Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlinkKinesisFirehoseProducer#invoke creates a new thread for each added record for single core architectures leading to very high resource utilization #20

Open
bobtiernay-okta opened this issue May 10, 2021 · 1 comment

Comments

@bobtiernay-okta
Copy link

bobtiernay-okta commented May 10, 2021

Currently, when running under 1 CPU (such as using 1 KPU in Kinesis Data Analytics for Apache Flink), FlinkKinesisFirehoseProducer#invoke is extremely inefficient. This is because of the call to handleAsync:

 @Override
    public void invoke(final OUT value, final Context context) throws Exception {
        // ...
        firehoseProducer
                .addUserRecord(new Record().withData(serializedValue))
                .handleAsync((record, throwable) -> {
                    // ...
                });
    }

When one looks at the implementation of handleAsync and it's call chain:

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(defaultExecutor(), fn);
    }

    public Executor defaultExecutor() {
        return ASYNC_POOL;
    }

    private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    private static final boolean USE_COMMON_POOL =
        (ForkJoinPool.getCommonPoolParallelism() > 1);       

    static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
    }

Thus, ThreadPerTaskExecutor is being used when there is only 1 CPU available on the host machine. In my tests, I can see that it's creating 20k short-lived threads in one minute, which is very expensive in a tight loop:
image

The proposal is to not rely on the default behavior of the ForkJoinPool and to instead pass an Executor to limit the impact of this type of issue for resource limited hosts. Ideally this can be exposed to sink consumers so that it is configurable per the open-closed principle.

@bobtiernay-okta bobtiernay-okta changed the title FlinkKinesisFirehoseProducer#invoke creates a new thread for each added record for single core architectures leading to high resource utilization FlinkKinesisFirehoseProducer#invoke creates a new thread for each added record for single core architectures leading to very high resource utilization May 10, 2021
@bobtiernay-okta
Copy link
Author

bobtiernay-okta commented May 11, 2021

On closer inspection, it looks as though the use of Futures in FirehoseProducer#addUserRecord is completely unnecessary and adds a great deal of accidental complexity to the implementation, in addition to the reduction of performance mentioned above. To see why, note the return value in FirehoseProducer#addUserRecord:

public CompletableFuture<O> addUserRecord(final R record, final long timeoutInMillis)
            throws TimeoutExpiredException, InterruptedException {

        // [...]

        UserRecordResult recordResult = new UserRecordResult().setSuccessful(true); // `recordResult` is aways non-null and `successful`
        return CompletableFuture.completedFuture((O) recordResult); // This is in the resolved / finished state
    }

And observe the downstream usage in FlinkKinesisFirehoseProducer#invoke:

    @Override
    public void invoke(final OUT value, final Context context) throws Exception {
        // [...]

        // This will always be a no-op given the comments that follow
        propagateAsyncExceptions();

        firehoseProducer
                .addUserRecord(new Record().withData(serializedValue))
                .handleAsync((record, throwable) -> {
                    // This will always be `false`
                    if (throwable != null) {
                        final String msg = "An error has occurred trying to write a record.";
                        if (failOnError) {
                            lastThrownException = throwable;
                        } else {
                            LOGGER.warn(msg, throwable);
                        }
                    }

                    // This will aways be `false`
                    if (record != null && !record.isSuccessful()) {
                        final String msg = "Record could not be successfully sent.";
                        if (failOnError && lastThrownException == null) {
                            lastThrownException = new RecordCouldNotBeSentException(msg, record.getException());
                        } else {
                            LOGGER.warn(msg, record.getException());
                        }
                    }

                    return null;
                });
    }

    private void propagateAsyncExceptions() throws Exception {
        // This will always be false and short return.
        if (lastThrownException == null) {
            return;
        }

        final String msg = "An exception has been thrown while trying to process a record";
        if (failOnError) {
            throw new FlinkKinesisFirehoseException(msg, lastThrownException);
        } else {
            LOGGER.warn(msg, lastThrownException);
            lastThrownException = null;
        }
    }

New suggestion is to remove futures completely as there is no clear implementation that would benefit from them (the code in question simply enqueues per producer-consumer pattern) and the added overhead due to thread creation and context switching is entirely counter productive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant