Skip to content

Commit

Permalink
fix thread work pool concurrent bug (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dobiichi-Origami authored Feb 2, 2024
1 parent 67ba140 commit 7af9a5d
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions python/qianfan/resources/llm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
# This constant is used to express no model is spcified,
# so that SDK still can get the requirements of API from _supported_models()
UNSPECIFIED_MODEL = "UNSPECIFIED_MODEL"
MAX_WORKER_THREAD_COUNT = 100000


class BatchRequestFuture(object):
Expand All @@ -62,16 +63,19 @@ def __init__(
"""
Init batch request future
"""
future_list: List[Future[Union[QfResponse, Iterator[QfResponse]]]] = []
max_workers = worker_num if worker_num else len(tasks) + 1
if max_workers > MAX_WORKER_THREAD_COUNT:
max_workers = MAX_WORKER_THREAD_COUNT

self._future_list: List[Future[Union[QfResponse, Iterator[QfResponse]]]] = []
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._finished_count = 0
self._task_count = len(tasks)
self._lock = threading.Lock()
for task in tasks:
future = self._executor.submit(task)
future.add_done_callback(self._future_callback)
future_list.append(future)
self._future_list = future_list
self._finished_count = 0
self._lock = threading.Lock()
self._future_list.append(future)

def _future_callback(
self, fn: Future[Union[QfResponse, Iterator[QfResponse]]]
Expand All @@ -81,7 +85,7 @@ def _future_callback(
"""
with self._lock:
self._finished_count += 1
if self._finished_count == len(self._future_list):
if self._finished_count == self._task_count:
log_info("All tasks finished, exeutor will be shutdown")
self._executor.shutdown(wait=False)

Expand Down

0 comments on commit 7af9a5d

Please sign in to comment.