Skip to content

Commit

Permalink
fix flushing upon empty batches with ordered execution
Browse files Browse the repository at this point in the history
when running a pipeline with ordered execution, flushes on the pipeline
were no longer being called when compute is called with an empty batch, causing
issues with the aggregate filter, for example, not being able to push events on
timeout.
  • Loading branch information
colinsurprenant committed Aug 26, 2020
1 parent 75ac8eb commit 0ce6614
Showing 1 changed file with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,35 @@ private boolean isOutput(final Vertex vertex) {

public final class CompiledOrderedExecution extends CompiledExecution {

@SuppressWarnings({"unchecked"}) private final RubyArray<RubyEvent> EMPTY_ARRAY = RubyUtil.RUBY.newEmptyArray();

@Override
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
compute(batch.events(), flush, shutdown);
}

@Override
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
// send batch one-by-one as single-element batches down the filters
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
for (final RubyEvent e : batch) {
filterBatch.set(0, e);
final Collection<RubyEvent> result = compiledFilters.compute(filterBatch, flush, shutdown);
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
if (!batch.isEmpty()) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
// send batch one-by-one as single-element batches down the filters
for (final RubyEvent e : batch) {
filterBatch.set(0, e);
_compute(filterBatch, outputBatch, flush, shutdown);
}
compiledOutputs.compute(outputBatch, flush, shutdown);
} else if (flush || shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
_compute(EMPTY_ARRAY, outputBatch, flush, shutdown);
compiledOutputs.compute(outputBatch, flush, shutdown);
}
compiledOutputs.compute(outputBatch, flush, shutdown);
}

private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEvent> outputBatch, final boolean flush, final boolean shutdown) {
final Collection<RubyEvent> result = compiledFilters.compute(batch, flush, shutdown);
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
}
}

Expand Down

0 comments on commit 0ce6614

Please sign in to comment.