Skip to content

Commit 791ed3a

Browse files
committed
Let thrift client reconnect on insert failure
Updated requirements.txt
1 parent 848b3a8 commit 791ed3a

File tree

7 files changed

+61
-32
lines changed

7 files changed

+61
-32
lines changed

python/benchmark/clients/base_client.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from urllib.parse import urlparse
99
import time
1010

11+
1112
class BaseClient:
1213
"""
1314
Base class for all clients(Qdrant, ES, infinity).
@@ -25,14 +26,21 @@ def __init__(self,
2526
"""
2627
pass
2728

29+
@abstractmethod
30+
def upload(self):
31+
"""
32+
Upload data and build indexes (parameters are parsed by __init__).
33+
"""
34+
pass
35+
2836
@abstractmethod
2937
def search(self) -> list[list[Any]]:
3038
"""
3139
Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters.
3240
The function returns id list.
3341
"""
3442
pass
35-
43+
3644
def download_data(self, url, target_path):
3745
"""
3846
Download dataset and extract it into path.
@@ -59,6 +67,11 @@ def run_experiment(self, args):
5967
"""
6068
run experiment and save results.
6169
"""
70+
if args.import_data:
71+
start_time = time.time()
72+
self.upload()
73+
finish_time = time.time()
74+
print(f"upload finish, cost time = {finish_time - start_time}")
6275
if args.query:
6376
results = self.search()
64-
self.check_and_save_results(results)
77+
self.check_and_save_results(results)

python/benchmark/configs/infinity_enwiki.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"query_link": "to_be_set",
1010
"mode": "fulltext",
1111
"topK": 10,
12-
"use_import": true,
12+
"use_import": false,
1313
"schema": {
1414
"doctitle": {"type": "varchar", "default":""},
1515
"docdate": {"type": "varchar", "default":""},

python/benchmark/requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
elasticsearch~=8.13.0
2+
h5py~=3.11.0
3+
qdrant_client~=1.9.0
4+

python/infinity/remote_thrift/client.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
class ThriftInfinityClient:
2828
def __init__(self, uri: URI):
2929
self.uri = uri
30+
self.transport = None
3031
self.reconnect()
3132

3233
def reconnect(self):
34+
if self.transport is not None:
35+
self.transport.close()
36+
self.transport = None
3337
# self.transport = TTransport.TFramedTransport(TSocket.TSocket(self.uri.ip, self.uri.port)) # async
3438
self.transport = TTransport.TBufferedTransport(
3539
TSocket.TSocket(self.uri.ip, self.uri.port)) # sync
@@ -126,7 +130,8 @@ def list_indexes(self, db_name: str, table_name: str):
126130

127131
def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
128132
retry = 0
129-
while retry <= 10:
133+
inner_ex = None
134+
while retry <= 2:
130135
try:
131136
res = self.client.Insert(InsertRequest(session_id=self.session_id,
132137
db_name=db_name,
@@ -135,12 +140,12 @@ def insert(self, db_name: str, table_name: str, column_names: list[str], fields:
135140
fields=fields))
136141
return res
137142
except TTransportException as ex:
138-
if ex.type == ex.END_OF_FILE:
139-
self.reconnect()
140-
retry += 1
141-
else:
142-
break
143-
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "retry insert failed")
143+
#import traceback
144+
#traceback.print_exc()
145+
self.reconnect()
146+
inner_ex = ex
147+
retry += 1
148+
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "insert failed with exception: " + str(inner_ex))
144149

145150
# Can be used in compact mode
146151
# def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
@@ -198,7 +203,11 @@ def update(self, db_name: str, table_name: str, where_expr, update_expr_array):
198203
update_expr_array=update_expr_array))
199204

200205
def disconnect(self):
201-
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
206+
res = None
207+
try:
208+
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
209+
except Exception:
210+
pass
202211
self.transport.close()
203212
return res
204213

python/pyproject.toml

+10-10
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22
name = "infinity_sdk"
33
version = "0.2.0.dev1"
44
dependencies = [
5-
"sqlglot==11.7.1",
6-
"pydantic",
7-
"thrift",
8-
"setuptools",
9-
"pytest",
10-
"pandas",
11-
"numpy",
12-
"pyarrow",
13-
"openpyxl",
14-
"polars"
5+
"sqlglot~=23.12.2",
6+
"pydantic~=2.7.1",
7+
"thrift~=0.20.0",
8+
"setuptools~=69.5.1",
9+
"pytest~=8.2.0",
10+
"pandas~=2.2.2",
11+
"numpy~=1.26.4",
12+
"pyarrow~=16.0.0",
13+
"polars~=0.20.23",
14+
"openpyxl~=3.1.2"
1515
]
1616
description = "infinity"
1717
readme = "README.md"

python/requirements.txt

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
sqlglot==11.7.1
2-
pydantic~=1.10.12
1+
sqlglot~=23.12.2
2+
pydantic~=2.7.1
33
thrift~=0.20.0
4-
setuptools~=68.0.0
5-
pytest~=7.4.0
6-
pandas~=2.1.1
7-
openpyxl
8-
numpy~=1.26.0
9-
polars~=0.19.0
10-
pyarrow
4+
setuptools~=69.5.1
5+
pytest~=8.2.0
6+
pandas~=2.2.2
7+
numpy~=1.26.4
8+
pyarrow~=16.0.0
9+
polars~=0.20.23
10+
openpyxl~=3.1.2

src/network/thrift_server.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
module;
1616

17+
#include <cstdio>
1718
#include <memory>
19+
#include <thrift/TOutput.h>
1820
#include <thrift/TToString.h>
1921
#include <thrift/concurrency/ThreadFactory.h>
2022
#include <thrift/concurrency/ThreadManager.h>
@@ -108,10 +110,11 @@ class InfinityServiceCloneFactory final : public infinity_thrift_rpc::InfinitySe
108110
void releaseHandler(infinity_thrift_rpc::InfinityServiceIf *handler) final { delete handler; }
109111
};
110112

111-
// Thrift server
113+
static void ThriftPrintf(const char *msg) { printf("%s\n", msg); }
112114

115+
// Thrift server
113116
void ThreadedThriftServer::Init(i32 port_no) {
114-
117+
GlobalOutput.setOutputFunction(ThriftPrintf);
115118
std::cout << "API server listen on: 0.0.0.0:" << port_no << std::endl;
116119
SharedPtr<TBinaryProtocolFactory> binary_protocol_factory = MakeShared<TBinaryProtocolFactory>();
117120
binary_protocol_factory->setStrict(true, true);

0 commit comments

Comments
 (0)