Skip to content

Commit

Permalink
Fixes #RHINENG-5922 - support multiple kafka brokers (#69)
Browse files Browse the repository at this point in the history
* Fixes #RHINENG-5922 - support multiple kafka brokers

* rename insights_kafka_address
  • Loading branch information
r14chandra authored Jan 3, 2024
1 parent 3d5ef0d commit 66e92f1
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions yuptoo/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def kafka_auth_config(connection_object):

if CLOWDER_ENABLED:
LOG.info("Using Clowder Operator...")
from app_common_python import LoadedConfig, KafkaTopics
from app_common_python import LoadedConfig, KafkaTopics, KafkaServers
KAFKA_BROKER = LoadedConfig.kafka.brokers[0]
INSIGHTS_KAFKA_ADDRESS = KAFKA_BROKER.hostname + ":" + str(KAFKA_BROKER.port)
BOOTSTRAP_SERVERS = KafkaServers
ANNOUNCE_TOPIC = KafkaTopics["platform.upload.announce"].name
UPLOAD_TOPIC = KafkaTopics["platform.inventory.host-ingress"].name
VALIDATION_TOPIC = KafkaTopics["platform.upload.validation"].name
Expand All @@ -33,7 +33,7 @@ def kafka_auth_config(connection_object):
else:
INSIGHTS_KAFKA_HOST = os.getenv('INSIGHTS_KAFKA_HOST', 'localhost')
INSIGHTS_KAFKA_PORT = os.getenv('INSIGHTS_KAFKA_PORT', '29092')
INSIGHTS_KAFKA_ADDRESS = f'{INSIGHTS_KAFKA_HOST}:{INSIGHTS_KAFKA_PORT}'
BOOTSTRAP_SERVERS = f'{INSIGHTS_KAFKA_HOST}:{INSIGHTS_KAFKA_PORT}'
KAFKA_BROKER = None
ANNOUNCE_TOPIC = os.getenv('ANNOUNCE_TOPIC', 'platform.upload.announce')
VALIDATION_TOPIC = os.getenv('VALIDATION_TOPIC', 'platform.upload.validation')
Expand Down
4 changes: 2 additions & 2 deletions yuptoo/lib/consume.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from confluent_kafka import Consumer
from yuptoo.lib.config import (
KAFKA_AUTO_COMMIT,
INSIGHTS_KAFKA_ADDRESS,
BOOTSTRAP_SERVERS,
ANNOUNCE_TOPIC,
KAFKA_CONSUMER_GROUP_ID,
kafka_auth_config
Expand All @@ -10,7 +10,7 @@

def init_consumer():
connection_object = {
'bootstrap.servers': INSIGHTS_KAFKA_ADDRESS,
'bootstrap.servers': ",".join(BOOTSTRAP_SERVERS),
'group.id': KAFKA_CONSUMER_GROUP_ID,
'enable.auto.commit': KAFKA_AUTO_COMMIT
}
Expand Down
4 changes: 2 additions & 2 deletions yuptoo/lib/produce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from confluent_kafka import Producer, KafkaException
from yuptoo.lib.config import (INSIGHTS_KAFKA_ADDRESS, KAFKA_PRODUCER_OVERRIDE_MAX_REQUEST_SIZE,
from yuptoo.lib.config import (BOOTSTRAP_SERVERS, KAFKA_PRODUCER_OVERRIDE_MAX_REQUEST_SIZE,
kafka_auth_config, UPLOAD_TOPIC)
from functools import partial
from yuptoo.lib.metrics import host_uploaded, host_upload_failures
Expand All @@ -13,7 +13,7 @@
def init_producer():
global producer
connection_object = {
'bootstrap.servers': INSIGHTS_KAFKA_ADDRESS,
'bootstrap.servers': ",".join(BOOTSTRAP_SERVERS),
'message.max.bytes': KAFKA_PRODUCER_OVERRIDE_MAX_REQUEST_SIZE
}
kafka_auth_config(connection_object)
Expand Down

0 comments on commit 66e92f1

Please sign in to comment.