Skip to content

Commit

Permalink
Updated backoff handler to user KafkaProducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
ksekou committed Oct 9, 2018
1 parent 7ad404f commit f5332be
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 17 deletions.
3 changes: 1 addition & 2 deletions guillotina_kafka/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@


def deserializer(msg):
result = {}
try:
msg = msg.decode('utf-8')
result = json.loads(msg)
except JSONDecodeError as e:
pass
result = {}
return result


Expand Down
12 changes: 9 additions & 3 deletions guillotina_kafka/consumers/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from guillotina_kafka.consumers import make_consumer
from guillotina.interfaces import ICatalogUtility
from guillotina.component import get_utility
from guillotina_kafka.util import send_to_kafka
from guillotina_kafka.util import get_kafka_producer

logger = logging.getLogger('guillotina_kafka')

Expand All @@ -23,8 +23,9 @@ def log_result(result, label):


async def backoff_hdlr(details):
kafka_producer = get_kafka_producer()
topic = 'T-elasticsearch-dead-letter'
await send_to_kafka(topic, details)
await kafka_producer.send(topic, details)


@backoff.on_exception(
Expand Down Expand Up @@ -67,13 +68,18 @@ async def update_elasticsearch(index_name, action, data):
}[action](util.conn, index_name, data)


def parser(payload):
if sorted(payload.keys()) == sorted(['index', 'action', 'data']):
return payload['action'] in ['index', 'delete', 'delete_children']
return False

async def es_consumer(kafka_hosts, topics, group_id='es_consumer'):
print(f'Starting es_consumer:{group_id} <= {topics!r}')
consumer = await make_consumer(kafka_hosts, topics, group_id)
await consumer.start()
try:
async for msg in consumer:
if msg.value:
if msg.value and parser(msg.value):
_ = await update_elasticsearch(
msg.value.get('index'),
msg.value.get('action'),
Expand Down
12 changes: 0 additions & 12 deletions guillotina_kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ def get_kafa_host():
return f'{host}:{port}'


def get_producer_api_url():
return app_settings['kafka'].get('producer_api_url')


async def get_kafka_producer():
global kafka_producer
if kafka_producer is None:
Expand All @@ -30,14 +26,6 @@ async def get_kafka_producer():
return kafka_producer


async def send_to_kafka(topic, payload):
url = f'{get_producer_api_url()}/{topic}'
auth = aiohttp.BasicAuth(login='root', password='root')
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
respons = await session.post(url, json=payload, auth=auth)
return (respons.status, await respons.json())


class KafkaProducer:

def __init__(self, loop, bootstrap_servers):
Expand Down

0 comments on commit f5332be

Please sign in to comment.