Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ASGI-Tools for HTTP/WS handling #1268

Merged
merged 6 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/about/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Unreleased
- :pull:`1113` - Renamed the ``use_location`` hook's ``search`` attribute to ``query_string``.
- :pull:`1113` - Renamed the ``use_location`` hook's ``pathname`` attribute to ``path``.
- :pull:`1113` - Renamed ``reactpy.config.REACTPY_DEBUG_MODE`` to ``reactpy.config.REACTPY_DEBUG``.
- :pull:`1113` - ``@reactpy/client`` now exports ``React`` and ``ReactDOM``.
- :pull:`1263` - ReactPy no longer auto-converts ``snake_case`` props to ``camelCase``. It is now the responsibility of the user to ensure that props are in the correct format.

**Removed**
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"lxml >=4",
"servestatic >=3.0.0",
"orjson >=3",
"asgi-tools",
]
dynamic = ["version"]
urls.Changelog = "https://reactpy.dev/docs/about/changelog.html"
Expand Down
93 changes: 55 additions & 38 deletions src/reactpy/asgi/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any

import orjson
from asgi_tools import ResponseWebSocket
from asgiref import typing as asgi_types
from asgiref.compatibility import guarantee_single_callable
from servestatic import ServeStaticASGI
Expand All @@ -26,6 +27,8 @@
AsgiHttpApp,
AsgiLifespanApp,
AsgiWebsocketApp,
AsgiWebsocketReceive,
AsgiWebsocketSend,
Connection,
Location,
ReactPyConfig,
Expand Down Expand Up @@ -153,41 +156,56 @@ async def __call__(
send: asgi_types.ASGISendCallable,
) -> None:
"""ASGI app for rendering ReactPy Python components."""
dispatcher: asyncio.Task[Any] | None = None
recv_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()

# Start a loop that handles ASGI websocket events
while True:
event = await receive()
if event["type"] == "websocket.connect":
await send(
{"type": "websocket.accept", "subprotocol": None, "headers": []}
)
dispatcher = asyncio.create_task(
self.run_dispatcher(scope, receive, send, recv_queue)
)

elif event["type"] == "websocket.disconnect":
if dispatcher:
dispatcher.cancel()
break

elif event["type"] == "websocket.receive" and event["text"]:
queue_put_func = recv_queue.put(orjson.loads(event["text"]))
await queue_put_func

async def run_dispatcher(
async with ReactPyWebsocket(scope, receive, send, parent=self.parent) as ws: # type: ignore
while True:
# Wait for the webserver to notify us of a new event
event: dict[str, Any] = await ws.receive(raw=True) # type: ignore

# If the event is a `receive` event, parse the message and send it to the rendering queue
if event["type"] == "websocket.receive":
msg: dict[str, str] = orjson.loads(event["text"])
if msg.get("type") == "layout-event":
await ws.rendering_queue.put(msg)
else: # pragma: no cover
await asyncio.to_thread(
_logger.warning, f"Unknown message type: {msg.get('type')}"
)

# If the event is a `disconnect` event, break the rendering loop and close the connection
elif event["type"] == "websocket.disconnect":
break


class ReactPyWebsocket(ResponseWebSocket):
def __init__(
self,
scope: asgi_types.WebSocketScope,
receive: asgi_types.ASGIReceiveCallable,
send: asgi_types.ASGISendCallable,
recv_queue: asyncio.Queue[dict[str, Any]],
receive: AsgiWebsocketReceive,
send: AsgiWebsocketSend,
parent: ReactPyMiddleware,
) -> None:
"""Asyncio background task that renders and transmits layout updates of ReactPy components."""
super().__init__(scope=scope, receive=receive, send=send) # type: ignore
self.scope = scope
self.parent = parent
self.rendering_queue: asyncio.Queue[dict[str, str]] = asyncio.Queue()
self.dispatcher: asyncio.Task[Any] | None = None

async def __aenter__(self) -> ReactPyWebsocket:
self.dispatcher = asyncio.create_task(self.run_dispatcher())
return await super().__aenter__() # type: ignore

async def __aexit__(self, *_: Any) -> None:
if self.dispatcher:
self.dispatcher.cancel()
await super().__aexit__() # type: ignore

async def run_dispatcher(self) -> None:
"""Async background task that renders ReactPy components over a websocket."""
try:
# Determine component to serve by analyzing the URL and/or class parameters.
if self.parent.multiple_root_components:
url_match = re.match(self.parent.dispatcher_pattern, scope["path"])
url_match = re.match(self.parent.dispatcher_pattern, self.scope["path"])
if not url_match: # pragma: no cover
raise RuntimeError("Could not find component in URL path.")
dotted_path = url_match["dotted_path"]
Expand All @@ -203,10 +221,10 @@ async def run_dispatcher(

# Create a connection object by analyzing the websocket's query string.
ws_query_string = urllib.parse.parse_qs(
scope["query_string"].decode(), strict_parsing=True
self.scope["query_string"].decode(), strict_parsing=True
)
connection = Connection(
scope=scope,
scope=self.scope,
location=Location(
path=ws_query_string.get("http_pathname", [""])[0],
query_string=ws_query_string.get("http_query_string", [""])[0],
Expand All @@ -217,20 +235,19 @@ async def run_dispatcher(
# Start the ReactPy component rendering loop
await serve_layout(
Layout(ConnectionContext(component(), value=connection)),
lambda msg: send(
{
"type": "websocket.send",
"text": orjson.dumps(msg).decode(),
"bytes": None,
}
),
recv_queue.get, # type: ignore
self.send_json,
self.rendering_queue.get, # type: ignore
)

# Manually log exceptions since this function is running in a separate asyncio task.
except Exception as error:
await asyncio.to_thread(_logger.error, f"{error}\n{traceback.format_exc()}")

async def send_json(self, data: Any) -> None:
return await self._send(
{"type": "websocket.send", "text": orjson.dumps(data).decode()}
)


@dataclass
class StaticFileApp:
Expand Down
37 changes: 11 additions & 26 deletions src/reactpy/asgi/standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@
from logging import getLogger
from typing import Callable, Literal, cast, overload

from asgi_tools import ResponseHTML
from asgiref import typing as asgi_types
from typing_extensions import Unpack

from reactpy import html
from reactpy.asgi.middleware import ReactPyMiddleware
from reactpy.asgi.utils import (
dict_to_byte_list,
http_response,
import_dotted_path,
vdom_head_to_html,
)
from reactpy.asgi.utils import import_dotted_path, vdom_head_to_html
from reactpy.types import (
AsgiApp,
AsgiHttpApp,
Expand All @@ -40,7 +36,7 @@ def __init__(
self,
root_component: RootComponentConstructor,
*,
http_headers: dict[str, str | int] | None = None,
http_headers: dict[str, str] | None = None,
html_head: VdomDict | None = None,
html_lang: str = "en",
**settings: Unpack[ReactPyConfig],
Expand Down Expand Up @@ -182,44 +178,33 @@ async def __call__(

# Response headers for `index.html` responses
request_headers = dict(scope["headers"])
response_headers: dict[str, str | int] = {
response_headers: dict[str, str] = {
"etag": self._etag,
"last-modified": self._last_modified,
"access-control-allow-origin": "*",
"cache-control": "max-age=60, public",
"content-length": len(self._cached_index_html),
"content-length": str(len(self._cached_index_html)),
"content-type": "text/html; charset=utf-8",
**self.parent.extra_headers,
}

# Browser is asking for the headers
if scope["method"] == "HEAD":
return await http_response(
send=send,
method=scope["method"],
headers=dict_to_byte_list(response_headers),
)
response = ResponseHTML("", headers=response_headers)
return await response(scope, receive, send) # type: ignore

# Browser already has the content cached
if (
request_headers.get(b"if-none-match") == self._etag.encode()
or request_headers.get(b"if-modified-since") == self._last_modified.encode()
):
response_headers.pop("content-length")
return await http_response(
send=send,
method=scope["method"],
code=304,
headers=dict_to_byte_list(response_headers),
)
response = ResponseHTML("", headers=response_headers, status_code=304)
return await response(scope, receive, send) # type: ignore

# Send the index.html
await http_response(
send=send,
method=scope["method"],
message=self._cached_index_html,
headers=dict_to_byte_list(response_headers),
)
response = ResponseHTML(self._cached_index_html, headers=response_headers)
await response(scope, receive, send) # type: ignore

def process_index_html(self) -> None:
"""Process the index.html and store the results in memory."""
Expand Down
43 changes: 0 additions & 43 deletions src/reactpy/asgi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from importlib import import_module
from typing import Any

from asgiref import typing as asgi_types

from reactpy._option import Option
from reactpy.types import ReactPyConfig, VdomDict
from reactpy.utils import vdom_to_html
Expand Down Expand Up @@ -55,18 +53,6 @@ def check_path(url_path: str) -> str: # pragma: no cover
return ""


def dict_to_byte_list(
data: dict[str, str | int],
) -> list[tuple[bytes, bytes]]:
"""Convert a dictionary to a list of byte tuples."""
result: list[tuple[bytes, bytes]] = []
for key, value in data.items():
new_key = key.encode()
new_value = value.encode() if isinstance(value, str) else str(value).encode()
result.append((new_key, new_value))
return result


def vdom_head_to_html(head: VdomDict) -> str:
if isinstance(head, dict) and head.get("tagName") == "head":
return vdom_to_html(head)
Expand All @@ -76,35 +62,6 @@ def vdom_head_to_html(head: VdomDict) -> str:
)


async def http_response(
*,
send: asgi_types.ASGISendCallable,
method: str,
code: int = 200,
message: str = "",
headers: Iterable[tuple[bytes, bytes]] = (),
) -> None:
"""Sends a HTTP response using the ASGI `send` API."""
start_msg: asgi_types.HTTPResponseStartEvent = {
"type": "http.response.start",
"status": code,
"headers": [*headers],
"trailers": False,
}
body_msg: asgi_types.HTTPResponseBodyEvent = {
"type": "http.response.body",
"body": b"",
"more_body": False,
}

# Add the content type and body to everything other than a HEAD request
if method != "HEAD":
body_msg["body"] = message.encode()

await send(start_msg)
await send(body_msg)


def process_settings(settings: ReactPyConfig) -> None:
"""Process the settings and return the final configuration."""
from reactpy import config
Expand Down
5 changes: 3 additions & 2 deletions tests/test_asgi/test_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from collections.abc import MutableMapping

import pytest
from asgi_tools import ResponseText
from asgiref.testing import ApplicationCommunicator
from requests import request

import reactpy
from reactpy import html
from reactpy.asgi.standalone import ReactPy
from reactpy.asgi.utils import http_response
from reactpy.testing import BackendFixture, DisplayFixture, poll
from reactpy.testing.common import REACTPY_TESTS_DEFAULT_TIMEOUT
from reactpy.types import Connection, Location
Expand Down Expand Up @@ -180,7 +180,8 @@ async def custom_http_app(scope, receive, send) -> None:
raise ValueError("Custom HTTP app received a non-HTTP scope")

rendered.current = True
await http_response(send=send, method=scope["method"], message="Hello World")
response = ResponseText("Hello World")
await response(scope, receive, send)

scope = {
"type": "http",
Expand Down