Skip to content

Commit

Permalink
blackfml.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgene committed Nov 16, 2024
1 parent 18a97f3 commit 6c7603d
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 159 deletions.
22 changes: 14 additions & 8 deletions aiokafka/coordinator/assignors/abstract.pyi
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import abc
from typing import Dict, Iterable, Mapping
from aiokafka.cluster import ClusterMetadata
from aiokafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata
from aiokafka.coordinator.protocol import (
ConsumerProtocolMemberAssignment,
ConsumerProtocolMemberMetadata,
)

log = ...

class AbstractPartitionAssignor(abc.ABC):
"""Abstract assignor implementation which does some common grunt work (in particular
collecting partition counts which are always needed in assignors).
"""

@property
@abc.abstractmethod
def name(self) -> str:
""".name should be a string identifying the assignor"""
...

@classmethod
@abc.abstractmethod
def assign(cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata]) -> Dict[str, ConsumerProtocolMemberAssignment]:
def assign(
cls,
cluster: ClusterMetadata,
members: Mapping[str, ConsumerProtocolMemberMetadata],
) -> Dict[str, ConsumerProtocolMemberAssignment]:
"""Perform group assignment given cluster metadata and member subscriptions
Arguments:
Expand All @@ -28,7 +37,7 @@ class AbstractPartitionAssignor(abc.ABC):
dict: {member_id: MemberAssignment}
"""
...

@classmethod
@abc.abstractmethod
def metadata(cls, topics: Iterable[str]) -> ConsumerProtocolMemberMetadata:
Expand All @@ -41,7 +50,7 @@ class AbstractPartitionAssignor(abc.ABC):
MemberMetadata struct
"""
...

@classmethod
@abc.abstractmethod
def on_assignment(cls, assignment: ConsumerProtocolMemberAssignment) -> None:
Expand All @@ -54,6 +63,3 @@ class AbstractPartitionAssignor(abc.ABC):
assignment (MemberAssignment): the member's assignment
"""
...



11 changes: 2 additions & 9 deletions aiokafka/coordinator/protocol.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,20 @@ class ConsumerProtocolMemberMetadata(Struct):
user_data: bytes
SCHEMA = ...


class ConsumerProtocolMemberAssignment(Struct):
class Assignment(NamedTuple):
topic: str
partitions: List[int]
...



version: int
assignment: List[Assignment]
user_data: bytes
SCHEMA = ...
def partitions(self) -> List[TopicPartition]:
...


def partitions(self) -> List[TopicPartition]: ...

class ConsumerProtocol:
PROTOCOL_TYPE = ...
ASSIGNMENT_STRATEGIES = ...
METADATA = ConsumerProtocolMemberMetadata
ASSIGNMENT = ConsumerProtocolMemberAssignment


3 changes: 1 addition & 2 deletions aiokafka/producer/message_accumulator.pyi
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
class BatchBuilder:
...
class BatchBuilder: ...
146 changes: 100 additions & 46 deletions aiokafka/producer/producer.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ from .message_accumulator import BatchBuilder
log = ...
_missing = ...
_DEFAULT_PARTITIONER = ...
def _identity(data: bytes) -> bytes:
...

def _identity(data: bytes) -> bytes: ...

KT = TypeVar("KT", contravariant=True)
VT = TypeVar("VT", contravariant=True)
ET = TypeVar("ET", bound=BaseException)

class AIOKafkaProducer(Generic[KT, VT]):
"""A Kafka client that publishes records to the Kafka cluster.
Expand Down Expand Up @@ -162,33 +164,85 @@ class AIOKafkaProducer(Generic[KT, VT]):
Many configuration parameters are taken from the Java client:
https://kafka.apache.org/documentation.html#producerconfigs
"""

_PRODUCER_CLIENT_ID_SEQUENCE = ...
_COMPRESSORS = ...
_closed = ...
_source_traceback = ...
def __init__(self, *, loop: asyncio.AbstractEventLoop | None = ..., bootstrap_servers: str | list[str] = ..., client_id: str | None = ..., metadata_max_age_ms: int = ..., request_timeout_ms: int = ..., api_version: str = ..., acks: Literal[0] | Literal[1] | Literal["all"] | object = ..., key_serializer: Callable[[KT], bytes] = _identity, value_serializer: Callable[[VT], bytes] = _identity, compression_type: Literal["gzip"] | Literal["snappy"] | Literal["lz4"] | Literal["zstd"] | None = ..., max_batch_size: int = ..., partitioner: Callable[[bytes, list[int], list[int]], int] = ..., max_request_size: int = ..., linger_ms: int = ..., retry_backoff_ms: int = ..., security_protocol: Literal["PLAINTEXT"] | Literal["SSL"] | Literal["SASL_PLAINTEXT"] | Literal["SASL_SSL"] = ..., ssl_context: SSLContext | None = ..., connections_max_idle_ms: int = ..., enable_idempotence: bool = ..., transactional_id: int | str | None = ..., transaction_timeout_ms: int = ..., sasl_mechanism: Literal["PLAIN"] | Literal["GSSAPI"] | Literal["SCRAM-SHA-256"] | Literal["SCRAM-SHA-512"] | Literal["OAUTHBEARER"] = ..., sasl_plain_password: str | None = ..., sasl_plain_username: str | None = ..., sasl_kerberos_service_name: str = ..., sasl_kerberos_domain_name: str | None = ..., sasl_oauth_token_provider: AbstractTokenProvider | None = ...) -> None:
...

def __del__(self, _warnings: ModuleType = ...) -> None:
...

def __init__(
self,
*,
loop: asyncio.AbstractEventLoop | None = ...,
bootstrap_servers: str | list[str] = ...,
client_id: str | None = ...,
metadata_max_age_ms: int = ...,
request_timeout_ms: int = ...,
api_version: str = ...,
acks: Literal[0] | Literal[1] | Literal["all"] | object = ...,
key_serializer: Callable[[KT], bytes] = _identity,
value_serializer: Callable[[VT], bytes] = _identity,
compression_type: (
Literal["gzip"]
| Literal["snappy"]
| Literal["lz4"]
| Literal["zstd"]
| None
) = ...,
max_batch_size: int = ...,
partitioner: Callable[[bytes, list[int], list[int]], int] = ...,
max_request_size: int = ...,
linger_ms: int = ...,
retry_backoff_ms: int = ...,
security_protocol: (
Literal["PLAINTEXT"]
| Literal["SSL"]
| Literal["SASL_PLAINTEXT"]
| Literal["SASL_SSL"]
) = ...,
ssl_context: SSLContext | None = ...,
connections_max_idle_ms: int = ...,
enable_idempotence: bool = ...,
transactional_id: int | str | None = ...,
transaction_timeout_ms: int = ...,
sasl_mechanism: (
Literal["PLAIN"]
| Literal["GSSAPI"]
| Literal["SCRAM-SHA-256"]
| Literal["SCRAM-SHA-512"]
| Literal["OAUTHBEARER"]
) = ...,
sasl_plain_password: str | None = ...,
sasl_plain_username: str | None = ...,
sasl_kerberos_service_name: str = ...,
sasl_kerberos_domain_name: str | None = ...,
sasl_oauth_token_provider: AbstractTokenProvider | None = ...,
) -> None: ...
def __del__(self, _warnings: ModuleType = ...) -> None: ...
async def start(self) -> None:
"""Connect to Kafka cluster and check server version"""
...

async def flush(self) -> None:
"""Wait until all batches are Delivered and futures resolved"""
...

async def stop(self) -> None:
"""Flush all pending data and close all connections to kafka cluster"""
...

async def partitions_for(self, topic: str) -> set[int]:
"""Returns set of all known partitions for the topic."""
...

async def send(self, topic: str, value: VT | None = ..., key: KT | None = ..., partition: int | None = ..., timestamp_ms: int | None = ..., headers: Iterable[tuple[str, bytes]] | None = ...) -> asyncio.Future[RecordMetadata]:

async def send(
self,
topic: str,
value: VT | None = ...,
key: KT | None = ...,
partition: int | None = ...,
timestamp_ms: int | None = ...,
headers: Iterable[tuple[str, bytes]] | None = ...,
) -> asyncio.Future[RecordMetadata]:
"""Publish a message to a topic.
Arguments:
Expand Down Expand Up @@ -233,11 +287,19 @@ class AIOKafkaProducer(Generic[KT, VT]):
**will**.
"""
...

async def send_and_wait(self, topic: str, value: VT | None = ..., key: KT | None = ..., partition: int | None = ..., timestamp_ms: int | None = ..., headers: Iterable[tuple[str, bytes]] | None = ...) -> RecordMetadata:

async def send_and_wait(
self,
topic: str,
value: VT | None = ...,
key: KT | None = ...,
partition: int | None = ...,
timestamp_ms: int | None = ...,
headers: Iterable[tuple[str, bytes]] | None = ...,
) -> RecordMetadata:
"""Publish a message to a topic and wait the result"""
...

def create_batch(self) -> BatchBuilder:
"""Create and return an empty :class:`.BatchBuilder`.
Expand All @@ -247,8 +309,10 @@ class AIOKafkaProducer(Generic[KT, VT]):
BatchBuilder: empty batch to be filled and submitted by the caller.
"""
...

async def send_batch(self, batch: BatchBuilder, topic: str, *, partition: int) -> asyncio.Future[RecordMetadata]:

async def send_batch(
self, batch: BatchBuilder, topic: str, *, partition: int
) -> asyncio.Future[RecordMetadata]:
"""Submit a BatchBuilder for publication.
Arguments:
Expand All @@ -261,37 +325,27 @@ class AIOKafkaProducer(Generic[KT, VT]):
delivered.
"""
...

async def begin_transaction(self) -> None:
...

async def commit_transaction(self) -> None:
...

async def abort_transaction(self) -> None:
...


async def begin_transaction(self) -> None: ...
async def commit_transaction(self) -> None: ...
async def abort_transaction(self) -> None: ...
def transaction(self) -> TransactionContext:
"""Start a transaction context"""
...

async def send_offsets_to_transaction(self, offsets: dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata], group_id: str) -> None:
...

async def __aenter__(self) -> AIOKafkaProducer[KT, VT]:
...

async def __aexit__(self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None) -> None:
...


async def send_offsets_to_transaction(
self,
offsets: dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata],
group_id: str,
) -> None: ...
async def __aenter__(self) -> AIOKafkaProducer[KT, VT]: ...
async def __aexit__(
self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None
) -> None: ...

class TransactionContext:
def __init__(self, producer: AIOKafkaProducer[KT, VT]) -> None:
...

async def __aenter__(self) -> TransactionContext:
...

async def __aexit__(self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None) -> None:
...
def __init__(self, producer: AIOKafkaProducer[KT, VT]) -> None: ...
async def __aenter__(self) -> TransactionContext: ...
async def __aexit__(
self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None
) -> None: ...
15 changes: 4 additions & 11 deletions aiokafka/protocol/abstract.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,13 @@ from io import BytesIO
from typing import Generic, TypeVar

T = TypeVar("T")

class AbstractType(Generic[T], metaclass=abc.ABCMeta):
@classmethod
@abc.abstractmethod
def encode(cls, value: T) -> bytes:
...

def encode(cls, value: T) -> bytes: ...
@classmethod
@abc.abstractmethod
def decode(cls, data: BytesIO) -> T:
...

def decode(cls, data: BytesIO) -> T: ...
@classmethod
def repr(cls, value: T) -> str:
...



def repr(cls, value: T) -> str: ...
Loading

0 comments on commit 6c7603d

Please sign in to comment.