-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrmqclient.py
68 lines (50 loc) · 1.39 KB
/
rmqclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import aiormq
import asyncio
import logging
from signal import SIGINT, SIGTERM
log = logging.getLogger(__name__)
class RMQClient:
def __init__(self):
self._conn = None
self._ch = []
self.on_close = None
self.closed = False
async def connect(self, *args, **kwargs):
"""
Raises ConnectionError
"""
log.debug(f"Connect to {args!r}")
self._conn = await aiormq.connect(*args, **kwargs)
self._conn.closing.add_done_callback(self.on_connection_close)
def on_connection_close(self, fut):
log.debug("Connection closed")
self.closed = True
if self.on_close:
self.on_close()
def on_channel_close(self, fut):
log.debug("Channel closed")
self.closed = True
if self.on_close:
self.on_close()
async def channel(self):
"""
Raises Exception
"""
assert self._conn
log.debug("New channel")
ch = await self._conn.channel()
self._ch.append(ch)
ch.closing.add_done_callback(self.on_channel_close)
return ch
async def close(self):
log.debug("Close")
self.closed = True
try:
for ch in self._ch:
await ch.close()
if self._conn:
await self._conn.close()
# Hack: Wait until aiormq's auxiliary tasks are done (they are not awaited for some reason)
await asyncio.sleep(0.1)
except Exception as ex:
log.debug(f"Exception while closing: {ex}")