-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FlinkKinesisFirehoseProducer#invoke
creates a new thread for each added record for single core architectures leading to very high resource utilization
#20
Comments
FlinkKinesisFirehoseProducer#invoke
creates a new thread for each added record for single core architectures leading to high resource utilizationFlinkKinesisFirehoseProducer#invoke
creates a new thread for each added record for single core architectures leading to very high resource utilization
On closer inspection, it looks as though the use of public CompletableFuture<O> addUserRecord(final R record, final long timeoutInMillis)
throws TimeoutExpiredException, InterruptedException {
// [...]
UserRecordResult recordResult = new UserRecordResult().setSuccessful(true); // `recordResult` is aways non-null and `successful`
return CompletableFuture.completedFuture((O) recordResult); // This is in the resolved / finished state
} And observe the downstream usage in @Override
public void invoke(final OUT value, final Context context) throws Exception {
// [...]
// This will always be a no-op given the comments that follow
propagateAsyncExceptions();
firehoseProducer
.addUserRecord(new Record().withData(serializedValue))
.handleAsync((record, throwable) -> {
// This will always be `false`
if (throwable != null) {
final String msg = "An error has occurred trying to write a record.";
if (failOnError) {
lastThrownException = throwable;
} else {
LOGGER.warn(msg, throwable);
}
}
// This will aways be `false`
if (record != null && !record.isSuccessful()) {
final String msg = "Record could not be successfully sent.";
if (failOnError && lastThrownException == null) {
lastThrownException = new RecordCouldNotBeSentException(msg, record.getException());
} else {
LOGGER.warn(msg, record.getException());
}
}
return null;
});
}
private void propagateAsyncExceptions() throws Exception {
// This will always be false and short return.
if (lastThrownException == null) {
return;
}
final String msg = "An exception has been thrown while trying to process a record";
if (failOnError) {
throw new FlinkKinesisFirehoseException(msg, lastThrownException);
} else {
LOGGER.warn(msg, lastThrownException);
lastThrownException = null;
}
} New suggestion is to remove futures completely as there is no clear implementation that would benefit from them (the code in question simply enqueues per producer-consumer pattern) and the added overhead due to thread creation and context switching is entirely counter productive. |
Currently, when running under 1 CPU (such as using 1 KPU in Kinesis Data Analytics for Apache Flink),
FlinkKinesisFirehoseProducer#invoke
is extremely inefficient. This is because of the call to handleAsync:When one looks at the implementation of
handleAsync
and it's call chain:Thus,

ThreadPerTaskExecutor
is being used when there is only 1 CPU available on the host machine. In my tests, I can see that it's creating 20k short-lived threads in one minute, which is very expensive in a tight loop:The proposal is to not rely on the default behavior of the
ForkJoinPool
and to instead pass anExecutor
to limit the impact of this type of issue for resource limited hosts. Ideally this can be exposed to sink consumers so that it is configurable per the open-closed principle.The text was updated successfully, but these errors were encountered: