Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeldaHuang committed Dec 19, 2024
1 parent ffc42a8 commit e29b97a
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 41 deletions.
38 changes: 0 additions & 38 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,44 +55,6 @@ def create(seq_group: SequenceGroupLlumnix, use_cache: bool = False):
else:
return RequestOutput.from_seq_group(seq_group, use_cache), seq_group.server_info

class AsyncPutQueueActor:
def __init__(self, instance_id, request_output_queue_type: QueueType):
self.instance_id = instance_id
self.request_output_queue_type = request_output_queue_type
self.request_output_queue_client: QueueClientBase = init_request_output_queue_client(request_output_queue_type)
self.engine_actor_handle = None

async def put_nowait_to_servers(self,
server_request_outputs: Dict[str, List[RequestOutput]],
server_info_dict: Dict[str, ServerInfo]) -> None:
try:
if self.engine_actor_handle is None:
self.engine_actor_handle = ray.get_actor("instance_{}".format(self.instance_id), namespace="llumnix")
tasks = []
for server_id, req_outputs in server_request_outputs.items():
server_info = server_info_dict[server_id]
for req_output in req_outputs:
if hasattr(req_output, 'request_timestamps'):
req_output.request_timestamps.engine_actor_put_queue_timestamp = time.time()
tasks.append(asyncio.create_task(self.request_output_queue_client.put_nowait(req_outputs, server_info)))
rets = await asyncio.gather(*tasks, return_exceptions=True)
for idx, ret in enumerate(rets):
if isinstance(ret, Exception):
server_id = list(server_request_outputs.keys())[idx]
server_info = server_info_dict[server_id]
logger.info("server {} is dead".format(server_id))
if self.request_output_queue_type == QueueType.ZMQ:
logger.info("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip,
server_info.request_output_queue_port))
req_outputs = list(server_request_outputs.values())[idx]
request_ids = [req_output.request_id for req_output in req_outputs]
self.engine_actor_handle.abort_request.remote(request_ids)
# pylint: disable=W0703
except Exception as e:
logger.error("Error in engine loop: {}".format(e))
logger.error("exception traceback: {}".format(traceback.format_exc()))


class LLMEngineLlumnix(_AsyncLLMEngine):
def __init__(self,
instance_id: str,
Expand Down
2 changes: 1 addition & 1 deletion llumnix/entrypoints/bladellm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def background_process_outputs(self):
del self.request_streams[request_id]

async def _add_request(self, request: ServerRequest) -> LLMResponse:
if request.sampling_params.n > 1 or request.sampling_params.use_beam_search:
if request.sampling_params.n > 1:
return error_resp(request.id, err_code=400, err_msg="Unsupported feature: multiple sequence decoding in Llumnix.")

llumnix_id = random.randint(0, 2147483647) # 1<<31-1
Expand Down
1 change: 1 addition & 0 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ def init_llumlets(self, engine_args, node_id: str, request_output_queue_type: Qu
world_size,
engine_manager_args.create_migration_config(),
engine_manager_args.profiling_result_file_path,
engine_args,
*args,
**kwargs
)
Expand Down
2 changes: 1 addition & 1 deletion llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def from_args(cls,
else: # backend_type == backend_type.SIM_VLLM:
kwargs["node_id"] = node_id
engine_class = ray.remote(num_cpus=1,
num_gpu=num_gpu,
num_gpus=num_gpu,
name=actor_name,
namespace='llumnix',
max_concurrency=4,
Expand Down
1 change: 0 additions & 1 deletion tests/e2e_test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ async def test_e2e(cleanup_ray_env, shutdown_llumnix_service, model, migration_b
sampling_params = {
"n": 1,
"best_of": 1,
"use_beam_search": False,
"temperature": 0.0,
"top_k": 1,
"ignore_eos": False,
Expand Down

0 comments on commit e29b97a

Please sign in to comment.