Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf materialization endpoint support #1840

Merged
merged 18 commits into from
Feb 19, 2025

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Dec 18, 2024

Description:

This adds support for the server side of Dekaf's support for materialization endpoints. At a high level, Dekaf is just another way to get data out of Flow. We already have a well fleshed out concept for these things: materializations. So back in #1665 we introduced support for a new materialization "endpoint type": dekaf. This lives alongside local and connector as the third kind of materialization, and is configured like so:

    endpoint:
      dekaf:
        variant: some-dekaf-variant
        config:
          token: "foo"
          ...other configuration for the behavior of your Dekaf task... 

The second part of this work is for Dekaf the server to support this mode of operation. Briefly, it needs to:

  • Support authentication and authorization using the control plane. This means that conceptually, Dekaf is authenticating you based on the task name and token specified in the endpoint config, and authorizing you based on the access granted to that task
    • Specifically, this means that you now use the task name for your username, and the token specified in the endpoint config when connecting to Dekaf.
  • Act more like a regular materialization. This means...
    • Support for field selection. Materializations let you specify which field(s) you want included in your destination using field selection. This fundamentally looks like a document transformation, where some fields may be removed and some fields may be projected from their original home to a new location in the output.
    • Support for task logs. Just like captures, materialization log output is captured and presented to users for status monitoring and debugging. As Dekaf is multi-tenant at its core, presenting these logs requires identifying which log messages are associated with which task, and then capturing and writing them to the corresponding logs journal.
    • [Fast-Follow] Support for task stats. In order to monitor the status of your running tasks, as well as aggregate usage information for billing, all tasks need to periodically emit information about how much work they've been doing. While Dekaf was in beta, all usage was free so this was less of a priority, but now that there's a task to associate stats with, implementing stats will likely be one of the last things to do before going GA.

I still have a couple of things on my list before this is fully wrapped up:

  • Implement emitting stats
  • Implement routing SessionAuthentication::Task sessions to a new/different backing store for migration purposes
  • Support journal suspension
  • Implement CI task for Dekaf integration tests
  • Later: out how to make shard statuses show green in the UI for Dekaf tasks

This change is Reviewable

@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 4 times, most recently from 173b9a2 to 2528553 Compare January 6, 2025 19:14
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 7 times, most recently from 41b05c2 to 457cb62 Compare January 13, 2025 14:50
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 14 times, most recently from cd55a23 to a16d4c5 Compare January 15, 2025 21:53
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 7 times, most recently from b5fd089 to ec16708 Compare February 14, 2025 15:58
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch 2 times, most recently from db7776d to 095cc75 Compare February 19, 2025 15:52
…s using `/authorize/dekaf` and `/authorize/task`

Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like
* Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents
* Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
… a Session and write them to the correct ops logs journal

Also support filtering logs by the requested shard log level
Then implement some tests to validate field selection logic
Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see.

In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials.
A couple things to note:

* I originally tried to create a single `journal::Client` responsible for appending both logs and stats, but I ended up realizing that `/authorize/task` only allows authoring a token for a single task/prefix at a time. So I took the simpler route of creating two clients, rather than teaching `/authorize/task` how to handle multiple tasks, which has some fairly delicate requirements.
* As it turns out, the stats rollups assume the presence of a `shard` field on both logs and stats. So I ended up needing to craft a `ShardRef` that just contains the Dekaf materialization name, and attach it to both the logs and stats documents that get emitted.
I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically:

```
Grpc(
  Status {
    code: DeadlineExceeded,
    message: "context deadline exceeded"
  }
)
```

It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary
We can still fetch suspended journals with a regular `ListRequest`. This will return journal specs which contain a `suspend` field. If `journal.spec.suspend.level` is `FULL`, it's not possible to read from that journal. So we need to:
* Report both low and high-watermarks as `journal.spec.suspend.offset`
* Serve empty resultsets for any read against this partition
…is in progress and send them as a batch next time it's able to
@jshearer jshearer force-pushed the jshearer/dekaf_materialization_endpoint_support branch from 095cc75 to 38122fd Compare February 19, 2025 15:55
@jshearer jshearer merged commit b575a43 into master Feb 19, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants