Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix][Refactor] Fix some bugs and refine codes for large scale simulator test #93

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
16ec8af
Refine logger output text
s5u13b Jan 13, 2025
e99e519
Customize prefix for actor logs
s5u13b Jan 13, 2025
11377bf
Upgrade logger to vLLM v0.6.6.post1
s5u13b Jan 13, 2025
bc2b83b
Reorganize logger
s5u13b Jan 14, 2025
8b1533d
Remove date format
s5u13b Jan 14, 2025
4ef0c7d
Add constants module
s5u13b Jan 14, 2025
df92ab8
Log ray id for logging
s5u13b Jan 14, 2025
24383cd
Refine logging handlers configuration
s5u13b Jan 14, 2025
a4df07c
Fix lint
s5u13b Jan 14, 2025
f3c427b
Refine logger
s5u13b Jan 14, 2025
fea7fa7
Optimize constants
s5u13b Jan 17, 2025
09519bc
Fix constants
s5u13b Jan 15, 2025
eec7a0d
Refactor
s5u13b Jan 16, 2025
425c543
Fix benchmark_serving
s5u13b Jan 16, 2025
1361bf6
Minors
s5u13b Jan 16, 2025
6d90be2
Minors
s5u13b Jan 17, 2025
e7bdcd3
Add poll instance infos and migration tasks log
s5u13b Jan 17, 2025
ac2d33c
Minors
s5u13b Jan 20, 2025
a86ae92
Minors
s5u13b Jan 20, 2025
a45721b
Minors
s5u13b Jan 20, 2025
7936ff0
Fix
s5u13b Jan 20, 2025
e59cbc7
Minors
s5u13b Jan 20, 2025
22b7561
Fix
s5u13b Jan 20, 2025
63862f9
Minors
s5u13b Jan 20, 2025
30fa6f0
Fix
s5u13b Jan 20, 2025
29821e9
Minors
s5u13b Jan 21, 2025
ca7aa5b
Reorg simulator files
s5u13b Jan 21, 2025
6783a2e
Minors
s5u13b Jan 22, 2025
9bac210
Assert enable_scaling
s5u13b Jan 22, 2025
4a2fd26
Minors
s5u13b Jan 22, 2025
1c1097c
Set max_instances for auto scale up
s5u13b Jan 22, 2025
7ef370d
Add retry bind address for zmq server
s5u13b Jan 22, 2025
7856ca0
Fix lint
s5u13b Jan 22, 2025
f6bd44c
Fix unit test
s5u13b Jan 22, 2025
bfc7b54
Refine dispatch scheduler implementation
s5u13b Jan 23, 2025
e3ace2d
Support power-of-k-choice for dispatch
s5u13b Jan 23, 2025
87aa594
Fix lint
s5u13b Jan 23, 2025
653a3a0
Fix lint
s5u13b Feb 7, 2025
e9bfeb3
Fix global scheduler unit test
s5u13b Feb 7, 2025
331cd7f
Fix entrypoints unit test
s5u13b Feb 7, 2025
7a65ae6
Squashed commit of the following:
s5u13b Feb 7, 2025
5b6325c
Fix host, num_cpus, serve
s5u13b Feb 8, 2025
41588ad
Minors
s5u13b Feb 8, 2025
19c9f0d
Simulator test done
s5u13b Feb 11, 2025
2ad98e7
Fix manager unit test
s5u13b Feb 11, 2025
dd93cf0
Fix init_instances and simulator test
s5u13b Feb 12, 2025
05fc34d
Fix simulator test
s5u13b Feb 12, 2025
814bf96
Minors
s5u13b Feb 12, 2025
10c49ae
Fix ip address
s5u13b Feb 12, 2025
895d68b
Refine instance ready & migration size sort
s5u13b Feb 12, 2025
ed8588b
Fix lint
s5u13b Feb 12, 2025
73f0f15
Refine timestamps
s5u13b Feb 12, 2025
59f2b08
Resort manager and launcher functions & Fix test_manager
s5u13b Feb 12, 2025
608f17b
Fix lint
s5u13b Feb 12, 2025
a38a374
Fix correctness test
s5u13b Feb 12, 2025
58e2647
Rename power-of-k-choice to topk-random-dispatch
s5u13b Feb 19, 2025
23510f2
Fix proxy_actor
s5u13b Feb 19, 2025
876d862
Fix ray_env
s5u13b Feb 19, 2025
9b3fdd3
Fix import time in conftest
s5u13b Feb 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def __init__(self):
self._decode_sum_latencies = []
self._all_decode_token_latencies = []
self._inference_latencies = []
self._per_token_latencies_breakdown_dict = []
self._per_token_latency_breakdown_list = []

def measure(self, f):
async def measured(*args, **kwargs):
Expand Down Expand Up @@ -400,9 +400,9 @@ async def measured(*args, **kwargs):
self._all_token_latencies.append(lat_arr)
self._decode_sum_latencies.append(decode_sum_latency)
self._all_decode_token_latencies.extend(lat_arr[1:,1])
if 'per_token_latency_breakdown_dict' in output:
self._inference_latencies.append(np.mean(output['per_token_latency_breakdown_dict']['step_latency_engine']))
self._per_token_latencies_breakdown_dict.append(output['per_token_latency_breakdown_dict'])
self._inference_latencies.append(0.0)
if 'per_token_latency_breakdown_list' in output:
self._per_token_latency_breakdown_list.append(output['per_token_latency_breakdown_list'])
return prompt, output
return measured

Expand Down Expand Up @@ -494,7 +494,7 @@ async def benchmark(
m._decode_sum_latencies, \
m._request_lens, \
m._all_decode_token_latencies, \
m._per_token_latencies_breakdown_dict
m._per_token_latency_breakdown_list

def gen_random_response_lens(distribution: str, len_mean, len_range, num_prompts):
if distribution == 'uniform':
Expand Down Expand Up @@ -785,7 +785,7 @@ def main():
decode_sum_latencies, \
request_lens, \
all_decode_token_latencies, \
per_token_latencies_breakdown_dict = asyncio.run(benchmark(
per_token_latency_breakdown_list = asyncio.run(benchmark(
backend,
tokenizer,
prompts,
Expand Down Expand Up @@ -823,8 +823,8 @@ def main():
"decode_sum_latencies": decode_sum_latencies,
"all_decode_token_latencies": all_decode_token_latencies,
"inference_latencies": inference_latencies,
"per_token_latencies_breakdown_dict": per_token_latencies_breakdown_dict,
"throughput": throughput,
"per_token_latency_breakdown_list": per_token_latency_breakdown_list,
"throughput": throughput,
"instance_num": avg_instance_num})
json.dump(results, f)

Expand Down
11 changes: 8 additions & 3 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--scaling-load-metric {remaining_steps,usage_ratio}]
[--polling-interval POLLING_INTERVAL]
[--dispatch-policy {balanced,load,queue,rr}]
[--topk-random-dispatch TOPK_RANDOM_DISPATCH]
[--enable-migration]
[--enable-defrag]
[--pair-migration-frequency PAIR_MIGRATION_FREQUENCY]
[--pair-migration-policy {balanced,defrag_constrained,defrag_relaxed}]
[--pair-migration-policy {balanced,defrag}]
[--migrate-out-threshold MIGRATE_OUT_THRESHOLD]
[--request-migration-policy {LCR,SR,LR,FCW,FCWSR}]
[--enable-scaling]
Expand Down Expand Up @@ -139,6 +140,10 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Possible choices: balanced, load, queue, rr
- Default: "load"

`--topk-random-dispatch`
- Number of candidate random dispatch instances for dispatch policy.
- Default: 1

`--enable-migration`
- Enable migrate requests between instances.

Expand All @@ -151,8 +156,8 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--pair-migration-policy`
- Pair migration policy.
- Possible choices: balanced, defrag_constrained, defrag_relaxed
- Default: "defrag_constrained"
- Possible choices: balanced, defrag
- Default: "defrag"

`--migrate-out-threshold`
- Migrate out instance load threshold.
Expand Down
2 changes: 1 addition & 1 deletion docs/Simulator.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Llumnix can generate latency data from logs. After run a real benchmark with `--

After running profiling with `python llumnix.backends.profiling.py`. You can get a `$PROFILING_RESULT_FILE_PATH.pkl`

Then, you can run simulator with `--profiling-result-file-path PROFILING_RESULT_FILE_PATH`.
Then, you can run simulator with `--simulator-mode` and `--profiling-result-file-path PROFILING_RESULT_FILE_PATH`.


```
Expand Down
23 changes: 18 additions & 5 deletions llumnix/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def add_argument(self, *args, **kwargs):
kwargs['default'] = None
super().add_argument(*args, **kwargs)


@dataclass
class EntrypointsArgs:
host: str = None
Expand Down Expand Up @@ -112,13 +113,15 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
help="path to config file of arguments")
return parser


@dataclass
class ManagerArgs:
initial_instances: int = None

polling_interval: float = None
dispatch_policy: str = None
scaling_load_metric: str = None
topk_random_dispatch: int = None

enable_migration: bool = None
pair_migration_frequency: int = None
Expand Down Expand Up @@ -174,6 +177,7 @@ def create_global_scheduler_config(self, is_group_kind_migration_backend) -> Tup
# Create the GlobalScheduler Configuration.
global_scheduler_config = GlobalSchedulerConfig(self.initial_instances,
self.dispatch_policy,
self.topk_random_dispatch,
self.pair_migration_policy,
self.migrate_out_threshold,
self.scaling_policy,
Expand Down Expand Up @@ -205,6 +209,8 @@ def check_args(cls, args: 'ManagerArgs', parser: argparse.ArgumentParser):
assert not args.enable_port_offset_store or args.enable_port_increment, \
"Set enable_port_increment when enable_port_offset_store"

assert not args.enable_scaling, "Proactive auto-scaling is deprecated now, all auto-scaling related args will not take effects."

@staticmethod
def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument('--initial-instances',
Expand All @@ -226,6 +232,13 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
'* "queue" dispatch request to the instance with minimum waiting request queue length.\n'
'* "flood" dispatch request to the instance with maximum requests dispatched.\n'
'* "rr" dispatch requests with round-robin policy.\n')
parser.add_argument('--topk-random-dispatch',
type=int,
help='number of candidate random dispatch instances for dispatch policy.\n\n'
'The candidate instances are first selected according to the load'
'(including factors such as load, queue size, etc.) based on the dispatch policy,'
'and then one of them is randomly chosen to receive the request for better load balancing.')

parser.add_argument('--enable-migration',
action='store_true',
help='enable migrate requests between instances')
Expand All @@ -234,13 +247,11 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
help='pair migration frequency')
parser.add_argument('--pair-migration-policy',
type=str,
choices=['balanced', 'defrag_constrained', 'defrag_relaxed'],
choices=['balanced', 'defrag'],
help='The pair migration policy.\n\n'
'* "balanced" pair migration to make the instance load of instance more balanced.\n'
'* "defrag_constrained" pair migration without balanced constraint to '
'achieve defragmentation thoroughly (with instance constraints).\n'
'* "defrag_relaxed" pair migration to without balanced constraint '
'to achieve defragmentation thoroughly (without instance constraints).\n')
'* "defrag" pair migration without balanced constraint to '
'achieve defragmentation thoroughly (with instance constraints).\n')
parser.add_argument('--migrate-out-threshold',
type=float,
help='migrate out instance load threshold')
Expand Down Expand Up @@ -289,11 +300,13 @@ def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
help='the prefill decode ratio used in gloabl launch model e.g. "1:1"')
return parser


@dataclass
class LaunchArgs:
launch_mode: LaunchMode = None
backend_type: BackendType = None


@dataclass
class InstanceArgs:
instance_type: str = None
Expand Down
4 changes: 4 additions & 0 deletions llumnix/backends/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
def _pad_to_alignment(x, multiple_of):
return x + ((-1*x) % multiple_of)


@dataclasses.dataclass
class LatencyMemData:
# The latency of each stage
Expand Down Expand Up @@ -69,6 +70,7 @@ def get_prefill_dict_kv(self):
def get_decode_dict_kv(self):
return map(list, zip(*self.decode_latency.items()))


@dataclasses.dataclass
class ProfilingResult:
"""Store the profiling result of a model."""
Expand Down Expand Up @@ -127,6 +129,7 @@ def fit_from_database(self, parallel_config: SimParallelConfig):
avg_loss += abs(sim_lat - latency_list[idx])
print(f"decode sim avg_loss={avg_loss/len(latency_list)}")


class ProfilingDatabase:
"""Store the profiling results of all the models"""
def __init__(self, database_filename: str, new_database: bool = False):
Expand Down Expand Up @@ -198,6 +201,7 @@ def get_latency_mem(backend_type: BackendType, profiling_database: ProfilingData
return latency_mem
raise ValueError(f'Unsupported simulator backend: {backend_type}')


if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
Expand Down
7 changes: 3 additions & 4 deletions llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from llumnix.logging.logger import init_logger
from llumnix.utils import get_instance_name
from llumnix.internal_config import MigrationConfig
from llumnix.metrics.timestamps import set_timestamp

logger = init_logger(__name__)

Expand Down Expand Up @@ -55,16 +56,14 @@ async def put_nowait_to_servers(self,
tasks = []
for server_id, req_outputs in server_request_outputs.items():
server_info = server_info_dict[server_id]
for req_output in req_outputs:
if hasattr(req_output, 'request_timestamps'):
req_output.request_timestamps.engine_actor_put_queue_timestamp = time.time()
set_timestamp(req_outputs, 'engine_actor_put_queue_timestamp', time.time())
tasks.append(asyncio.create_task(self.request_output_queue_client.put_nowait(req_outputs, server_info)))
rets = await asyncio.gather(*tasks, return_exceptions=True)
for idx, ret in enumerate(rets):
if isinstance(ret, Exception):
server_id = list(server_request_outputs.keys())[idx]
server_info = server_info_dict[server_id]
logger.warning("Server {} is dead.".format(server_id))
logger.error("Server {} is dead, exception: {}".format(server_id, ret))
if self.request_output_queue_type == QueueType.ZMQ:
logger.warning("request output queue ip: {}, port: {}".format(server_info.request_output_queue_ip,
server_info.request_output_queue_port))
Expand Down
67 changes: 0 additions & 67 deletions llumnix/backends/vllm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# limitations under the License.

import time
import asyncio

from collections import defaultdict
from typing import Callable, Dict, List, Optional, Tuple, Type
Expand All @@ -22,7 +21,6 @@
from ray.util.placement_group import PlacementGroup

from vllm.executor.executor_base import ExecutorBase
from vllm.model_executor.layers.sampler import SamplerOutput, CompletionSequenceGroupOutput
from vllm.executor.ray_gpu_executor import RayGPUExecutor, RayGPUExecutorAsync, RayWorkerWrapper, envs, \
get_ip, get_vllm_instance_id, get_distributed_init_method, get_open_port
from vllm.worker.worker_base import WorkerBase
Expand Down Expand Up @@ -263,68 +261,3 @@ async def execute_model_async(self, *args, **kwargs):
t1 = time.time()
self.last_inference_latency = (t1 - t0) * 1000
return outputs

class SimGPUExecutor(RayGPUExecutor):
latency_mem: LatencyMemData = None
def __init__(self, *args, **kwargs) -> None:
RayGPUExecutor.__init__(self, *args, **kwargs)
self.last_inference_latency = 0
self.migration_bandwidth = self.latency_mem.migration_bandwidth
# TODO(ZeldaHuang): add swap bandwidth

self.cache_block_size = get_cache_block_size(
self.cache_config.block_size, self.model_config, self.parallel_config)
self.cache_block_size /= GiB_bytes
self.sim_cache_config = SimCacheConfig(self.cache_config.gpu_memory_utilization,
self.cache_config.block_size,
self.scheduler_config.max_num_batched_tokens)

def _init_executor(self) -> None:
pass

def determine_num_available_blocks(self) -> Tuple[int, int]:
num_gpu_blocks = self.latency_mem.cache_dict.get(self.sim_cache_config, 880)
num_cpu_blocks = 2048
return (num_gpu_blocks, num_cpu_blocks)

def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
logger.info("# GPU blocks: {}, # CPU blocks: {}".format(num_gpu_blocks, num_cpu_blocks))

async def execute_model_async(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
prefill_seq_len = 0
decode_seq_len = 0
decode_bs = 0
for meta_data in execute_model_req.seq_group_metadata_list:
if meta_data.is_prompt:
prefill_seq_len += meta_data.token_chunk_size
else:
decode_bs += meta_data.token_chunk_size
decode_seq_len += list(meta_data.seq_data.values())[0].get_len()
decode_bs = _pad_to_alignment(decode_bs, 8)
prefill_seq_len = _pad_to_alignment(prefill_seq_len, 8)
latency = 0
if prefill_seq_len:
latency += self.latency_mem.prefill_latency[prefill_seq_len][0] if prefill_seq_len in self.latency_mem.prefill_latency \
else model_prefill(prefill_seq_len, *self.latency_mem.prefill_model_params)
if decode_bs:
decode_meta_data = (decode_bs, decode_seq_len)
latency += self.latency_mem.decode_latency[decode_meta_data][0] if decode_meta_data in self.latency_mem.decode_latency \
else model_decode((decode_bs, decode_seq_len), *self.latency_mem.decode_model_params)
await asyncio.sleep(latency/1000)
sampler_outputs = []
for meta_data in execute_model_req.seq_group_metadata_list:
samples = []
for seq_id in meta_data.seq_data.keys():
dummy_sample_output = SequenceOutput(seq_id, 20, {20: Logprob(1.0)})
samples.append(dummy_sample_output)
if samples:
output = CompletionSequenceGroupOutput(samples, None)
sampler_outputs.append(output)
return [SamplerOutput(outputs=sampler_outputs)]

async def send_blocks(self, blocks_len) -> None:
migration_latency = (self.cache_block_size * blocks_len) / self.migration_bandwidth
await asyncio.sleep(migration_latency)
Loading