Kafka adopts event-based communication paradigm instead of remote procedure calls or remote service invocations.
The main benefit of this is decoupling, synchronization decoupling, and time decoupling: the producers/consumers don't need to know each other.
graph LR
subgraph Kafka Cluster
Broker1[Broker]
Broker2[Broker]
Broker3[Broker]
end
Producer1[Producer] -->|Writes to| Broker1
Producer2[Producer] -->|Writes to| Broker2
Producer3[Producer] -->|Writes to| Broker3
Consumer1[Consumer] -->|Reads from| Broker1
Consumer2[Consumer] -->|Reads from| Broker2
Consumer3[Consumer] -->|Reads from| Broker3
Most important Kafka stuff:
- Topic: A category or feed name to which messages are published.
- Partition: A split within a topic for parallel processing.
- Producer: Component that publishes messages to Kafka topics.
- Consumer: Component that subscribes to topics and processes messages.
- Consumer Group: A group of consumers acting as a single logical subscriber, where each consumer reads messages from a distinct set of partitions.
- Broker: A Kafka server that stores data and serves clients.
- ZooKeeper: A service for coordinating and managing the Kafka brokers.
- Coordinates and manages Kafka brokers.
- Maintains cluster membership and metadata.
- Handles leader election for partitions.
- Essential for Kafka's distributed nature.
- Being replaced by Kafka's own internal mechanisms in newer versions.
- Replication: Kafka's method of fault tolerance by duplicating partitions across multiple brokers.
- Replicates partitions across multiple brokers.
- Designates one broker as the leader for each partition.
- Followers replicate data from the leader.
- On leader failure, a follower is promoted.
- Ensures no data loss and service continuity.
- Consumer Offset:
- Marks position of a consumer in a partition.
- Unique ID for each message read.
- Allows consumers to resume from where they left off.
- Committed periodically for reliability.
- Managed per consumer in a group for distributed processing.
Consumers pull messages from brokers, which offers several advantages. Unlike other messaging systems that use a push-based interaction, Kafka's pull-based approach:
- Eliminates the need for brokers to store consumer state. Instead, consumers are responsible for storing the offset of the last position they have read. This flexibility enables adding more consumers to the system without reconfiguring the cluster.
- Enables consumers to resume from where they left off when they come back online after being offline.
- Allows consumers to pull and process data at their own sustained speed, without negatively impacting producers or the cluster.
A basic consumer is suitable for simple consumption patterns where exactly-once processing is not critical.
//props is a list of proprierties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, String> records = consumer.poll(Duration.of(5, ChronoUnit.MINUTES));
//operations with records ...
some interesting properties can be:
- Auto-Commit Enabled: Automatically commits offsets at regular intervals.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(true)); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(autoCommitIntervalMs));
- Offset Reset Strategy: Starts from the latest offset if no previous offset is found.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
Suitable for general-purpose message production where transactional guarantees are not required:
- Send and Optionally Wait for Ack: Sends messages without transactional control, with optional wait for acknowledgment.
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
final Future<RecordMetadata> future = producer.send(record);
The TopicManager
class is a Kafka administration utility designed to manage Kafka topics. It's useful for creating, deleting, and listing topics in a Kafka cluster.
Note that by default, a topic is created when it is first used.
NewTopic newTopic = new NewTopic(topicName, topicPartitions, replicationFactor);
In communication theory, message delivery can be provided with different guarantees:
- at Most Once Semantics: no duplicates, but messages can be lost
- at Least Once Semantics: messages can be delivered more than once, but they are not lost.
- Exactly Once Semantics (EOS): the system behaves as if each message was delivered once and exactly once. In Kafka this is achieved through idempotent operations and transactional updates of offsets.
Idempotence means that a producer can safely retry sending messages without worrying about duplicate messages being created. This is part of Kafka's "exactly once" semantics, which ensures that each message is delivered exactly once to the intended recipients. This line sets the ENABLE_IDEMPOTENCE_CONFIG
property of the producer to true
, which enables idempotence.
// Idempotence = exactly once semantics between the producer and the partition
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));
If a producer sends messages as part of a transaction, you can use the isolation level to control whether a consumer reads these messages before the transaction is committed. The isolation level determines how the consumer reads messages that are part of a transaction:
Ideal for scenarios requiring data consistency and exactly-once processing in a transactional context.
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
//or also messages that are part of a transaction that hasn't been committed yet:
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
A Transactional Producer is designed for transactional message delivery, ensuring atomicity and consistency in message production. Transactional ID is configured with a transactional ID for transactional message delivery.
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
Transaction Management: Manages transactions explicitly with beginTransaction
, commitTransaction
, and abortTransaction
.
// This must be called before any method that involves transactions
producer.initTransactions();
producer.beginTransaction();
producer.send(record);
// Commit or abort based on some condition
producer.commitTransaction();
// or
producer.abortTransaction();
Example:
producer.initTransactions();
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.beginTransaction();
producer.send(record);
if (condition) producer.commitTransaction();
else {
// If not flushed, aborted messages are deleted from the outgoing buffer
producer.flush();
producer.abortTransaction();
}
An object/instance can be both Consumer and Producer. We called it AtomicForwarder
, which is designed for transactional message forwarding, ensuring EOS.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
Sets up consumer properties including server address, group ID, deserializers, and isolation level.
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));
Configures the producer with a transactional ID and enables idempotence:
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, producerTransactionalId);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, String.valueOf(true));
And then Polling and Forwarding: continuously polls for new records and forwards them to another topic within a transaction.
// This must be called before any method that involves transactions
producer.initTransactions();
//...
final ConsumerRecords<String, String> records = consumer.poll(Duration.of(5, ChronoUnit.MINUTES));
producer.beginTransaction();
for (final ConsumerRecord<String, String> record : records){
producer.send(new ProducerRecord<>(outputTopic,record.key(),record.value()));
}
//Manual offset management **within** the transaction
producer.sendOffsetsToTransaction(map,consumer.groupMetadata());
producer.commitTransaction();
If you have one partition and two consumers in the same consumer group, only one of the consumers will be able to read from the partition. This is because in Kafka, each partition is consumed by exactly one consumer in a group at a time. The other consumer will be idle and won't receive any messages unless the active consumer goes down. If the active consumer fails, the other consumer will take over and start consuming messages from the partition.
Partitions in Kafka primarily speed up the consumption of data. By having multiple partitions, you can have multiple consumers in a consumer group each reading from a different partition concurrently, thus allowing for data to be read in parallel. This increases the throughput of data consumption.
If there are two consumers and two partitions, each consumer will be reading from one partition. If one consumer fails, Kafka's consumer group coordinator will notice the failure (due to the lack of heartbeat from the failed consumer) and trigger a rebalance of the consumer group. The remaining consumer will take over the partition that was being read by the failed consumer, in addition to the partition it was already consuming. So, the remaining consumer will start consuming messages from both partitions.
Rebalancing: When a consumer fails (or a new consumer joins, or a topic partition count changes), Kafka triggers a rebalance. During a rebalance, the broker reassigns the partitions among the available consumers in the group.