From b5e83860a6dc212be9db33d923add2aa2fc900a8 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 23 Feb 2025 14:03:43 -0800 Subject: [PATCH] Remove weird recursive call Signed-off-by: Marc Handalian --- .../DataFrameStreamProducer.java | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/libs/datafusion/src/main/java/org.opensearch.datafusion/DataFrameStreamProducer.java b/libs/datafusion/src/main/java/org.opensearch.datafusion/DataFrameStreamProducer.java index 4d98dd69e3ae7..305ce852c39d1 100644 --- a/libs/datafusion/src/main/java/org.opensearch.datafusion/DataFrameStreamProducer.java +++ b/libs/datafusion/src/main/java/org.opensearch.datafusion/DataFrameStreamProducer.java @@ -67,28 +67,16 @@ public BatchedJob createJob(BufferAllocator allocator) { public void run(VectorSchemaRoot root, FlushSignal flushSignal) { try { assert rootTicket != null; - pollUntilFalse(() -> recordBatchStream.loadNextBatch(), flushSignal); + // loadNextBatch will execute async in datafusion + while (recordBatchStream.loadNextBatch().join()) { + flushSignal.awaitConsumption(TimeValue.timeValueMillis(1000)); + } close(); } catch (Exception e) { throw new RuntimeException(e); } } - private CompletableFuture pollUntilFalse( - Supplier> pollingFunction, FlushSignal signal) { - return pollingFunction.get() - .thenCompose(result -> { - if (result) { - // If true, continue polling - signal.awaitConsumption(TimeValue.timeValueMillis(1000)); - return pollUntilFalse(pollingFunction, signal); - } else { - // If false, stop polling - return CompletableFuture.completedFuture(null); - } - }); - } - @Override public void onCancel() { try {