Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
KuilongCui committed Dec 17, 2024
1 parent d21d8e8 commit febf19c
Show file tree
Hide file tree
Showing 15 changed files with 27 additions and 29 deletions.
6 changes: 3 additions & 3 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--profiling-result-file-path PROFILING_RESULT_FILE_PATH]
[--gpu-type GPU_TYPE]
[--polling-interval POLLING_INTERVAL]
[--migration-backend {gloo,nccl,rpc,grpc,kvtransfer}]
[--migration-backend {gloo,nccl,rayrpc,grpc,kvtransfer}]
[--migration-buffer-blocks MIGRATION_BUFFER_BLOCKS]
[--migration-backend-transfer-type {cuda_ipc,rdma,}]
[--migration-backend-kvtransfer-naming-url MIGRATION_BACKEND_KVTRANSFER_NAMING_URL]
Expand Down Expand Up @@ -147,8 +147,8 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--migration-backend`
- Communication backend of migration.
- Possible choices: gloo, rpc, grpc, kvtransfer. [gloo, rpc] are available for vllm and [grpc, kvtransfer] are available for bladellm.
- Default: "rpc"
- Possible choices: gloo, rayrpc, grpc, kvtransfer. [gloo, rayrpc] are available for vllm and [grpc, kvtransfer] are available for bladellm.
- Default: "rayrpc"

`--migration-backend-transfer-type`
- Transfer type for migration backend grpc and kvTransfer.
Expand Down
2 changes: 1 addition & 1 deletion docs/Quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cd llumnix
make install
```

The default migration backend is RPC. If you want to use NCCL as the migration backend, run `make cupy-cuda` to install [cupy-cuda](https://pypi.org/search/?q=cupy-cuda) manually, as it is related to the CUDA version.
The default migration backend is rayrpc. If you want to use NCCL as the migration backend, run `make cupy-cuda` to install [cupy-cuda](https://pypi.org/search/?q=cupy-cuda) manually, as it is related to the CUDA version.

If you want to use Gloo as migration backend, **in addition to installing cupy-cuda**, please refer to [this link](https://github.com/ZeldaHuang/pygloo/blob/main/.github/workflows/ubuntu_basic.yml#L24C1-L26C1) to install [Bazel](https://github.com/bazelbuild/bazel) >= 5.1.0. Then, run `make pygloo` to install [pygloo](https://github.com/ZeldaHuang/pygloo).

Expand Down
4 changes: 2 additions & 2 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
help='profiling result file path')
parser.add_argument('--migration-backend',
type=str,
choices=['gloo','nccl','rpc','grpc','kvtransfer'],
help='communication backend of migration, [gloo, rpc] are available for vllm \
choices=['gloo','nccl','rayrpc','grpc','kvtransfer'],
help='communication backend of migration, [gloo, rayrpc] are available for vllm \
and [grpc, kvtransfer] are available for bladellm')
parser.add_argument('--migration-backend-transfer-type',
type=str,
Expand Down
8 changes: 3 additions & 5 deletions llumnix/backends/bladellm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from llumnix.instance_info import InstanceInfo
from llumnix.queue.queue_type import QueueType

class AsyncBackQueue(APIWrapper):
class AsyncBackQueueWrapper(APIWrapper):
def __init__(self, placement_group, node_id, instance_id, output_queue_type) -> None:
super().__init__(args=None, resp_queue=None)
if placement_group:
Expand Down Expand Up @@ -144,7 +144,7 @@ def instance_info(self) -> InstanceInfo:
def start(self, loop: asyncio.AbstractEventLoop):
super().start(loop)
self._client = self.init_client_from_engine()
self.trans_wrapper: AsyncBackQueue = AsyncBackQueue(self.placement_group,
self.trans_wrapper: AsyncBackQueueWrapper = AsyncBackQueueWrapper(self.placement_group,
self.node_id, self.instance_id, self.output_queue_type)
self._scheduler.llumnix_metrics.engine_init_metrics(self)

Expand Down Expand Up @@ -202,11 +202,9 @@ def __init__(self,
node_id: Optional[str],
*args, **kwargs,
) -> None:
logger.info("aaa")
AsyncLLMEngine.__init__(self, *args, **kwargs)
logger.info("bbb")
AsyncLLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id)
logger.info("ccc")

class PrefillAsyncLLMEngineLlumnix(AsyncLLMEngineLlumnixMixin, PrefillAsyncLLMEngine):
def __init__(self,
instance_id: str,
Expand Down
6 changes: 3 additions & 3 deletions llumnix/backends/vllm/migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, migration_config: MigrationConfig, cache_engine: CacheEngine,
self.migration_stream = torch.cuda.Stream()

def init_backend(self, group_name, world_size, rank) -> bool:
logger.info("create rpc migration backend successfully.")
logger.info("create rayrpc migration backend successfully.")
return True

def destory_backend(self) -> None:
Expand All @@ -85,7 +85,7 @@ def destory_backend(self) -> None:

def warmup(self) -> bool:
self.actor.exec_method.remote(self.is_driver_worker, "do_send", [0])
logger.info("rpc migration backend warmup successfully.")
logger.info("rayrpc migration backend warmup successfully.")
return True

# The src actor will pack the kv-cache data layer by layer. Specifically, NumPy is used for the transfer
Expand Down Expand Up @@ -285,7 +285,7 @@ def get_migration_backend(migration_config: MigrationConfig, cache_engine: Cache
target_migration_backend = None
backend = migration_config.migration_backend

assert backend in ['nccl', 'gloo', 'rpc'], "Unsupported migration backend: {} for llumnix".format(backend)
assert backend in ['nccl', 'gloo', 'rayrpc'], "Unsupported migration backend: {} for llumnix".format(backend)

if backend in ['nccl', 'gloo']:
target_migration_backend = RayColMigrationBackend(migration_config, cache_engine, local_rank, scheduling_strategy,
Expand Down
4 changes: 2 additions & 2 deletions llumnix/global_scheduler/migration_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self,
# to prevent instances of migration backends that have not been initialized from participating in migration.
migration_backend_init_filter = CustomFilter()
migration_backend_init_filter.set_filter_condtition(
src_filter=lambda _: migration_backend == 'rpc',
dst_filter=lambda _: migration_backend == 'rpc')
src_filter=lambda _: migration_backend not in ['gloo', 'nccl'],
dst_filter=lambda _: migration_backend not in ['gloo', 'nccl'])
self.migration_filter.register_filter("migration_backend_init_filter",
migration_backend_init_filter)

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def run_vllm(model, max_model_len, sampling_params):
@pytest.mark.asyncio
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo'])
@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo'])
@pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf'])
async def test_e2e(cleanup_ray_env, shutdown_llumnix_service, model, migration_backend, launch_mode):
if migration_backend == 'gloo' and launch_mode != 'eief':
Expand Down
6 changes: 3 additions & 3 deletions tests/e2e_test/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ def get_instance_num_blocks():
@pytest.mark.asyncio
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo'])
@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo'])
@pytest.mark.parametrize("migrated_request_status", ['running', 'waiting'])
async def test_migration_benchmark(cleanup_ray_env, shutdown_llumnix_service, model, migration_backend, migrated_request_status):
if migrated_request_status == 'waiting' and migration_backend != 'rpc':
pytest.skip("When the migrated request status is waiting, only test the rpc migration backend.")
if migrated_request_status == 'waiting' and migration_backend != 'rayrpc':
pytest.skip("When the migrated request status is waiting, only test the rayrpc migration backend.")

request_migration_policy = 'SR' if migrated_request_status == 'running' else 'FCW'
ip = "127.0.0.1"
Expand Down
4 changes: 2 additions & 2 deletions tests/unit_test/backends/vllm/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def step_async_try_schedule():

self.backend_engine.engine.step_async = step_async_try_schedule

@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("migration_request_status", ['waiting', 'running'])
@pytest.mark.asyncio
async def test_migration_correctness(setup_ray_env, migration_backend, migration_request_status):
Expand Down Expand Up @@ -203,7 +203,7 @@ async def test_correctness(prompt):
await test_correctness(prompt)
que.cleanup()

@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo', 'nccl'])
@pytest.mark.asyncio
async def test_pd_diaggregation_correctness(setup_ray_env, migration_backend):
engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_test/backends/vllm/test_migration_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_gpu_cache(self):
return self.gpu_cache

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl'])
def test_migrate_cache(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migraiton_config = EngineManagerArgs(migration_buffer_blocks=3, migration_num_layers=5).create_migration_config()
Expand Down
4 changes: 2 additions & 2 deletions tests/unit_test/backends/vllm/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig,

return worker

@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl'])
def test_reserve_memory_for_migration(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migration_config = EngineManagerArgs(migration_buffer_blocks=1).create_migration_config()
Expand All @@ -77,7 +77,7 @@ def test_reserve_memory_for_migration(setup_ray_env, backend):
assert migration_cache_size == occupy_memory

@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.")
@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl'])
@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl'])
def test_rebuild_migration_backend(setup_ray_env, backend):
engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config()
migration_config = EngineManagerArgs(migration_buffer_blocks=1).create_migration_config()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_test/global_scheduler/test_global_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
def init_global_scheduler():
global_scheduler_config = GlobalSchedulerConfig(0, 'remaining_steps', 'load', math.inf,
'defrag_constrained', 3.0, True, 'avg_load',
10, 60, False, 'rpc')
10, 60, False, 'rayrpc')
global_scheduler = GlobalScheduler(global_scheduler_config)
return global_scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _get_lantecy_mem(self, *args, **kwargs):

def init_manager():
try:
engine_manager_args = EngineManagerArgs(migration_backend="rpc", enable_migration=True)
engine_manager_args = EngineManagerArgs(migration_backend="rayrpc", enable_migration=True)
engine_manager_args.log_instance_info = False
engine_manager = LLMEngineManager.from_args(engine_manager_args, None)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

def init_migration_scheduler(policy='balanced'):
instance_load_calculator = InstanceLoadCalculator('remaining_steps', True)
migration_scheduler = MigrationScheduler(policy, MIGRATE_OUT_LOAD_THRESHOLD, instance_load_calculator, 'rpc')
migration_scheduler = MigrationScheduler(policy, MIGRATE_OUT_LOAD_THRESHOLD, instance_load_calculator, 'rayrpc')
return migration_scheduler

@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_test/llumlet/test_engine_step_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def raise_error_step():
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Need at least 1 GPU to run the test.")
def test_engine_step_exception(setup_ray_env):
engine_args = EngineArgs(model="facebook/opt-125m", max_model_len=8, worker_use_ray=True)
migration_config = MigrationConfig("SR", "rpc", 16, 1, 4, 5, 20)
migration_config = MigrationConfig("SR", "rayrpc", 16, 1, 4, 5, 20)
node_id = ray.get_runtime_context().get_node_id()
scheduling_strategy = NodeAffinitySchedulingStrategy(node_id=node_id, soft=False)

Expand Down

0 comments on commit febf19c

Please sign in to comment.