diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index c3d5a9a7..d4146add 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -915,10 +915,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error { return err } - if err = kerr.ErrorForCode(resp.ErrorCode); err != nil { - if resp.ErrorMessage != nil { - return fmt.Errorf("%s: %w", *resp.ErrorMessage, err) - } + if err := errCodeMessage(resp.ErrorCode, resp.ErrorMessage); err != nil { return err } challenge = resp.SASLAuthBytes diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 6100f248..48b20df3 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -427,7 +427,8 @@ func NewClient(opts ...Opt) (*Client, error) { switch key { case ((*kmsg.JoinGroupRequest)(nil)).Key(), ((*kmsg.SyncGroupRequest)(nil)).Key(), - ((*kmsg.HeartbeatRequest)(nil)).Key(): + ((*kmsg.HeartbeatRequest)(nil)).Key(), + ((*kmsg.ConsumerGroupHeartbeatRequest)(nil)).Key(): return cfg.sessionTimeout } return 30 * time.Second @@ -584,7 +585,9 @@ func (cl *Client) Ping(ctx context.Context) error { // topic no longer exists, or if you are consuming via regex and know that some // previously consumed topics no longer exist, or if you simply do not want to // ever consume from a topic again. If you are group consuming, this function -// will likely cause a rebalance. +// will likely cause a rebalance. If you are consuming via regex and the topic +// still exists on the broker, this function will at most only temporarily +// remove the topic from the client and the topic will be re-discovered. // // For admin requests, this deletes the topic from the cached metadata map for // sharded requests. Metadata for sharded admin requests is only cached for @@ -817,6 +820,18 @@ func (cl *Client) supportsOffsetForLeaderEpoch() bool { return cl.supportsKeyVersion(int16(kmsg.OffsetForLeaderEpoch), 2) } +// Called after the first metadata request, before we go into either +// (*groupConsumer).manage or (*groupConsumer).manage848. +func (cl *Client) supportsKIP848() bool { + return cl.supportsKeyVersion(int16(kmsg.ConsumerGroupHeartbeat), 0) +} + +// v1 introduces support for regex and requires the client to generate +// the member ID. +func (cl *Client) supportsKIP848v1() bool { + return cl.supportsKeyVersion(int16(kmsg.ConsumerGroupHeartbeat), 1) +} + // A broker may not support some requests we want to make. This function checks // support. This should only be used *after* at least one successful response. func (cl *Client) supportsKeyVersion(key, version int16) bool { @@ -1362,7 +1377,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo return shards(cl.handleAdminReq(ctx, t)), nil case kmsg.GroupCoordinatorRequest, - kmsg.TxnCoordinatorRequest: + kmsg.TxnCoordinatorRequest, + *kmsg.ConsumerGroupHeartbeatRequest: return shards(cl.handleCoordinatorReq(ctx, t)), nil case *kmsg.ApiVersionsRequest: @@ -1851,6 +1867,8 @@ func (cl *Client) handleCoordinatorReq(ctx context.Context, req kmsg.Request) Re return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) case *kmsg.OffsetDeleteRequest: return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) + case *kmsg.ConsumerGroupHeartbeatRequest: + return cl.handleCoordinatorReqSimple(ctx, coordinatorTypeGroup, t.Group, req) } } @@ -1913,6 +1931,8 @@ func (cl *Client) handleReqWithCoordinator( code = t.ErrorCode case *kmsg.SyncGroupResponse: code = t.ErrorCode + case *kmsg.ConsumerGroupHeartbeatResponse: + code = t.ErrorCode } // ListGroups, OffsetFetch, DeleteGroups, DescribeGroups, and diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 60222497..07f4ed27 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -727,8 +727,20 @@ func (c *consumer) purgeTopics(topics []string) { // The difference for groups is we need to lock the group and there is // a slight type difference in g.using vs d.using. + // + // assignPartitions removes the topics from 'tps', which removes them + // from FUTURE metadata requests meaning they will not be repopulated + // in the future. Any loaded tps is fine; metadata updates the topic + // pointers underneath -- metadata does not re-store the tps itself + // with any new topics that we just purged (except in the case of regex, + // which is impossible to permanently purge anyway). + // + // We are guarded from adding back to 'using' via the consumer mu; + // this cannot run concurrent with findNewAssignments. Thus, we first + // purge tps, then clear using, and once the lock releases, findNewAssignments + // will use the now-purged tps and will not add back to using. if c.g != nil { - c.g.mu.Lock() + c.g.mu.Lock() // required when updating using defer c.g.mu.Unlock() c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics)) for _, topic := range topics { @@ -742,6 +754,7 @@ func (c *consumer) purgeTopics(topics []string) { delete(c.d.using, topic) delete(c.d.reSeen, topic) delete(c.d.m, topic) + delete(c.d.ps, topic) } } } diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index 0dcbf989..9f808a16 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -5,7 +5,7 @@ type directConsumer struct { tps *topicsPartitions // data for topics that the user assigned using mtmps // topics we are currently using m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add - ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add + ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add, for direct partition consuming reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index cde63f54..8d4d9cee 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -683,3 +683,40 @@ func TestIssue865(t *testing.T) { t.Errorf("got byte buffered %d != 0", nbytebuf) } } + +func TestGroupSimple(t *testing.T) { + t.Parallel() + + t1, cleanup := tmpTopicPartitions(t, 1) + defer cleanup() + g1, gcleanup := tmpGroup(t) + defer gcleanup() + + cl, _ := newTestClient( + DefaultProduceTopic(t1), + ConsumeTopics(t1), + ConsumerGroup(g1), + MetadataMinAge(100*time.Millisecond), + FetchMaxWait(time.Second), + UnknownTopicRetries(-1), + ) + defer cl.Close() + + for i := 0; i < 2; i++ { + if err := cl.ProduceSync(context.Background(), StringRecord("foo")).FirstErr(); err != nil { + t.Fatal(err) + } + + fs := cl.PollFetches(context.Background()) + + if errs := fs.Errors(); errs != nil { + t.Errorf("unexpected fetch errors: %v", errs) + } + if num := fs.NumRecords(); num != 1 { + t.Errorf("expected only one record, got %d", num) + } + if err := cl.CommitUncommittedOffsets(context.Background()); err != nil { + t.Errorf("unexpected err: %v", err) + } + } +} diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 8a01afb6..fbfc1332 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -29,6 +29,14 @@ type groupConsumer struct { // The data for topics that the user assigned. Metadata updates the // atomic.Value in each pointer atomically. + // + // We initialize tps with zero-value *topicPartitions in initGroup + // if we are directly consuming topics. If we are regex consuming, + // the metadata loop itself requests all topics, filters the topics + // against our regex, and then adds matching topics to tps. + // + // This, effectively, is the set of all candidate topics we could + // theoretically consume. Purging topics removes from this. tps *topicsPartitions reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not @@ -95,11 +103,13 @@ type groupConsumer struct { // topics that we are not interested in consuming. We track the entire // group's topics in external, and our fetchMetadata loop uses this. // We store this as a pointer for address comparisons. + // Not relevant if using KIP-848. external atomic.Value // *groupExternal // See the big comment on `commit`. If we allow committing between // join&sync, we occasionally see RebalanceInProgress or // IllegalGeneration errors while cooperative consuming. + // Not relevant if using KIP-848. noCommitDuringJoinAndSync sync.RWMutex ////////////// @@ -109,8 +119,12 @@ type groupConsumer struct { // using is updated when finding new assignments, we always add to this // if we want to consume a topic (or see there are more potential - // partitions). Only the leader can trigger a new group session if there - // are simply more partitions for existing topics. + // partitions). The difference between 'using' and 'tps' is that + // 'using' is used FOR joining. We add topics to this when we learn + // about them and want to consume them, and the topics here + // are used in the JoinGroup metadata. There may be a small delta + // between 'tps' before topics are in 'using', and 'using' tracks + // the last known partition count for if we are leader. // // This is read when joining a group or leaving a group. using map[string]int // topics *we* are currently using => # partitions known in that topic @@ -146,7 +160,9 @@ type groupConsumer struct { blockAuto bool // We set this once to manage the group lifecycle once. + // If we detect we should run in 848 mode, we set is848 true. managing bool + is848 bool dying bool // set when closing, read in findNewAssignments left chan struct{} @@ -189,6 +205,10 @@ func (g *groupMemberGen) storeMember(memberID string) { g.store(memberID, g.generation()) } +func (g *groupMemberGen) storeGeneration(generation int32) { + g.store(g.memberID(), generation) +} + // LeaveGroup leaves a group. Close automatically leaves the group, so this is // only necessary to call if you plan to leave the group but continue to use // the client. If a rebalance is in progress, this function waits for the @@ -366,6 +386,93 @@ func (c *consumer) initGroup() { } } +func (g *groupConsumer) manageFailWait(consecutiveErrors int, err error) (ctxCanceled bool) { + // If the user has BlockPollOnRebalance enabled, we have to + // block around the onLost and assigning. + g.c.waitAndAddRebalance() + + if errors.Is(err, context.Canceled) && g.cfg.onRevoked != nil { + // The cooperative consumer does not revoke everything + // while rebalancing, meaning if our context is + // canceled, we may have uncommitted data. Rather than + // diving into onLost, we should go into onRevoked, + // because for the most part, a context cancelation + // means we are leaving the group. Going into onRevoked + // gives us an opportunity to commit outstanding + // offsets. For the eager consumer, since we always + // revoke before exiting the heartbeat loop, we do not + // really care so much about *needing* to call + // onRevoked, but since we are handling this case for + // the cooperative consumer we may as well just also + // include the eager consumer. + g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) + } else { + // Any other error is perceived as a fatal error, + // and we go into onLost as appropriate. + if g.cfg.onLost != nil { + g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) + } + g.cfg.hooks.each(func(h Hook) { + if h, ok := h.(HookGroupManageError); ok { + h.OnGroupManageError(err) + } + }) + g.c.addFakeReadyForDraining("", 0, &ErrGroupSession{err}, "notification of group management loop error") + } + + // If we are eager, we should have invalidated everything + // before getting here, but we do so doubly just in case. + // + // If we are cooperative, the join and sync could have failed + // during the cooperative rebalance where we were still + // consuming. We need to invalidate everything. Waiting to + // resume from poll is necessary, but the user will likely be + // unable to commit. + { + g.c.mu.Lock() + g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") + g.mu.Lock() // before allowing poll to touch uncommitted, lock the group + g.c.mu.Unlock() // now part of poll can continue + g.uncommitted = nil + g.mu.Unlock() + + g.nowAssigned.store(nil) + g.lastAssigned = nil + g.fetching = nil + + g.leader.Store(false) + g.resetExternal() + } + + // Unblock bolling now that we have called onLost and + // re-assigned. + g.c.unaddRebalance() + + if errors.Is(err, context.Canceled) { // context was canceled, quit now + return true + } + + // Waiting for the backoff is a good time to update our + // metadata; maybe the error is from stale metadata. + backoff := g.cfg.retryBackoff(consecutiveErrors) + g.cfg.logger.Log(LogLevelError, "group manage loop errored", + "group", g.cfg.group, + "err", err, + "consecutive_errors", consecutiveErrors, + "backoff", backoff, + ) + deadline := time.Now().Add(backoff) + g.cl.waitmeta(g.ctx, backoff, "waitmeta during group manage backoff") + after := time.NewTimer(time.Until(deadline)) + select { + case <-g.ctx.Done(): + after.Stop() + return true + case <-after.C: + } + return false +} + // Manages the group consumer's join / sync / heartbeat / fetch offset flow. // // Once a group is assigned, we fire a metadata request for all topics the @@ -388,7 +495,7 @@ func (g *groupConsumer) manage() { } err := g.joinAndSync(joinWhy) if err == nil { - if joinWhy, err = g.setupAssignedAndHeartbeat(); err != nil { + if joinWhy, err = g.setupAssignedAndHeartbeat(g.cfg.heartbeatInterval, g.heartbeatFn()); err != nil { if errors.Is(err, kerr.RebalanceInProgress) { err = nil } @@ -400,89 +507,10 @@ func (g *groupConsumer) manage() { } joinWhy = "rejoining after we previously errored and backed off" - // If the user has BlockPollOnRebalance enabled, we have to - // block around the onLost and assigning. - g.c.waitAndAddRebalance() - - if errors.Is(err, context.Canceled) && g.cfg.onRevoked != nil { - // The cooperative consumer does not revoke everything - // while rebalancing, meaning if our context is - // canceled, we may have uncommitted data. Rather than - // diving into onLost, we should go into onRevoked, - // because for the most part, a context cancelation - // means we are leaving the group. Going into onRevoked - // gives us an opportunity to commit outstanding - // offsets. For the eager consumer, since we always - // revoke before exiting the heartbeat loop, we do not - // really care so much about *needing* to call - // onRevoked, but since we are handling this case for - // the cooperative consumer we may as well just also - // include the eager consumer. - g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned.read()) - } else { - // Any other error is perceived as a fatal error, - // and we go into onLost as appropriate. - if g.cfg.onLost != nil { - g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned.read()) - } - g.cfg.hooks.each(func(h Hook) { - if h, ok := h.(HookGroupManageError); ok { - h.OnGroupManageError(err) - } - }) - g.c.addFakeReadyForDraining("", 0, &ErrGroupSession{err}, "notification of group management loop error") - } - - // If we are eager, we should have invalidated everything - // before getting here, but we do so doubly just in case. - // - // If we are cooperative, the join and sync could have failed - // during the cooperative rebalance where we were still - // consuming. We need to invalidate everything. Waiting to - // resume from poll is necessary, but the user will likely be - // unable to commit. - { - g.c.mu.Lock() - g.c.assignPartitions(nil, assignInvalidateAll, nil, "clearing assignment at end of group management session") - g.mu.Lock() // before allowing poll to touch uncommitted, lock the group - g.c.mu.Unlock() // now part of poll can continue - g.uncommitted = nil - g.mu.Unlock() - - g.nowAssigned.store(nil) - g.lastAssigned = nil - g.fetching = nil - - g.leader.Store(false) - g.resetExternal() - } - - // Unblock bolling now that we have called onLost and - // re-assigned. - g.c.unaddRebalance() - - if errors.Is(err, context.Canceled) { // context was canceled, quit now - return - } - - // Waiting for the backoff is a good time to update our - // metadata; maybe the error is from stale metadata. consecutiveErrors++ - backoff := g.cfg.retryBackoff(consecutiveErrors) - g.cfg.logger.Log(LogLevelError, "join and sync loop errored", - "group", g.cfg.group, - "err", err, - "consecutive_errors", consecutiveErrors, - "backoff", backoff, - ) - deadline := time.Now().Add(backoff) - g.cl.waitmeta(g.ctx, backoff, "waitmeta during join & sync error backoff") - after := time.NewTimer(time.Until(deadline)) - select { - case <-g.ctx.Done(): - after.Stop() + ctxCanceled := g.manageFailWait(consecutiveErrors, err) + if ctxCanceled { return - case <-after.C: } } } @@ -494,6 +522,7 @@ func (g *groupConsumer) leave(ctx context.Context) { wasDead := g.dying g.dying = true wasManaging := g.managing + is848 := g.is848 g.cancel() g.mu.Unlock() @@ -512,11 +541,23 @@ func (g *groupConsumer) leave(ctx context.Context) { defer close(g.left) + // If we JUST started a group but do not yet have a + // member ID, there's nothing we can do. + memberID := g.memberGen.memberID() + if memberID == "" { + g.cfg.logger.Log(LogLevelInfo, "tried to leave group but we have no member ID yet, returning early", g.cfg.group) + return + } + + if is848 { + g.leave848(ctx) + return + } + if g.cfg.instanceID != nil { return } - memberID := g.memberGen.memberID() g.cfg.logger.Log(LogLevelInfo, "leaving group", "group", g.cfg.group, "member_id", memberID, @@ -654,7 +695,8 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi switch stage { case revokeLastSession: - // we use lost in this case + // we use lost in this case; this is the case where we are + // rejoining after losing some partitions (cooperative or KIP-848) case revokeThisSession: // lost is nil for cooperative assigning. Instead, we determine @@ -825,7 +867,7 @@ func (s *assignRevokeSession) revoke(g *groupConsumer, leaving bool) <-chan stru // - which ensures that pre revoking is complete // - fetching is complete // - heartbeating is complete -func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { +func (g *groupConsumer) setupAssignedAndHeartbeat(initialHb time.Duration, hbfn func() (time.Duration, error)) (string, error) { type hbquit struct { rejoinWhy string err error @@ -847,7 +889,7 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { go func() { defer cancel() // potentially kill offset fetching g.cfg.logger.Log(LogLevelInfo, "beginning heartbeat loop", "group", g.cfg.group) - rejoinWhy, err := g.heartbeat(fetchErrCh, s) + rejoinWhy, err := g.heartbeat(initialHb, fetchErrCh, s, hbfn) hbErrCh <- hbquit{rejoinWhy, err} }() @@ -910,6 +952,23 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { return done.rejoinWhy, done.err } +func (g *groupConsumer) heartbeatFn() func() (time.Duration, error) { + return func() (time.Duration, error) { + req := kmsg.NewPtrHeartbeatRequest() + req.Group = g.cfg.group + memberID, generation := g.memberGen.load() + req.Generation = generation + req.MemberID = memberID + req.InstanceID = g.cfg.instanceID + var resp *kmsg.HeartbeatResponse + resp, err := req.RequestWith(g.ctx, g.cl) + if err == nil { + err = kerr.ErrorForCode(resp.ErrorCode) + } + return g.cfg.heartbeatInterval, err + } +} + // heartbeat issues heartbeat requests to Kafka for the duration of a group // session. // @@ -920,15 +979,20 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // // If the offset fetch is successful, then we basically sit in this function // until a heartbeat errors or we, being the leader, decide to re-join. -func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSession) (string, error) { - ticker := time.NewTicker(g.cfg.heartbeatInterval) - defer ticker.Stop() +func (g *groupConsumer) heartbeat(initialHb time.Duration, fetchErrCh <-chan error, s *assignRevokeSession, hbfn func() (time.Duration, error)) (string, error) { + g.mu.Lock() + is848 := g.is848 + g.mu.Unlock() + + timer := time.NewTimer(initialHb) + defer timer.Stop() // We issue one heartbeat quickly if we are cooperative because // cooperative consumers rejoin the group immediately, and we want to - // detect that in 500ms rather than 3s. + // detect that in 500ms rather than 3s. We only want this is non-848 + // mode. var cooperativeFastCheck <-chan time.Time - if g.cooperative.Load() { + if g.cooperative.Load() && !is848 { cooperativeFastCheck = time.After(500 * time.Millisecond) } @@ -946,7 +1010,7 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio select { case <-cooperativeFastCheck: heartbeat = true - case <-ticker.C: + case <-timer.C: heartbeat = true case force = <-g.heartbeatForceCh: heartbeat = true @@ -971,16 +1035,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio if heartbeat { g.cfg.logger.Log(LogLevelDebug, "heartbeating", "group", g.cfg.group) - req := kmsg.NewPtrHeartbeatRequest() - req.Group = g.cfg.group - memberID, generation := g.memberGen.load() - req.Generation = generation - req.MemberID = memberID - req.InstanceID = g.cfg.instanceID - var resp *kmsg.HeartbeatResponse - if resp, err = req.RequestWith(g.ctx, g.cl); err == nil { - err = kerr.ErrorForCode(resp.ErrorCode) - } + var reset time.Duration + reset, err = hbfn() + timer.Reset(reset) g.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "group", g.cfg.group, "err", err) if force != nil { force(err) @@ -999,7 +1056,11 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio } if lastErr == nil { - g.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "group", g.cfg.group, "err", err) + if is848 && errors.Is(err, kerr.RebalanceInProgress) { + g.cfg.logger.Log(LogLevelInfo, "heartbeat saw a change in group status; partitions were added or lost", "group", g.cfg.group) + } else { + g.cfg.logger.Log(LogLevelInfo, "heartbeat errored", "group", g.cfg.group, "err", err) + } } else { g.cfg.logger.Log(LogLevelInfo, "heartbeat errored again while waiting for user revoke to finish", "group", g.cfg.group, "err", err) } @@ -1759,6 +1820,11 @@ func (g *groupConsumer) findNewAssignments() { if !g.managing { g.managing = true + if g.should848() { + g.is848 = true + go g.manage848() + return + } go g.manage() return } diff --git a/pkg/kgo/consumer_group_848.go b/pkg/kgo/consumer_group_848.go new file mode 100644 index 00000000..39a80094 --- /dev/null +++ b/pkg/kgo/consumer_group_848.go @@ -0,0 +1,330 @@ +package kgo + +import ( + "context" + "crypto/rand" + "encoding/base64" + "errors" + "fmt" + "io" + "slices" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func (g *groupConsumer) should848() bool { + if !g.cl.supportsKIP848() { + return false + } + switch g.cfg.balancers[0].(type) { + case *stickyBalancer: + case *rangeBalancer: + default: + return false + } + return true +} + +func (g *groupConsumer) manage848() { + var serverAssignor string + switch g.cfg.balancers[0].(type) { + case *stickyBalancer: + serverAssignor = "uniform" + case *rangeBalancer: + serverAssignor = "range" + } + + var known848Support bool + defer func() { + if known848Support { + close(g.manageDone) + } + }() + optInKnown := func() { + if known848Support { + return + } + known848Support = true + g.cfg.logger.Log(LogLevelInfo, "beginning to manage the next-gen group lifecycle", "group", g.cfg.group) + g.cooperative.Store(true) // next gen is always cooperative + } + + g848 := &g848{g: g, serverAssignor: serverAssignor} + var consecutiveErrors int + for { + initialHb, err := g848.initialJoin() + + // Even if Kafka replies that the API is available, if we use it + // and the broker is not configured to support it, we receive + // UnsupportedVersion. On the first loop + if !known848Support { + if err != nil { + var ke *kerr.Error + if errors.As(err, &ke) { + if ke.Code == kerr.UnsupportedVersion.Code { + // It's okay to update is848 here. This is used while leaving + // and while heartbeating. We have not yet entered heartbeating, + // and if the user is concurrently leaving, the lack of a memberID + // means both 848 and old group mgmt leaves return early. + g.mu.Lock() + g.is848 = false + g.mu.Unlock() + g.cfg.logger.Log(LogLevelInfo, "falling back to standard consumer group management due to lack of broker support", "group", g.cfg.group) + go g.manage() + return + } + optInKnown() // A kerr that is NOT UnsupportVersion means this is supported. + } + // For non-kerr errors, we fall into normal logic below and retry. + } else { + optInKnown() + } + } + + for err == nil { + consecutiveErrors = 0 + var nowAssigned map[string][]int32 + var haveUpdate bool + // setupAssignedAndHeartbeat + // * First revokes partitions we lost from our last session + // * Starts heartbeating + // * Starts fetching offsets for what new we're assigned + // + // In heartbeating, if we lose or gain partitions, we need to + // exit the old heartbeat and re-enter setupAssignedAndHeartbeat. + // Otherwise, we heartbeat exactly the same as the old. + // + // This results in a few more heartbeats than necessary + // when things are changing, but keeps all the old + // logic that handles all edge conditions. + _, err = g.setupAssignedAndHeartbeat(initialHb, func() (time.Duration, error) { + req := g848.mkreq() + resp, err := req.RequestWith(g.ctx, g.cl) + if err != nil { + return g.cfg.heartbeatInterval, err + } + err = errCodeMessage(resp.ErrorCode, resp.ErrorMessage) + sleep := time.Duration(resp.HeartbeatIntervalMillis) * time.Millisecond + if err != nil { + return sleep, err + } + nowAssigned = g848.handleResp(resp) + if nowAssigned != nil { + err = kerr.RebalanceInProgress + haveUpdate = true + } + return sleep, err + }) + if errors.Is(err, kerr.RebalanceInProgress) { + err = nil + } + if haveUpdate { + g.nowAssigned.store(nowAssigned) + } + } + + // The errors we have to handle are: + // * UnknownMemberID: abandon partitions, rejoin + // * FencedMemberEpoch: abandon partitions, rejoin + // * UnreleasedInstanceID: fatal error, do not rejoin + // * General error: fatal error, do not rejoin + // + // In the latter two cases, we fall into rejoining anyway + // because it is both non-problematic (we will keep failing + // with the same error) and because it will cause the user + // to repeatedly get error logs. + consecutiveErrors++ + ctxCanceled := g.manageFailWait(consecutiveErrors, err) + if ctxCanceled { + return + } + } +} + +func (g *groupConsumer) leave848(ctx context.Context) { + if g.cfg.instanceID != nil { + return + } + + memberID := g.memberGen.memberID() + g.cfg.logger.Log(LogLevelInfo, "leaving next-gen group", + "group", g.cfg.group, + "member_id", memberID, + "instance_id", g.cfg.instanceID, + ) + // If we error when leaving, there is not much + // we can do. We may as well just return. + req := kmsg.NewPtrConsumerGroupHeartbeatRequest() + req.Group = g.cfg.group + req.MemberID = g.memberGen.memberID() + req.MemberEpoch = -1 + if g.cfg.instanceID != nil { + req.MemberEpoch = -2 + } + + resp, err := req.RequestWith(ctx, g.cl) + if err != nil { + g.leaveErr = err + return + } + g.leaveErr = errCodeMessage(resp.ErrorCode, resp.ErrorMessage) +} + +type g848 struct { + g *groupConsumer + + serverAssignor string + + lastSubscribedTopics []string + lastTopics map[string][]int32 +} + +// v1+ requires the end user to generate their own MemberID, with the +// recommendation being v4 uuid base64 encoded so it can be put in URLs. We +// roughly do that (no version nor variant bits). crypto/rand does not fail +// and, in future Go versions, will panic on internal errors. +func newStringUUID() string { + var uuid [16]byte + io.ReadFull(rand.Reader, uuid[:]) // even more random than adding version & variant bits is having full randomness + return base64.URLEncoding.EncodeToString(uuid[:]) +} + +func (g *g848) initialJoin() (time.Duration, error) { + var memberID string + rejoin := true + if g.g.cl.supportsKIP848v1() { + memberID = newStringUUID() + rejoin = false + } + g.g.memberGen.store(memberID, 0) // 0 joins the group +reissue: + req := g.mkreq() + resp, err := req.RequestWith(g.g.ctx, g.g.cl) + if err == nil { + err = errCodeMessage(resp.ErrorCode, resp.ErrorMessage) + } + if err != nil { + return 0, err + } + // On the initial join, IF we are v0, we need to rejoin with the + // server-provided memberID. We should not be assigned anything yet... + nowAssigned := g.handleResp(resp) + if len(nowAssigned) == 0 && rejoin { + rejoin = false + goto reissue + } + g.g.nowAssigned.store(nowAssigned) + return time.Duration(resp.HeartbeatIntervalMillis) * time.Millisecond, nil +} + +func (g *g848) handleResp(resp *kmsg.ConsumerGroupHeartbeatResponse) map[string][]int32 { + if resp.MemberID != nil { + g.g.memberGen.store(*resp.MemberID, resp.MemberEpoch) + } else { + g.g.memberGen.storeGeneration(resp.MemberEpoch) + } + + if resp.Assignment == nil { + return nil + } + + id2t := g.g.cl.id2tMap() + newAssigned := make(map[string][]int32) + for _, t := range resp.Assignment.Topics { + name := id2t[t.TopicID] + if name == "" { + // If we do not recognize the topic ID, we do not keep it for + // assignment yet, but we immediately trigger a metadata update + // to hopefully discover this topic by the time we are assigned + // the topic again. + g.g.cl.triggerUpdateMetadataNow(fmt.Sprintf("consumer group heartbeat returned topic ID %s that we do not recognize", topicID(t.TopicID))) + continue + } + slices.Sort(t.Partitions) + newAssigned[name] = t.Partitions + } + current := g.g.nowAssigned.read() + if !mapi32sDeepEq(current, newAssigned) { + return newAssigned + } + return nil +} + +func (g *g848) mkreq() *kmsg.ConsumerGroupHeartbeatRequest { + req := kmsg.NewPtrConsumerGroupHeartbeatRequest() + req.Group = g.g.cfg.group + req.MemberID, req.MemberEpoch = g.g.memberGen.load() + + // Most fields in the request can be null if the field is equal to the + // last time we sent the request. The first time we write, we include + // all information. + // + // Our initial set of subscribed topics is specified in our config. + // For non-regex consuming, topics are directly created into g.tps. + // As well, g.tps is added to or purged from in AddConsumeTopics or + // PurgeConsumeTopics. We can always use g.tps for direct topics the + // user wants to consume. + // + // For regex topics, they cannot add or remove after client creation. + // We just use the initial config field. + + switch req.MemberEpoch { + case 0: + req.InstanceID = g.g.cfg.instanceID + if g.g.cfg.rack != "" { + req.RackID = &g.g.cfg.rack + } + req.RebalanceTimeoutMillis = int32(g.g.cfg.rebalanceTimeout.Milliseconds()) + req.ServerAssignor = &g.serverAssignor + + // SubscribedTopics must always exist when epoch == 0. + // We specifically 'make' the slice to ensure it is non-nil. + tps := g.g.tps.load() + subscribedTopics := make([]string, 0, len(tps)) + for t := range tps { + subscribedTopics = append(subscribedTopics, t) + } + g.lastSubscribedTopics = subscribedTopics + req.SubscribedTopicNames = subscribedTopics + + // Topics must be empty when we do the initial join. + // We sanity-check that we are not assigned anything. + if len(g.g.nowAssigned.read()) > 0 { + panic("issuing ConsumerGroupHeartbeat while we are currently assigned something, bug, please open an issue!") + } + g.lastTopics = noConsumerGroupHeartbeatReqTopics + req.Topics = []kmsg.ConsumerGroupHeartbeatRequestTopic{} // must always exist and be empty when joining + + default: + tps := g.g.tps.load() + subscribedTopics := make([]string, 0, len(tps)) + for t := range tps { + subscribedTopics = append(subscribedTopics, t) + } + if !slicesDeepEq(g.lastSubscribedTopics, subscribedTopics) { + req.SubscribedTopicNames = subscribedTopics + } + g.lastSubscribedTopics = subscribedTopics + + nowAssigned := g.g.nowAssigned.clone() // always returns non-nil + if !mapi32sDeepEq(g.lastTopics, nowAssigned) { + req.Topics = []kmsg.ConsumerGroupHeartbeatRequestTopic{} // ALWAYS initialize: len 0 is significantly different than nil (nil means same as last time) + tps := g.g.tps.load() + for t, ps := range nowAssigned { + rt := kmsg.NewConsumerGroupHeartbeatRequestTopic() + rt.Partitions = slices.Clone(ps) + rt.TopicID = tps[t].load().id + req.Topics = append(req.Topics, rt) + } + } + g.lastTopics = nowAssigned + } + + // TODO regex v1+ + + return req +} + +var noConsumerGroupHeartbeatReqTopics = make(map[string][]int32) diff --git a/pkg/kgo/errors.go b/pkg/kgo/errors.go index 74913254..35322a03 100644 --- a/pkg/kgo/errors.go +++ b/pkg/kgo/errors.go @@ -7,6 +7,8 @@ import ( "io" "net" "os" + + "github.com/twmb/franz-go/pkg/kerr" ) func isRetryableBrokerErr(err error) bool { @@ -334,3 +336,13 @@ func isDecompressErr(err error) bool { var ed *errDecompress return errors.As(err, &ed) } + +func errCodeMessage(code int16, errMessage *string) error { + if err := kerr.ErrorForCode(code); err != nil { + if errMessage != nil { + return fmt.Errorf("%s: %w", *errMessage, err) + } + return err + } + return nil +} diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index a6aade33..9be95f83 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -75,12 +75,6 @@ func (cl *Client) PartitionLeader(topic string, partition int32) (leader, leader return p.leader, p.leaderEpoch, p.loadErr } -func (cl *Client) id2topic(id [16]byte) string { - m := cl.id2tMap() - t := m[id] - return t -} - var noid2t = make(map[[16]byte]string) func (cl *Client) id2tMap() map[[16]byte]string {