Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

803 823 #904

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kfake/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ outer:
// Control is a function to call on any client request the cluster handles.
//
// If the control function returns true, then either the response is written
// back to the client or, if there the control function returns an error, the
// back to the client or, if the control function returns an error, the
// client connection is closed. If both returns are nil, then the cluster will
// loop continuing to read from the client and the client will likely have a
// read timeout at some point.
Expand All @@ -467,7 +467,7 @@ func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool)) {
// handles.
//
// If the control function returns true, then either the response is written
// back to the client or, if there the control function returns an error, the
// back to the client or, if the control function returns an error, the
// client connection is closed. If both returns are nil, then the cluster will
// loop continuing to read from the client and the client will likely have a
// read timeout at some point.
Expand Down
36 changes: 21 additions & 15 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ type Client struct {
producer producer
consumer consumer

compressor *compressor
decompressor *decompressor

coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]*coordinatorLoad

Expand Down Expand Up @@ -117,7 +114,7 @@ type hostport struct {

// ValidateOpts returns an error if the options are invalid.
func ValidateOpts(opts ...Opt) error {
_, _, _, err := validateCfg(opts...)
_, _, err := validateCfg(opts...)
return err
}

Expand All @@ -136,23 +133,29 @@ func parseSeeds(addrs []string) ([]hostport, error) {
// This function validates the configuration and returns a few things that we
// initialize while validating. The difference between this and NewClient
// initialization is all NewClient initialization is infallible.
func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) {
func validateCfg(opts ...Opt) (cfg, []hostport, error) {
cfg := defaultCfg()
for _, opt := range opts {
opt.apply(&cfg)
}
if err := cfg.validate(); err != nil {
return cfg, nil, nil, err
return cfg, nil, err
}
seeds, err := parseSeeds(cfg.seedBrokers)
if err != nil {
return cfg, nil, nil, err
return cfg, nil, err
}
compressor, err := newCompressor(cfg.compression...)
if err != nil {
return cfg, nil, nil, err
if cfg.compressor == nil {
cfg.compressor, err = DefaultCompressor(cfg.compression...)
if err != nil {
return cfg, nil, err
}
}
return cfg, seeds, compressor, nil
if cfg.decompressor == nil {
cfg.decompressor = DefaultDecompressor(cfg.pools...)
}

return cfg, seeds, nil
}

func namefn(fn any) string {
Expand Down Expand Up @@ -279,6 +282,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.sasls}
case namefn(WithHooks):
return []any{cfg.hooks}
case namefn(WithPools):
return []any{cfg.pools}
case namefn(ConcurrentTransactionsBackoff):
return []any{cfg.txnBackoff}
case namefn(ConsiderMissingTopicDeletedAfter):
Expand All @@ -294,6 +299,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.maxProduceInflight}
case namefn(ProducerBatchCompression):
return []any{cfg.compression}
case namefn(WithCompressor):
return []any{cfg.compressor}
case namefn(ProducerBatchMaxBytes):
return []any{cfg.maxRecordBatchBytes}
case namefn(MaxBufferedRecords):
Expand Down Expand Up @@ -330,6 +337,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.partitions}
case namefn(ConsumePreferringLagFn):
return []any{cfg.preferLagFn}
case namefn(WithDecompressor):
return []any{cfg.decompressor}
case namefn(ConsumeRegex):
return []any{cfg.regex}
case namefn(ConsumeResetOffset):
Expand Down Expand Up @@ -416,7 +425,7 @@ func (cl *Client) OptValues(opt any) []any {
// NewClient also launches a goroutine which periodically updates the cached
// topic metadata.
func NewClient(opts ...Opt) (*Client, error) {
cfg, seeds, compressor, err := validateCfg(opts...)
cfg, seeds, err := validateCfg(opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -482,9 +491,6 @@ func NewClient(opts ...Opt) (*Client, error) {
bufPool: newBufPool(),
prsPool: newPrsPool(),

compressor: compressor,
decompressor: newDecompressor(),

coordinators: make(map[coordinatorKey]*coordinatorLoad),

updateMetadataCh: make(chan string, 1),
Expand Down
Loading