Confluent Cloud for Apache Flink (CCAF) integrates Confluent Cloud, a fully managed Apache Kafka service, with Apache Flink, a powerful stream processing framework. This integration enables real-time data processing, analytics, and complex event processing on data streams managed by Confluent Cloud. Your Kafka topics appear automatically as queryable Flink tables, with schemas and metadata attached by Confluent Cloud.
Confluent Cloud for Apache Flink supports creating stream-processing applications by using Flink SQL, the Flink Table API (Java and Python), and custom user-defined functions.
Table of Contents
Because Flink applications require AWS SSO credentials to run, I created Bash scripts that retrieve and inject these credentials. This approach ensures the necessary authentication is in place, allowing the Flink applications to execute successfully.
Flink App | Run Script |
---|---|
avro_flight_consolidator_app |
scripts/run-flight-consolidator-ccaf-app-locally.sh --profile=<AWS_SSO_PROFILE_NAME> --catalog-name=<CATALOG_NAME> --database-name=<DATABASE_NAME> |
Argument placeholder Replace with <AWS_SSO_PROFILE_NAME>
your AWS SSO profile name for your AWS infrastructue that host your AWS Secrets Manager. <CATALOG_NAME>
the Environment name of the Kafka Cluster. <DATABASE_NAME>
the Database name of the Kafka Cluster.
Flink App | Run Script |
---|---|
avro_flight_consolidator_app |
scripts/run-flight-consolidator-ccaf-docker-locally.sh --profile=<AWS_SSO_PROFILE_NAME> --catalog-name=<CATALOG_NAME> --database-name=<DATABASE_NAME> |
Argument placeholder Replace with <AWS_SSO_PROFILE_NAME>
your AWS SSO profile name for your AWS infrastructue that host your AWS Secrets Manager. <CATALOG_NAME>
the Environment name of the Kafka Cluster. <DATABASE_NAME>
the Database name of the Kafka Cluster.
You maybe asking yourself why. Well, uv
is an incredibly fast Python package installer and dependency resolver, written in Rust, and designed to seamlessly replace pip
, pipx
, poetry
, pyenv
, twine
, virtualenv
, and more in your workflows. By prefixing uv run
to a command, you're ensuring that the command runs in an optimal Python environment.
Now, let's go a little deeper into the magic behind uv run
:
- When you use it with a file ending in
.py
or an HTTP(S) URL,uv
treats it as a script and runs it with a Python interpreter. In other words,uv run file.py
is equivalent touv run python file.py
. If you're working with a URL,uv
even downloads it temporarily to execute it. Any inline dependency metadata is installed into an isolated, temporary environment—meaning zero leftover mess! When used with-
, the input will be read fromstdin
, and treated as a Python script. - If used in a project directory,
uv
will automatically create or update the project environment before running the command. - Outside of a project, if there's a virtual environment present in your current directory (or any parent directory),
uv
runs the command in that environment. If no environment is found, it uses the interpreter's environment.
So what does this mean when we put uv run
before flight_consolidator
? It means uv
takes care of all the setup—fast and seamless—right in your local Docker container. If you think AI/ML is magic, the work the folks at Astral have done with uv
is pure wizardry!
Curious to learn more about Astral's uv
? Check these out:
- Documentation: Learn about
uv
. - Video:
uv
IS THE FUTURE OF PYTHON PACKING!
Table API on Confluent Cloud for Apache Flink