Skip to content

Commit

Permalink
Merge pull request #120 from dapper91/dev
Browse files Browse the repository at this point in the history
- initialize dispatcher middleware stack in constructor.
- generic context type added.
  • Loading branch information
dapper91 authored Feb 15, 2025
2 parents 1f490d3 + 7328073 commit e2132c0
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 108 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

1.13.0 (2025-02-15)
------------------

- initialize dispatcher middleware stack in constructor.
- generic context type added.


1.12.2 (2025-01-15)
------------------

Expand Down
2 changes: 1 addition & 1 deletion pjrpc/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__description__ = 'Extensible JSON-RPC library'
__url__ = 'https://github.com/dapper91/pjrpc'

__version__ = '1.12.2'
__version__ = '1.13.0'

__author__ = 'Dmitry Pershin'
__email__ = 'dapper1291@gmail.com'
Expand Down
74 changes: 39 additions & 35 deletions pjrpc/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def wrapped(*args: Any, **kwargs: Any) -> 'BaseBatch.BaseProxy':
return wrapped

@abc.abstractmethod
def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes an RPC call.
:param _trace_ctx: tracers request context
"""

@abc.abstractmethod
def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes an RPC call.
Expand Down Expand Up @@ -97,7 +97,7 @@ def __call__(self, method: str, *args: Any, **kwargs: Any) -> 'BaseBatch':
return self.add(method, *args, **kwargs)

@abc.abstractmethod
def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes a JSON-RPC request.
Expand All @@ -107,7 +107,7 @@ def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitab

@abc.abstractmethod
def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Union[Awaitable[Optional[BatchResponse]], Optional[BatchResponse]]:
"""
Sends a JSON-RPC batch request.
Expand Down Expand Up @@ -183,10 +183,10 @@ class Proxy(BaseBatch.BaseProxy):
def __init__(self, batch: 'Batch'):
super().__init__(batch)

def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return self.call(_trace_ctx)

def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return self._batch.call(_trace_ctx)

@property
Expand All @@ -196,13 +196,13 @@ def proxy(self) -> 'Proxy':
def __init__(self, client: 'AbstractClient'):
super().__init__(client)

def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Optional[Any]:
response = self.send(self._requests)
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Optional[Any]:
response = self.send(self._requests, _trace_ctx=_trace_ctx)

return response.result if response is not None else None

def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Optional[BatchResponse]:
return cast(
Optional[BatchResponse], self._client._send(
Expand All @@ -225,10 +225,10 @@ class Proxy(BaseBatch.BaseProxy):
def __init__(self, batch: 'AsyncBatch'):
super().__init__(batch)

async def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
async def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return await self.call(_trace_ctx)

async def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
async def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return await self._batch.call(_trace_ctx)

@property
Expand All @@ -238,13 +238,13 @@ def proxy(self) -> 'Proxy':
def __init__(self, client: 'AbstractAsyncClient'):
super().__init__(client)

async def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Optional[Any]:
async def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Optional[Any]:
response = await self.send(self._requests, _trace_ctx=_trace_ctx)

return response.result if response is not None else None

async def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Optional[BatchResponse]:
return await cast(
Awaitable[Optional[BatchResponse]], self._client._send(
Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(
json_decoder: Optional[json.JSONDecoder] = None,
strict: bool = True,
request_args: Optional[Dict[str, Any]] = None,
tracers: Iterable[Tracer] = (),
tracers: Iterable[Tracer[Any]] = (),
retry_strategy: Optional[retry.RetryStrategy] = None,
):
self.request_class = request_class
Expand All @@ -326,7 +326,7 @@ def __call__(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Union[Awaitable[Any], Any]:
"""
Expand All @@ -339,7 +339,7 @@ def __call__(
:returns: response result
"""

return self.call(method, *args, **kwargs)
return self.call(method, *args, _trace_ctx=_trace_ctx, **kwargs)

@property
def proxy(self) -> 'Proxy':
Expand All @@ -359,7 +359,7 @@ def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Union[Awaitable[Optional[AbstractResponse]], Optional[AbstractResponse]]:
pass
Expand Down Expand Up @@ -407,7 +407,7 @@ def notify(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[Response]:
"""
Expand All @@ -429,7 +429,7 @@ def notify(
return self.send(request, _trace_ctx=_trace_ctx)

def call(
self, method: str, *args: Any, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, method: str, *args: Any, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Any:
"""
Makes JSON-RPC call.
Expand All @@ -456,7 +456,7 @@ def call(
def send(
self,
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
_retry_strategy: MaybeSet[retry.RetryStrategy] = UNSET,
**kwargs: Any,
) -> Optional[Response]:
Expand Down Expand Up @@ -486,25 +486,27 @@ def traced(method: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(
self: 'AbstractClient',
request: AbstractRequest,
_trace_ctx: SimpleNamespace,
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
"""
Adds tracing logic to the method.
"""

trace_ctx = _trace_ctx or SimpleNamespace()

for tracer in self._tracers:
tracer.on_request_begin(_trace_ctx, request)
tracer.on_request_begin(trace_ctx, request)

try:
response = method(self, request, _trace_ctx=_trace_ctx, **kwargs)
response = method(self, request, _trace_ctx=trace_ctx, **kwargs)
except BaseException as e:
for tracer in self._tracers:
tracer.on_error(_trace_ctx, request, e)
tracer.on_error(trace_ctx, request, e)
raise

for tracer in self._tracers:
tracer.on_request_end(_trace_ctx, request, response)
tracer.on_request_end(trace_ctx, request, response)

return response

Expand Down Expand Up @@ -541,7 +543,7 @@ def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
kwargs = {**self._request_args, **kwargs}
Expand Down Expand Up @@ -588,7 +590,7 @@ async def _request(self, request_text: str, is_notification: bool = False, **kwa
async def send(
self,
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
_retry_strategy: MaybeSet[retry.RetryStrategy] = UNSET,
**kwargs: Any,
) -> Optional[Response]:
Expand Down Expand Up @@ -618,25 +620,27 @@ def traced(method: Callable[..., Any]) -> Callable[..., Any]:
async def wrapper(
self: 'AbstractAsyncClient',
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Response:
"""
Adds tracing logic to the method.
"""

trace_ctx = _trace_ctx or SimpleNamespace()

for tracer in self._tracers:
tracer.on_request_begin(_trace_ctx, request)
tracer.on_request_begin(trace_ctx, request)

try:
response = await method(self, request, _trace_ctx=_trace_ctx, **kwargs)
response = await method(self, request, _trace_ctx=trace_ctx, **kwargs)
except BaseException as e:
for tracer in self._tracers:
tracer.on_error(_trace_ctx, request, e)
tracer.on_error(trace_ctx, request, e)
raise

for tracer in self._tracers:
tracer.on_request_end(_trace_ctx, request, response)
tracer.on_request_end(trace_ctx, request, response)

return response

Expand Down Expand Up @@ -673,7 +677,7 @@ async def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
kwargs = {**self._request_args, **kwargs}
Expand All @@ -697,7 +701,7 @@ async def notify(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[Response]:
"""
Expand All @@ -719,7 +723,7 @@ async def notify(
return await self.send(request, _trace_ctx=_trace_ctx)

async def call(
self, method: str, *args: Any, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, method: str, *args: Any, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Any:
"""
Makes JSON-RPC call.
Expand Down
21 changes: 11 additions & 10 deletions pjrpc/client/tracer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import logging
from types import SimpleNamespace
from typing import Optional
from typing import Generic, Optional, TypeVar

from pjrpc import AbstractRequest, AbstractResponse

client_logger = logging.getLogger(__package__)

ContextType = TypeVar('ContextType')

class Tracer:

class Tracer(Generic[ContextType]):
"""
JSON-RPC client tracer.
"""

def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequest) -> None:
def on_request_begin(self, trace_context: ContextType, request: AbstractRequest) -> None:
"""
Handler called before JSON-RPC request begins.
Expand All @@ -21,7 +22,7 @@ def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequ
"""

def on_request_end(
self, trace_context: SimpleNamespace, request: AbstractRequest, response: Optional[AbstractResponse],
self, trace_context: ContextType, request: AbstractRequest, response: Optional[AbstractResponse],
) -> None:
"""
Handler called after JSON-RPC request ends.
Expand All @@ -32,7 +33,7 @@ def on_request_end(
"""

def on_error(
self, trace_context: SimpleNamespace, request: AbstractRequest, error: BaseException,
self, trace_context: ContextType, request: AbstractRequest, error: BaseException,
) -> None:
"""
Handler called when JSON-RPC request failed.
Expand All @@ -43,7 +44,7 @@ def on_error(
"""


class LoggingTracer(Tracer):
class LoggingTracer(Tracer[ContextType], Generic[ContextType]):
"""
JSON-RPC client logging tracer.
"""
Expand All @@ -52,15 +53,15 @@ def __init__(self, logger: logging.Logger = client_logger, level: int = logging.
self._logger = logger
self._level = level

def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequest) -> None:
def on_request_begin(self, trace_context: ContextType, request: AbstractRequest) -> None:
self._logger.log(self._level, "sending request: %r", request)

def on_request_end(
self, trace_context: SimpleNamespace, request: AbstractRequest, response: Optional[AbstractResponse],
self, trace_context: ContextType, request: AbstractRequest, response: Optional[AbstractResponse],
) -> None:
self._logger.log(self._level, "received response: %r", response)

def on_error(
self, trace_context: SimpleNamespace, request: AbstractRequest, error: BaseException,
self, trace_context: ContextType, request: AbstractRequest, error: BaseException,
) -> None:
self._logger.log(self._level, "request '%s' sending error: %r", request, error)
Loading

0 comments on commit e2132c0

Please sign in to comment.