Skip to content

Commit

Permalink
Remove weird recursive call
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Feb 23, 2025
1 parent cce4cf9 commit b5e8386
Showing 1 changed file with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,16 @@ public BatchedJob createJob(BufferAllocator allocator) {
public void run(VectorSchemaRoot root, FlushSignal flushSignal) {
try {
assert rootTicket != null;
pollUntilFalse(() -> recordBatchStream.loadNextBatch(), flushSignal);
// loadNextBatch will execute async in datafusion
while (recordBatchStream.loadNextBatch().join()) {
flushSignal.awaitConsumption(TimeValue.timeValueMillis(1000));
}
close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private CompletableFuture<Void> pollUntilFalse(
Supplier<CompletableFuture<Boolean>> pollingFunction, FlushSignal signal) {
return pollingFunction.get()
.thenCompose(result -> {
if (result) {
// If true, continue polling
signal.awaitConsumption(TimeValue.timeValueMillis(1000));
return pollUntilFalse(pollingFunction, signal);
} else {
// If false, stop polling
return CompletableFuture.completedFuture(null);
}
});
}

@Override
public void onCancel() {
try {
Expand Down

0 comments on commit b5e8386

Please sign in to comment.