Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI] Add launch modes and available blocks tests in e2e test #57

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ test: check_pytest_installed
@pytest -x -v --ignore=third_party/ --ignore=tests/e2e_test --disable-warnings
@python examlpes/offline_inference.py
@pytest -v tests/e2e_test/test_e2e.py
@pytest -v -x ./tests/e2e_test/test_migration.py

.PHONY: unit_test
unit_test: check_pytest_installed
Expand All @@ -49,6 +50,10 @@ e2e_test:
bench_test:
@pytest -v ./tests/e2e_test/test_bench.py

.PHONY: migration_test
migration_test:
@pytest -v -x ./tests/e2e_test/test_migration.py

#################### pygloo install for gloo migration backend begin ####################

BAZEL_CMD = bazel
Expand Down
16 changes: 13 additions & 3 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ def __init__(self,
self.instance_migrating: Dict[str, bool] = {}
self.pending_rebuild_migration_instances = 0
self.global_scheduler = GlobalScheduler(global_scheduler_config)
# When manager starts, it automatically connects to all existing instances.
self._connect_to_instances()

self.polling_interval = engine_manager_args.polling_interval
asyncio.create_task(self._update_instance_info_loop(self.polling_interval))
Expand Down Expand Up @@ -106,6 +104,10 @@ def __init__(self,
self.log_instance_info = engine_manager_args.log_instance_info
if self.log_instance_info:
self._init_instance_info_csv(engine_manager_args)
self.instance_last_logged_empty = {}

# When manager starts, it automatically connects to all existing instances.
self._connect_to_instances()

async def generate(
self,
Expand Down Expand Up @@ -352,6 +354,8 @@ def scale_up(self, instance_id: Union[str, Iterable[str]], llumlet_actor_handles
indeed_update = True
self.instances[ins_id] = llumlet_actor_handles[idx]
self.instance_migrating[ins_id] = False
if self.log_instance_info:
self.instance_last_logged_empty[ins_id] = False
self.pending_rebuild_migration_instances += 1
self.global_scheduler.scale_up(instance_ids)
self.num_instances = len(self.instances)
Expand All @@ -378,6 +382,8 @@ def scale_down(self, instance_id: Union[str, Iterable[str]], rebuild_migrate_bac
indeed_update = True
del self.instances[ins_id]
del self.instance_migrating[ins_id]
if self.log_instance_info:
del self.instance_last_logged_empty[ins_id]
self.pending_rebuild_migration_instances += 1
self.global_scheduler.scale_down(instance_ids)
self.num_instances = len(self.instances)
Expand Down Expand Up @@ -521,7 +527,11 @@ def _init_instance_info_csv(self, engine_manager_args: EngineManagerArgs) -> Non

def _log_instance_infos_to_csv(self, instance_infos: List[InstanceInfo]) -> None:
for instance_info in instance_infos:
if instance_info.gpu_cache_usage > 0:
instance_id = instance_info.instance_id
gpu_cache_usage = instance_info.gpu_cache_usage
should_log = (gpu_cache_usage > 0) or (gpu_cache_usage == 0 and not self.instance_last_logged_empty[instance_id])
if should_log:
self.instance_last_logged_empty[instance_id] = (gpu_cache_usage == 0)
self.instance_info_csv.writerow([
instance_info.timestamp,
instance_info.instance_id,
Expand Down
19 changes: 1 addition & 18 deletions tests/e2e_test/test_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
import os
import subprocess
import pytest
import ray
import torch
import numpy as np

from .test_e2e import generate_launch_command
from .test_e2e import generate_launch_command, clear_ray_state
from .utils import to_markdown_table

def launch_llumnix_service(command):
Expand Down Expand Up @@ -56,22 +55,6 @@ def shutdown_llumnix_service():
except Exception:
pass

def clear_ray_state():
named_actors = ray.util.list_named_actors(True)
for actor in named_actors:
try:
actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace'])
# pylint: disable=bare-except
except:
continue

try:
ray.kill(actor_handle)
# pylint: disable=bare-except
except:
continue
ray.shutdown()

def parse_log_file():
json_files = [f for f in os.listdir('.') if f.endswith('_latency_info.json')]

Expand Down
39 changes: 33 additions & 6 deletions tests/e2e_test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,38 @@

from vllm import LLM, SamplingParams


def parse_launch_mode(launch_mode: str):
# 'eief' means that enable init instance by manager and enable fixed node init instance, and so on.
if launch_mode == 'eief':
disable_init_instance_by_manager = False
disable_fixed_node_init_instance = False
elif launch_mode == 'eidf':
disable_init_instance_by_manager = False
disable_fixed_node_init_instance = True
elif launch_mode == 'dief':
disable_init_instance_by_manager = True
disable_fixed_node_init_instance = False
else:
disable_init_instance_by_manager = True
disable_fixed_node_init_instance = True
return disable_init_instance_by_manager, disable_fixed_node_init_instance

def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1",
ip: str = "127.0.0.1", port: int = 1234, instances_num = 1, dispatch_policy: str = "load",
migration_backend = "rpc", model = "facebook/opt-125m", max_model_len: int = 2048):
ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load",
migration_backend = "gloo", model = "facebook/opt-125m", max_model_len: int = 2048,
launch_mode: str = 'eief', log_instance_info: bool = False):
disable_init_instance_by_manager, disable_fixed_node_init_instance = parse_launch_mode(launch_mode)
command = (
f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 "
f"nohup python -m llumnix.entrypoints.vllm.api_server "
f"--host {ip} "
f"--port {port} "
f"{'--disable-init-instance-by-manager ' if disable_init_instance_by_manager else ''}"
f"{'--disable-fixed-node-init-instance ' if disable_fixed_node_init_instance else ''}"
f"--initial-instances {instances_num} "
f"{'--log-filename manager ' if log_instance_info else ''}"
f"{'--log-instance-info ' if log_instance_info else ''}"
f"--enable-migration "
f"--model {model} "
f"--engine-use-ray "
Expand All @@ -46,9 +69,10 @@ def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool
)
return command

def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str):
def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str, launch_mode: str):
command = generate_launch_command(model=model, max_model_len=max_model_len,
port=port, migration_backend=migration_backend)
port=port, migration_backend=migration_backend,
launch_mode=launch_mode)
subprocess.run(command, shell=True, check=True)

def shutdown_llumnix_service():
Expand Down Expand Up @@ -110,7 +134,10 @@ def run_vllm(model, max_model_len, sampling_params):
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl'])
async def test_e2e(model, migration_backend):
@pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf'])
async def test_e2e(model, migration_backend, launch_mode):
if migration_backend == 'gloo' and launch_mode != 'eief':
pytest.skip("When the migration backend is gloo, the launch mode of llumnix can only be eief")
max_model_len = 370
sampling_params = {
"n": 1,
Expand All @@ -123,7 +150,7 @@ async def test_e2e(model, migration_backend):

# generate llumnix outputs
base_port = 37037
launch_llumnix_service(model, max_model_len, base_port, migration_backend)
launch_llumnix_service(model, max_model_len, base_port, migration_backend, launch_mode)
await asyncio.sleep(60)

llumnix_output = {}
Expand Down
26 changes: 20 additions & 6 deletions tests/e2e_test/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import pytest
import torch
import pandas as pd

from .test_e2e import generate_launch_command
from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service
Expand All @@ -25,7 +26,8 @@
size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)')
speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s')

def parse_log_file(log_files):

def parse_instance_log_file(log_files):
speed_dict = defaultdict(list)

for log_file in log_files:
Expand All @@ -52,6 +54,14 @@ def parse_log_file(log_files):

return averger_speed

def parse_manager_log_file(log_file):
df = pd.read_csv(log_file)
instance_id_set = set(df["instance_id"])
for instance_id in instance_id_set:
df_instance = df[df["instance_id"] == instance_id]
num_available_gpu_blocks_list = df_instance["num_available_gpu_blocks"].to_numpy().tolist()
assert num_available_gpu_blocks_list[0] == num_available_gpu_blocks_list[-1]

@pytest.mark.asyncio
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
Expand All @@ -65,7 +75,8 @@ async def test_migration_benchmark(model, migration_backend):
output_log = f"{base_port+i}.out"
instance_output_logs.append("instance_"+output_log)
launch_command = generate_launch_command(result_filename=output_log, launch_ray_cluster=False, port=base_port+i,
model=model, dispatch_policy="flood", migration_backend=migration_backend)
model=model, dispatch_policy="flood", migration_backend=migration_backend,
log_instance_info=True)
subprocess.run(launch_command, shell=True, check=True)
await asyncio.sleep(60)

Expand All @@ -76,12 +87,15 @@ async def run_bench_command(command):

for i in range(device_count//2):
bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300,
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=10)
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=10)
await asyncio.wait_for(run_bench_command(bench_command), timeout=60*30)
await asyncio.sleep(30)

parse_manager_log_file("manager_instance.csv")

averger_speed = parse_log_file(instance_output_logs)
averger_speed = parse_instance_log_file(instance_output_logs)

sorted_keys = sorted(averger_speed.keys(), key=lambda x: float(x.split()[0]))

Expand Down
2 changes: 1 addition & 1 deletion tools/migration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ set -ex

nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \
registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \
bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_migration.py"
bash -c "pip install -e . > /dev/null && make migration_test"