Skip to content

Commit 37b8c5d

Browse files
committed
Initial commit
0 parents  commit 37b8c5d

8 files changed

+107
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
venv
2+
.pytest_cache
3+
.idea

docker-compose.yml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
version: '3'
2+
3+
services:
4+
rabbit:
5+
image: rabbitmq:3-management-alpine
6+
ports:
7+
- 8888:15672
8+
- 5672:5672

eggplant/__init__.py

Whitespace-only changes.

eggplant/core.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
class Eggplant:
2+
def __init__(self, broker):
3+
self._broker = broker
4+
self._handlers = {}
5+
6+
def handler(self, message_name):
7+
def decorator_handler(func):
8+
self._handlers.update({message_name: func})
9+
10+
return decorator_handler
11+
12+
def start(self):
13+
self._broker.consume(topics=self._handlers.keys(), callback=self._on_message)
14+
15+
def _on_message(self, message, delivery_info):
16+
self._handlers[delivery_info['topic']](message)
17+
18+
def stop(self):
19+
self._broker.stop()

eggplant/kumbo.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from kombu import Connection, Exchange, Queue, binding
2+
from kombu.mixins import ConsumerMixin
3+
4+
5+
class Worker(ConsumerMixin):
6+
def __init__(self, connection, queues, callback):
7+
self._connection = connection
8+
self._queues = queues
9+
self._callback = callback
10+
11+
def get_consumers(self, consumer, channel):
12+
return [consumer(queues=self._queues,
13+
callbacks=[self.on_message])]
14+
15+
def on_message(self, body, message):
16+
self._callback(
17+
body,
18+
{'topic': message.delivery_info['routing_key']})
19+
message.ack()
20+
21+
22+
class RabbitKombuConsumer:
23+
24+
def __init__(self, amqp_uri, exchange, queue):
25+
self._amqp_uri = amqp_uri
26+
self._exchange = exchange
27+
self._queue = queue
28+
self._worker = None
29+
30+
def consume(self, topics, callback):
31+
queue = self._create_queue_for(topics)
32+
with Connection(self._amqp_uri, heartbeat=4) as conn:
33+
self._worker = Worker(conn, [queue], callback)
34+
self._worker.run()
35+
36+
def _create_queue_for(self, topics):
37+
exchange = Exchange(self._exchange, type="topic")
38+
queue = Queue(
39+
self._queue,
40+
exchange,
41+
bindings=[binding(exchange, routing_key=t) for t in topics])
42+
return queue
43+
44+
def stop(self):
45+
self._worker.should_stop = True

requirements.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest==5.3.5
2+
pika==1.1.0
3+
busypie==0.3.0

tests/__init__.py

Whitespace-only changes.

tests/test_consuming.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from threading import Thread
2+
from time import sleep
3+
4+
from busypie import wait, FIVE_SECONDS
5+
from kombu import Connection, Exchange, Producer
6+
7+
from eggplant.core import Eggplant
8+
from eggplant.kumbo import RabbitKombuConsumer
9+
10+
11+
def test_consume_using_function_handler():
12+
app = Eggplant(RabbitKombuConsumer(amqp_uri='amqp://localhost', exchange='eggplant-exchange', queue='test_queue'))
13+
received_messages = []
14+
15+
@app.handler('status_changed')
16+
def status_changed_handler(message):
17+
received_messages.append(message)
18+
19+
Thread(target=app.start).start()
20+
sleep(1)
21+
_send_message('status_changed', 'enabled')
22+
wait().at_most(FIVE_SECONDS).until(lambda: 'enabled' in received_messages)
23+
app.stop()
24+
25+
26+
def _send_message(topic, message):
27+
with Connection('amqp://localhost') as conn:
28+
exchange = Exchange("eggplant-exchange", type="topic")
29+
Producer(exchange=exchange, channel=conn.channel(), routing_key=topic).publish(message)

0 commit comments

Comments
 (0)