-
Notifications
You must be signed in to change notification settings - Fork 97
[server][dvc] add snapshot creation listener for flush and sync offset before create snapshot #1711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
...a-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java
Outdated
Show resolved
Hide resolved
This reverts commit 5f49dfc.
} | ||
} | ||
|
||
// 2. Create snapshot for blob transfer | ||
storageEngine.createSnapshot(storagePartitionConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we will create a snapshot for hybrid store in Server all the time when this feature is enabled?
Who will clean this up if there is no blob transfer request?
Unused checkpoint can keep the stale data lingering around.
LOGGER | ||
.info("Beginning pre-snapshot offset sync for store: {}, partition: {}", storeNameAndVersion, partitionId); | ||
try { | ||
syncOffset(storeNameAndVersion, getPartitionConsumptionState(partitionId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can trigger race conditions.
So far, this function is mainly triggered in drainer thread, and if we invoke it here, the race condition can happen.
If we make it synchronized
, it may not solve all the issues since we would like to invoke this function as the last step of the processing.
Some related javadoc:
/**
* Syncing offset checking in syncOffset() should be the very last step for processing a record.
*
* Check whether offset metadata checkpoint will happen; if so, update the producer states recorded in OffsetRecord
* with the updated producer states maintained in {@link #drainerDiv}
*/
if (shouldSyncOffset(partitionConsumptionState, record, leaderProducedRecordContext)) {
updateOffsetMetadataAndSyncOffset(partitionConsumptionState);
}
Can we leverage something like this?
CompletableFuture<Void> cmdFuture = storeBufferService.execSyncOffsetCommandAsync(topicPartition, this);
waitForSyncOffsetCmd(cmdFuture, topicPartition);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the syncOffset
to this asynchronous execSyncOffsetCommandAsync
command.
Because we need to wait for this command completion before creating a snapshot for the partition.Resulting we moved some of the snapshot creation logic to SIT.
The updated approach is as follows:
(1) In SIT, we add a snapshot creation listener for all stores all partitions.
(2) For batch stores, we notify the listener to trigger snapshot creation after EOP. For hybrid stores, we fetch the partition and then notify the listener based on blob transfer requests.
(3) When the listener receives the notification, it executes syncOffsetAndCreateSnapshot
, a method overridden in the SIT class, to synchronize the offset and create the snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let’s use Utils.getReplicaId(pubSubTopic, partitionId)
to log topic and partition information consistently. A standardized format makes it easier to search and filter logs.
Problem Statement
RocksDB keeps new data in memory before writing to disk. For example: 200 partitions at 16MB each. In extreme case, 3.2GB of data (in small key-value pairs) can exists only in memory.
Since snapshots only capture data already written to disk, failing to flush before creating a snapshot means those in-memory records won't be included.
Previously we create snapshot directly call getPartitionOrThrow(partitionId).createSnapshot(); However, now we want to call
execSyncOffsetCommandAsync
in SIT before create snapshot. Then this PR restructure the snapshot creation logic:(1) Add a snapshot creation listener to each partition.
(2) When need to snapshot creation,
partition
triggers thenotifySnapshotCreationListener
event.(3) Subsequently, the listener executes
syncOffsetAndCreateSnapshot
, which is overridden at SIT.(4) This
syncOffsetAndCreateSnapshot
initiatesexecSyncOffsetCommandAsync
to send the SYNC OFFSET command to the drainer. Upon asynchronous completion of this command, the partition then trigger the snapshot creation.Solution
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized
,RWLock
) are used where needed.ConcurrentHashMap
,CopyOnWriteArrayList
).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?