-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOP-16662] Add Kafka consumer based on FastStream
- Loading branch information
Showing
33 changed files
with
725 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
|
||
from faststream import FastStream | ||
from faststream.kafka import KafkaBroker | ||
|
||
import arrakis | ||
from arrakis.consumer.handlers import router | ||
from arrakis.consumer.settings import ConsumerApplicationSettings | ||
from arrakis.consumer.settings.security import get_broker_security | ||
from arrakis.logging.setup_logging import setup_logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker: | ||
broker = KafkaBroker( | ||
bootstrap_servers=settings.kafka.bootstrap_servers, | ||
security=get_broker_security(settings.kafka.security), | ||
logger=logger, | ||
) | ||
broker.include_router(router) | ||
return broker | ||
|
||
|
||
def application_factory(settings: ConsumerApplicationSettings) -> FastStream: | ||
return FastStream( | ||
broker=broker_factory(settings), | ||
title="Arrakis", | ||
description="Arrakis is a nextgen DataLineage service", | ||
version=arrakis.__version__, | ||
logger=logger, | ||
) | ||
|
||
|
||
def get_application(): | ||
settings = ConsumerApplicationSettings() | ||
setup_logging(settings.logging) | ||
return application_factory(settings=settings) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
#!/bin/env python3 | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from __future__ import annotations | ||
|
||
import os | ||
import sys | ||
from pathlib import Path | ||
|
||
from faststream.cli.main import cli | ||
|
||
here = Path(__file__).resolve() | ||
|
||
|
||
def main(prog_name: str | None = None, args: list[str] | None = None): | ||
"""Run FastStream and pass the command line arguments to it.""" | ||
if args is None: | ||
args = sys.argv.copy() | ||
prog_name = args.pop(0) | ||
|
||
if not prog_name: | ||
prog_name = os.fspath(here) | ||
|
||
args = args.copy() | ||
# prepend config path before command line arguments | ||
args.insert(0, "run") | ||
args.insert(1, "arrakis.consumer:get_application") | ||
args.insert(2, "--factory") | ||
|
||
# call uvicorn | ||
cli(args, prog_name=prog_name) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from faststream import Logger | ||
from faststream.kafka import KafkaRouter | ||
|
||
router = KafkaRouter() | ||
|
||
|
||
@router.subscriber("input") | ||
@router.publisher("output") | ||
async def base_handler(body: dict, logger: Logger): | ||
logger.info("Test handler, %s", body) | ||
return {"handler": body} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from pydantic import Field | ||
from pydantic_settings import BaseSettings, SettingsConfigDict | ||
|
||
from arrakis.consumer.settings.kafka import KafkaSettings | ||
from arrakis.db.settings import DatabaseSettings | ||
from arrakis.logging.settings import LoggingSettings | ||
|
||
|
||
class ConsumerApplicationSettings(BaseSettings): | ||
"""Arrakis Kafka consumer settings. | ||
Application can be configured in 2 ways: | ||
* By explicitly passing ``settings`` object as an argument to :obj:`application_factory <arrakis.consumer.main.application_factory>` | ||
* By setting up environment variables matching a specific key. | ||
All environment variable names are written in uppercase and should be prefixed with ``ARRAKIS__``. | ||
Nested items are delimited with ``__``. | ||
More details can be found in `Pydantic documentation <https://docs.pydantic.dev/latest/concepts/pydantic_settings/>`_. | ||
Examples | ||
-------- | ||
.. code-block:: bash | ||
# same as settings.database.url = "postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis" | ||
ARRAKIS__DATABASE__URL=postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis | ||
# same as settings.kafka.bootstrap_servers = "postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis" | ||
ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=postgresql+asyncpg://postgres:postgres@localhost:5432/arrakis | ||
# same as settings.logging.preset = "json" | ||
ARRAKIS__LOGGING__PRESET=json | ||
""" | ||
|
||
database: DatabaseSettings = Field(description=":ref:`Database settings <configuration-consumer-database>`") | ||
kafka: KafkaSettings = Field( | ||
description=":ref:`Kafka settings <configuration-consumer-kafka>`", | ||
) | ||
logging: LoggingSettings = Field( | ||
default_factory=LoggingSettings, | ||
description=":ref:`Logging settings <configuration-consumer-logging>`", | ||
) | ||
|
||
model_config = SettingsConfigDict(env_prefix="ARRAKIS__", env_nested_delimiter="__", extra="forbid") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from pydantic import BaseModel, Field | ||
|
||
from arrakis.consumer.settings.security import KafkaSecuritySettings | ||
|
||
|
||
class KafkaSettings(BaseModel): | ||
"""Arrakis consumer Kafka-specific settings. | ||
Examples | ||
-------- | ||
.. code-block:: bash | ||
ARRAKIS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092 | ||
ARRAKIS__KAFKA__SECURITY__TYPE=scram-256 | ||
""" | ||
|
||
bootstrap_servers: str = Field( | ||
description="List of Kafka bootstrap servers", | ||
min_length=1, | ||
) | ||
security: KafkaSecuritySettings = Field( | ||
default_factory=KafkaSecuritySettings, | ||
description=":ref:`Kafka security settings <configuration-consumer-kafka-security>`", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# SPDX-FileCopyrightText: 2024 MTS PJSC | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from typing import Literal | ||
|
||
from faststream.security import BaseSecurity, SASLPlaintext, SASLScram256, SASLScram512 | ||
from pydantic import BaseModel, Field, SecretStr, ValidationInfo, field_validator | ||
|
||
|
||
class KafkaSecuritySettings(BaseModel): | ||
"""Kafka security specific settings. | ||
Examples | ||
-------- | ||
.. code-block:: bash | ||
ARRAKIS__KAFKA__SECURITY__TYPE=scram-256 | ||
ARRAKIS__KAFKA__SECURITY__USER=dummy | ||
ARRAKIS__KAFKA__SECURITY__PASSWORD=changeme | ||
""" | ||
|
||
type: Literal["plaintext", "scram-sha256", "scram-sha512"] | None = Field( | ||
default=None, | ||
description="Kafka security type", | ||
) | ||
user: str | None = Field(default=None, description="Kafka security username") | ||
password: SecretStr | None = Field(default=None, description="Kafka security password") | ||
|
||
@field_validator("user", "password", mode="after") | ||
@classmethod | ||
def check_security(cls, value: str, info: ValidationInfo): | ||
security_type = info.data.get("type") | ||
if security_type is None: | ||
return value | ||
if value is None: | ||
raise ValueError(f"User or password is required for security type {security_type!r}") | ||
return value | ||
|
||
|
||
def get_broker_security(settings: KafkaSecuritySettings) -> BaseSecurity: | ||
if not settings.type: | ||
return BaseSecurity() | ||
|
||
security_class: type[BaseSecurity] | ||
match settings.type: | ||
case "plaintext": | ||
security_class = SASLPlaintext | ||
case "scram-sha256": | ||
security_class = SASLScram256 | ||
case "scram-sha512": | ||
security_class = SASLScram512 | ||
|
||
return security_class(settings.user, settings.password.get_secret_value()) # type: ignore[union-attr, arg-type] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.