Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ZeldaHuang committed Dec 19, 2024
1 parent e29b97a commit e9d1eff
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 20 deletions.
3 changes: 1 addition & 2 deletions docs/Quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ HEAD_NODE=1 python -m llumnix.entrypoints.vllm.api_server \
--initial-instances $INITIAL_INSTANCES \
--launch-ray-cluster \
--model $MODEL_PATH \
--engine-use-ray \
--worker-use-ray \
--max-model-len 4096
```
`CONFIG_PATH` is the path to the configuration file for Llumnix, and we give an example configuration file [here](../configs/base.yml). `MODEL_PATH` defines the location of your model. `INITIAL_INSTANCES` determines the number of instances to be launched on the current node,
`CONFIG_PATH` is the path to the configuration file for Llumnix, and we give an example configuration file [here](../configs/base.yml). `MODEL_PATH` defines the location of your model. `INITIAL_INSTANCES` determines the number of instances to be launched on the current node,

Second, you can run the benchmark to evaluate the serving performance:

Expand Down
6 changes: 3 additions & 3 deletions examlpes/offline_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ async def main():
for request in prompts:
request_id = random_uuid()
await manager.generate.remote(request_id=request_id,
server_info=server_info,
server_info=server_info,
prompt=request,
sampling_params=sampling_params,)
params=sampling_params,)

await output_task

asyncio.run(main())
Expand Down
4 changes: 2 additions & 2 deletions llumnix/backends/migration_backend_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def migrate_cache(self, src_handle, src_blocks: List[int], dst_blocks: List[int]
raise NotImplementedError

@abstractmethod
def do_send(self, dst_handle, blocks: List[int]):
def do_send(self, dst_handle, blocks: List[int], virtuel_engine: int):
raise NotImplementedError

@abstractmethod
def do_recv(self, src_handle, blocks: List[int]):
def do_recv(self, src_handle, blocks: List[int], virtuel_engine: int):
raise NotImplementedError
4 changes: 1 addition & 3 deletions llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,7 @@ def sort_by_driver_then_worker_ip(worker):
def _get_worker_module_and_class(
self) -> Tuple[str, str, Optional[Callable[[], Type[WorkerBase]]]]:
worker_class_fn = None
if self.scheduler_config.is_multi_step:
raise NotImplementedError
elif self.speculative_config:
if self.scheduler_config.is_multi_step or self.speculative_config:
raise NotImplementedError
else:
worker_module_name = "llumnix.backends.vllm.worker"
Expand Down
3 changes: 2 additions & 1 deletion llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import time
import traceback
from typing import Any, List, Optional, Dict, Union, Iterable, Deque, Tuple
from typing import Any, List, Optional, Union, Iterable, Deque, Tuple
from collections import defaultdict
import threading
import asyncio
Expand Down Expand Up @@ -62,6 +62,7 @@ def __init__(self,
placement_group: Optional[PlacementGroup],
node_id: Optional[str],
*arg, **kwargs) -> None:
# pylint: disable=import-outside-toplevel
import vllm.outputs
vllm.outputs.RequestOutputFactory.create = LlumnixOutput.create
super().__init__(*arg, **kwargs)
Expand Down
12 changes: 8 additions & 4 deletions llumnix/backends/vllm/migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def do_send(self, dst_handle, blocks: List[int], virtuel_engine: int=0):
device="cpu", pin_memory=True).view(-1, 2)
with torch.cuda.stream(self.migration_stream):
for layer_idx in range(self.num_layers):
self.cache_engine[virtuel_engine].attn_backend.swap_blocks(self.gpu_cache[virtuel_engine][layer_idx], send_cache[layer_idx], block_mapping_tensor)
self.cache_engine[virtuel_engine].attn_backend \
.swap_blocks(self.gpu_cache[virtuel_engine][layer_idx], send_cache[layer_idx], block_mapping_tensor)
torch.cuda.Stream.synchronize(self.migration_stream)
return send_cache.to(self.rpc_dtype).numpy()

Expand All @@ -134,7 +135,8 @@ def do_recv(self, src_handle, blocks: List[int], virtuel_engine: int=0):

with torch.cuda.stream(self.migration_stream):
for layer_idx in range(self.num_layers):
self.cache_engine[virtuel_engine].attn_backend.swap_blocks(recv_cache[layer_idx], self.gpu_cache[virtuel_engine][layer_idx], block_mapping_tensor)
self.cache_engine[virtuel_engine].attn_backend \
.swap_blocks(recv_cache[layer_idx], self.gpu_cache[virtuel_engine][layer_idx], block_mapping_tensor)
torch.cuda.Stream.synchronize(self.migration_stream)

def try_import_gloo():
Expand Down Expand Up @@ -271,7 +273,8 @@ def do_send(self, dst_handle, blocks: List[int], virtuel_engine: int=0):
with self.migration_stream:
for layer_idx in range(self.cache_engine[0].num_attention_layers):
cache_idx = layer_idx % self.migration_num_layers
self.cache_engine[virtuel_engine].attn_backend.swap_blocks(self.gpu_cache[virtuel_engine][layer_idx], send_cache[cache_idx], block_mapping_tensor)
self.cache_engine[virtuel_engine].attn_backend \
.swap_blocks(self.gpu_cache[virtuel_engine][layer_idx], send_cache[cache_idx], block_mapping_tensor)
if cache_idx + 1 == self.migration_num_layers or layer_idx + 1 == self.cache_engine[0].num_attention_layers:
# TODO(KuilongCui): check the error code if peer is dead
col.send(send_cache, dst_handle, self.group_name)
Expand All @@ -292,7 +295,8 @@ def do_recv(self, src_handle, blocks: List[int], virtuel_engine: int=0):
cache_idx = layer_idx % self.migration_num_layers
if cache_idx == 0:
col.recv(recv_cache, src_handle, self.group_name)
self.cache_engine[virtuel_engine].attn_backend.swap_blocks(recv_cache[cache_idx], self.gpu_cache[virtuel_engine][layer_idx], block_mapping_tensor)
self.cache_engine[virtuel_engine].attn_backend \
.swap_blocks(recv_cache[cache_idx], self.gpu_cache[virtuel_engine][layer_idx], block_mapping_tensor)
self.migration_stream.synchronize()

def get_migration_backend(migration_config: MigrationConfig, cache_engine: List[CacheEngine], worker_handle_list, scheduling_strategy,
Expand Down
2 changes: 0 additions & 2 deletions llumnix/backends/vllm/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ def remove_running_request(self, request_id: str) -> bool:
if seq_group.request_id == request_id:
self.running.remove(seq_group)
seq_group.set_status(RequestStatus.RUNNING_MIGRATING)
logger.info(f"remove running req {request_id}")
logger.info(f"len:{len(self.running)}")
return True
return False

Expand Down
1 change: 0 additions & 1 deletion llumnix/backends/vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from vllm.config import ModelConfig, ParallelConfig
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.sampling_params import SamplingType
from vllm.model_executor.layers.sampler import SamplingMetadata, SamplingTensors, SampleResultArgsType, SampleReturnType, \
SampleResultsDictType, SampleMetadataType, MultinomialSamplesType, \
Expand Down
2 changes: 1 addition & 1 deletion llumnix/entrypoints/vllm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def generate(self,
**kwargs) -> AsyncStream:
if sampling_params.n > 1:
raise ValueError("Unsupported feature: multiple sequence decoding")

# pylint: disable=unexpected-keyword-arg
results_generator = AsyncStream(request_id, cancel=None)
self.request_streams[request_id] = results_generator
server_info_copy = copy.deepcopy(self.server_info)
Expand Down
1 change: 0 additions & 1 deletion tests/e2e_test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def generate_launch_command(result_filename: str = "",
f"{'--log-instance-info ' if log_instance_info else ''}"
f"--enable-migration "
f"--model {model} "
f"--engine-use-ray "
f"--worker-use-ray "
f"--max-model-len {max_model_len} "
f"--dispatch-policy {dispatch_policy} "
Expand Down

0 comments on commit e9d1eff

Please sign in to comment.