Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e ack, checkpointing and metrics to Postgres stream processing #5375

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public Map<String, ParentTable> getParentTableMap(StreamPartition streamPartitio

/**
* Detects if a binlog event contains cascading updates and if detected, creates resync partitions
* @param event event
* @param event binlog event
* @param parentTableMap parent table map
* @param tableMetadata table meta data
* @param tableMetadata table metadata
*/
public void detectCascadingUpdates(Event event, Map<String, ParentTable> parentTableMap, TableMetadata tableMetadata) {
final UpdateRowsEventData data = event.getData();
Expand Down Expand Up @@ -143,9 +143,9 @@ public void detectCascadingUpdates(Event event, Map<String, ParentTable> parentT

/**
* Detects if a binlog event contains cascading deletes and if detected, creates resync partitions
* @param event event
* @param event binlog event
* @param parentTableMap parent table map
* @param tableMetadata table meta data
* @param tableMetadata table metadata
*/
public void detectCascadingDeletes(Event event, Map<String, ParentTable> parentTableMap, TableMetadata tableMetadata) {
if (parentTableMap.containsKey(tableMetadata.getFullTableName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public BinlogEventListener(final StreamPartition streamPartition,
this.dbTableMetadata = dbTableMetadata;
this.streamCheckpointManager = new StreamCheckpointManager(
streamCheckpointer, sourceConfig.isAcknowledgmentsEnabled(),
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout());
acknowledgementSetManager, this::stopClient, sourceConfig.getStreamAcknowledgmentTimeout(),
sourceConfig.getEngine(), pluginMetrics);
streamCheckpointManager.start();

this.cascadeActionDetector = cascadeActionDetector;
Expand Down Expand Up @@ -200,7 +201,7 @@ void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {

// Trigger a checkpoint update for this rotate when there're no row mutation events being processed
if (streamCheckpointManager.getChangeEventStatuses().isEmpty()) {
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
ChangeEventStatus changeEventStatus = streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, 0);
if (isAcknowledgmentsEnabled) {
changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
}
Expand Down Expand Up @@ -347,9 +348,10 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
LOG.debug("Current binlog coordinate after receiving a row change event: " + currentBinlogCoordinate);
}

final long recordCount = rows.size();
AcknowledgementSet acknowledgementSet = null;
if (isAcknowledgmentsEnabled) {
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate);
acknowledgementSet = streamCheckpointManager.createAcknowledgmentSet(currentBinlogCoordinate, recordCount);
}

final long bytes = event.toString().getBytes().length;
Expand Down Expand Up @@ -398,7 +400,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
if (isAcknowledgmentsEnabled) {
acknowledgementSet.complete();
} else {
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate);
streamCheckpointManager.saveChangeEventsStatus(currentBinlogCoordinate, recordCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.postgresql.replication.LogSequenceNumber;

public class ChangeEventStatus {

private final BinlogCoordinate binlogCoordinate;
private final LogSequenceNumber logSequenceNumber;
private final long timestamp;
private final long recordCount;
private volatile AcknowledgmentStatus acknowledgmentStatus;

public enum AcknowledgmentStatus {
Expand All @@ -19,9 +22,19 @@ public enum AcknowledgmentStatus {
NO_ACK
}

public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp) {
public ChangeEventStatus(final BinlogCoordinate binlogCoordinate, final long timestamp, final long recordCount) {
this.binlogCoordinate = binlogCoordinate;
this.logSequenceNumber = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this constructor ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In StreamCheckpointManager, we use a ConcurrentLinkedQueue<ChangeEventStatus> to track change event status. To make this ChangeEventStatus compatible for both MySQL and Postgres, I added logSequenceNumber in it, so MySQL will use binlogCoordinate and Postgres will use logSequenceNumber.

this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

public ChangeEventStatus(final LogSequenceNumber logSequenceNumber, final long timestamp, final long recordCount) {
this.binlogCoordinate = null;
this.logSequenceNumber = logSequenceNumber;
this.timestamp = timestamp;
this.recordCount = recordCount;
acknowledgmentStatus = AcknowledgmentStatus.NO_ACK;
}

Expand All @@ -45,7 +58,15 @@ public BinlogCoordinate getBinlogCoordinate() {
return binlogCoordinate;
}

public LogSequenceNumber getLogSequenceNumber() {
return logSequenceNumber;
}

public long getTimestamp() {
return timestamp;
}

public long getRecordCount() {
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalReplicationClient(final ConnectionManager connectionManager,

@Override
public void connect() {
LOG.debug("Start connecting logical replication stream. ");
PGReplicationStream stream;
try (Connection conn = connectionManager.getConnection()) {
PGConnection pgConnection = conn.unwrap(PGConnection.class);
Expand All @@ -62,6 +63,7 @@ public void connect() {
logicalStreamBuilder.withStartPosition(startLsn);
}
stream = logicalStreamBuilder.start();
LOG.debug("Logical replication stream started. ");

if (eventProcessor != null) {
while (!disconnectRequested) {
Expand All @@ -88,7 +90,8 @@ public void connect() {
}

stream.close();
LOG.info("Replication stream closed successfully.");
disconnectRequested = false;
LOG.debug("Replication stream closed successfully.");
} catch (Exception e) {
LOG.error("Exception while creating Postgres replication stream. ", e);
}
Expand All @@ -97,6 +100,7 @@ public void connect() {
@Override
public void disconnect() {
disconnectRequested = true;
LOG.debug("Requested to disconnect logical replication stream.");
}

public void setEventProcessor(LogicalReplicationEventProcessor eventProcessor) {
Expand Down
Loading
Loading