Kafka Streams' stateful streaming creates and uses local state information in the node where the application is running. If the application runs in a distributed mode on multiple nodes, then each node contains the respective state information. Kafka Streams does not publish any unifying API that allows you to query across all the nodes for the state information. However it has a set of infrastructure components that can be used to implement a query service based on your favorite end points.
Interactive Queries were introduced on version 0.10.1
and the main goal is stated as follows:
This feature allows you to treat the stream processing layer as a lightweight embedded database and, more concretely, to directly query the latest state of your stream processing application, without needing to materialize that state to external databases or external storage first.
However Kafka Streams documentation also makes it clear that the query layer for the global state of your application does not come out of the box.
Kafka Streams provides all the required functionality for interactively querying your application’s state out of the box, with but one exception: if you want to expose your application’s full state via interactive queries, then – for reasons we explain further down below – it is your responsibility to add an appropriate RPC layer to your application that allows application instances to communicate over the network. If, however, you only need to let your application instances access their own local state, then you do not need to add such an RPC layer at all.
The goal of this small library is to offer such a query layer based on akka-http.
kafka-streams-query
is published and cross-built for Scala 2.11
, and 2.12
, so you can just add the following to your build:
val kafka_streams_query_version = "0.1.1"
libraryDependencies ++= Seq("com.lightbend" %%
"kafka-streams-query" % kafka_streams_query_version)
Note:
kafka-streams-query
supports Kafka Streams1.0.0
.
The API docs for kafka-streams-query
is available here for Scala 2.12 and here for Scala 2.11.
The library is organized around 3 main packages containing the following:
http
: The main end point implementations including a classInteractiveQueryHttpService
that provides methods for starting and stopping the HTTP service. The other classes provided areHttpRequester
that handles the request, does some validations and forwards the request toKeyValueFetcher
that invokes the actual service for fetching the state information.services
: This layer interacts with the underlying Kafka Streams APIs to fetch data from the local state. The 2 classes in this layer are (a)MetadataService
that uses Kafka Streams API to fetch the metadata for the state and (b)LocalStateStoreQueryService
that does the actual query for the state.serializers
: A bunch of serializers useful for application development that help you serialize your model structures.
If the application is run in a distributed mode across multiple physical nodes, local state information is spread across all the nodes. The http
services that the library offers can handle this and provide a unified view of the global application state.
Consider the following scenario:
- The application is deployed in 3 nodes with IPs,
ip1
,ip2
andip3
. Assuming the application uses this library, the HTTP services run on port7070
in each of the nodes. - The user queries for some information from
http://ip1:7070/<path>/<to>/<key>
.
It may so happen that the <key>
that she is looking for may not reside in host ip1
. The query service handles this situation by interacting with the MetadataService
as follows:
- User queries from host
ip1
- Check
MetadataService
to get information about thekey
that the user is looking for - If the metadata for the key indicates that the data is part of the local state in
ip1
, then we are done. Return the query result - Otherwise, get the host information from the metadata where this state resides
- Query the appropriate node by reissuing the HTTP request to get the state information
It may so happen that when the user does the query, Kafka Streams may be doing a partition rebalancing when states may migrate from one store (node) to another. During such a situation Kafka Streams throws InvalidStateStoreException
.
Migration is typically done when new instances of the application come up or Kafka Streams does a rebalancing. The library handles such situation through retry semantics. The query API will continue to retry until the rebalancing is complete or the retry count is exhausted.