Skip to content

Commit

Permalink
introduce min reping interval for cached connection testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pedohorse committed Sep 3, 2024
1 parent 27517f2 commit 57eaf53
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions src/lifeblood/net_messages/impl/tcp_message_stream_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from lifeblood.logging import get_logger
from datetime import datetime
from dataclasses import dataclass
from dataclasses import dataclass, field
from ..exceptions import MessageTransferError, MessageTransferTimeoutError
from ..interfaces import MessageStreamFactory
from ..stream_wrappers import MessageSendStream, MessageSendStreamBase
Expand All @@ -30,6 +30,7 @@ class ConnectionPoolEntry:
writer: asyncio.StreamWriter
last_used: datetime
users_count: int
last_ping_time: datetime = field(default_factory=lambda: datetime.now())
close_when_user_count_zero: bool = False
bad: bool = False

Expand Down Expand Up @@ -100,6 +101,8 @@ def __init__(self,
self.__open_connection_calls_count = 0
self.__pool_closed = asyncio.Event()
self.__timeout = timeout
# below is some arbitrary heuristics
self.__minimal_reping_interval = max(1, int(timeout/2))
if self._logger is None:
TcpMessageStreamPooledFactory._logger = get_logger('TcpMessageStreamPooledFactory')

Expand Down Expand Up @@ -173,7 +176,12 @@ async def open_sending_stream(self, destination: DirectAddress, source: DirectAd
stream_timeout=self.__timeout,
confirmation_timeout=self.__timeout)
try:
await stream.send_ping()
# this is a heuristics base on connection "freshness"
# "fresh" connections will most likely work, so extra ping will only slow things down
ping_now = datetime.now()
if (ping_now - entry.last_ping_time).total_seconds() > self.__minimal_reping_interval:
await stream.send_ping()
entry.last_ping_time = ping_now
except MessageTransferError as e:
self._logger.debug('ping failed due to %s', e)
entry.bad = True
Expand All @@ -187,7 +195,8 @@ async def open_sending_stream(self, destination: DirectAddress, source: DirectAd
entry = ConnectionPoolEntry(reader,
writer,
datetime.now(),
0)
0,
datetime.now())
self.__pool.setdefault(key, []).append(entry)
assert entry is not None

Expand Down

0 comments on commit 57eaf53

Please sign in to comment.