Skip to content

Commit

Permalink
Add async support for openai server to enable faster speed
Browse files Browse the repository at this point in the history
  • Loading branch information
kcz358 committed Aug 27, 2024
1 parent cbc8599 commit 0d02bad
Showing 1 changed file with 75 additions and 90 deletions.
165 changes: 75 additions & 90 deletions lmms_eval/models/srt_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from accelerate import Accelerator, DistributedType
import asyncio
import base64
from io import BytesIO
from copy import deepcopy
from decord import VideoReader, cpu
import numpy as np
from openai import OpenAI
from multiprocessing import cpu_count
from openai import AsyncOpenAI
from PIL import Image
import os
import json
Expand Down Expand Up @@ -43,6 +45,7 @@ def __init__(
chunked_prefill_size: int = 16384,
continual_mode: bool = False,
response_persistent_folder: str = None,
num_processes: int = cpu_count() // 2,
**kwargs,
) -> None:
super().__init__()
Expand Down Expand Up @@ -88,7 +91,8 @@ def __init__(
other_args=other_args,
)
self.base_url += "/v1"
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
self.client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
self.num_processes = num_processes
# assert self.batch_size_per_gpu == 1, "Llava currently does not support batched generation. See https://github.com/haotian-liu/LLaVA/issues/754. HF Llava also has this issue."
if accelerator.num_processes > 1:
assert accelerator.distributed_type in [DistributedType.FSDP, DistributedType.MULTI_GPU, DistributedType.DEEPSPEED], "Unsupported distributed type provided. Only DDP and FSDP are supported."
Expand Down Expand Up @@ -155,100 +159,81 @@ def flatten(self, input):
new_list.append(j)
return new_list

async def generate(self, request):
contexts, gen_kwargs, doc_to_visual, doc_id, task, split = request.args
visuals = [doc_to_visual(self.task_dict[task][split][doc_id])]
visuals = self.flatten(visuals)
imgs = [] # multiple images or frames for video
for visual in visuals:
if self.modality == "image":
img = self.encode_image(visual)
imgs.append(img)
elif self.modality == "video":
try:
frames = self.encode_video(visual, self.max_frames_num)
imgs.extend(frames)
except Exception as e:
eval_logger.error(f"Exception : {e} \n When loading video {visual}")
imgs = None
break

# Handling video decode error
# If we can't even load using pyav, then we will skip
if imgs is None:
resps = ""
return resps

messages = []

# put the images in the first place
content = []
for img in imgs:
content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}})

content.append({"type": "text", "text": contexts})
messages.append({"role": "user", "content": content})

if "max_new_tokens" not in gen_kwargs:
gen_kwargs["max_new_tokens"] = 1024

if "temperature" not in gen_kwargs:
gen_kwargs["temperature"] = 0

for attempt in range(5):
try:
response = await self.client.chat.completions.create(model=self.model_version, messages=messages, temperature=gen_kwargs["temperature"], max_tokens=gen_kwargs["max_new_tokens"], timeout=self.timeout)
response_text = response.choices[0].message.content.strip()
break # If successful, break out of the loop

except Exception as e:
eval_logger.info(f"Attempt {attempt + 1} failed with error: {str(e)}.")
if attempt < 4:
time.sleep(NUM_SECONDS_TO_SLEEP)
else: # If this was the last attempt, log and return empty string
eval_logger.error(f"All 5 attempts failed. Last error message: {str(e)}.")
response_text = ""

return response_text

def generate_until(self, requests) -> List[str]:
res = []
pbar = tqdm(total=len(requests), disable=(self.rank != 0), desc="Model Responding")

for contexts, gen_kwargs, doc_to_visual, doc_id, task, split in [reg.args for reg in requests]:
if self.continual_mode is True and self.cache_mode == "resume":
doc_uuid = f"{task}___{split}___{doc_id}"
if doc_uuid in self.response_cache:
response_text = self.response_cache[doc_uuid]
if response_text:
res.append(response_text)
pbar.update(1)
continue

visuals = [doc_to_visual(self.task_dict[task][split][doc_id])]
visuals = self.flatten(visuals)
imgs = [] # multiple images or frames for video
for visual in visuals:
if self.modality == "image":
img = self.encode_image(visual)
imgs.append(img)
elif self.modality == "video":
try:
frames = self.encode_video(visual, self.max_frames_num)
imgs.extend(frames)
except Exception as e:
eval_logger.error(f"Exception : {e} \n When loading video {visual}")
imgs = None
break

# Handling video decode error
# If we can't even load using pyav, then we will skip
if imgs is None:
resps = ""
res.append(resps)
async def run(requests):
sem = asyncio.Semaphore(self.num_processes)

async def _process(request):
async with sem:
return await self.generate(request)

tasks = [asyncio.create_task(_process(request)) for request in requests]
for completed_task in asyncio.as_completed(tasks):
result = await completed_task
res.append(result)

This comment has been minimized.

Copy link
@Maxwell-Lyu

Maxwell-Lyu Sep 11, 2024

This need a serious review. The responses may be in a different order of requests, because as_completed is a non-blocking method which returns results from "faster" task before "slower" ones, not respecting the original order when the tasks are created.
This makes srt_api a rolling dice.

pbar.update(1)
continue

messages = []

# put the images in the first place
content = []
for img in imgs:
content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}})

content.append({"type": "text", "text": contexts})
messages.append({"role": "user", "content": content})
# if self.image_token not in contexts: # single image format
# content = []
# for img in imgs:
# content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}})

# content.append({"type": "text", "text": contexts})
# messages.append({"role": "user", "content": content})
# else: # interleaved format
# contexts = contexts.split(self.image_token)
# for idx, img in enumerate(imgs):
# content = [
# {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}},
# {"type": "text", "text": contexts[idx]},
# ]
# messages.append({"role": "user", "content": content})
# messages.append({"role": "user", "content": [{"type": "text", "text": contexts[-1]}]})

if "max_new_tokens" not in gen_kwargs:
gen_kwargs["max_new_tokens"] = 1024

if "temperature" not in gen_kwargs:
gen_kwargs["temperature"] = 0

for attempt in range(5):
try:
response = self.client.chat.completions.create(model=self.model_version, messages=messages, temperature=gen_kwargs["temperature"], max_tokens=gen_kwargs["max_new_tokens"], timeout=self.timeout)
response_text = response.choices[0].message.content.strip()
break # If successful, break out of the loop

except Exception as e:
eval_logger.info(f"Attempt {attempt + 1} failed with error: {str(e)}.")
if attempt < 4:
time.sleep(NUM_SECONDS_TO_SLEEP)
else: # If this was the last attempt, log and return empty string
eval_logger.error(f"All 5 attempts failed. Last error message: {str(e)}.")
response_text = ""

res.append(response_text)
pbar.update(1)

if self.continual_mode is True: # Cache the response
doc_uuid = f"{task}___{split}___{doc_id}"
self.response_cache[doc_uuid] = response_text
with open(self.response_persistent_file, "w") as f:
json.dump(self.response_cache, f)

pbar.close()
asyncio.run(run(requests))

return res

def loglikelihood(self, requests: List[Instance]) -> List[Tuple[float, bool]]:
Expand Down

0 comments on commit 0d02bad

Please sign in to comment.