From 22f5969c36136cea26832a25389ae25ac9527d6a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Jan 2024 16:45:19 +0000 Subject: [PATCH] Bump github.com/segmentio/kafka-go from 0.4.46 to 0.4.47 Bumps [github.com/segmentio/kafka-go](https://github.com/segmentio/kafka-go) from 0.4.46 to 0.4.47. - [Release notes](https://github.com/segmentio/kafka-go/releases) - [Commits](https://github.com/segmentio/kafka-go/compare/v0.4.46...v0.4.47) --- updated-dependencies: - dependency-name: github.com/segmentio/kafka-go dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../github.com/segmentio/kafka-go/balancer.go | 12 +- .../segmentio/kafka-go/protocol/protocol.go | 31 ++++++ .../protocol/rawproduce/rawproduce.go | 91 ++++++++++++++++ .../segmentio/kafka-go/protocol/record.go | 40 +++++++ .../segmentio/kafka-go/protocol/request.go | 6 + .../segmentio/kafka-go/protocol/response.go | 6 + .../segmentio/kafka-go/rawproduce.go | 103 ++++++++++++++++++ vendor/modules.txt | 3 +- 10 files changed, 290 insertions(+), 8 deletions(-) create mode 100644 vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go create mode 100644 vendor/github.com/segmentio/kafka-go/rawproduce.go diff --git a/go.mod b/go.mod index 7494d4641..36dfa7352 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.45.0 - github.com/segmentio/kafka-go v0.4.46 + github.com/segmentio/kafka-go v0.4.47 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 7b6f4094b..ab3dda4b6 100644 --- a/go.sum +++ b/go.sum @@ -760,8 +760,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI= -github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= diff --git a/vendor/github.com/segmentio/kafka-go/balancer.go b/vendor/github.com/segmentio/kafka-go/balancer.go index 4136fce7b..ee3a25885 100644 --- a/vendor/github.com/segmentio/kafka-go/balancer.go +++ b/vendor/github.com/segmentio/kafka-go/balancer.go @@ -7,7 +7,6 @@ import ( "math/rand" "sort" "sync" - "sync/atomic" ) // The Balancer interface provides an abstraction of the message distribution @@ -42,8 +41,10 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int { type RoundRobin struct { ChunkSize int // Use a 32 bits integer so RoundRobin values don't need to be aligned to - // apply atomic increments. + // apply increments. counter uint32 + + mutex sync.Mutex } // Balance satisfies the Balancer interface. @@ -52,14 +53,17 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int { } func (rr *RoundRobin) balance(partitions []int) int { + rr.mutex.Lock() + defer rr.mutex.Unlock() + if rr.ChunkSize < 1 { rr.ChunkSize = 1 } length := len(partitions) - counterNow := atomic.LoadUint32(&rr.counter) + counterNow := rr.counter offset := int(counterNow / uint32(rr.ChunkSize)) - atomic.AddUint32(&rr.counter, 1) + rr.counter++ return partitions[offset%length] } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go index ebf91a798..3d0a7b8dd 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go @@ -213,6 +213,37 @@ func Register(req, res Message) { } } +// OverrideTypeMessage is an interface implemented by messages that want to override the standard +// request/response types for a given API. +type OverrideTypeMessage interface { + TypeKey() OverrideTypeKey +} + +type OverrideTypeKey int16 + +const ( + RawProduceOverride OverrideTypeKey = 0 +) + +var overrideApiTypes [numApis]map[OverrideTypeKey]apiType + +func RegisterOverride(req, res Message, key OverrideTypeKey) { + k1 := req.ApiKey() + k2 := res.ApiKey() + + if k1 != k2 { + panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2)) + } + + if overrideApiTypes[k1] == nil { + overrideApiTypes[k1] = make(map[OverrideTypeKey]apiType) + } + overrideApiTypes[k1][key] = apiType{ + requests: typesOf(req), + responses: typesOf(res), + } +} + func typesOf(v interface{}) []messageType { return makeTypes(reflect.TypeOf(v).Elem()) } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go b/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go new file mode 100644 index 000000000..bad83138d --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go @@ -0,0 +1,91 @@ +package rawproduce + +import ( + "fmt" + + "github.com/segmentio/kafka-go/protocol" + "github.com/segmentio/kafka-go/protocol/produce" +) + +func init() { + // Register a type override so that raw produce requests will be encoded with the correct type. + req := &Request{} + protocol.RegisterOverride(req, &produce.Response{}, req.TypeKey()) +} + +type Request struct { + TransactionalID string `kafka:"min=v3,max=v8,nullable"` + Acks int16 `kafka:"min=v0,max=v8"` + Timeout int32 `kafka:"min=v0,max=v8"` + Topics []RequestTopic `kafka:"min=v0,max=v8"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce } + +func (r *Request) TypeKey() protocol.OverrideTypeKey { return protocol.RawProduceOverride } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + broker := protocol.Broker{ID: -1} + + for i := range r.Topics { + t := &r.Topics[i] + + topic, ok := cluster.Topics[t.Topic] + if !ok { + return broker, NewError(protocol.NewErrNoTopic(t.Topic)) + } + + for j := range t.Partitions { + p := &t.Partitions[j] + + partition, ok := topic.Partitions[p.Partition] + if !ok { + return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition)) + } + + if b, ok := cluster.Brokers[partition.Leader]; !ok { + return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition)) + } else if broker.ID < 0 { + broker = b + } else if b.ID != broker.ID { + return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID)) + } + } + } + + return broker, nil +} + +func (r *Request) HasResponse() bool { + return r.Acks != 0 +} + +type RequestTopic struct { + Topic string `kafka:"min=v0,max=v8"` + Partitions []RequestPartition `kafka:"min=v0,max=v8"` +} + +type RequestPartition struct { + Partition int32 `kafka:"min=v0,max=v8"` + RecordSet protocol.RawRecordSet `kafka:"min=v0,max=v8"` +} + +var ( + _ protocol.BrokerMessage = (*Request)(nil) +) + +type Error struct { + Err error +} + +func NewError(err error) *Error { + return &Error{Err: err} +} + +func (e *Error) Error() string { + return fmt.Sprintf("fetch request error: %v", e.Err) +} + +func (e *Error) Unwrap() error { + return e.Err +} diff --git a/vendor/github.com/segmentio/kafka-go/protocol/record.go b/vendor/github.com/segmentio/kafka-go/protocol/record.go index 84594868b..e11af4dcc 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/record.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/record.go @@ -292,6 +292,46 @@ func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) { return n, nil } +// RawRecordSet represents a record set for a RawProduce request. The record set is +// represented as a raw sequence of pre-encoded record set bytes. +type RawRecordSet struct { + // Reader exposes the raw sequence of record set bytes. + Reader io.Reader +} + +// ReadFrom reads the representation of a record set from r into rrs. It re-uses the +// existing RecordSet.ReadFrom implementation to first read/decode data into a RecordSet, +// then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet. +// +// Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a +// performance standpoint as it require an extra copy of the record bytes. Holding off +// on optimizing, as this code path is only invoked in tests. +func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) { + rs := &RecordSet{} + n, err := rs.ReadFrom(r) + if err != nil { + return 0, err + } + + buf := &bytes.Buffer{} + rs.WriteTo(buf) + *rrs = RawRecordSet{ + Reader: buf, + } + + return n, nil +} + +// WriteTo writes the RawRecordSet to an io.Writer. Since this is a raw record set representation, all that is +// done here is copying bytes from the underlying reader to the specified writer. +func (rrs *RawRecordSet) WriteTo(w io.Writer) (int64, error) { + if rrs.Reader == nil { + return 0, ErrNoRecord + } + + return io.Copy(w, rrs.Reader) +} + func makeTime(t int64) time.Time { return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/request.go b/vendor/github.com/segmentio/kafka-go/protocol/request.go index 8b99e0537..135b938bb 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/request.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/request.go @@ -81,6 +81,12 @@ func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID s return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) } + if typedMessage, ok := msg.(OverrideTypeMessage); ok { + typeKey := typedMessage.TypeKey() + overrideType := overrideApiTypes[apiKey][typeKey] + t = &overrideType + } + minVersion := t.minVersion() maxVersion := t.maxVersion() diff --git a/vendor/github.com/segmentio/kafka-go/protocol/response.go b/vendor/github.com/segmentio/kafka-go/protocol/response.go index 619480313..a43bd0237 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/response.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/response.go @@ -95,6 +95,12 @@ func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Messa return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) } + if typedMessage, ok := msg.(OverrideTypeMessage); ok { + typeKey := typedMessage.TypeKey() + overrideType := overrideApiTypes[apiKey][typeKey] + t = &overrideType + } + minVersion := t.minVersion() maxVersion := t.maxVersion() diff --git a/vendor/github.com/segmentio/kafka-go/rawproduce.go b/vendor/github.com/segmentio/kafka-go/rawproduce.go new file mode 100644 index 000000000..5928cb2f8 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/rawproduce.go @@ -0,0 +1,103 @@ +package kafka + +import ( + "context" + "errors" + "fmt" + "net" + + "github.com/segmentio/kafka-go/protocol" + produceAPI "github.com/segmentio/kafka-go/protocol/produce" + "github.com/segmentio/kafka-go/protocol/rawproduce" +) + +// RawProduceRequest represents a request sent to a kafka broker to produce records +// to a topic partition. The request contains a pre-encoded/raw record set. +type RawProduceRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // The topic to produce the records to. + Topic string + + // The partition to produce the records to. + Partition int + + // The level of required acknowledgements to ask the kafka broker for. + RequiredAcks RequiredAcks + + // The message format version used when encoding the records. + // + // By default, the client automatically determine which version should be + // used based on the version of the Produce API supported by the server. + MessageVersion int + + // An optional transaction id when producing to the kafka broker is part of + // a transaction. + TransactionalID string + + // The sequence of records to produce to the topic partition. + RawRecords protocol.RawRecordSet +} + +// RawProduce sends a raw produce request to a kafka broker and returns the response. +// +// If the request contained no records, an error wrapping protocol.ErrNoRecord +// is returned. +// +// When the request is configured with RequiredAcks=none, both the response and +// the error will be nil on success. +func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) { + m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{ + TransactionalID: req.TransactionalID, + Acks: int16(req.RequiredAcks), + Timeout: c.timeoutMs(ctx, defaultProduceTimeout), + Topics: []rawproduce.RequestTopic{{ + Topic: req.Topic, + Partitions: []rawproduce.RequestPartition{{ + Partition: int32(req.Partition), + RecordSet: req.RawRecords, + }}, + }}, + }) + + switch { + case err == nil: + case errors.Is(err, protocol.ErrNoRecord): + return new(ProduceResponse), nil + default: + return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err) + } + + if req.RequiredAcks == RequireNone { + return nil, nil + } + + res := m.(*produceAPI.Response) + if len(res.Topics) == 0 { + return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic) + } + topic := &res.Topics[0] + if len(topic.Partitions) == 0 { + return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition) + } + partition := &topic.Partitions[0] + + ret := &ProduceResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Error: makeError(partition.ErrorCode, partition.ErrorMessage), + BaseOffset: partition.BaseOffset, + LogAppendTime: makeTime(partition.LogAppendTime), + LogStartOffset: partition.LogStartOffset, + } + + if len(partition.RecordErrors) != 0 { + ret.RecordErrors = make(map[int]error, len(partition.RecordErrors)) + + for _, recErr := range partition.RecordErrors { + ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage) + } + } + + return ret, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e67333824..3f7302f7e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -341,7 +341,7 @@ github.com/sagikazarmark/locafero # github.com/sagikazarmark/slog-shim v0.1.0 ## explicit; go 1.20 github.com/sagikazarmark/slog-shim -# github.com/segmentio/kafka-go v0.4.46 +# github.com/segmentio/kafka-go v0.4.47 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress @@ -386,6 +386,7 @@ github.com/segmentio/kafka-go/protocol/offsetcommit github.com/segmentio/kafka-go/protocol/offsetdelete github.com/segmentio/kafka-go/protocol/offsetfetch github.com/segmentio/kafka-go/protocol/produce +github.com/segmentio/kafka-go/protocol/rawproduce github.com/segmentio/kafka-go/protocol/saslauthenticate github.com/segmentio/kafka-go/protocol/saslhandshake github.com/segmentio/kafka-go/protocol/syncgroup