Skip to content

[server][dvc] add snapshot creation listener for flush and sync offset before create snapshot #1711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

jingy-li
Copy link
Contributor

@jingy-li jingy-li commented Apr 18, 2025

Problem Statement

RocksDB keeps new data in memory before writing to disk. For example: 200 partitions at 16MB each. In extreme case, 3.2GB of data (in small key-value pairs) can exists only in memory.

Since snapshots only capture data already written to disk, failing to flush before creating a snapshot means those in-memory records won't be included.

Previously we create snapshot directly call getPartitionOrThrow(partitionId).createSnapshot(); However, now we want to call execSyncOffsetCommandAsync in SIT before create snapshot. Then this PR restructure the snapshot creation logic:

(1) Add a snapshot creation listener to each partition.
(2) When need to snapshot creation, partition triggers the notifySnapshotCreationListener event.
(3) Subsequently, the listener executes syncOffsetAndCreateSnapshot, which is overridden at SIT.
(4) This syncOffsetAndCreateSnapshot initiates execSyncOffsetCommandAsync to send the SYNC OFFSET command to the drainer. Upon asynchronous completion of this command, the partition then trigger the snapshot creation.

Solution

  1. Add a snapshot creation listener.
  2. Trigger the listener's snapshot-creation event handler to sync the offset and snapshot creation in SIT.

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
  • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@jingy-li jingy-li requested a review from gaojieliu April 22, 2025 20:37
@jingy-li jingy-li changed the title [server][dvc] flush RocksDB before create snapshot [server][dvc] add pre snapshot creation listener for disk flush and offset sync Apr 22, 2025
}
}

// 2. Create snapshot for blob transfer
storageEngine.createSnapshot(storagePartitionConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will create a snapshot for hybrid store in Server all the time when this feature is enabled?
Who will clean this up if there is no blob transfer request?
Unused checkpoint can keep the stale data lingering around.

LOGGER
.info("Beginning pre-snapshot offset sync for store: {}, partition: {}", storeNameAndVersion, partitionId);
try {
syncOffset(storeNameAndVersion, getPartitionConsumptionState(partitionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can trigger race conditions.
So far, this function is mainly triggered in drainer thread, and if we invoke it here, the race condition can happen.
If we make it synchronized, it may not solve all the issues since we would like to invoke this function as the last step of the processing.
Some related javadoc:

/**
     * Syncing offset checking in syncOffset() should be the very last step for processing a record.
     *
     * Check whether offset metadata checkpoint will happen; if so, update the producer states recorded in OffsetRecord
     * with the updated producer states maintained in {@link #drainerDiv}
     */
    if (shouldSyncOffset(partitionConsumptionState, record, leaderProducedRecordContext)) {
      updateOffsetMetadataAndSyncOffset(partitionConsumptionState);
    }

Can we leverage something like this?

CompletableFuture<Void> cmdFuture = storeBufferService.execSyncOffsetCommandAsync(topicPartition, this);
              waitForSyncOffsetCmd(cmdFuture, topicPartition);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the syncOffset to this asynchronous execSyncOffsetCommandAsync command.

Because we need to wait for this command completion before creating a snapshot for the partition.Resulting we moved some of the snapshot creation logic to SIT.

The updated approach is as follows:
(1) In SIT, we add a snapshot creation listener for all stores all partitions.
(2) For batch stores, we notify the listener to trigger snapshot creation after EOP. For hybrid stores, we fetch the partition and then notify the listener based on blob transfer requests.
(3) When the listener receives the notification, it executes syncOffsetAndCreateSnapshot, a method overridden in the SIT class, to synchronize the offset and create the snapshot.

Copy link
Collaborator

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s use Utils.getReplicaId(pubSubTopic, partitionId) to log topic and partition information consistently. A standardized format makes it easier to search and filter logs.

@jingy-li jingy-li requested a review from gaojieliu April 29, 2025 19:49
@jingy-li jingy-li changed the title [server][dvc] add pre snapshot creation listener for disk flush and offset sync [server][dvc] add snapshot creation listener for disk flush and offset sync Apr 29, 2025
@jingy-li jingy-li changed the title [server][dvc] add snapshot creation listener for disk flush and offset sync [server][dvc] add snapshot creation listener for sync offset and flush before create snapshot Apr 29, 2025
@jingy-li jingy-li changed the title [server][dvc] add snapshot creation listener for sync offset and flush before create snapshot [server][dvc] add snapshot creation listener for flush and sync offset before create snapshot Apr 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants