Skip to content

Commit

Permalink
kgo: support KIP-848 group consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Feb 20, 2025
1 parent 8d5d008 commit 6dbfe4a
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 112 deletions.
5 changes: 1 addition & 4 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,10 +915,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
return err
}

if err = kerr.ErrorForCode(resp.ErrorCode); err != nil {
if resp.ErrorMessage != nil {
return fmt.Errorf("%s: %w", *resp.ErrorMessage, err)
}
if err := errCodeMessage(resp.ErrorCode, resp.ErrorMessage); err != nil {
return err
}
challenge = resp.SASLAuthBytes
Expand Down
23 changes: 21 additions & 2 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ func NewClient(opts ...Opt) (*Client, error) {
switch key {
case ((*kmsg.JoinGroupRequest)(nil)).Key(),
((*kmsg.SyncGroupRequest)(nil)).Key(),
((*kmsg.HeartbeatRequest)(nil)).Key():
((*kmsg.HeartbeatRequest)(nil)).Key(),
((*kmsg.ConsumerGroupHeartbeatRequest)(nil)).Key():
return cfg.sessionTimeout
}
return 30 * time.Second
Expand Down Expand Up @@ -584,7 +585,9 @@ func (cl *Client) Ping(ctx context.Context) error {
// topic no longer exists, or if you are consuming via regex and know that some
// previously consumed topics no longer exist, or if you simply do not want to
// ever consume from a topic again. If you are group consuming, this function
// will likely cause a rebalance.
// will likely cause a rebalance. If you are consuming via regex and the topic
// still exists on the broker, this function will at most only temporarily
// remove the topic from the client and the topic will be re-discovered.
//
// For admin requests, this deletes the topic from the cached metadata map for
// sharded requests. Metadata for sharded admin requests is only cached for
Expand Down Expand Up @@ -817,6 +820,18 @@ func (cl *Client) supportsOffsetForLeaderEpoch() bool {
return cl.supportsKeyVersion(int16(kmsg.OffsetForLeaderEpoch), 2)
}

// Called after the first metadata request, before we go into either
// (*groupConsumer).manage or (*groupConsumer).manage848.
func (cl *Client) supportsKIP848() bool {
return cl.supportsKeyVersion(int16(kmsg.ConsumerGroupHeartbeat), 0)
}

// v1 introduces support for regex and requires the client to generate
// the member ID.
func (cl *Client) supportsKIP848v1() bool {
return cl.supportsKeyVersion(int16(kmsg.ConsumerGroupHeartbeat), 0)
}

// A broker may not support some requests we want to make. This function checks
// support. This should only be used *after* at least one successful response.
func (cl *Client) supportsKeyVersion(key, version int16) bool {
Expand Down Expand Up @@ -1851,6 +1866,8 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req)
case *kmsg.OffsetDeleteRequest:
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req)
case *kmsg.ConsumerGroupHeartbeatRequest:
return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req)
}
}

Expand Down Expand Up @@ -1913,6 +1930,8 @@ func (cl *Client) handleReqWithCoordinator(
code = t.ErrorCode
case *kmsg.SyncGroupResponse:
code = t.ErrorCode
case *kmsg.ConsumerGroupHeartbeatResponse:
code = t.ErrorCode
}

// ListGroups, OffsetFetch, DeleteGroups, DescribeGroups, and
Expand Down
15 changes: 14 additions & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,20 @@ func (c *consumer) purgeTopics(topics []string) {

// The difference for groups is we need to lock the group and there is
// a slight type difference in g.using vs d.using.
//
// assignPartitions removes the topics from 'tps', which removes them
// from FUTURE metadata requests meaning they will not be repopulated
// in the future. Any loaded tps is fine; metadata updates the topic
// pointers underneath -- metadata does not re-store the tps itself
// with any new topics that we just purged (except in the case of regex,
// which is impossible to permanently purge anyway).
//
// We are guarded from adding back to 'using' via the consumer mu;
// this cannot run concurrent with findNewAssignments. Thus, we first
// purge tps, then clear using, and once the lock releases, findNewAssignments
// will use the now-purged tps and will not add back to using.
if c.g != nil {
c.g.mu.Lock()
c.g.mu.Lock() // required when updating using
defer c.g.mu.Unlock()
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
Expand All @@ -742,6 +754,7 @@ func (c *consumer) purgeTopics(topics []string) {
delete(c.d.using, topic)
delete(c.d.reSeen, topic)
delete(c.d.m, topic)
delete(c.d.ps, topic)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ type directConsumer struct {
tps *topicsPartitions // data for topics that the user assigned
using mtmps // topics we are currently using
m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add
ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add
ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add, for direct partition consuming
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
}

Expand Down
Loading

0 comments on commit 6dbfe4a

Please sign in to comment.