-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstkserver_wrapper.py
1325 lines (1242 loc) · 67.5 KB
/
stkserver_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
* stkserver_wrapper.py - main file for starting servers
* License: GNU LGPL v2.1
* Author: DernisNW (a.k.a. NobWow)
"""
import asyncio
import logging
import traceback
import os
import re
import shlex
# import traceback
# from shutil import rmtree
# from zipfile import ZipFile
# from math import floor
# from defusedxml import ElementTree as dElementTree
from logging.handlers import TimedRotatingFileHandler
from admin_console import AdminCommandExecutor, AdminCommandExtension, basic_command_set, paginate_range
from admin_console.ainput import colors, ARILogHandler
from admin_console.ainput import ansi_escape as ansi_escape_
from aiohndchain import AIOHandlerChain
from enum import IntEnum
from packaging.version import parse as parseVersion
from functools import partial
from contextlib import asynccontextmanager
from typing import Sequence, MutableSequence, Optional, Mapping, MutableMapping, Callable, Any
ansi_escape = re.compile(r'(?:\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
_cfgfile_path = 'config.json'
debug_l = 'debug'
verbose_l = 'verbose'
info_l = 'info'
warn_l = 'warn'
error_l = 'error'
fatal_l = 'fatal'
server_attribs = ('cfgpath', 'datapath', 'executable_path', 'cwd',
'autostart', 'autorestart', 'timed_autorestart',
'timed_autorestart_interval', 'startup_timeout', 'shutdown_timeout',
'extra_env', 'extra_args')
no_yes = ('no', 'yes')
yes_match = re.compile(r' *[yY+1][yYeEaAhHpPsS ]*')
no_match = re.compile(r' *[nN\-0][nNoOpPeE ]*')
splitter = re.compile(r'[, ] *')
make_server_skipmsg = ("""Current working directory: "{cwd}" for relative path reference\n"""
"""Press return to skip and set the default value "{default}" """)
make_server_msg = (
# name
"""Enter the server name. It will be used for further interaction with the server,\nYou cannot change it later\n"""
"""but every name should be unique.""",
# cfgpath
"""Enter the path to configuration file. It must have the XML format (.xml).\nYou can change it later\n{skipmsg}""",
# datapath
"""Enter the path to the "data" directory that will be used for the new server.\nYou can change it later\n{skipmsg}\n"""
"""TIP: it is usually either /usr/share/supertuxkart\n or in case of GIT version /path/to/stk-code""",
# exec
"""Enter the path to supertuxkart executable file (program).\nYou can change it later\n{skipmsg}""",
# cwd
"""Enter the path to server's working directory.\nYou can change it later\n{skipmsg}""",
# autostart
"""Should server automatically start after wrapper has been launched? Hit y for yes or n for no.\nYou can change it later""",
# autorestart
"""In case the server crashes, does it require automatic restart? Hit y for yes or n for no.\nYou can change it later""",
# timed_autorestart + timed_autorestart_interval
"""Is it needed to restart the server every N minutes? Leave empty string or 0 if autorestarts aren't required.\n"""
"""You can change it later\n"""
"""Note: the server will not restart if there are players at the moment""",
# startup_timeout
"""How many seconds the server has to initialize?"""
""" When this timeout exceeds during server startup, the process is killed.\n{skipmsg}""",
# shutdown_timeout
"""How many seconds the server has to shutdown?"""
"""When this timeout exceeds during server shutdown, the process is killed.\n{skipmsg}""",
# extra_env
"""Advanced: which additional environment variables to pass to the process?\n"""
"""For example, you can specify XDG_DATA_HOME=/path/to/directory HOME=/some/directory/path\n"""
"""You can change it later\nTo clear extra argument, specify -""",
# extra_args
"""Advanced: any additional arguments to the command line? Just leave it empty if you have no idea.\n"""
"""You can change it later\nTo clear extra argument, specify -"""
)
def load_config(ace: AdminCommandExecutor):
ace.load_config()
_ver = ace.config['stk_version'] = ace.config.get('stk_version', '1.4.0')
ace.config['logpath'] = ace.config.get('logpath', 'logs')
ace.config['servers'] = ace.config.get('servers', {})
ace.config['datapath'] = ace.config.get('datapath', 'stk-code')
ace.config['executable_path'] = ace.config.get('executable_path', 'supertuxkart')
ace.config['autostart'] = ace.config.get('autostart', False)
ace.config['autorestart'] = ace.config.get('autorestart', True)
ace.config['autorestart_pause'] = ace.config.get('autorestart_pause', 10.0)
ace.config['timed_autorestart'] = ace.config.get('timed_autorestart', False)
ace.config['timed_autorestart_interval'] = ace.config.get('timed_autorestart_interval', False)
ace.config['extra_env'] = ace.config.get('extra_env', None)
ace.config['extra_args'] = ace.config.get('extra_args', []) # json doesn't support immutable sequences, use mutable instead
_global_logignores = ace.config['global_logignores'] = ace.config.get('global_logignores', {})
ace.save_config()
ace.global_logignores = make_logignores(_global_logignores)
ace.stuff['stk_version'] = parseVersion(_ver)
for servername, serverdata in ace.config['servers'].items():
# cwd: str, autostart=False, autorestart=True, timed_autorestart=False,
# timed_autorestart_interval: Optional[float] = None,
# restarter_cond: Optional[asyncio.Condition] = None,
# extra_args: Optional[Sequence[str]] = tuple()):
if servername in ace.servers:
server: STKServer = ace.servers[servername]
for item in server_attribs:
setattr(server, item, serverdata[item])
local_logignore: dict = server.log_ignores.maps[0]
assert type(local_logignore) is dict, 'because first mapping must be mutable'
local_logignore.clear()
local_logignore.update(make_logignores(serverdata.get('log_ignores', {})))
else:
ace.servers[servername] = STKServer(
ace.logger, ace.ainput.writeln, servername, cfgpath=serverdata['cfgpath'],
restarter_cond=ace.server_restart_cond, start_stop_guard=ace.start_stop_guard,
datapath=serverdata.get('datapath', ace.config['datapath']),
executable_path=serverdata.get('executable_path', ace.config['executable_path']),
cwd=serverdata.get('cwd', ace.config.get('cwd', os.getcwd())),
autostart=serverdata.get('autostart', ace.config['autostart']),
autorestart=serverdata.get('autorestart', ace.config['autorestart']),
autorestart_pause=serverdata.get('autorestart_pause', ace.config['autorestart_pause']),
timed_autorestart=serverdata.get('timed_autorestart', ace.config['timed_autorestart']),
timed_autorestart_interval=serverdata.get('timed_autorestart_interval', ace.config['timed_autorestart_interval']),
extra_env=serverdata.get('extra_env', ace.config.get('extra_env', None)),
extra_args=serverdata.get('extra_args', ace.config.get('extra_args', tuple())),
global_logignores=ace.global_logignores,
logignores=make_logignores(serverdata.get('log_ignores', {}))
)
def make_logignores(logignores: Mapping[str, Mapping[str, Sequence[str]]]) -> MutableMapping[str, MutableMapping[int, MutableSequence[re.Pattern]]]:
res = dict(
(modname, dict((int(level), list(re.compile(pattern) for pattern in patterns)) for level, patterns in modignores.items()))
for modname, modignores in logignores.items()
)
return res
async def _trigger_restart(ace) -> bool:
try:
async with ace.server_restart_cond:
ace.server_restart_cond.notify_all()
return True
except RuntimeError:
return False
def server_restart_clk(ace):
"""
This is called every time when the server needs to be restarted
But automatic server restart will only happen if no players are online at the moment
"""
asyncio.create_task(_trigger_restart(ace))
class STKLogFilter(logging.Filter):
def __init__(self, ace: AdminCommandExecutor, *args, **kwargs):
self.ace = ace
super().__init__(*args, **kwargs)
def filter(record: logging.LogRecord):
pass
class LogLevel(IntEnum):
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
FATAL = logging.FATAL
class STKServer:
idle_command = '\x01'
logstrip = re.compile(r'(?:\w+ +\w+ +\d+ +\d+:\d+:\d+ +\d+ )?\[(\w+) *\] +([^:]+)?: (.*)''\n?')
ignore_idle = re.compile(f'Unknown command: {idle_command}')
ready_loglevel = logging.INFO
ready_objectname = 'ServerLobby'
ready_pattern = re.compile(r'Server (\d+) is now online.')
joinleave_objectname = 'STKHost'
joinleave_pattern = re.compile(r'[a-f0-9.:]+ has just (?:dis)?connected. There are now (\d+) peers.')
extra_leave_patterns = [
('STKHost', logging.INFO, re.compile(r'[a-f0-9.:]+ has not been validated for more than [0-9.]+ seconds, disconnect it by force.')),
('STKHost', logging.INFO, re.compile(r'[a-f0-9.:]+ \S+ with ping \d+ is higher than \d+ ms when not in game, kick.')),
('ServerLobby', logging.INFO, re.compile(r'\S+ banned by .+: \S+ (rowid: \d+, description: \S+).')),
]
stop_command = b'quit\n'
def __init__(self, logger: logging.Logger, writeln: Callable[[str], Any],
name: str, cfgpath: str, datapath: str, executable_path: str,
cwd: str, autostart=False, autorestart=True, autorestart_pause=10.0, timed_autorestart=False,
timed_autorestart_interval: Optional[float] = None,
startup_timeout: Optional[float] = None,
shutdown_timeout: Optional[float] = None,
restarter_cond: Optional[asyncio.Condition] = None,
extra_env: Optional[Mapping[str, str]] = None,
extra_args: Optional[Sequence[str]] = tuple(),
global_logignores: Optional[Mapping[str, Mapping[int, Sequence[re.Pattern]]]] = None,
logignores: Optional[MutableMapping[str, MutableMapping[int, MutableSequence[re.Pattern]]]] = None,
start_stop_guard: Optional[asyncio.Lock] = None):
self.process: Optional[asyncio.subprocess.Process] = None
if not os.path.isfile(executable_path):
raise FileNotFoundError(f'supertuxkart executable "{executable_path}" not found', 'executable_path', executable_path)
self.executable_path = executable_path
if not os.path.isdir(cwd):
raise FileNotFoundError(f'working directory "{cwd}" not found', 'cwd', cwd)
self.cwd = cwd
self.writeln = writeln
self.active = False
self.ready = False
self.restart = False
self.autostart = autostart
self.autorestart = autorestart
self.autorestart_pause = autorestart_pause
self.timed_autorestart = timed_autorestart
self.timed_autorestart_interval = timed_autorestart_interval
self.startup_timeout = startup_timeout
self.shutdown_timeout = shutdown_timeout
self.empty_server = asyncio.Event()
self.empty_server.set()
self.name = name
self.cfgpath = cfgpath
if not os.path.isdir(datapath):
raise FileNotFoundError(f'assets directory "{datapath}" not found', 'datapath', datapath)
self.datapath = datapath
self.logger = logger
self.log_event = AIOHandlerChain()
self.log_event.on_handler_error = self._loghandler_error
self.ready_event = AIOHandlerChain(cancellable=False)
self.restarter_task: Optional[asyncio.Task] = None
self.restarter_cond = restarter_cond
self.reader_task: Optional[asyncio.Task] = None
self.errreader_task: Optional[asyncio.Task] = None
self.timer_task: Optional[asyncio.Task] = None
self.server_ready_task: Optional[asyncio.Task] = None
self.show_stderr = False
self.idle_cancellable = False
self.lock = asyncio.Lock()
# since 1.4, concurrent startup of servers is broken
self.start_stop_guard = start_stop_guard
if logignores is None:
logignores = {}
self.log_ignores: MutableMapping[str, MutableMapping[int, MutableSequence[re.Pattern]]] = logignores
self.global_logignores = global_logignores
# 'STKHost': {logging.WARNING: [re.compile(r'bad addon: asdasdasd')]}
self.extra_args = extra_args
self.extra_env = extra_env
self.show_plain = False
async def _loghandler_error(self, hndid: int, exc: Exception, *args, **kw):
self.logger.exception(f"An exception is occurred when invoking handler #{hndid}:")
def __del__(self):
for task in (self.restarter_task, self.reader_task, self.errreader_task):
if task is not None:
if not task.done():
task.cancel()
def add_logignore(self, modname: str, level: int, pattern: str):
_pattern = re.compile(pattern)
self.log_ignores[modname][level].append(_pattern)
return _pattern
def del_logignore(self, modname: str, level: int, id_: int):
del self.log_ignores[modname][level][id_]
async def launch(self):
if self.process is not None:
if self.process.returncode is not None:
raise RuntimeError("the server is already running")
# cmdline = (f"{shlex.quote(self.executable_path)} "
# f'--server-config={shlex.quote(self.cfgpath)} ' + ' '.join(
# shlex.quote(arg) for arg in self.extra_args
# ) + ' --network-console')
_env = os.environ.copy()
if self.extra_env is not None:
# pass extra environment to the process
_env.update(self.extra_env)
_env['SUPERTUXKART_DATADIR'] = self.datapath
# lock is released at ready call
if self.start_stop_guard is not None:
await self.start_stop_guard.acquire()
self.server_ready_task = asyncio.create_task(self._waitready(self.startup_timeout))
self.process = await asyncio.create_subprocess_exec(
self.executable_path,
f'--server-config={self.cfgpath}',
*self.extra_args,
'--network-console',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_env,
cwd=self.cwd
)
self.restart = self.autorestart
self.active = True
self.reader_task = asyncio.create_task(self._reader(self.process.stdout))
self.errreader_task = asyncio.create_task(self._error_reader(self.process.stderr))
if self.timed_autorestart:
self.timer_task = asyncio.create_task(self._timed_restarter())
if self.restarter_cond is not None:
self.restarter_task = asyncio.create_task(self._restarter())
async def _waitready(self, timeout: Optional[float] = None):
"""
Releases the start_stop_guard lock when the server becomes ready.
If server doesn't become ready within a timeout, kills the server.
"""
assert self.start_stop_guard is not None
try:
await asyncio.wait_for(self.ready_event.wait_for_successful(), timeout)
self.ready = True
except asyncio.TimeoutError:
self.logger.warning(f'STK {self.name} has not become ready within {timeout} seconds, killing')
self.process.kill()
finally:
self.start_stop_guard.release()
async def stop(self, timeout: Optional[float] = None, from_timer=False, no_lock=False) -> bool:
"""
Sends quit command to the server and waits until the process exit.
On timeout kills the process
If timeout is 0, kills the process without waiting for it
Set self.restart = True to restart, False to stop
"""
try:
# sorry, can't use context manager here
if self.start_stop_guard is not None:
await self.start_stop_guard.acquire()
if self.process is None:
self.logger.debug('STKServer.stop: the server is not running')
raise RuntimeError("the server is not running")
elif self.process.returncode is not None:
self.logger.debug('STKServer.stop: the server is already stopped')
raise RuntimeError("the server is already stopped")
if self.restarter_task is not None:
self.restarter_task.cancel()
self.restarter_task = None
if self.timer_task is not None and not from_timer:
self.timer_task.cancel()
self.timer_task = None
if timeout == 0:
# Forcibly stop a server
self.logger.warning(f'STK {self.name} was forcefully shut down.')
self.process.kill()
return
elif timeout is None:
timeout = self.shutdown_timeout
if no_lock:
self.logger.debug('STKServer.stop: no-lock stop command')
self.process.stdin.write(self.stop_command)
await self.process.stdin.drain()
else:
self.logger.debug('STKServer.stop: pre-interrupt')
async with self.interrupting_idle_for():
self.logger.debug('STKServer.stop: interrupt')
self.process.stdin.write(self.stop_command)
await self.process.stdin.drain()
self.logger.debug('STKServer.stop: command sent')
self.logger.debug('STKServer.stop: post-interrupt')
if timeout is None:
await self.process.wait()
self.ready = False
return True
else:
try:
await asyncio.wait_for(self.process.wait(), timeout)
self.ready = False
return True
except asyncio.TimeoutError:
self.logger.warning(f'STK {self.name} shutdown operation timed out, killing.')
self.process.kill()
return False
except Exception:
self.logger.exception('stop() failed:')
finally:
if self.start_stop_guard is not None:
self.start_stop_guard.release()
async def _error_reader(self, _stderr: asyncio.StreamReader):
while not _stderr.at_eof():
line = await _stderr.readline()
if not self.show_stderr:
continue
if asyncio.iscoroutinefunction(self.handle_stderr):
await self.handle_stderr(line.decode())
else:
self.handle_stderr(line.decode())
async def _reader(self, _stdout: asyncio.StreamReader):
self.logger.debug('_reader: start')
while not _stdout.at_eof():
try:
async with self.lock:
pass
self.idle_cancellable = True
async with self.lock:
line = await _stdout.readline()
self.idle_cancellable = False
if asyncio.iscoroutinefunction(self.handle_stdout):
await self.handle_stdout(ansi_escape_.sub('', line.decode()))
else:
self.handle_stdout(ansi_escape_.sub('', line.decode()))
except asyncio.CancelledError:
if not self.active:
return
else:
continue
except Exception:
self.logger.error(f'_reader: exception caught\n{traceback.format_exc()}')
_returncode = self.process.returncode
if _returncode is None:
self.idle_cancellable = True
await self.process.wait()
_returncode = self.process.returncode
self.logger.log(logging.ERROR if _returncode != 0 else logging.INFO, f'Server {self.name} exited with returncode {_returncode}')
if self.server_ready_task is not None:
if not self.server_ready_task.done():
self.server_ready_task.cancel()
# start_stop_guard is now unlocked by cancelling this task
self.process = None
self.ready = False
self.active = False
self.empty_server.set()
if self.autorestart and self.restart:
if _returncode != 0:
self.logger.info(f'Server {self.name} returned non-zero returncode, restart delay applied: {self.autorestart_pause}')
await asyncio.sleep(self.autorestart_pause)
self.logger.debug('_reader: restart server')
await self.launch()
self.logger.debug('_reader: end')
async def _timed_restarter(self):
self.logger.info(f'Timed autorestarter for server {self.name} launched. Interval = {self.timed_autorestart_interval}')
await asyncio.sleep(self.timed_autorestart_interval)
try:
self.logger.info(f'Timed autorestarter for server {self.name} schedules the restart')
# async with self.restarter_cond:
# self.restarter_cond.notify_all()
await self.stop(timeout=60.0, from_timer=True)
except RuntimeError:
pass
async def _restarter(self):
self.logger.debug('_restarter: start')
while self.process is not None:
self.logger.debug('_restarter: returncode is None')
async with self.restarter_cond:
self.logger.debug('_restarter: restarter condition entered')
await self.restarter_cond.wait()
self.logger.debug('_restarter: restarter condition received, waiting for empty server')
self.logger.debug(f'_restarter: lock is currently {self.restarter_cond.locked()}.')
await self.empty_server.wait()
self.logger.debug(f'_restarter: lock is currently {self.restarter_cond.locked()}.')
self.logger.debug('_restarter: restarter condition received, empty server reached')
await self.stop(timeout=60.0, no_lock=True)
# self.restarter_cond.release()
# await self.restarter_cond.acquire()
self.logger.debug(f'_restarter: lock is currently {self.restarter_cond.locked()}.')
def handle_stderr(self, line: str):
self.logger.error(f'STK-Stderr {self.name}: {line}')
async def handle_stdout(self, line: str):
if self.ignore_idle.fullmatch(line):
return
# handle log message
_match = self.logstrip.fullmatch(line)
if _match:
levelname, objectname, message = _match.groups()
else:
# self.logger.info(f'STK [{self.name}] {line[:-1]}')
if self.show_plain:
self.writeln(line[:-1])
return
if self.joinleave_objectname == objectname:
_matchjl = self.joinleave_pattern.fullmatch(message)
if _matchjl:
_curPeers = int(_matchjl.groups()[0])
if _curPeers:
self.empty_server.clear()
else:
self.empty_server.set()
level = getattr(logging, levelname.upper(), logging.DEBUG)
if self.ready_objectname == objectname and self.ready_loglevel == level:
_matchready = self.ready_pattern.fullmatch(message)
if _matchready is not None:
await self.ready_event.emit(int(_matchready.group(1)))
if not (await self.log_event.emit(message, levelname=levelname, level=level, objectname=objectname)):
return
# 'STKHost': {logging.WARNING: [re.compile(r'bad addon: asdasdasd')]}
if self.global_logignores is not None:
try:
for pattern in self.global_logignores[objectname][level]:
if pattern.fullmatch(message):
# self.logger.debug(f'handle_stdout: skipped line with global pattern {pattern}')
return
else:
# self.logger.debug(f'handle_stdout: global pattern {pattern} didn\'t match: {repr(message)}')
pass
except KeyError:
pass
try:
for pattern in self.log_ignores[objectname][level]:
if pattern.fullmatch(message):
# self.logger.debug(f'handle_stdout: skipped line with pattern {pattern}')
return
else:
# self.logger.debug(f'handle_stdout: pattern {pattern} didn\'t match: {repr(message)}')
pass
except KeyError:
pass
self.logger.log(level, f'STK [{self.name}] {objectname}: {message}')
async def stuff(self, cmdline: str, noblock=False):
_b = cmdline.encode() + b'\n'
if noblock:
self.process.stdin.write(_b)
await self.process.stdin.drain()
else:
async with self.interrupting_idle_for():
self.process.stdin.write(_b)
await self.process.stdin.drain()
def idleCancel(self) -> bool:
if self.reader_task is not None and not self.reader_task.done() and self.idle_cancellable:
self.reader_task.cancel()
return True
return False
@asynccontextmanager
async def interrupting_idle_for(self):
"""Acquire a lock for interrupting output handler"""
self.idleCancel()
try:
yield await self.lock.acquire()
finally:
self.lock.release()
def save(self, ace: AdminCommandExecutor):
try:
export_data = ace.config['servers'][self.name] = {}
for item in server_attribs:
_item = getattr(self, item)
if ace.config.get(item, None) != _item:
export_data[item] = _item
export_data['log_ignores'] = dict(
(modname, dict((str(level), list(pattern.pattern for pattern in patterns)) for level, patterns in modignores.items()))
for modname, modignores in self.log_ignores.items()
)
ace.save_config()
except Exception:
ace.error(traceback.format_exc())
def stkwrapper_command_set(ace: AdminCommandExecutor):
async def create_server(cmd: AdminCommandExecutor, name: str,
cfgpath: Optional[str] = "",
datapath: Optional[str] = "",
exec_: Optional[str] = "",
cwd: Optional[str] = "",
autostart: Optional[bool] = False,
autorestart: Optional[bool] = False,
timed_autorestart: Optional[str] = False,
timed_autorestart_interval: Optional[int] = 0,
startup_timeout: Optional[float] = None,
shutdown_timeout: Optional[float] = None,
extra_env: Optional[str] = None,
extra_args=tuple()):
if name in ace.config['servers']:
cmd.error(f'server {name} already exists, specify another name', log=False)
return
cmd.print('Note: for interactive server creation use stk-make-server')
_kwargs = {}
for item, name in zip((cfgpath, datapath, exec_, cwd, autostart, autorestart, timed_autorestart,
timed_autorestart_interval, startup_timeout, shutdown_timeout,
extra_env, extra_args), server_attribs):
if item:
_kwargs[name] = item
try:
_server = STKServer(ace.logger, ace.ainput.writeln, name, restarter_cond=ace.server_restart_cond,
start_stop_guard=ace.start_stop_guard, **_kwargs)
except FileNotFoundError as exc:
cmd.error(f'Failed, {exc}, re-check the path', log=False)
return
_server.save(ace)
cmd.print(f'Server "{name}" created. To start it, do stk-start {name}')
ace.add_command(create_server, 'stk-create-server', ((str, 'name'), ),
((str, 'path/to/config.xml'), (str, 'path/to/stk-assets dir'),
(str, 'path/to/supertuxkart exec'), (str, 'path/to/server/workdir'),
(bool, 'autostart with wrapper?'),
(bool, 'autorestart on crash?'), (bool, 'autorestart every n-seconds?'),
(int, 'autorestart seconds interval'),
(float, 'max startup seconds'), (float, 'max shutdown seconds'),
(str, 'extra environment variables'),
(None, 'extra arguments space sep')))
def _startswith_predicate(name: str, stkservername: str):
return stkservername.startswith(name)
async def stkserver_tab(cmd: AdminCommandExecutor, name: str = '', *, argl: str):
if argl:
return list(ace.servers.keys())
return list(filter(partial(_startswith_predicate, name), ace.servers.keys()))
async def make_server(cmd: AdminCommandExecutor, name: str = '', edit_existing=False):
try:
(_name_msg, _cfgpath_msg, _assets_msg, _exec_msg,
_cwd_msg, _as_msg, _ar_msg, _tar_msg, _startt_msg,
_stopt_msg, _env_msg, _ea_msg) = make_server_msg
if not name:
cmd.print(_name_msg)
while not name:
name = await cmd.ainput.prompt_line('name: ', history_disabled=True)
if edit_existing and name not in ace.servers:
cmd.error('This server doesn\'t exist', log=False)
return
elif not edit_existing and name in ace.servers:
cmd.error('This server already exists, specify another name', log=False)
return
_cwd = os.getcwd()
if edit_existing:
_server: STKServer = ace.servers[name]
_cfgpath_default = _server.cfgpath
_datapath_default = _server.datapath
_exec_default = _server.executable_path
_cwd_default = _server.cwd
_autostart_default = _server.autostart
_autorestart_default = _server.autorestart
_timed_autorestart_interval_default = _server.timed_autorestart_interval
_startup_timeout_default = _server.startup_timeout
_shutdown_timeout_default = _server.shutdown_timeout
_extra_env_default_text = (shlex.join(f"{name}={value}" for name, value in _server.extra_env.items())
if _server.extra_env is not None else None)
_extra_env_default = _server.extra_env
_extra_arguments_default = ' '.join(_server.extra_args)
else:
_cfgpath_default = cmd.config.get('cfgpath', '')
_datapath_default = cmd.config.get('datapath', '')
_exec_default = cmd.config.get('executable_path', '')
_cwd_default = cmd.config.get('cwd', os.path.join(_cwd, name))
_autostart_default = cmd.config.get('autostart', False)
_autorestart_default = cmd.config.get('autorestart', True)
_timed_autorestart_interval_default = cmd.config.get('timed_autorestart_interval', 0)
_startup_timeout_default = cmd.config.get('server_startup_timeout', None)
_shutdown_timeout_default = cmd.config.get('server_startup_timeout', None)
_extra_env_default = cmd.config.get('extra_env', None)
_extra_env_default_text = (shlex.join(f"{name}={value}" for name, value in _extra_env_default)
if _extra_env_default is not None else None)
_extra_arguments_default = cmd.config.get('extra_args', '')
while True:
cmd.print(_cwd_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_cwd_default)))
cwd = (await cmd.ainput.prompt_line('cwd: ', history_disabled=True)) or _cwd_default
cwd = os.path.normpath(os.path.abspath(os.path.expanduser(cwd)))
if os.path.isdir(cwd):
break
else:
cmd.print('This directory doesn\'t exist. Create one?')
if yes_match.fullmatch(await cmd.ainput.prompt_keystroke(f'create dir "{os.path.abspath(cwd)}"? ')):
os.makedirs(cwd)
cmd.print('Directory created for server')
break
while True:
cmd.print(_cfgpath_msg.format(skipmsg=make_server_skipmsg.format(cwd=cwd, default=_cfgpath_default)))
cfgpath = (await cmd.ainput.prompt_line('cfgpath: ', history_disabled=True)) or _cfgpath_default
cfgpath = os.path.join(cwd, os.path.expanduser(cfgpath))
if os.path.isfile(cfgpath):
break
else:
cmd.print('This file doesn\'t exist. Skip?')
if yes_match.fullmatch(await cmd.ainput.prompt_keystroke('skip? ')):
break
while True:
cmd.print(_assets_msg.format(skipmsg=make_server_skipmsg.format(cwd=cwd, default=_datapath_default)))
datapath = (await cmd.ainput.prompt_line('datapath: ', history_disabled=True)) or _datapath_default
datapath = os.path.normpath(os.path.join(datapath, os.path.expanduser(datapath)))
if os.path.isdir(datapath):
break
else:
cmd.print('This directory doesn\'t exist. Re-check your path')
while True:
cmd.print(_exec_msg.format(skipmsg=make_server_skipmsg.format(cwd=cwd, default=_exec_default)))
executable_path = (await cmd.ainput.prompt_line('exec: ', history_disabled=True)) or _exec_default
executable_path = os.path.join(cwd, os.path.expanduser(executable_path))
if os.path.isfile(executable_path):
break
else:
cmd.print('This executable doesn\'t exist. Re-check your path')
# autostart
cmd.print(_as_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=no_yes[int(_autostart_default)])))
_autostart = await cmd.ainput.prompt_keystroke('autostart? ')
if yes_match.fullmatch(_autostart):
autostart = True
elif no_match.fullmatch(_autostart):
autostart = False
else:
autostart = _autostart_default
# autorestart
cmd.print(_ar_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=no_yes[int(_autorestart_default)])))
_autorestart = await cmd.ainput.prompt_keystroke('autorestart? ')
if yes_match.fullmatch(_autorestart):
autorestart = True
elif no_match.fullmatch(_autorestart):
autorestart = False
else:
autorestart = _autorestart_default
cmd.print(_tar_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_timed_autorestart_interval_default)))
timed_autorestart_interval = await cmd.ainput.prompt_line('timed autorestart minutes (or empty): ',
history_disabled=True)
if not timed_autorestart_interval or timed_autorestart_interval == '0':
timed_autorestart = False
timed_autorestart_interval = 0
else:
timed_autorestart = True
timed_autorestart_interval = int(timed_autorestart_interval) * 60
cmd.print(_startt_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_startup_timeout_default)))
while True:
try:
startup_timeout = await cmd.ainput.prompt_line('startup timeout (n.n): ', history_disabled=True)
if not startup_timeout:
startup_timeout = _startup_timeout_default
break
startup_timeout = float(startup_timeout)
break
except ValueError:
cmd.print('Specify valid floating point number, for example 120.0')
cmd.print(_stopt_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_shutdown_timeout_default)))
while True:
try:
shutdown_timeout = await cmd.ainput.prompt_line('shutdown timeout (n.n): ', history_disabled=True)
if not shutdown_timeout:
shutdown_timeout = _shutdown_timeout_default
break
shutdown_timeout = float(shutdown_timeout)
break
except ValueError:
cmd.print('Specify valid floating point number, for example 60.0')
while True:
try:
cmd.print(_env_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_extra_env_default_text)))
extra_env = await cmd.ainput.prompt_line('extra environment variables: ', history_disabled=True)
if not extra_env:
extra_env = _extra_env_default
break
extra_env = dict((name, value) for entity in shlex.split(extra_env)
for name, _, value in (entity.partition('='), ))
break
except ValueError:
cmd.print('Specify valid sequence of environment variables, for example: "A=1 B=2 C=something"')
cmd.print(_ea_msg.format(skipmsg=make_server_skipmsg.format(cwd=_cwd, default=_extra_arguments_default)))
extra_args = (await cmd.ainput.prompt_line('extra args: ', history_disabled=True)) or _extra_arguments_default
if not extra_args or extra_args == '-':
extra_args = tuple()
else:
extra_args = splitter.split(extra_args)
if edit_existing:
_server: STKServer
_server.cfgpath = cfgpath
_server.datapath = datapath
_server.executable_path = executable_path
_server.cwd = cwd
_server.autostart = autostart
_server.autorestart = autorestart
_server.timed_autorestart = timed_autorestart
_server.timed_autorestart_interval = timed_autorestart_interval
_server.startup_timeout = startup_timeout
_server.shutdown_timeout = shutdown_timeout
_server.extra_env = extra_env
_server.extra_args = extra_args
_server.save(ace)
cmd.print('Server successfully edited.')
else:
_server = STKServer(
cmd.logger, cmd.ainput.writeln, name,
cfgpath=cfgpath,
datapath=datapath,
executable_path=executable_path,
cwd=cwd,
autostart=autostart,
autorestart=autorestart,
timed_autorestart=timed_autorestart,
timed_autorestart_interval=timed_autorestart_interval,
startup_timeout=startup_timeout,
shutdown_timeout=shutdown_timeout,
restarter_cond=ace.server_restart_cond, start_stop_guard=ace.start_stop_guard,
extra_env=extra_env,
extra_args=extra_args
)
_server.save(ace)
ace.servers[name] = _server
cmd.print('Server successfully created. Start it right now?')
if yes_match.fullmatch(await cmd.ainput.prompt_keystroke(f'start {name}? ')):
await _server.launch()
cmd.print(f'Starting server {name}')
except Exception:
cmd.error(traceback.format_exc(), log=False)
return
ace.add_command(partial(make_server, edit_existing=False), 'stk-make-server', optargs=((str, 'name'), ),
description='Interactive command for registering STK servers into the wrapper.')
ace.add_command(partial(make_server, edit_existing=True), 'stk-edit-server', args=((str, 'name'), ),
description='Interactive command for editing STK servers in case you messed up.',
atabcomplete=stkserver_tab)
async def start_server(cmd: AdminCommandExecutor, name: str, autorestart: bool = None):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if _server.active:
cmd.error(f'Server {name} is already running. To stop it, do stk-stop {name}', log=False)
return
_tsk = asyncio.create_task(_server.launch())
ace.tasks[_tsk.get_name()] = _tsk
cmd.print(f'Starting STK Server {name}')
ace.add_command(start_server, 'stk-start', ((str, 'name'), ), ((bool, 'autorestart?'), ), 'Launch an STK server', stkserver_tab)
async def stop_server(cmd: AdminCommandExecutor, name: str, force=False, timeout: Optional[float] = None):
if timeout is None:
timeout = cmd.config['server_shutdown_timeout']
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if not _server.active:
cmd.error(f'Server {name} is already stopped. To start it, do stk-start {name}', log=False)
return
if not _server.empty_server.is_set() and not force:
cmd.error(f'Server {name} currently has players. To stop it anyway, specify second argument as yes', log=False)
return
_server.restart = False
_tsk = asyncio.create_task(_server.stop(timeout))
ace.tasks[_tsk.get_name()] = _tsk
cmd.print(f'Stopping server {name}')
ace.add_command(stop_server, 'stk-stop', ((str, 'name'), ), ((bool, 'even if players'), (float, 'timeout'), ), 'Stop an STK server. When timeout reaches, the process is killed', stkserver_tab)
async def restart_server(cmd: AdminCommandExecutor, name: str, force=False):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if not _server.active:
cmd.error(f'Server {name} is already stopped. To start it, do stk-start {name}', log=False)
return
if not _server.empty_server.is_set() and not force:
cmd.error(f'Server {name} currently has players. To stop it anyway, specify second argument as yes', log=False)
return
_server.restart = True
_tsk = asyncio.create_task(_server.stop(60))
ace.tasks[_tsk.get_name()] = _tsk
cmd.print(f'Restarting server {name}')
ace.add_command(restart_server, 'stk-restart', ((str, 'name'), ), ((bool, 'force'), ), 'Restart an STK server.', stkserver_tab)
async def server_ncsend(cmd, name: str, line: str):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if not _server.active:
cmd.error(f'Server {name} is stopped. To start it, do stk-start {name}', log=False)
return
await _server.stuff(line)
ace.add_command(server_ncsend, 'stk-cmd', ((str, 'name'), (None, 'cmd')), description='Send a command to STK server', atabcomplete=stkserver_tab)
async def server_enternc(cmd: AdminCommandExecutor, name: str, quitword: str = 'quit'):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if not _server.active:
cmd.error(f'Server {name} is stopped. To start it, do stk-start {name}', log=False)
return
quitword = '.' + quitword
_prompt = f'{name}> '
cmd.ainput.writeln(f'Entered "network console" for {name}. Type {quitword} to return back to command prompt', fgcolor=colors.YELLOW)
_server.show_plain = True
while True:
line = await cmd.ainput.prompt_line(_prompt, prompt_formats={'fgcolor': colors.YELLOW}, input_formats={'fgcolor': 14})
if line == quitword:
break
await _server.stuff(line)
_server.show_plain = False
ace.add_command(server_enternc, 'stk-nc', ((str, 'name'), ), ((None, 'quitword'), ), 'Enter into interactive network console of the server. To return back, send ".quit".', stkserver_tab)
async def list_servers(cmd, cpage: int = 1):
ace.servers: Mapping[str, STKServer]
_len = len(ace.servers)
_maxpage, _start, _end = paginate_range(_len, 10, cpage)
cmd.print(f'STK servers: (page {cpage} or {_maxpage})')
cmd.print('\n'.join(f'{name}: pid {getattr(server.process, "pid", -1)}' for name, server in tuple(ace.servers.items())[_start:_end]))
ace.add_command(list_servers, 'stk-servers', optargs=((int, 'page'), ))
async def server_norestart(cmd: AdminCommandExecutor, name: str):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if _server.timer_task is not None:
if not _server.timer_task.done():
_server.timer_task.cancel()
cmd.print(f'Timer task has been killed for {name}')
if _server.autorestart:
cmd.print(f'Autorestart for server "{name}" has been disabled.')
_server.autorestart = False
else:
cmd.print(f'Autorestart for server "{name}" has been enabled')
_server.autorestart = True
ace.add_command(server_norestart, 'stk-norestart', ((str, 'name'), ), description='Stop autorestart timer for STK server')
async def server_timedrestart(cmd: AdminCommandExecutor, name: str, interval_mins: int):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
if _server.timer_task is not None:
if not _server.timer_task.done():
_server.timer_task.cancel()
cmd.print(f'Timer task has been killed for {name}')
_server.timed_autorestart = True
_server.timed_autorestart_interval = interval_mins * 60
_server.timer_task = asyncio.create_task(_server._timed_restarter())
cmd.print(f'Restarter enabled with {interval_mins} minutes.')
ace.add_command(server_timedrestart, 'stk-timed-restart', ((str, 'name'), (int, 'interval_mins')), description='Enable autorestart timer for STK server')
async def stk_stopall(cmd: AdminCommandExecutor):
cmd.print('Stopping all the servers forcefully')
for server in ace.servers:
if server.active:
server.restart = False
_tsk = asyncio.create_task(server.stop())
ace.tasks[_tsk.get_name()] = _tsk
ace.add_command(stk_stopall, 'stk-stopall', description='Stops all servers')
async def wrapper_reloadcfg(cmd: AdminCommandExecutor, full=False):
if full:
for server in ace.servers.values():
server.restart = False
await asyncio.gather(*(server.stop(10) for server in ace.servers.values() if server.active))
ace.servers.clear()
load_config()
cmd.print('Configuration reloaded and changes are reverted.')
ace.add_command(wrapper_reloadcfg, 'reloadcfg', optargs=((bool, 'hard reload?'), ),
description='Reloads config.json. When hard reload is enabled, turns all servers off within 10 seconds')
async def list_globallogignore(cmd: AdminCommandExecutor, modname: str, levelname: str, cpage=1):
level = LogLevel[levelname.upper()].value
if modname in ace.global_logignores:
if level in ace.global_logignores[modname]:
_logignores = ace.global_logignores[modname][level]
_len = len(_logignores)
_maxpage, _start, _end = paginate_range(_len, 10, cpage)
cmd.print(f'Global Log-Ignore patterns (page {cpage} of {_maxpage}):')
cmd.print(*(f'#{i}: {logignore.pattern}' for i, logignore in ((i, _logignores[i]) for i in range(_start, _end))), sep='\n')
return
cmd.print(f'No Log-Ignores exist for {modname}: {level}')
ace.add_command(list_globallogignore, 'stk-global-logignores', ((str, 'object name'), (str, 'levelname')), ((int, 'page'), ), 'Shows the list of filters for module (or log object) and log level.')
async def list_globallogignorelevels(cmd: AdminCommandExecutor, modname: str):
if modname not in ace.global_logignores:
cmd.print(f'No Log-Ignores exist for {modname}')
return
_loglevels = (LogLevel(i).name for i in ace.global_logignores[modname].keys())
cmd.print(f'Next levels exist for {modname}:\n{", ".join(_loglevels)}')
ace.add_command(list_globallogignorelevels, 'stk-global-logignore-levels', ((str, 'object name'), ), description='Shows the list of levels that exists for a specific log-object')
async def list_globallogignoreobjects(cmd: AdminCommandExecutor):
cmd.print(f'Next logobjects (or modules) exist in Log-Ignore:\n{", ".join(ace.global_logignores.keys())}')
ace.add_command(list_globallogignoreobjects, 'stk-global-logignore-objects', description='Shows which log objects are registered in Log-Ignore')
async def list_logignore(cmd: AdminCommandExecutor, name: str, modname: str, levelname: str, cpage=1):
if name not in ace.servers:
cmd.error('Server doesn\'t exist', log=False)
return
_server: STKServer = ace.servers[name]
level = LogLevel[levelname.upper()].value
all_log_ignores = _server.log_ignores.maps[0]
if modname in all_log_ignores:
if level in all_log_ignores[modname]:
_logignores = all_log_ignores[modname][level]
_len = len(_logignores)
_maxpage, _start, _end = paginate_range(_len, 10, cpage)
cmd.print(f'Log-Ignores for server {name} (logobject {modname}: {level}) (page {cpage} of {_maxpage}):')
cmd.print(*(f'#{i}: {logignore.pattern}' for i, logignore in ((i, _logignores[i]) for i in range(_start, _end))), sep='\n')
return
cmd.print(f'No Log-Ignores exist for {modname}: {level} at server {name}')
ace.add_command(list_logignore, 'stk-logignores',
((str, 'server name'), (str, 'logobject'), (str, 'loglevel')), ((int, 'page'), ),
description='Shows the Log-Ignore patterns of logobject and loglevel for specific STK server.',
atabcomplete=stkserver_tab)
async def list_logignorelevels(cmd: AdminCommandExecutor, name: str, modname: str):