Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlinkKinesisFirehoseProducer does not call SerializationSchema#open #14

Open
bobtiernay-okta opened this issue Feb 1, 2021 · 2 comments

Comments

@bobtiernay-okta
Copy link

Currently, the implementation of FlinkKinesisFirehoseProducer does not call SerializationSchema#open. This causes problems when a client SerializationSchema needs to perform some initialization on open (e.g. initialize metrics, etc.)

@bobtiernay-okta
Copy link
Author

To get around this issue for now, I'm doing the following:

/**
 * See <a href="https://github.com/aws/aws-kinesisanalytics-flink-connectors/issues/14">aws-kinesisanalytics-flink-connector #14</a>
 * for description of why this is needed.
 *
 * @param <T> the type to serialize and produce.
 */
public class CustomFlinkKinesisFirehoseProducer<T> extends FlinkKinesisFirehoseProducer<T> {

    private static final long serialVersionUID = 1L;

    private final SerializationSchema<T> schema;

    public CustomFlinkKinesisFirehoseProducer(String deliveryStream, SerializationSchema<T> schema, Properties configProps) {
        super(deliveryStream, schema, configProps);
        this.schema = schema;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        schema.open(() -> getRuntimeContext().getMetricGroup());
    }

}

@dannycranmer
Copy link
Contributor

Thanks for reporting the issue, we will include in the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants