Skip to content

Problem with await self.task_manager.on_send_task_subscribe call #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
joyoungzhang opened this issue Apr 21, 2025 · 0 comments
Open

Comments

@joyoungzhang
Copy link

Problem Description

In the current implementation, there is an issue with the await self.task_manager.on_send_task_subscribe(json_rpc_request) call in server.py. The problem occurs because:

  1. The method on_send_task_subscribe() is declared to return Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]

  2. When it returns an AsyncIterable (asynchronous generator), we cannot directly use await on it

  3. This causes the error: TypeError: object async_generator can't be used in 'await' expression

Expected Behavior

For streaming requests (SendTaskStreamingRequest), the system should properly handle the asynchronous streaming response without raising exceptions.

Current Behavior

The code attempts to await an asynchronous generator, which is not a valid operation in Python, resulting in the mentioned error.

Solution Options

Option 1: Change the calling code (Recommended)

elif isinstance(json_rpc_request, SendTaskStreamingRequest):
    # Don't await the async generator, pass it directly
    result = self.task_manager.on_send_task_subscribe(json_rpc_request)

Option 2: Modify the method implementation (If keeping await is necessary)
Change on_send_task_subscribe() to return a coroutine that collects all streaming responses:

async def on_send_task_subscribe(
    self, request: SendTaskStreamingRequest
) -> Union[List[SendTaskStreamingResponse], JSONRPCResponse]:
    responses = []
    async for item in original_async_gen:
        responses.append(item)
    return responses

Recommendation

Option 1 is preferred as it maintains the streaming nature of the response. Option 2 would convert the streaming response into a batch response, which may not be desirable for the intended use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant