Skip to content

Commit

Permalink
cli server: better thread name, auto-trimming login timeout connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Fallen-Breath committed Mar 10, 2022
1 parent ae1ad20 commit 8eca77c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 30 deletions.
3 changes: 1 addition & 2 deletions chatbridge/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ def reply_command(self, target: str, asker_payload: 'CommandPayload', result: Un
# Keep Alive
# --------------

@classmethod
def _get_keep_alive_thread_name(cls):
def _get_keep_alive_thread_name(self):
return 'KeepAlive'

def _start_keep_alive_thread(self) -> Thread:
Expand Down
3 changes: 1 addition & 2 deletions chatbridge/core/network/basic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from threading import Thread, current_thread, RLock
from typing import NamedTuple, Callable, List, Optional
from typing import NamedTuple, Callable, Optional

from chatbridge.common.logger import ChatBridgeLogger
from chatbridge.core.network.cryptor import AESCryptor
Expand Down Expand Up @@ -41,7 +41,6 @@ def _start_thread(self, target: Callable, name: str) -> Thread:
self.logger.debug('Started thread {}: {}'.format(name, thread))
return thread

@classmethod
def _get_main_loop_thread_name(cls):
return 'MainLoop'

Expand Down
96 changes: 74 additions & 22 deletions chatbridge/core/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import socket
import time
from concurrent.futures.thread import ThreadPoolExecutor
from threading import Thread, Event, RLock
from typing import Dict, Optional
from threading import Thread, Event, RLock, Lock, current_thread
from typing import Dict, Optional, List, NamedTuple

from chatbridge.common import constants
from chatbridge.core.client import ChatBridgeClient, ClientStatus
Expand All @@ -23,6 +24,12 @@ def __init__(self, server: 'ChatBridgeServer', info: ClientInfo):
def get_logging_name(self) -> str:
return 'Server.{}'.format(self.get_connection_client_name())

def _get_main_loop_thread_name(self):
return super()._get_main_loop_thread_name() + '.' + self.get_connection_client_name()

def _get_keep_alive_thread_name(self):
return super()._get_main_loop_thread_name() + '.' + self.get_connection_client_name()

def get_logging_file_name(self) -> Optional[str]:
return None

Expand Down Expand Up @@ -66,11 +73,26 @@ def restart_connection(self, conn: socket.socket, addr: Address):
self.start()


class ComingConnection(NamedTuple):
sock: socket.socket
addr: Address
thread: Thread
start_time: float

@classmethod
def create(cls, sock: socket.socket, addr: Address) -> 'ComingConnection':
return ComingConnection(sock=sock, addr=addr, thread=current_thread(), start_time=time.time())


class ChatBridgeServer(ChatBridgeBase):
MAXIMUM_LOGIN_DURATION = 20 # 20s

def __init__(self, aes_key: str, server_address: Address):
super().__init__('Server', aes_key)
self.server_address = server_address
self.clients: Dict[str, _ClientConnection] = {}
self.__coming_connections: List[ComingConnection] = []
self.__coming_connections_lock = Lock()
self.__sock: Optional[socket.socket] = None
self.__thread_run: Optional[Thread] = None
self.__stop_lock = RLock()
Expand Down Expand Up @@ -99,11 +121,16 @@ def _main_loop(self):
self.__binding_done.set()
try:
self.__sock.listen(5)
self.__sock.settimeout(3)
self.logger.info('Server started at {}'.format(self.server_address))
counter = 0
while self.is_running():
try:
conn, addr = self.__sock.accept()
self.__trim_coming_connections()
try:
conn, addr = self.__sock.accept()
except socket.timeout:
continue
if not self.is_running():
conn.close()
break
Expand Down Expand Up @@ -145,29 +172,54 @@ def stop(self):
self.__stop()
super().stop()

def __trim_coming_connections(self):
with self.__coming_connections_lock:
to_be_removed = []
current_time = time.time()
for cc in self.__coming_connections:
if current_time - cc.start_time > self.MAXIMUM_LOGIN_DURATION:
self.logger.warning('Terminating coming connection from {} in thread {} due to login timeout'.format(cc.addr, cc.thread.getName()))
try:
cc.sock.close()
except:
self.logger.exception('Error terminating coming connection from {} in thread {}'.format(cc.addr, cc.thread.getName()))
to_be_removed.append(cc)
for cc in to_be_removed:
self.__coming_connections.remove(cc)

def __handle_connection(self, conn: socket, addr: Address):
success = False
cc = ComingConnection.create(conn, addr)
with self.__coming_connections_lock:
self.__coming_connections.append(cc)
try:
recv_str = net_util.receive_data(conn, self._cryptor, timeout=15)
login_packet = LoginPacket.deserialize(json.loads(recv_str))
except Exception as e:
self.logger.error('Failed reading client\'s login packet: {}'.format(e))
return
else:
self.log_packet(login_packet, to_client=False)
client = self.clients.get(login_packet.name, None)
if client is not None:
if client.info.password == login_packet.password:
success = True
self.logger.info('Identification of {} confirmed: {}'.format(addr, client.info.name))
client.restart_connection(conn, addr)
else:
self.logger.warning('Wrong password during login for client {}: expected {} but received {}'.format(client.info.name, client.info.password, login_packet.password))
try:
recv_str = net_util.receive_data(conn, self._cryptor, timeout=15)
login_packet = LoginPacket.deserialize(json.loads(recv_str))
except Exception as e:
self.logger.error('Failed reading client\'s login packet: {}'.format(e))
return
else:
self.logger.warning('Unknown client name during login: {}'.format(login_packet.name))
if not success:
conn.close()
self.logger.warning('Closed connection from {}'.format(addr))
self.log_packet(login_packet, to_client=False)
client = self.clients.get(login_packet.name, None)
if client is not None:
if client.info.password == login_packet.password:
success = True
self.logger.info('Identification of {} confirmed: {}'.format(addr, client.info.name))
client.restart_connection(conn, addr)
else:
self.logger.warning('Wrong password during login for client {}: expected {} but received {}'.format(client.info.name, client.info.password, login_packet.password))
else:
self.logger.warning('Unknown client name during login: {}'.format(login_packet.name))
if not success:
conn.close()
self.logger.warning('Closed connection from {}'.format(addr))
finally:
with self.__coming_connections_lock:
try:
self.__coming_connections.remove(cc)
except ValueError:
pass

def log_packet(self, packet: AbstractPacket, *, to_client: bool, client_name: str = None):
if isinstance(packet, ChatBridgePacket):
Expand Down
6 changes: 2 additions & 4 deletions chatbridge/impl/mcdr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ def __init__(self, config: MCDRClientConfig, server: ServerInterface):
def get_logging_name(self) -> str:
return 'ChatBridge@{}'.format(hex((id(self) >> 16) & (id(self) & 0xFFFF))[2:].rjust(4, '0'))

@classmethod
def _get_main_loop_thread_name(cls):
def _get_main_loop_thread_name(self):
return 'ChatBridge-' + super()._get_main_loop_thread_name()

@classmethod
def _get_keep_alive_thread_name(cls):
def _get_keep_alive_thread_name(self):
return 'ChatBridge-' + super()._get_keep_alive_thread_name()

def _on_stopped(self):
Expand Down

0 comments on commit 8eca77c

Please sign in to comment.