Skip to content

Commit

Permalink
synchronize flushing of snapshot writers in flusher thread with closi…
Browse files Browse the repository at this point in the history
…ng snapshot files (#1302)
  • Loading branch information
makalaaneesh authored Feb 1, 2024
1 parent 7363dda commit afda6c2
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class YbExporterConsumer extends BaseChangeConsumer {
private RecordTransformer recordTransformer;
Thread flusherThread;
boolean shutDown = false;
Object flushingSnapshotFilesLock = new Object();


public YbExporterConsumer(String dataDir){
this.dataDir = dataDir;
Expand Down Expand Up @@ -121,9 +123,11 @@ else if (exporterRole.equals(TARGET_DB_EXPORTER_FB_ROLE)){
}

while (true) {
for (RecordWriter writer : snapshotWriters.values()) {
writer.flush();
writer.sync();
synchronized (flushingSnapshotFilesLock){
for (RecordWriter writer : snapshotWriters.values()) {
writer.flush();
writer.sync();
}
}
// TODO: doing more than flushing files to disk. maybe move this call to another thread?
if (exportStatus != null) {
Expand Down Expand Up @@ -282,7 +286,9 @@ private void checkIfSnapshotAlreadyComplete(Record r) {
}

private void handleSnapshotComplete() {
closeSnapshotWriters();
synchronized (flushingSnapshotFilesLock){
closeSnapshotWriters();
}
exportStatus.updateMode(ExportMode.STREAMING);
exportStatus.flushToDisk();
openCDCWriter();
Expand Down

0 comments on commit afda6c2

Please sign in to comment.