Skip to content

Commit

Permalink
[DOP-16662] Add Kafka consumer based on FastStream
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 4, 2024
1 parent 861e18e commit 1912bc0
Show file tree
Hide file tree
Showing 33 changed files with 725 additions and 9 deletions.
21 changes: 21 additions & 0 deletions .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,30 @@ POSTGRES_DB=arrakis
POSTGRES_USER=arrakis
POSTGRES_PASSWORD=changeme

# Init Kafka
KAFKA_CFG_NODE_ID=0
KAFKA_CFG_PROCESS_ROLES=controller,broker
KAFKA_CFG_LISTENERS=DOCKER://:9092,LOCALHOST://:9093,CONTROLLER://:9094,INTERBROKER://:9095
KAFKA_CFG_ADVERTISED_LISTENERS=DOCKER://kafka:9092,LOCALHOST://localhost:9093,INTERBROKER://kafka:9095
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERBROKER:PLAINTEXT,DOCKER:SASL_PLAINTEXT,LOCALHOST:SASL_PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9094
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=DOCKER
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
KAFKA_CLIENT_USERS=arrakis
KAFKA_CLIENT_PASSWORDS=changeme
KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256

# Common backend config
ARRAKIS__LOGGING__PRESET=colored
ARRAKIS__DATABASE__URL=postgresql+asyncpg://arrakis:changeme@db:5432/arrakis

# See Backend -> Server -> Configuration documentation
ARRAKIS__SERVER__DEBUG=false

# See Backend -> Consumer -> Configuration documentation
ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=kafka:9092
ARRAKIS__KAFKA__SECURITY__TYPE=scram-sha256
ARRAKIS__KAFKA__SECURITY__USER=arrakis
ARRAKIS__KAFKA__SECURITY__PASSWORD=changeme
5 changes: 5 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@ export ARRAKIS__LOGGING__PRESET=colored

export ARRAKIS__DATABASE__URL=postgresql+asyncpg://arrakis:changeme@localhost:5432/arrakis

export ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=localhost:9093
export ARRAKIS__KAFKA__SECURITY__TYPE=scram-sha256
export ARRAKIS__KAFKA__SECURITY__USER=arrakis
export ARRAKIS__KAFKA__SECURITY__PASSWORD=changeme

export ARRAKIS__SERVER__DEBUG=true
2 changes: 0 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ repos:
- id: check-added-large-files
- id: check-ast
- id: check-merge-conflict
- id: check-yaml
args: [--unsafe]
- id: debug-statements
- id: detect-private-key
- id: end-of-file-fixer
Expand Down
14 changes: 12 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ db-partitions: ##@DB Create partitions
${POETRY} run python -m arrakis.db.scripts.create_partitions $(ARGS)


test: db-start ##@Test Run tests
broker: broker-start ##@Broker Prepare broker (in docker)

broker-start: ##Broker Start broker
docker compose -f docker-compose.test.yml up -d --wait kafka $(DOCKER_COMPOSE_ARGS)


test: db-start broker-start ##@Test Run tests
${POETRY} run pytest $(PYTEST_ARGS)

check-fixtures: ##@Test Check declared fixtures
Expand All @@ -69,11 +75,15 @@ cleanup: ##@Test Cleanup tests dependencies



dev: db-start ##@Application Run development server (without docker)
dev-server: db-start ##@Application Run development server (without docker)
${POETRY} run python -m arrakis.server $(ARGS)

dev-consumer: broker-start ##@Application Run development broker (without docker)
${POETRY} run python -m arrakis.consumer $(ARGS)

prod-build: ##@Application Build docker image
docker build --progress=plain --network=host -t mtsrus/arrakis-server:develop -f ./docker/Dockerfile.server $(ARGS) .
docker build --progress=plain --network=host -t mtsrus/arrakis-consumer:develop -f ./docker/Dockerfile.consumer $(ARGS) .

prod: ##@Application Run production server (with docker)
docker compose up -d
Expand Down
41 changes: 41 additions & 0 deletions arrakis/consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

import logging

from faststream import FastStream
from faststream.kafka import KafkaBroker

import arrakis
from arrakis.consumer.handlers import router
from arrakis.consumer.settings import ConsumerApplicationSettings
from arrakis.consumer.settings.security import get_broker_security
from arrakis.logging.setup_logging import setup_logging

logger = logging.getLogger(__name__)


def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
broker = KafkaBroker(
bootstrap_servers=settings.kafka.bootstrap_servers,
security=get_broker_security(settings.kafka.security),
logger=logger,
)
broker.include_router(router)
return broker


def application_factory(settings: ConsumerApplicationSettings) -> FastStream:
return FastStream(
broker=broker_factory(settings),
title="Arrakis",
description="Arrakis is a nextgen DataLineage service",
version=arrakis.__version__,
logger=logger,
)


def get_application():
settings = ConsumerApplicationSettings()
setup_logging(settings.logging)
return application_factory(settings=settings)
36 changes: 36 additions & 0 deletions arrakis/consumer/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/env python3
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import os
import sys
from pathlib import Path

from faststream.cli.main import cli

here = Path(__file__).resolve()


def main(prog_name: str | None = None, args: list[str] | None = None):
"""Run FastStream and pass the command line arguments to it."""
if args is None:
args = sys.argv.copy()
prog_name = args.pop(0)

if not prog_name:
prog_name = os.fspath(here)

args = args.copy()
# prepend config path before command line arguments
args.insert(0, "run")
args.insert(1, "arrakis.consumer:get_application")
args.insert(2, "--factory")

# call uvicorn
cli(args, prog_name=prog_name)


if __name__ == "__main__":
main()
14 changes: 14 additions & 0 deletions arrakis/consumer/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from faststream import Logger
from faststream.kafka import KafkaRouter

router = KafkaRouter()


@router.subscriber("input")
@router.publisher("output")
async def base_handler(body: dict, logger: Logger):
logger.info("Test handler, %s", body)
return {"handler": body}
49 changes: 49 additions & 0 deletions arrakis/consumer/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from arrakis.consumer.settings.kafka import KafkaSettings
from arrakis.db.settings import DatabaseSettings
from arrakis.logging.settings import LoggingSettings


class ConsumerApplicationSettings(BaseSettings):
"""Arrakis Kafka consumer settings.
Application can be configured in 2 ways:
* By explicitly passing ``settings`` object as an argument to :obj:`application_factory <arrakis.consumer.main.application_factory>`
* By setting up environment variables matching a specific key.
All environment variable names are written in uppercase and should be prefixed with ``ARRAKIS__``.
Nested items are delimited with ``__``.
More details can be found in `Pydantic documentation <https://docs.pydantic.dev/latest/concepts/pydantic_settings/>`_.
Examples
--------
.. code-block:: bash
# same as settings.database.url = "postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis"
ARRAKIS__DATABASE__URL=postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis
# same as settings.kafka.bootstrap_servers = "postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis"
ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis
# same as settings.logging.preset = "json"
ARRAKIS__LOGGING__PRESET=json
"""

database: DatabaseSettings = Field(description=":ref:`Database settings <configuration-consumer-database>`")
kafka: KafkaSettings = Field(
description=":ref:`Kafka settings <configuration-consumer-kafka>`",
)
logging: LoggingSettings = Field(
default_factory=LoggingSettings,
description=":ref:`Logging settings <configuration-consumer-logging>`",
)

model_config = SettingsConfigDict(env_prefix="ARRAKIS__", env_nested_delimiter="__", extra="forbid")
28 changes: 28 additions & 0 deletions arrakis/consumer/settings/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import BaseModel, Field

from arrakis.consumer.settings.security import KafkaSecuritySettings


class KafkaSettings(BaseModel):
"""Arrakis consumer Kafka-specific settings.
Examples
--------
.. code-block:: bash
ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092
ARRAKIS__KAFKA__SECURITY__TYPE=scram-256
"""

bootstrap_servers: str = Field(
description="List of Kafka bootstrap servers",
min_length=1,
)
security: KafkaSecuritySettings = Field(
default_factory=KafkaSecuritySettings,
description=":ref:`Kafka security settings <configuration-consumer-kafka-security>`",
)
54 changes: 54 additions & 0 deletions arrakis/consumer/settings/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from typing import Literal

from faststream.security import BaseSecurity, SASLPlaintext, SASLScram256, SASLScram512
from pydantic import BaseModel, Field, SecretStr, ValidationInfo, field_validator


class KafkaSecuritySettings(BaseModel):
"""Kafka security specific settings.
Examples
--------
.. code-block:: bash
ARRAKIS__KAFKA__SECURITY__TYPE=scram-256
ARRAKIS__KAFKA__SECURITY__USER=dummy
ARRAKIS__KAFKA__SECURITY__PASSWORD=changeme
"""

type: Literal["plaintext", "scram-sha256", "scram-sha512"] | None = Field(
default=None,
description="Kafka security type",
)
user: str | None = Field(default=None, description="Kafka security username")
password: SecretStr | None = Field(default=None, description="Kafka security password")

@field_validator("user", "password", mode="after")
@classmethod
def check_security(cls, value: str, info: ValidationInfo):
security_type = info.data.get("type")
if security_type is None:
return value
if value is None:
raise ValueError(f"User or password is required for security type {security_type!r}")
return value


def get_broker_security(settings: KafkaSecuritySettings) -> BaseSecurity:
if not settings.type:
return BaseSecurity()

security_class: type[BaseSecurity]
match settings.type:
case "plaintext":
security_class = SASLPlaintext
case "scram-sha256":
security_class = SASLScram256
case "scram-sha512":
security_class = SASLScram512

return security_class(settings.user, settings.password.get_secret_value()) # type: ignore[union-attr, arg-type]
27 changes: 27 additions & 0 deletions arrakis/logging/presets/colored.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,35 @@ filters:
(): asgi_correlation_id.CorrelationIdFilter
uuid_length: 32
default_value: '-'
faststream:
(): faststream.log.logging.ExtendedFilter
default_context:
topic: ''
group_id: ''
message_id_ln: 10

formatters:
colored:
(): coloredlogs.ColoredFormatter
# Add correlation_id to log records
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(correlation_id)s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
kafka_colored:
(): coloredlogs.ColoredFormatter
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(topic)s %(group_id)s %(message_id)s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'

handlers:
main:
class: logging.StreamHandler
formatter: colored
filters: [correlation_id]
stream: ext://sys.stdout
faststream:
class: logging.StreamHandler
formatter: kafka_colored
filters: [faststream]
stream: ext://sys.stdout

loggers:
'':
Expand All @@ -34,3 +49,15 @@ loggers:
handlers: [main]
level: INFO
propagate: false
arrakis.consumer:
handlers: [faststream]
level: INFO
propagate: false
aiokafka:
handlers: [faststream]
level: INFO
propagate: false
aiokafka.conn:
handlers: [faststream]
level: ERROR
propagate: false
Loading

0 comments on commit 1912bc0

Please sign in to comment.