From 83ef028aa911c0b29bd57bfff545d818c81f42f4 Mon Sep 17 00:00:00 2001 From: Deng Ming Date: Sat, 6 Apr 2024 21:36:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20kafka=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=20config=20map=EF=BC=8C=E5=8E=BB=E9=99=A4?= =?UTF-8?q?=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- kafka/mq.go | 35 ++++++++--------------------------- kafka/producer.go | 14 ++++---------- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 3762514..d001659 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ecodeclub/mq-api -go 1.21.0 +go 1.21 require ( github.com/ecodeclub/ekit v0.0.8 diff --git a/kafka/mq.go b/kafka/mq.go index 3fe3575..7eee7d7 100644 --- a/kafka/mq.go +++ b/kafka/mq.go @@ -41,10 +41,8 @@ type MQ struct { closed bool closeErr error - // 方便释放资源 - topicConfigMapping map[string]kafkago.TopicConfig - producers []mq.Producer - consumers []mq.Consumer + producers []mq.Producer + consumers []mq.Consumer } func NewMQ(network string, address []string) (mq.MQ, error) { @@ -64,10 +62,9 @@ func NewMQ(network string, address []string) (mq.MQ, error) { return nil, err } return &MQ{ - address: address, - controllerConn: controllerConn, - replicationFactor: defaultReplicationFactor, - topicConfigMapping: make(map[string]kafkago.TopicConfig), + address: address, + controllerConn: controllerConn, + replicationFactor: defaultReplicationFactor, }, nil } @@ -91,12 +88,8 @@ func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error return ctx.Err() } - if _, ok := m.topicConfigMapping[name]; ok { - return fmt.Errorf("kafka: %w", mqerr.ErrCreatedTopic) - } - - m.topicConfigMapping[name] = kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor} - return m.controllerConn.CreateTopics(m.topicConfigMapping[name]) + cfg := kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor} + return m.controllerConn.CreateTopics(cfg) } // DeleteTopics 删除topic @@ -112,10 +105,6 @@ func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error { return ctx.Err() } - for _, topic := range topics { - delete(m.topicConfigMapping, topic) - } - err := m.controllerConn.DeleteTopics(topics...) var val kafkago.Error if errors.As(err, &val) && val == kafkago.UnknownTopicOrPartition { @@ -132,12 +121,8 @@ func (m *MQ) Producer(topic string) (mq.Producer, error) { return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) } - if _, ok := m.topicConfigMapping[topic]; !ok { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic) - } - balancer, _ := NewSpecifiedPartitionBalancer(&kafkago.Hash{}) - p := NewProducer(m.address, topic, m.topicConfigMapping[topic].NumPartitions, balancer) + p := NewProducer(m.address, topic, balancer) m.producers = append(m.producers, p) return p, nil } @@ -150,10 +135,6 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) { return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) } - if _, ok := m.topicConfigMapping[topic]; !ok { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic) - } - c := NewConsumer(m.address, topic, groupID) m.consumers = append(m.consumers, c) diff --git a/kafka/producer.go b/kafka/producer.go index 6a2fc8f..6487155 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -30,9 +30,7 @@ import ( ) type Producer struct { - topic string - partitions int - + topic string writer *kafkago.Writer locker *sync.RWMutex @@ -41,11 +39,10 @@ type Producer struct { closeErr error } -func NewProducer(address []string, topic string, partitions int, balancer kafkago.Balancer) *Producer { +func NewProducer(address []string, topic string, balancer kafkago.Balancer) *Producer { return &Producer{ - topic: topic, - partitions: partitions, - locker: &sync.RWMutex{}, + topic: topic, + locker: &sync.RWMutex{}, writer: &kafkago.Writer{ Addr: kafkago.TCP(address...), Topic: topic, @@ -61,9 +58,6 @@ func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResu } func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error) { - if partition < 0 || partition >= p.partitions { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrInvalidPartition) - } return p.produce(ctx, m, metaMessage{SpecifiedPartitionKey: partition}) }