diff --git a/docs/Arguments.md b/docs/Arguments.md index 6d8a3c0d..dfe06df9 100644 --- a/docs/Arguments.md +++ b/docs/Arguments.md @@ -32,7 +32,7 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] [--profiling-result-file-path PROFILING_RESULT_FILE_PATH] [--gpu-type GPU_TYPE] [--polling-interval POLLING_INTERVAL] - [--migration-backend {gloo,nccl,rpc,grpc,kvtransfer}] + [--migration-backend {gloo,nccl,rayrpc,grpc,kvtransfer}] [--migration-buffer-blocks MIGRATION_BUFFER_BLOCKS] [--migration-backend-transfer-type {cuda_ipc,rdma,}] [--migration-backend-kvtransfer-naming-url MIGRATION_BACKEND_KVTRANSFER_NAMING_URL] @@ -147,8 +147,8 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h] `--migration-backend` - Communication backend of migration. -- Possible choices: gloo, rpc, grpc, kvtransfer. [gloo, rpc] are available for vllm and [grpc, kvtransfer] are available for bladellm. -- Default: "rpc" +- Possible choices: gloo, rayrpc, grpc, kvtransfer. [gloo, rayrpc] are available for vllm and [grpc, kvtransfer] are available for bladellm. +- Default: "rayrpc" `--migration-backend-transfer-type` - Transfer type for migration backend grpc and kvTransfer. diff --git a/docs/Quickstart.md b/docs/Quickstart.md index 78024cb5..4fcd605f 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -22,7 +22,7 @@ cd llumnix make install ``` -The default migration backend is RPC. If you want to use NCCL as the migration backend, run `make cupy-cuda` to install [cupy-cuda](https://pypi.org/search/?q=cupy-cuda) manually, as it is related to the CUDA version. +The default migration backend is rayrpc. If you want to use NCCL as the migration backend, run `make cupy-cuda` to install [cupy-cuda](https://pypi.org/search/?q=cupy-cuda) manually, as it is related to the CUDA version. If you want to use Gloo as migration backend, **in addition to installing cupy-cuda**, please refer to [this link](https://github.com/ZeldaHuang/pygloo/blob/main/.github/workflows/ubuntu_basic.yml#L24C1-L26C1) to install [Bazel](https://github.com/bazelbuild/bazel) >= 5.1.0. Then, run `make pygloo` to install [pygloo](https://github.com/ZeldaHuang/pygloo). diff --git a/llumnix/arg_utils.py b/llumnix/arg_utils.py index 4ff3102e..4bad0cd4 100644 --- a/llumnix/arg_utils.py +++ b/llumnix/arg_utils.py @@ -317,8 +317,8 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: help='profiling result file path') parser.add_argument('--migration-backend', type=str, - choices=['gloo','nccl','rpc','grpc','kvtransfer'], - help='communication backend of migration, [gloo, rpc] are available for vllm \ + choices=['gloo','nccl','rayrpc','grpc','kvtransfer'], + help='communication backend of migration, [gloo, rayrpc] are available for vllm \ and [grpc, kvtransfer] are available for bladellm') parser.add_argument('--migration-backend-transfer-type', type=str, diff --git a/llumnix/backends/bladellm/llm_engine.py b/llumnix/backends/bladellm/llm_engine.py index 83e3a9f4..557b1bc1 100644 --- a/llumnix/backends/bladellm/llm_engine.py +++ b/llumnix/backends/bladellm/llm_engine.py @@ -39,7 +39,7 @@ from llumnix.instance_info import InstanceInfo from llumnix.queue.queue_type import QueueType -class AsyncBackQueue(APIWrapper): +class AsyncBackQueueWrapper(APIWrapper): def __init__(self, placement_group, node_id, instance_id, output_queue_type) -> None: super().__init__(args=None, resp_queue=None) if placement_group: @@ -144,7 +144,7 @@ def instance_info(self) -> InstanceInfo: def start(self, loop: asyncio.AbstractEventLoop): super().start(loop) self._client = self.init_client_from_engine() - self.trans_wrapper: AsyncBackQueue = AsyncBackQueue(self.placement_group, + self.trans_wrapper: AsyncBackQueueWrapper = AsyncBackQueueWrapper(self.placement_group, self.node_id, self.instance_id, self.output_queue_type) self._scheduler.llumnix_metrics.engine_init_metrics(self) @@ -202,11 +202,9 @@ def __init__(self, node_id: Optional[str], *args, **kwargs, ) -> None: - logger.info("aaa") AsyncLLMEngine.__init__(self, *args, **kwargs) - logger.info("bbb") AsyncLLMEngineLlumnixMixin.__init__(self, instance_id, output_queue_type, migration_config, placement_group, node_id) - logger.info("ccc") + class PrefillAsyncLLMEngineLlumnix(AsyncLLMEngineLlumnixMixin, PrefillAsyncLLMEngine): def __init__(self, instance_id: str, diff --git a/llumnix/backends/vllm/migration_backend.py b/llumnix/backends/vllm/migration_backend.py index a6f2c375..f21c2bab 100644 --- a/llumnix/backends/vllm/migration_backend.py +++ b/llumnix/backends/vllm/migration_backend.py @@ -75,7 +75,7 @@ def __init__(self, migration_config: MigrationConfig, cache_engine: CacheEngine, self.migration_stream = torch.cuda.Stream() def init_backend(self, group_name, world_size, rank) -> bool: - logger.info("create rpc migration backend successfully.") + logger.info("create rayrpc migration backend successfully.") return True def destory_backend(self) -> None: @@ -85,7 +85,7 @@ def destory_backend(self) -> None: def warmup(self) -> bool: self.actor.exec_method.remote(self.is_driver_worker, "do_send", [0]) - logger.info("rpc migration backend warmup successfully.") + logger.info("rayrpc migration backend warmup successfully.") return True # The src actor will pack the kv-cache data layer by layer. Specifically, NumPy is used for the transfer @@ -285,7 +285,7 @@ def get_migration_backend(migration_config: MigrationConfig, cache_engine: Cache target_migration_backend = None backend = migration_config.migration_backend - assert backend in ['nccl', 'gloo', 'rpc'], "Unsupported migration backend: {} for llumnix".format(backend) + assert backend in ['nccl', 'gloo', 'rayrpc'], "Unsupported migration backend: {} for llumnix".format(backend) if backend in ['nccl', 'gloo']: target_migration_backend = RayColMigrationBackend(migration_config, cache_engine, local_rank, scheduling_strategy, diff --git a/llumnix/global_scheduler/migration_scheduler.py b/llumnix/global_scheduler/migration_scheduler.py index 9c448ebf..61516dd7 100644 --- a/llumnix/global_scheduler/migration_scheduler.py +++ b/llumnix/global_scheduler/migration_scheduler.py @@ -33,8 +33,8 @@ def __init__(self, # to prevent instances of migration backends that have not been initialized from participating in migration. migration_backend_init_filter = CustomFilter() migration_backend_init_filter.set_filter_condtition( - src_filter=lambda _: migration_backend == 'rpc', - dst_filter=lambda _: migration_backend == 'rpc') + src_filter=lambda _: migration_backend not in ['gloo', 'nccl'], + dst_filter=lambda _: migration_backend not in ['gloo', 'nccl']) self.migration_filter.register_filter("migration_backend_init_filter", migration_backend_init_filter) diff --git a/tests/e2e_test/test_e2e.py b/tests/e2e_test/test_e2e.py index 24dd896e..87b03417 100644 --- a/tests/e2e_test/test_e2e.py +++ b/tests/e2e_test/test_e2e.py @@ -62,7 +62,7 @@ def run_vllm(model, max_model_len, sampling_params): @pytest.mark.asyncio @pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) -@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo']) +@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo']) @pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf']) async def test_e2e(cleanup_ray_env, shutdown_llumnix_service, model, migration_backend, launch_mode): if migration_backend == 'gloo' and launch_mode != 'eief': diff --git a/tests/e2e_test/test_migration.py b/tests/e2e_test/test_migration.py index 700b8da3..ba0793da 100644 --- a/tests/e2e_test/test_migration.py +++ b/tests/e2e_test/test_migration.py @@ -90,11 +90,11 @@ def get_instance_num_blocks(): @pytest.mark.asyncio @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench") @pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) -@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo']) +@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo']) @pytest.mark.parametrize("migrated_request_status", ['running', 'waiting']) async def test_migration_benchmark(cleanup_ray_env, shutdown_llumnix_service, model, migration_backend, migrated_request_status): - if migrated_request_status == 'waiting' and migration_backend != 'rpc': - pytest.skip("When the migrated request status is waiting, only test the rpc migration backend.") + if migrated_request_status == 'waiting' and migration_backend != 'rayrpc': + pytest.skip("When the migrated request status is waiting, only test the rayrpc migration backend.") request_migration_policy = 'SR' if migrated_request_status == 'running' else 'FCW' ip = "127.0.0.1" diff --git a/tests/unit_test/backends/vllm/test_migration.py b/tests/unit_test/backends/vllm/test_migration.py index d247e160..d73b130e 100644 --- a/tests/unit_test/backends/vllm/test_migration.py +++ b/tests/unit_test/backends/vllm/test_migration.py @@ -83,7 +83,7 @@ async def step_async_try_schedule(): self.backend_engine.engine.step_async = step_async_try_schedule -@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) +@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo', 'nccl']) @pytest.mark.parametrize("migration_request_status", ['waiting', 'running']) @pytest.mark.asyncio async def test_migration_correctness(setup_ray_env, migration_backend, migration_request_status): @@ -203,7 +203,7 @@ async def test_correctness(prompt): await test_correctness(prompt) que.cleanup() -@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) +@pytest.mark.parametrize("migration_backend", ['rayrpc', 'gloo', 'nccl']) @pytest.mark.asyncio async def test_pd_diaggregation_correctness(setup_ray_env, migration_backend): engine_args = EngineArgs(model="facebook/opt-125m", worker_use_ray=True) diff --git a/tests/unit_test/backends/vllm/test_migration_backend.py b/tests/unit_test/backends/vllm/test_migration_backend.py index c6e23e10..5b92fb9c 100644 --- a/tests/unit_test/backends/vllm/test_migration_backend.py +++ b/tests/unit_test/backends/vllm/test_migration_backend.py @@ -37,7 +37,7 @@ def get_gpu_cache(self): return self.gpu_cache @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.") -@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl']) +@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl']) def test_migrate_cache(setup_ray_env, backend): engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config() migraiton_config = EngineManagerArgs(migration_buffer_blocks=3, migration_num_layers=5).create_migration_config() diff --git a/tests/unit_test/backends/vllm/test_worker.py b/tests/unit_test/backends/vllm/test_worker.py index a42b9f28..440bf6e9 100644 --- a/tests/unit_test/backends/vllm/test_worker.py +++ b/tests/unit_test/backends/vllm/test_worker.py @@ -56,7 +56,7 @@ def create_worker(rank: int, local_rank: int, engine_config: EngineConfig, return worker -@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl']) +@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl']) def test_reserve_memory_for_migration(setup_ray_env, backend): engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config() migration_config = EngineManagerArgs(migration_buffer_blocks=1).create_migration_config() @@ -77,7 +77,7 @@ def test_reserve_memory_for_migration(setup_ray_env, backend): assert migration_cache_size == occupy_memory @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPU to run the test.") -@pytest.mark.parametrize("backend", ['rpc', 'gloo', 'nccl']) +@pytest.mark.parametrize("backend", ['rayrpc', 'gloo', 'nccl']) def test_rebuild_migration_backend(setup_ray_env, backend): engine_config = EngineArgs(model='facebook/opt-125m', max_model_len=8, enforce_eager=True).create_engine_config() migration_config = EngineManagerArgs(migration_buffer_blocks=1).create_migration_config() diff --git a/tests/unit_test/global_scheduler/test_global_scheduler.py b/tests/unit_test/global_scheduler/test_global_scheduler.py index 18c83f85..7079c96f 100644 --- a/tests/unit_test/global_scheduler/test_global_scheduler.py +++ b/tests/unit_test/global_scheduler/test_global_scheduler.py @@ -25,7 +25,7 @@ def init_global_scheduler(): global_scheduler_config = GlobalSchedulerConfig(0, 'remaining_steps', 'load', math.inf, 'defrag_constrained', 3.0, True, 'avg_load', - 10, 60, False, 'rpc') + 10, 60, False, 'rayrpc') global_scheduler = GlobalScheduler(global_scheduler_config) return global_scheduler diff --git a/tests/unit_test/global_scheduler/test_llm_engine_manager.py b/tests/unit_test/global_scheduler/test_llm_engine_manager.py index 16befae9..595512c1 100644 --- a/tests/unit_test/global_scheduler/test_llm_engine_manager.py +++ b/tests/unit_test/global_scheduler/test_llm_engine_manager.py @@ -105,7 +105,7 @@ def _get_lantecy_mem(self, *args, **kwargs): def init_manager(): try: - engine_manager_args = EngineManagerArgs(migration_backend="rpc", enable_migration=True) + engine_manager_args = EngineManagerArgs(migration_backend="rayrpc", enable_migration=True) engine_manager_args.log_instance_info = False engine_manager = LLMEngineManager.from_args(engine_manager_args, None) except ValueError: diff --git a/tests/unit_test/global_scheduler/test_migration_scheduler.py b/tests/unit_test/global_scheduler/test_migration_scheduler.py index 89b813c3..3ed1655a 100644 --- a/tests/unit_test/global_scheduler/test_migration_scheduler.py +++ b/tests/unit_test/global_scheduler/test_migration_scheduler.py @@ -27,7 +27,7 @@ def init_migration_scheduler(policy='balanced'): instance_load_calculator = InstanceLoadCalculator('remaining_steps', True) - migration_scheduler = MigrationScheduler(policy, MIGRATE_OUT_LOAD_THRESHOLD, instance_load_calculator, 'rpc') + migration_scheduler = MigrationScheduler(policy, MIGRATE_OUT_LOAD_THRESHOLD, instance_load_calculator, 'rayrpc') return migration_scheduler @pytest.fixture diff --git a/tests/unit_test/llumlet/test_engine_step_exception.py b/tests/unit_test/llumlet/test_engine_step_exception.py index 736520d2..50c621c0 100644 --- a/tests/unit_test/llumlet/test_engine_step_exception.py +++ b/tests/unit_test/llumlet/test_engine_step_exception.py @@ -51,7 +51,7 @@ async def raise_error_step(): @pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Need at least 1 GPU to run the test.") def test_engine_step_exception(setup_ray_env): engine_args = EngineArgs(model="facebook/opt-125m", max_model_len=8, worker_use_ray=True) - migration_config = MigrationConfig("SR", "rpc", 16, 1, 4, 5, 20) + migration_config = MigrationConfig("SR", "rayrpc", 16, 1, 4, 5, 20) node_id = ray.get_runtime_context().get_node_id() scheduling_strategy = NodeAffinitySchedulingStrategy(node_id=node_id, soft=False)