2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 class QueryExecutorTestCase(unittest
.TestCase
):
27 class MySink(bt2
._UserSinkComponent
):
32 def _query(cls
, query_exec
, obj
, params
, log_level
):
42 'array': ['coucou', 23, None],
51 res
= bt2
.QueryExecutor().query(MySink
, 'obj', params
)
52 self
.assertEqual(query_params
, params
)
53 self
.assertEqual(res
, {
59 def test_query_params_none(self
):
60 class MySink(bt2
._UserSinkComponent
):
65 def _query(cls
, query_exec
, obj
, params
, log_level
):
70 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None)
71 self
.assertEqual(query_params
, None)
74 def test_query_logging_level(self
):
75 class MySink(bt2
._UserSinkComponent
):
80 def _query(cls
, query_exec
, obj
, params
, log_level
):
81 nonlocal query_log_level
82 query_log_level
= log_level
84 query_log_level
= None
85 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None,
86 bt2
.LoggingLevel
.INFO
)
87 self
.assertEqual(query_log_level
, bt2
.LoggingLevel
.INFO
)
90 def test_query_gen_error(self
):
91 class MySink(bt2
._UserSinkComponent
):
96 def _query(cls
, query_exec
, obj
, params
, log_level
):
99 with self
.assertRaises(bt2
.Error
):
100 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
102 def test_query_invalid_object(self
):
103 class MySink(bt2
._UserSinkComponent
):
108 def _query(cls
, query_exec
, obj
, params
, log_level
):
109 raise bt2
.InvalidQueryObject
111 with self
.assertRaises(bt2
.InvalidQueryObject
):
112 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
114 def test_query_logging_level_invalid_type(self
):
115 class MySink(bt2
._UserSinkComponent
):
120 def _query(cls
, query_exec
, obj
, params
, log_level
):
123 with self
.assertRaises(TypeError):
124 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 'yeah')
126 def test_query_logging_level_invalid_value(self
):
127 class MySink(bt2
._UserSinkComponent
):
132 def _query(cls
, query_exec
, obj
, params
, log_level
):
135 with self
.assertRaises(ValueError):
136 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 12345)
138 def test_query_invalid_params(self
):
139 class MySink(bt2
._UserSinkComponent
):
144 def _query(cls
, query_exec
, obj
, params
, log_level
):
145 raise bt2
.InvalidQueryParams
147 with self
.assertRaises(bt2
.InvalidQueryParams
):
148 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
150 def test_query_try_again(self
):
151 class MySink(bt2
._UserSinkComponent
):
156 def _query(cls
, query_exec
, obj
, params
, log_level
):
159 with self
.assertRaises(bt2
.TryAgain
):
160 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
162 def test_cancel_no_query(self
):
163 query_exec
= bt2
.QueryExecutor()
164 self
.assertFalse(query_exec
.is_canceled
)
166 self
.assertTrue(query_exec
.is_canceled
)
168 def test_query_canceled(self
):
169 class MySink(bt2
._UserSinkComponent
):
174 def _query(cls
, query_exec
, obj
, params
, log_level
):
177 query_exec
= bt2
.QueryExecutor()
180 with self
.assertRaises(bt2
.QueryExecutorCanceled
):
181 res
= query_exec
.query(MySink
, 'obj', [17, 23])
This page took 0.034506 seconds and 5 git commands to generate.