Skip to content

Commit

Permalink
删除 kafka 实现中的 config map,去除不必要的校验
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Apr 6, 2024
1 parent b3c489c commit 83ef028
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ecodeclub/mq-api

go 1.21.0
go 1.21

require (
github.com/ecodeclub/ekit v0.0.8
Expand Down
35 changes: 8 additions & 27 deletions kafka/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ type MQ struct {
closed bool
closeErr error

// 方便释放资源
topicConfigMapping map[string]kafkago.TopicConfig
producers []mq.Producer
consumers []mq.Consumer
producers []mq.Producer
consumers []mq.Consumer
}

func NewMQ(network string, address []string) (mq.MQ, error) {
Expand All @@ -64,10 +62,9 @@ func NewMQ(network string, address []string) (mq.MQ, error) {
return nil, err
}
return &MQ{
address: address,
controllerConn: controllerConn,
replicationFactor: defaultReplicationFactor,
topicConfigMapping: make(map[string]kafkago.TopicConfig),
address: address,
controllerConn: controllerConn,
replicationFactor: defaultReplicationFactor,
}, nil
}

Expand All @@ -91,12 +88,8 @@ func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error
return ctx.Err()
}

if _, ok := m.topicConfigMapping[name]; ok {
return fmt.Errorf("kafka: %w", mqerr.ErrCreatedTopic)
}

m.topicConfigMapping[name] = kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor}
return m.controllerConn.CreateTopics(m.topicConfigMapping[name])
cfg := kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor}
return m.controllerConn.CreateTopics(cfg)
}

// DeleteTopics 删除topic
Expand All @@ -112,10 +105,6 @@ func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error {
return ctx.Err()
}

for _, topic := range topics {
delete(m.topicConfigMapping, topic)
}

err := m.controllerConn.DeleteTopics(topics...)
var val kafkago.Error
if errors.As(err, &val) && val == kafkago.UnknownTopicOrPartition {
Expand All @@ -132,12 +121,8 @@ func (m *MQ) Producer(topic string) (mq.Producer, error) {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
}

if _, ok := m.topicConfigMapping[topic]; !ok {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic)
}

balancer, _ := NewSpecifiedPartitionBalancer(&kafkago.Hash{})
p := NewProducer(m.address, topic, m.topicConfigMapping[topic].NumPartitions, balancer)
p := NewProducer(m.address, topic, balancer)
m.producers = append(m.producers, p)
return p, nil
}
Expand All @@ -150,10 +135,6 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
}

if _, ok := m.topicConfigMapping[topic]; !ok {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic)
}

c := NewConsumer(m.address, topic, groupID)
m.consumers = append(m.consumers, c)

Expand Down
14 changes: 4 additions & 10 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
)

type Producer struct {
topic string
partitions int

topic string
writer *kafkago.Writer
locker *sync.RWMutex

Expand All @@ -41,11 +39,10 @@ type Producer struct {
closeErr error
}

func NewProducer(address []string, topic string, partitions int, balancer kafkago.Balancer) *Producer {
func NewProducer(address []string, topic string, balancer kafkago.Balancer) *Producer {
return &Producer{
topic: topic,
partitions: partitions,
locker: &sync.RWMutex{},
topic: topic,
locker: &sync.RWMutex{},
writer: &kafkago.Writer{
Addr: kafkago.TCP(address...),
Topic: topic,
Expand All @@ -61,9 +58,6 @@ func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResu
}

func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error) {
if partition < 0 || partition >= p.partitions {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrInvalidPartition)
}
return p.produce(ctx, m, metaMessage{SpecifiedPartitionKey: partition})
}

Expand Down

0 comments on commit 83ef028

Please sign in to comment.