forked from infiniflow/infinity
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_chaos.py
147 lines (126 loc) · 5.91 KB
/
test_chaos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import time
import infinity.index as index
import pandas
import pytest
import random
from threading import Thread
from infinity.common import ConflictType
from infinity.errors import ErrorCode
from infinity.connection_pool import ConnectionPool
from infinity.table import Table
TEST_DATA_DIR = "/test/data/"
fulltext_file_path = os.getcwd() + TEST_DATA_DIR + "csv/enwiki_99.csv"
vector_file_path = os.getcwd() + TEST_DATA_DIR + "csv/pysdk_test_knn.csv"
kRunningTime = 30
kNumThread = 4
data_size = 100000
insert_delete_size = 100
class TestIndexParallel:
#@pytest.mark.skip(reason="To pass benchmark, use wrong row count in knn scan")
def test_chaos(self, get_infinity_connection_pool):
data = read_out_data()
connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
res = db_obj.drop_table("chaos_test", ConflictType.Ignore)
assert res.error_code == ErrorCode.OK
table_obj = db_obj.create_table("chaos_test", {
"index": {"type": "int"}, "body": {"type": "varchar"}, "other_vector": {"type": "vector,4,float"}},
ConflictType.Error)
res = table_obj.create_index("body_index",
[index.IndexInfo("body",
index.IndexType.FullText,
[])])
assert res.error_code == ErrorCode.OK
res = table_obj.create_index("other_index",
[index.IndexInfo("other_vector", index.IndexType.Hnsw,
[index.InitParameter("M", "16"),
index.InitParameter("ef_construction", "50"),
index.InitParameter("ef", "50"),
index.InitParameter("metric", "l2")
])], ConflictType.Error)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)
threads = []
end_time = time.time() + kRunningTime
for i in range(kNumThread):
threads.append(Thread(target=random_exec, args=[connection_pool, data, end_time, i]))
for i in range(len(threads)):
threads[i].start()
for i in range(len(threads)):
threads[i].join()
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
res = db_obj.drop_table("chaos_test", ConflictType.Error)
assert res.error_code == ErrorCode.OK
connection_pool.release_conn(infinity_obj)
def read_out_data():
column_names = ["doctitle", "docdate", "body"]
df_fulltext = pandas.read_csv(fulltext_file_path, delimiter="\t", header=None, names=column_names)["body"]
df_vector = pandas.DataFrame({"other_vector": [[0.0, 0.0, 0.0, 0.0], [1.1, 1.1, 1.1, 1.1], [2.2, 2.2, 2.2, 2.2],
[3.3, 3.3, 3.3, 3.3], [4.4, 4.4, 4.4, 4.4], [5.5, 5.5, 5.5, 5.5],
[6.6, 6.6, 6.6, 6.6], [7.7, 7.7, 7.7, 7.7], [8.8, 8.8, 8.8, 8.8],
[9.9, 9.9, 9.9, 9.9]]})
print(df_vector)
df_fulltext = pandas.concat([df_fulltext] * int(data_size / 10), ignore_index=True)
df_vector = pandas.concat([df_vector] * int(data_size / 10), ignore_index=True)
df_fulltext = df_fulltext.reset_index()
df_vector = df_vector.reset_index()
df = pandas.merge(df_vector, df_fulltext, on='index')
df["body"] = df["body"].astype(str)
print(df.dtypes)
data = {key: list(value.values())
for key, value in df.to_dict().items()}
return data
def search_fulltext(table_obj: Table):
res = table_obj.output(["index", "body", "other_vector", "_row_id", "_score"]).match(
"body^5", "harmful chemical", "topn=3").to_pl()
print(res)
def search_vector(table_obj: Table):
res = table_obj.output(["*"]).knn("other_vector", [2] * 4, "float", "l2", 3).to_pl()
print(res)
def insert(table_obj: Table, data):
try:
pos = random.randint(0, int(data_size / insert_delete_size) - 1)
value = []
for i in range(insert_delete_size):
value.append({"index": data["index"][i + pos], "body": data["body"][i + pos],
"other_vector": data["other_vector"][i + pos]})
table_obj.insert(value)
except Exception as e:
print(e)
def delete(table_obj: Table):
pos = random.randint(0, int(data_size / insert_delete_size) - 1)
try:
table_obj.delete(f"index >= {pos} and index < {pos + insert_delete_size}")
except Exception as e:
print(e)
def updata(table_obj: Table):
pos = random.randint(0, data_size - 1)
try:
table_obj.update(f"index = {pos}", [{"index": pos, "body": "infinity", "other_vector": [0.0, 0.0, 0.0, 0.0]}])
except Exception as e:
print(e)
def random_exec(connection_pool: ConnectionPool, data, end_time, thread_id):
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("chaos_test")
while time.time() < end_time:
rand_v = random.randint(0, 4)
if rand_v == 0:
print(thread_id, "insert")
insert(table_obj, data)
elif rand_v == 1:
print(thread_id, "delete")
delete(table_obj)
elif rand_v == 2:
print(thread_id, "update")
updata(table_obj)
elif rand_v == 3:
print(thread_id, "search fulltext")
search_fulltext(table_obj)
else:
print(thread_id, "search vector")
search_vector(table_obj)
connection_pool.release_conn(infinity_obj)