The repository stores all code snippets used in my Data+AI Summit 2024 talk: https://www.databricks.com/dataaisummit/session/unit-tests-how-overcome-challenges-structured-streaming
The business logic implements a simple sessionization pipeline. The choice was driven by a quite wide Apache Spark Structured Streaming coverage of a sessionization pipeline. It covers the transformations, triggers, state management, and output modes, which provides a perfect overview of what Apache Spark Structured Streaming is capable of.
You can go with Python (python-project
) or Scala (scala-project
) example. You can play with the code in two ways:
-
either directly see the tests; for that you can run them from CLI or directly from your IDE
-
or see the running application; it requires starting the Docker containers with Apache Kafka and the data generator by running
cd docker
docker-compose down --volumes; docker-compose up
Next, start the job by setting the input parameters up for Scala...
--kafkaBootstrapServers localhost:9094 --kafkaInputTopic visits \
--kafkaOutputTopic sessions --devicesTableLocation /tmp/dais2024/devices \
--checkpointLocation /tmp/dais24_checkpoint/
...or Python:
--kafka_bootstrap_servers localhost:9094 --kafka_input_topic visits --kafka_output_topic sessions --devices_table_location /tmp/dais2024/devices --checkpoint_location /tmp/dais24_checkpoint/
For debugging purposes, you can connect to the Apache Kafka container with:
docker exec dais_24_kafka kafka-console-consumer.sh --topic sessions --bootstrap-server localhost:9092
- Streaming: The world beyond batch, part 1, part 2 by Tyler Akidau
- Streaming Systems by Tyler Akidau, Slava Chernyak, Reuven Lax
- Learn to Efficiently Test ETL Pipelines by Jacqueline Bilston
- Productizing Structured Streaming Jobs by Burak Yavuz
- My 25 Laws of Test Driven Development by Dennis Doomen
Before running the tests, run the Great Expectations up with:
python setup/expectations_preparator.py
After running the command you should see a new great_expectations_spec directory:
$ tree great_expectations_spec/ --charset unicode
great_expectations_spec/
`-- gx
|-- checkpoints
|-- expectations
| |-- devices_expectations.json
| `-- visits_expectations.json
|-- great_expectations.yml
|-- plugins
| `-- custom_data_docs
| |-- renderers
| |-- styles
| | `-- data_docs_custom_styles.css
| `-- views
|-- profilers
`-- uncommitted
|-- config_variables.yml
|-- data_docs
`-- validations
12 directories, 5 files
Next, uncomment this line in the test/conftest.py
:
#request.addfinalizer(validate_test_datasets)