diff --git a/Makefile b/Makefile index 9194429..1ab2822 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ ut: e2e: @make dev_3rd_down @make dev_3rd_up - @go test -tags=e2e -race -cover -coverprofile=e2e.out -failfast -shuffle=on ./e2e/... + @go test -tags=e2e -race -cover -coverprofile=e2e.out -failfast -shuffle=on ./internal/e2e/... @make dev_3rd_down # 启动本地研发 docker 依赖 diff --git a/go.mod b/go.mod index 3762514..d001659 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ecodeclub/mq-api -go 1.21.0 +go 1.21 require ( github.com/ecodeclub/ekit v0.0.8 diff --git a/e2e/base_test.go b/internal/e2e/base_test.go similarity index 92% rename from e2e/base_test.go rename to internal/e2e/base_test.go index f7ff251..9d063c3 100644 --- a/e2e/base_test.go +++ b/internal/e2e/base_test.go @@ -23,8 +23,9 @@ import ( "testing" "time" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/ecodeclub/mq-api" - "github.com/ecodeclub/mq-api/mqerr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -159,7 +160,7 @@ func (b *TestSuite) TestMQ_CreateTopic() { for _, invalidTopic := range invalidTopics { err := b.messageQueue.CreateTopic(context.Background(), invalidTopic, partitions) - assert.ErrorIs(t, err, mqerr.ErrInvalidTopic, invalidTopic) + assert.ErrorIs(t, err, errs.ErrInvalidTopic, invalidTopic) } require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), invalidTopics...)) @@ -179,11 +180,11 @@ func (b *TestSuite) TestMQ_CreateTopic() { partitions := -1 validTopic1 := "invalidPartitions1" - assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic1, partitions), mqerr.ErrInvalidPartition) + assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic1, partitions), errs.ErrInvalidPartition) partitions = 0 validTopic2 := "invalidPartitions2" - assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic2, partitions), mqerr.ErrInvalidPartition) + assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic2, partitions), errs.ErrInvalidPartition) require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), validTopic1, validTopic2)) }) @@ -196,7 +197,7 @@ func (b *TestSuite) TestMQ_CreateTopic() { require.NoError(t, err, createdTopic) err = b.messageQueue.CreateTopic(context.Background(), createdTopic, partitions) - require.Error(t, err) + require.NoError(t, err) require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), createdTopic)) }) @@ -259,22 +260,19 @@ func (b *TestSuite) TestMQ_Producer() { err := b.messageQueue.CreateTopic(context.Background(), unknownTopic, 1) require.NoError(t, err) require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), unknownTopic)) - - _, err = b.messageQueue.Producer(unknownTopic) - require.ErrorIs(t, err, mqerr.ErrUnknownTopic) } func (b *TestSuite) TestMQ_Consumer() { t := b.T() t.Parallel() - unknownTopic, groupID := "consumer_unknownTopic", "c1" - err := b.messageQueue.CreateTopic(context.Background(), unknownTopic, 1) + topic, groupID := "topic_a", "c1" + err := b.messageQueue.CreateTopic(context.Background(), topic, 1) require.NoError(t, err) - require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), unknownTopic)) + require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), topic)) - _, err = b.messageQueue.Consumer(unknownTopic, groupID) - require.ErrorIs(t, err, mqerr.ErrUnknownTopic) + _, err = b.messageQueue.Consumer(topic, groupID) + require.NoError(t, err) } func (b *TestSuite) TestMQ_Close() { @@ -305,30 +303,30 @@ func (b *TestSuite) TestMQ_Close() { // 调用producer上的方法会返回ErrProducerIsClosed _, err = p.Produce(context.Background(), &mq.Message{}) - require.ErrorIs(t, err, mqerr.ErrProducerIsClosed) + require.ErrorIs(t, err, errs.ErrProducerIsClosed) _, err = p.ProduceWithPartition(context.Background(), &mq.Message{}, partitions-1) - require.ErrorIs(t, err, mqerr.ErrProducerIsClosed) + require.ErrorIs(t, err, errs.ErrProducerIsClosed) // 调用consumer上的方法会返回ErrConsumerIsClosed _, err = c.ConsumeChan(context.Background()) - require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed) + require.ErrorIs(t, err, errs.ErrConsumerIsClosed) _, err = c.Consume(context.Background()) - require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed) + require.ErrorIs(t, err, errs.ErrConsumerIsClosed) // 再次调用MQ上的方法会返回ErrMQIsClosed err = messageQueue.CreateTopic(context.Background(), topic11, partitions) - require.ErrorIs(t, err, mqerr.ErrMQIsClosed) + require.ErrorIs(t, err, errs.ErrMQIsClosed) _, err = messageQueue.Producer(topic11) - require.ErrorIs(t, err, mqerr.ErrMQIsClosed) + require.ErrorIs(t, err, errs.ErrMQIsClosed) _, err = messageQueue.Consumer(topic11, consumerGroupID) - require.ErrorIs(t, err, mqerr.ErrMQIsClosed) + require.ErrorIs(t, err, errs.ErrMQIsClosed) err = messageQueue.DeleteTopics(context.Background(), topic11) - require.ErrorIs(t, err, mqerr.ErrMQIsClosed) + require.ErrorIs(t, err, errs.ErrMQIsClosed) } func (b *TestSuite) TestProducer_Produce() { @@ -410,27 +408,6 @@ func (b *TestSuite) TestProducer_ProduceWithPartition() { t.Parallel() }) - t.Run("分区ID非法_返回错误", func(t *testing.T) { - t.Parallel() - - topic14, partitions := "topic14", 2 - producers, _ := b.newProducersAndConsumers(t, topic14, partitions, producerInfo{Num: 1}, consumerInfo{}) - - ctx, cancelFunc := context.WithCancel(context.Background()) - cancelFunc() - - p := producers[0] - - _, err := p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, partitions) - require.ErrorIs(t, err, mqerr.ErrInvalidPartition) - - _, err = p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, partitions+1) - require.ErrorIs(t, err, mqerr.ErrInvalidPartition) - - _, err = p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, -1) - require.ErrorIs(t, err, mqerr.ErrInvalidPartition) - }) - t.Run("多分区_并发发送", func(t *testing.T) { t.Parallel() @@ -548,9 +525,9 @@ func (b *TestSuite) TestProducer_Close() { // 调用Close后 _, err = p.Produce(context.Background(), &mq.Message{Value: []byte("hello")}) - require.ErrorIs(t, err, mqerr.ErrProducerIsClosed) + require.ErrorIs(t, err, errs.ErrProducerIsClosed) _, err = p.ProduceWithPartition(context.Background(), &mq.Message{Value: []byte("world")}, partitions-1) - require.ErrorIs(t, err, mqerr.ErrProducerIsClosed) + require.ErrorIs(t, err, errs.ErrProducerIsClosed) } func (b *TestSuite) TestConsumer_Close() { @@ -592,10 +569,10 @@ func (b *TestSuite) TestConsumer_Close() { // 再调用Consumer上的其他方法将返回error _, err = c.ConsumeChan(context.Background()) - require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed) + require.ErrorIs(t, err, errs.ErrConsumerIsClosed) _, err = c.Consume(context.Background()) - require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed) + require.ErrorIs(t, err, errs.ErrConsumerIsClosed) } func (b *TestSuite) TestConsumer_ConsumeChan() { diff --git a/e2e/kafka_test.go b/internal/e2e/kafka_test.go similarity index 97% rename from e2e/kafka_test.go rename to internal/e2e/kafka_test.go index d312515..bfc15b2 100644 --- a/e2e/kafka_test.go +++ b/internal/e2e/kafka_test.go @@ -27,7 +27,7 @@ import ( ) func TestKafka(t *testing.T) { - address := []string{"127.0.0.1:9094"} + address := []string{"127.0.0.1:9092"} suite.Run(t, NewTestSuite(KafkaCreator{address: address})) } diff --git a/mqerr/error.go b/internal/errs/error.go similarity index 88% rename from mqerr/error.go rename to internal/errs/error.go index 39fb6d8..2192e52 100644 --- a/mqerr/error.go +++ b/internal/errs/error.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package mqerr +package errs import "errors" @@ -21,7 +21,5 @@ var ( ErrProducerIsClosed = errors.New("生产者已经关闭") ErrMQIsClosed = errors.New("mq已经关闭") ErrInvalidTopic = errors.New("topic非法") - ErrCreatedTopic = errors.New("topic已创建") ErrInvalidPartition = errors.New("partition非法") - ErrUnknownTopic = errors.New("未知topic") ) diff --git a/kafka/consumer.go b/kafka/consumer.go index f956741..83cb3e2 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -22,9 +22,10 @@ import ( "log" "sync" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/ecodeclub/mq-api" "github.com/ecodeclub/mq-api/kafka/common" - "github.com/ecodeclub/mq-api/mqerr" kafkago "github.com/segmentio/kafka-go" ) @@ -68,7 +69,7 @@ func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error) { return nil, ctx.Err() case m, ok := <-c.msgCh: if !ok { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrConsumerIsClosed) + return nil, fmt.Errorf("kafka: %w", errs.ErrConsumerIsClosed) } return m, nil } @@ -76,7 +77,7 @@ func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error) { func (c *Consumer) ConsumeChan(ctx context.Context) (<-chan *mq.Message, error) { if c.closeCtx.Err() != nil { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrConsumerIsClosed) + return nil, fmt.Errorf("kafka: %w", errs.ErrConsumerIsClosed) } if ctx.Err() != nil { return nil, ctx.Err() diff --git a/kafka/mq.go b/kafka/mq.go index 3fe3575..6a546f8 100644 --- a/kafka/mq.go +++ b/kafka/mq.go @@ -21,9 +21,10 @@ import ( "strconv" "sync" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/ecodeclub/mq-api" "github.com/ecodeclub/mq-api/internal/pkg/validator" - "github.com/ecodeclub/mq-api/mqerr" "github.com/pkg/errors" kafkago "github.com/segmentio/kafka-go" "go.uber.org/multierr" @@ -41,10 +42,8 @@ type MQ struct { closed bool closeErr error - // 方便释放资源 - topicConfigMapping map[string]kafkago.TopicConfig - producers []mq.Producer - consumers []mq.Consumer + producers []mq.Producer + consumers []mq.Consumer } func NewMQ(network string, address []string) (mq.MQ, error) { @@ -64,39 +63,34 @@ func NewMQ(network string, address []string) (mq.MQ, error) { return nil, err } return &MQ{ - address: address, - controllerConn: controllerConn, - replicationFactor: defaultReplicationFactor, - topicConfigMapping: make(map[string]kafkago.TopicConfig), + address: address, + controllerConn: controllerConn, + replicationFactor: defaultReplicationFactor, }, nil } func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error { if !validator.IsValidTopic(name) { - return fmt.Errorf("%w: %s", mqerr.ErrInvalidTopic, name) + return fmt.Errorf("%w: %s", errs.ErrInvalidTopic, name) } if partitions <= 0 { - return fmt.Errorf("%w: %d", mqerr.ErrInvalidPartition, partitions) + return fmt.Errorf("%w: %d", errs.ErrInvalidPartition, partitions) } m.locker.Lock() defer m.locker.Unlock() if m.closed { - return fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) + return fmt.Errorf("kafka: %w", errs.ErrMQIsClosed) } if ctx.Err() != nil { return ctx.Err() } - if _, ok := m.topicConfigMapping[name]; ok { - return fmt.Errorf("kafka: %w", mqerr.ErrCreatedTopic) - } - - m.topicConfigMapping[name] = kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor} - return m.controllerConn.CreateTopics(m.topicConfigMapping[name]) + cfg := kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor} + return m.controllerConn.CreateTopics(cfg) } // DeleteTopics 删除topic @@ -105,17 +99,13 @@ func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error { defer m.locker.Unlock() if m.closed { - return fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) + return fmt.Errorf("kafka: %w", errs.ErrMQIsClosed) } if ctx.Err() != nil { return ctx.Err() } - for _, topic := range topics { - delete(m.topicConfigMapping, topic) - } - err := m.controllerConn.DeleteTopics(topics...) var val kafkago.Error if errors.As(err, &val) && val == kafkago.UnknownTopicOrPartition { @@ -129,15 +119,11 @@ func (m *MQ) Producer(topic string) (mq.Producer, error) { defer m.locker.Unlock() if m.closed { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) - } - - if _, ok := m.topicConfigMapping[topic]; !ok { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic) + return nil, fmt.Errorf("kafka: %w", errs.ErrMQIsClosed) } balancer, _ := NewSpecifiedPartitionBalancer(&kafkago.Hash{}) - p := NewProducer(m.address, topic, m.topicConfigMapping[topic].NumPartitions, balancer) + p := NewProducer(m.address, topic, balancer) m.producers = append(m.producers, p) return p, nil } @@ -147,11 +133,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) { defer m.locker.Unlock() if m.closed { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed) - } - - if _, ok := m.topicConfigMapping[topic]; !ok { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic) + return nil, fmt.Errorf("kafka: %w", errs.ErrMQIsClosed) } c := NewConsumer(m.address, topic, groupID) diff --git a/kafka/producer.go b/kafka/producer.go index 6a2fc8f..c58c03b 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -21,18 +21,17 @@ import ( "sync" "time" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/ecodeclub/ekit/retry" "github.com/ecodeclub/mq-api" "github.com/ecodeclub/mq-api/kafka/common" - "github.com/ecodeclub/mq-api/mqerr" "github.com/pkg/errors" kafkago "github.com/segmentio/kafka-go" ) type Producer struct { - topic string - partitions int - + topic string writer *kafkago.Writer locker *sync.RWMutex @@ -41,11 +40,10 @@ type Producer struct { closeErr error } -func NewProducer(address []string, topic string, partitions int, balancer kafkago.Balancer) *Producer { +func NewProducer(address []string, topic string, balancer kafkago.Balancer) *Producer { return &Producer{ - topic: topic, - partitions: partitions, - locker: &sync.RWMutex{}, + topic: topic, + locker: &sync.RWMutex{}, writer: &kafkago.Writer{ Addr: kafkago.TCP(address...), Topic: topic, @@ -60,10 +58,8 @@ func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResu return p.produce(ctx, m, nil) } +// ProduceWithPartition 并没有校验 partition 的正确性。 func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error) { - if partition < 0 || partition >= p.partitions { - return nil, fmt.Errorf("kafka: %w", mqerr.ErrInvalidPartition) - } return p.produce(ctx, m, metaMessage{SpecifiedPartitionKey: partition}) } @@ -84,7 +80,7 @@ func (p *Producer) produce(ctx context.Context, m *mq.Message, meta metaMessage) return &mq.ProducerResult{}, nil } if errors.Is(err, io.ErrClosedPipe) { - return &mq.ProducerResult{}, fmt.Errorf("kafka: %w", mqerr.ErrProducerIsClosed) + return &mq.ProducerResult{}, fmt.Errorf("kafka: %w", errs.ErrProducerIsClosed) } // 控制流走到这Topic和Partition已经验证合法 // 要么选主阶段、要么分区在broker间移动,因此这两种情况需要重试