-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dekaf materialization endpoint support #1840
Merged
jshearer
merged 18 commits into
master
from
jshearer/dekaf_materialization_endpoint_support
Feb 19, 2025
Merged
Dekaf materialization endpoint support #1840
jshearer
merged 18 commits into
master
from
jshearer/dekaf_materialization_endpoint_support
Feb 19, 2025
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
173b9a2
to
2528553
Compare
This was referenced Dec 3, 2024
41b05c2
to
457cb62
Compare
cd55a23
to
a16d4c5
Compare
jshearer
commented
Jan 15, 2025
b5fd089
to
ec16708
Compare
db7776d
to
095cc75
Compare
…s using `/authorize/dekaf` and `/authorize/task` Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like * Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents * Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
… a Session and write them to the correct ops logs journal Also support filtering logs by the requested shard log level
Then implement some tests to validate field selection logic
Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see. In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials.
A couple things to note: * I originally tried to create a single `journal::Client` responsible for appending both logs and stats, but I ended up realizing that `/authorize/task` only allows authoring a token for a single task/prefix at a time. So I took the simpler route of creating two clients, rather than teaching `/authorize/task` how to handle multiple tasks, which has some fairly delicate requirements. * As it turns out, the stats rollups assume the presence of a `shard` field on both logs and stats. So I ended up needing to craft a `ShardRef` that just contains the Dekaf materialization name, and attach it to both the logs and stats documents that get emitted.
I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically: ``` Grpc( Status { code: DeadlineExceeded, message: "context deadline exceeded" } ) ``` It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary
We can still fetch suspended journals with a regular `ListRequest`. This will return journal specs which contain a `suspend` field. If `journal.spec.suspend.level` is `FULL`, it's not possible to read from that journal. So we need to: * Report both low and high-watermarks as `journal.spec.suspend.offset` * Serve empty resultsets for any read against this partition
…is in progress and send them as a batch next time it's able to
095cc75
to
38122fd
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description:
This adds support for the server side of Dekaf's support for materialization endpoints. At a high level, Dekaf is just another way to get data out of Flow. We already have a well fleshed out concept for these things: materializations. So back in #1665 we introduced support for a new materialization "endpoint type":
dekaf
. This lives alongsidelocal
andconnector
as the third kind of materialization, and is configured like so:The second part of this work is for Dekaf the server to support this mode of operation. Briefly, it needs to:
I still have a couple of things on my list before this is fully wrapped up:
SessionAuthentication::Task
sessions to a new/different backing store for migration purposesThis change is