Skip to content

Latest commit

 

History

History
246 lines (164 loc) · 10.2 KB

File metadata and controls

246 lines (164 loc) · 10.2 KB

Apache Kafka

Kafka adopts event-based communication paradigm instead of remote procedure calls or remote service invocations.

The main benefit of this is decoupling, synchronization decoupling, and time decoupling: the producers/consumers don't need to know each other.

graph LR
    subgraph Kafka Cluster
        Broker1[Broker]
        Broker2[Broker]
        Broker3[Broker]
    end

    Producer1[Producer] -->|Writes to| Broker1
    Producer2[Producer] -->|Writes to| Broker2
    Producer3[Producer] -->|Writes to| Broker3

    Consumer1[Consumer] -->|Reads from| Broker1
    Consumer2[Consumer] -->|Reads from| Broker2
    Consumer3[Consumer] -->|Reads from| Broker3

Loading

Most important Kafka stuff:

  • Topic: A category or feed name to which messages are published.
  • Partition: A split within a topic for parallel processing.
  • Producer: Component that publishes messages to Kafka topics.
  • Consumer: Component that subscribes to topics and processes messages.
  • Consumer Group: A group of consumers acting as a single logical subscriber, where each consumer reads messages from a distinct set of partitions.
  • Broker: A Kafka server that stores data and serves clients.
  • ZooKeeper: A service for coordinating and managing the Kafka brokers.
    • Coordinates and manages Kafka brokers.
    • Maintains cluster membership and metadata.
    • Handles leader election for partitions.
    • Essential for Kafka's distributed nature.
    • Being replaced by Kafka's own internal mechanisms in newer versions.
  • Replication: Kafka's method of fault tolerance by duplicating partitions across multiple brokers.
    • Replicates partitions across multiple brokers.
    • Designates one broker as the leader for each partition.
    • Followers replicate data from the leader.
    • On leader failure, a follower is promoted.
    • Ensures no data loss and service continuity.
  • Consumer Offset:
    • Marks position of a consumer in a partition.
    • Unique ID for each message read.
    • Allows consumers to resume from where they left off.
    • Committed periodically for reliability.
    • Managed per consumer in a group for distributed processing.

Pull based

Consumers pull messages from brokers, which offers several advantages. Unlike other messaging systems that use a push-based interaction, Kafka's pull-based approach:

  • Eliminates the need for brokers to store consumer state. Instead, consumers are responsible for storing the offset of the last position they have read. This flexibility enables adding more consumers to the system without reconfiguring the cluster.
  • Enables consumers to resume from where they left off when they come back online after being offline.
  • Allows consumers to pull and process data at their own sustained speed, without negatively impacting producers or the cluster.

Basic classes

Basic Consumer

A basic consumer is suitable for simple consumption patterns where exactly-once processing is not critical.

//props is a list of proprierties 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 

consumer.subscribe(Collections.singletonList(topic));

while (true) {

final ConsumerRecords<String, String> records = consumer.poll(Duration.of(5, ChronoUnit.MINUTES));
  
//operations with records ...

some interesting properties can be:

  • Auto-Commit Enabled: Automatically commits offsets at regular intervals.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true)); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(autoCommitIntervalMs));
  • Offset Reset Strategy: Starts from the latest offset if no previous offset is found.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

Basic Producer

Suitable for general-purpose message production where transactional guarantees are not required:

  • Send and Optionally Wait for Ack: Sends messages without transactional control, with optional wait for acknowledgment.
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
final Future<RecordMetadata> future = producer.send(record);

Topic Manager

The TopicManager class is a Kafka administration utility designed to manage Kafka topics. It's useful for creating, deleting, and listing topics in a Kafka cluster. Note that by default, a topic is created when it is first used.

NewTopic newTopic = new NewTopic(topicName, topicPartitions, replicationFactor);

Semantics and Idempotence

In communication theory, message delivery can be provided with different guarantees:

  1. at Most Once Semantics: no duplicates, but messages can be lost
  2. at Least Once Semantics: messages can be delivered more than once, but they are not lost.
  3. Exactly Once Semantics (EOS): the system behaves as if each message was delivered once and exactly once. In Kafka this is achieved through idempotent operations and transactional updates of offsets.

Idempotence means that a producer can safely retry sending messages without worrying about duplicate messages being created. This is part of Kafka's "exactly once" semantics, which ensures that each message is delivered exactly once to the intended recipients. This line sets the ENABLE_IDEMPOTENCE_CONFIG property of the producer to true, which enables idempotence.

 // Idempotence = exactly once semantics between the producer and the partition
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));

Transactional

If a producer sends messages as part of a transaction, you can use the isolation level to control whether a consumer reads these messages before the transaction is committed. The isolation level determines how the consumer reads messages that are part of a transaction:

Transactional Consumer

Ideal for scenarios requiring data consistency and exactly-once processing in a transactional context.

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

//or also messages that are part of a transaction that hasn't been committed yet:

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");

Transactional Producer

A Transactional Producer is designed for transactional message delivery, ensuring atomicity and consistency in message production. Transactional ID is configured with a transactional ID for transactional message delivery.

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);

Transaction Management: Manages transactions explicitly with beginTransaction, commitTransaction, and abortTransaction.

// This must be called before any method that involves transactions
producer.initTransactions();

producer.beginTransaction(); 
producer.send(record); 
// Commit or abort based on some condition 
producer.commitTransaction(); 
// or 
producer.abortTransaction();

Example:

producer.initTransactions();

final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

producer.beginTransaction();

producer.send(record);

if (condition) producer.commitTransaction();
else {
	// If not flushed, aborted messages are deleted from the outgoing buffer
	producer.flush();
	producer.abortTransaction();
}

Atomic Forwarder

An object/instance can be both Consumer and Producer. We called it AtomicForwarder, which is designed for transactional message forwarding, ensuring EOS.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); 

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

Sets up consumer properties including server address, group ID, deserializers, and isolation level.

consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); 

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));

Configures the producer with a transactional ID and enables idempotence:

producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, producerTransactionalId); 

producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));

And then Polling and Forwarding: continuously polls for new records and forwards them to another topic within a transaction.

// This must be called before any method that involves transactions
producer.initTransactions();

//...

final ConsumerRecords<String, String> records = consumer.poll(Duration.of(5, ChronoUnit.MINUTES)); 

producer.beginTransaction(); 

for (final ConsumerRecord<String, String> record : records){
	producer.send(new ProducerRecord<>(outputTopic,record.key(),record.value())); 
}

//Manual offset management **within** the transaction
producer.sendOffsetsToTransaction(map,consumer.groupMetadata()); 

producer.commitTransaction();

If you have one partition and two consumers in the same consumer group, only one of the consumers will be able to read from the partition. This is because in Kafka, each partition is consumed by exactly one consumer in a group at a time. The other consumer will be idle and won't receive any messages unless the active consumer goes down. If the active consumer fails, the other consumer will take over and start consuming messages from the partition.

Partitions in Kafka primarily speed up the consumption of data. By having multiple partitions, you can have multiple consumers in a consumer group each reading from a different partition concurrently, thus allowing for data to be read in parallel. This increases the throughput of data consumption.

If there are two consumers and two partitions, each consumer will be reading from one partition. If one consumer fails, Kafka's consumer group coordinator will notice the failure (due to the lack of heartbeat from the failed consumer) and trigger a rebalance of the consumer group. The remaining consumer will take over the partition that was being read by the failed consumer, in addition to the partition it was already consuming. So, the remaining consumer will start consuming messages from both partitions.

Rebalancing: When a consumer fails (or a new consumer joins, or a topic partition count changes), Kafka triggers a rebalance. During a rebalance, the broker reassigns the partitions among the available consumers in the group.