12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
import functools
15
- import multiprocessing
16
15
import threading
17
16
import time
18
17
import traceback
@@ -37,37 +36,22 @@ def wrapped_func(*args, **kwargs):
37
36
return wrapped_func
38
37
39
38
40
- @trace_unhandled_exceptions
41
- def go ():
42
- print (1 )
43
- raise Exception ()
44
- print (2 )
39
+ def worker_thread (thread_id , num_iterations , some_function , ip = '127.0.0.1' , port = 9090 ):
40
+ infinity_obj = infinity .connect (NetworkAddress (ip , port ))
41
+ try :
42
+ for j in range (num_iterations ):
43
+ some_function (infinity_obj , port , thread_id , j )
44
+ except Exception as e :
45
+ print (f"Exception: { e } " )
46
+ finally :
47
+ infinity_obj .disconnect ()
45
48
46
49
47
- def test ():
48
- p = multiprocessing .Pool (1 )
49
-
50
- p .apply_async (go )
51
- p .close ()
52
- p .join ()
53
-
54
-
55
- def worker_thread (process_id , thread_id , num_iterations , some_function , ip = '0.0.0.0' , port = 9090 ):
56
- for j in range (num_iterations ):
57
- infinity_obj = infinity .connect (NetworkAddress (ip , port ))
58
- try :
59
- some_function (infinity_obj , port , process_id , thread_id , j )
60
- except Exception as e :
61
- print (f"Exception: { e } " )
62
- finally :
63
- infinity_obj .disconnect ()
64
-
65
-
66
- def worker_internal_connection (process_id , num_threads , num_iterations , some_function , ip = None , port = None ):
50
+ def worker_internal_connection (num_threads , num_iterations , some_function , ip = None , port = None ):
67
51
threads = []
68
52
for j in range (num_threads ):
69
53
thread = threading .Thread (target = worker_thread , args = (
70
- process_id , j , num_iterations , some_function , ip , port ))
54
+ j , num_iterations , some_function , ip , port ))
71
55
threads .append (thread )
72
56
thread .start ()
73
57
@@ -76,38 +60,29 @@ def worker_internal_connection(process_id, num_threads, num_iterations, some_fun
76
60
thread .join ()
77
61
78
62
79
- def measure_time_internal (num_processes , num_threads , num_times , some_function , ip = None , port = None ):
80
- # Calculate how many iterations each process should do
81
- num_iterations = num_times // num_processes // num_threads
63
+ def measure_time_internal (num_threads , num_times , some_function , ip = None , port = None ):
64
+ # Calculate how many iterations each thread should do
65
+ num_iterations = num_times // num_threads
82
66
83
67
start_time = time .perf_counter ()
84
- processes = []
85
- for i in range (num_processes ):
86
- process = multiprocessing .Process (target = worker_internal_connection ,
87
- args = (i , num_threads , num_iterations , some_function , ip , port ))
88
- processes .append (process )
89
- process .start ()
90
-
91
- # Wait for all threads to finish
92
- for process in processes :
93
- process .join ()
94
-
68
+ worker_internal_connection (
69
+ num_threads , num_iterations , some_function , ip , port )
95
70
end_time = time .perf_counter ()
96
71
97
72
elapsed_time = end_time - start_time
98
73
return elapsed_time
99
74
100
75
101
- def execute (some_functions : list , protocols : list , num_processes , num_threads , num_times ) -> pd .DataFrame :
76
+ def execute (some_functions : list , protocols : list , num_threads , num_times ) -> pd .DataFrame :
102
77
results = pd .DataFrame (
103
- columns = ['rpc name' , 'function' , 'qps' , 'elapsed_time' , 'average_latency' , 'num_processes' , ' num_threads' ,
78
+ columns = ['rpc name' , 'function' , 'qps' , 'elapsed_time' , 'average_latency' , 'num_threads' ,
104
79
'num_times' ])
105
80
print (f"\n " )
106
81
107
82
for (protocol , ip , port ) in protocols :
108
83
for some_function in some_functions :
109
84
elapsed_time = measure_time_internal (
110
- num_processes , num_threads , num_times , some_function , ip , port )
85
+ num_threads , num_times , some_function , ip , port )
111
86
qps = num_times / elapsed_time # queries per second
112
87
avg_latency = (elapsed_time / num_times ) * 1000 # in ms
113
88
@@ -116,7 +91,6 @@ def execute(some_functions: list, protocols: list, num_processes, num_threads, n
116
91
qps ,
117
92
elapsed_time ,
118
93
avg_latency ,
119
- num_processes ,
120
94
num_threads ,
121
95
num_times ]
122
96
@@ -129,117 +103,118 @@ class TestBenchmark:
129
103
130
104
def test_measure_time (self ):
131
105
@trace_unhandled_exceptions
132
- def create_database (infinity_obj , port , process_id , thread_id , num_iteration ):
106
+ def create_database (infinity_obj , port , thread_id , num_iteration ):
133
107
res = infinity_obj .create_database (
134
- f"my_database_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
108
+ f"my_database_{ port } _{ thread_id } _{ num_iteration } " )
135
109
if res .error_code != ErrorCode .OK :
136
110
raise Exception (f"create_database failed: { res .error_msg } " )
137
111
138
112
@trace_unhandled_exceptions
139
- def get_database (infinity_obj , port , process_id , thread_id , num_iteration ):
113
+ def get_database (infinity_obj , port , thread_id , num_iteration ):
140
114
db_obj = infinity_obj .get_database (f"default_db" )
141
115
if db_obj is None :
142
116
raise Exception (f"get_database failed" )
143
117
144
118
@trace_unhandled_exceptions
145
- def list_databases (infinity_obj , port , process_id , thread_id , num_iteration ):
119
+ def list_databases (infinity_obj , port , thread_id , num_iteration ):
146
120
res = infinity_obj .list_databases ()
147
121
if res .error_code != ErrorCode .OK :
148
122
raise Exception (f"list_databases failed: { res .error_msg } " )
149
123
150
124
@trace_unhandled_exceptions
151
- def drop_database (infinity_obj , port , process_id , thread_id , num_iteration ):
125
+ def drop_database (infinity_obj , port , thread_id , num_iteration ):
152
126
res = infinity_obj .drop_database (
153
- f"my_database_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
127
+ f"my_database_{ port } _{ thread_id } _{ num_iteration } " )
154
128
if res .error_code != ErrorCode .OK :
155
129
raise Exception (f"drop_database failed: { res .error_msg } " )
156
130
157
131
@trace_unhandled_exceptions
158
- def create_table (infinity_obj , port , process_id , thread_id , num_iteration ):
132
+ def create_table (infinity_obj , port , thread_id , num_iteration ):
159
133
res = infinity_obj .get_database (f"default_db" ).create_table (
160
- f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " ,
134
+ f"table_{ port } _{ thread_id } _{ num_iteration } " ,
161
135
{"c1" : {"type" : "int" , "constraints" : ["primary key" ]}, "c2" : {"type" : "float" }})
162
136
if res .error_code != ErrorCode .OK :
163
137
raise Exception (f"create_table failed: { res .error_msg } " )
164
138
165
139
@trace_unhandled_exceptions
166
- def insert_table (infinity_obj , port , process_id , thread_id , num_iteration ):
140
+ def insert_table (infinity_obj , port , thread_id , num_iteration ):
167
141
res = (infinity_obj
168
142
.get_database (f"default_db" )
169
- .get_table (f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
143
+ .get_table (f"table_{ port } _{ thread_id } _{ num_iteration } " )
170
144
.insert ([{"c1" : 1 , "c2" : 1.1 }, {"c1" : 2 , "c2" : 2.2 }]))
171
145
if res .error_code != ErrorCode .OK :
172
146
raise Exception (f"insert_table failed: { res .error_msg } " )
173
147
174
148
@trace_unhandled_exceptions
175
- def list_tables (infinity_obj , port , process_id , thread_id , num_iteration ):
149
+ def list_tables (infinity_obj , port , thread_id , num_iteration ):
176
150
(infinity_obj
177
151
.get_database (f"default_db" )
178
152
.list_tables ())
179
153
180
154
@trace_unhandled_exceptions
181
- def select_table (infinity_obj , port , process_id , thread_id , num_iteration ):
155
+ def select_table (infinity_obj , port , thread_id , num_iteration ):
182
156
res = (infinity_obj
183
157
.get_database (f"default_db" )
184
- .get_table (f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
158
+ .get_table (f"table_{ port } _{ thread_id } _{ num_iteration } " )
185
159
.query_builder ()
186
160
.output (["*" ])
187
161
.filter ("c1 > 1" ).to_df ())
188
162
if res is None :
189
163
raise Exception (f"select_table failed: { res } " )
190
164
191
165
@trace_unhandled_exceptions
192
- def drop_table (infinity_obj , port , process_id , thread_id , num_iteration ):
166
+ def drop_table (infinity_obj , port , thread_id , num_iteration ):
193
167
res = (infinity_obj
194
168
.get_database (f"default_db" )
195
- .drop_table (f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " ))
169
+ .drop_table (f"table_{ port } _{ thread_id } _{ num_iteration } " ))
196
170
if res .error_code != ErrorCode .OK :
197
171
raise Exception (f"drop_table failed: { res .error_msg } " )
198
172
199
173
@trace_unhandled_exceptions
200
- def create_index (infinity_obj , port , process_id , thread_id , num_iteration ):
174
+ def create_index (infinity_obj , port , thread_id , num_iteration ):
201
175
res = (infinity_obj
202
176
.get_database (f"default_db" )
203
- .get_table (f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
177
+ .get_table (f"table_{ port } _{ thread_id } _{ num_iteration } " )
204
178
.create_index ("my_index" , ["c1" ], "IVF_FLAT" , None ))
205
179
if res .error_code != ErrorCode .OK :
206
180
raise Exception (f"create_index failed: { res .error_msg } " )
207
181
208
182
@trace_unhandled_exceptions
209
- def drop_index (infinity_obj , port , process_id , thread_id , num_iteration ):
183
+ def drop_index (infinity_obj , port , thread_id , num_iteration ):
210
184
res = (infinity_obj
211
185
.get_database (f"default_db" )
212
- .get_table (f"table_{ port } _{ process_id } _ { thread_id } _{ num_iteration } " )
186
+ .get_table (f"table_{ port } _{ thread_id } _{ num_iteration } " )
213
187
.drop_index ("my_index" ))
214
188
if res .error_code != ErrorCode .OK :
215
189
raise Exception (f"drop_index failed: { res .error_msg } " )
216
190
217
191
############################################
218
192
# Using the tune
219
193
220
- ip : str = '0 .0.0.0 '
221
- thrift = ("Thrift" , ip , 9090 )
194
+ ip : str = '127 .0.0.1 '
195
+ thrift = ("Thrift" , ip , 23817 )
222
196
thread_pool_thrift = ("Thread Pool Thrift" , ip , 23817 )
223
- async_thrift = ("AsyncThrift" , ip , 9070 )
224
- num_processes = 16
225
- num_threads = 16
226
- num_times = 16 * 16 * 10
197
+ async_thrift = ("AsyncThrift" , ip , 23817 )
198
+ num_threads = 1
199
+ num_times = 10
227
200
protocols = [thread_pool_thrift ]
228
201
229
- database_functions = [create_database ,
230
- get_database , list_databases , drop_database ]
202
+ database_functions = [create_database ]
231
203
232
204
db_df = execute (database_functions , protocols ,
233
- num_processes , num_threads , num_times )
205
+ num_threads , num_times )
234
206
235
- table_functions = [create_table , insert_table ,
236
- select_table , list_tables , drop_table ]
207
+ table_functions = []
237
208
tbl_df = execute (table_functions , protocols ,
238
- num_processes , num_threads , num_times )
209
+ num_threads , num_times )
239
210
240
211
# index_functions = []
241
- # idx_df = execute(index_functions, protocols, num_processes, num_threads, num_times)
212
+ # idx_df = execute(index_functions, protocols, num_threads, num_times)
242
213
243
214
df = pd .concat ([db_df , tbl_df ])
244
215
print (df )
245
216
df .to_excel (f"{ datetime .now ()} _benchmark.xlsx" )
217
+
218
+
219
+ if __name__ == "__main__" :
220
+ TestBenchmark ().test_measure_time ()
0 commit comments