Skip to content

Commit

Permalink
删除 kafka 实现中的 config map,去除不必要的校验
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Apr 6, 2024
1 parent b3c489c commit ccc9c02
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ut:
e2e:
@make dev_3rd_down
@make dev_3rd_up
@go test -tags=e2e -race -cover -coverprofile=e2e.out -failfast -shuffle=on ./e2e/...
@go test -tags=e2e -race -cover -coverprofile=e2e.out -failfast -shuffle=on ./internal/e2e/...
@make dev_3rd_down

# 启动本地研发 docker 依赖
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ecodeclub/mq-api

go 1.21.0
go 1.21

require (
github.com/ecodeclub/ekit v0.0.8
Expand Down
69 changes: 23 additions & 46 deletions e2e/base_test.go → internal/e2e/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"testing"
"time"

"github.com/ecodeclub/mq-api/internal/errs"

"github.com/ecodeclub/mq-api"
"github.com/ecodeclub/mq-api/mqerr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (b *TestSuite) TestMQ_CreateTopic() {

for _, invalidTopic := range invalidTopics {
err := b.messageQueue.CreateTopic(context.Background(), invalidTopic, partitions)
assert.ErrorIs(t, err, mqerr.ErrInvalidTopic, invalidTopic)
assert.ErrorIs(t, err, errs.ErrInvalidTopic, invalidTopic)
}

require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), invalidTopics...))
Expand All @@ -179,11 +180,11 @@ func (b *TestSuite) TestMQ_CreateTopic() {

partitions := -1
validTopic1 := "invalidPartitions1"
assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic1, partitions), mqerr.ErrInvalidPartition)
assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic1, partitions), errs.ErrInvalidPartition)

partitions = 0
validTopic2 := "invalidPartitions2"
assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic2, partitions), mqerr.ErrInvalidPartition)
assert.ErrorIs(t, b.messageQueue.CreateTopic(context.Background(), validTopic2, partitions), errs.ErrInvalidPartition)

require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), validTopic1, validTopic2))
})
Expand All @@ -196,7 +197,7 @@ func (b *TestSuite) TestMQ_CreateTopic() {
require.NoError(t, err, createdTopic)

err = b.messageQueue.CreateTopic(context.Background(), createdTopic, partitions)
require.Error(t, err)
require.NoError(t, err)

require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), createdTopic))
})
Expand Down Expand Up @@ -259,22 +260,19 @@ func (b *TestSuite) TestMQ_Producer() {
err := b.messageQueue.CreateTopic(context.Background(), unknownTopic, 1)
require.NoError(t, err)
require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), unknownTopic))

_, err = b.messageQueue.Producer(unknownTopic)
require.ErrorIs(t, err, mqerr.ErrUnknownTopic)
}

func (b *TestSuite) TestMQ_Consumer() {
t := b.T()
t.Parallel()

unknownTopic, groupID := "consumer_unknownTopic", "c1"
err := b.messageQueue.CreateTopic(context.Background(), unknownTopic, 1)
topic, groupID := "topic_a", "c1"
err := b.messageQueue.CreateTopic(context.Background(), topic, 1)
require.NoError(t, err)
require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), unknownTopic))
require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), topic))

_, err = b.messageQueue.Consumer(unknownTopic, groupID)
require.ErrorIs(t, err, mqerr.ErrUnknownTopic)
_, err = b.messageQueue.Consumer(topic, groupID)
require.NoError(t, err)
}

func (b *TestSuite) TestMQ_Close() {
Expand Down Expand Up @@ -305,30 +303,30 @@ func (b *TestSuite) TestMQ_Close() {

// 调用producer上的方法会返回ErrProducerIsClosed
_, err = p.Produce(context.Background(), &mq.Message{})
require.ErrorIs(t, err, mqerr.ErrProducerIsClosed)
require.ErrorIs(t, err, errs.ErrProducerIsClosed)

_, err = p.ProduceWithPartition(context.Background(), &mq.Message{}, partitions-1)
require.ErrorIs(t, err, mqerr.ErrProducerIsClosed)
require.ErrorIs(t, err, errs.ErrProducerIsClosed)

// 调用consumer上的方法会返回ErrConsumerIsClosed
_, err = c.ConsumeChan(context.Background())
require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed)
require.ErrorIs(t, err, errs.ErrConsumerIsClosed)

_, err = c.Consume(context.Background())
require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed)
require.ErrorIs(t, err, errs.ErrConsumerIsClosed)

// 再次调用MQ上的方法会返回ErrMQIsClosed
err = messageQueue.CreateTopic(context.Background(), topic11, partitions)
require.ErrorIs(t, err, mqerr.ErrMQIsClosed)
require.ErrorIs(t, err, errs.ErrMQIsClosed)

_, err = messageQueue.Producer(topic11)
require.ErrorIs(t, err, mqerr.ErrMQIsClosed)
require.ErrorIs(t, err, errs.ErrMQIsClosed)

_, err = messageQueue.Consumer(topic11, consumerGroupID)
require.ErrorIs(t, err, mqerr.ErrMQIsClosed)
require.ErrorIs(t, err, errs.ErrMQIsClosed)

err = messageQueue.DeleteTopics(context.Background(), topic11)
require.ErrorIs(t, err, mqerr.ErrMQIsClosed)
require.ErrorIs(t, err, errs.ErrMQIsClosed)
}

func (b *TestSuite) TestProducer_Produce() {
Expand Down Expand Up @@ -410,27 +408,6 @@ func (b *TestSuite) TestProducer_ProduceWithPartition() {
t.Parallel()
})

t.Run("分区ID非法_返回错误", func(t *testing.T) {
t.Parallel()

topic14, partitions := "topic14", 2
producers, _ := b.newProducersAndConsumers(t, topic14, partitions, producerInfo{Num: 1}, consumerInfo{})

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()

p := producers[0]

_, err := p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, partitions)
require.ErrorIs(t, err, mqerr.ErrInvalidPartition)

_, err = p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, partitions+1)
require.ErrorIs(t, err, mqerr.ErrInvalidPartition)

_, err = p.ProduceWithPartition(ctx, &mq.Message{Value: []byte("hello")}, -1)
require.ErrorIs(t, err, mqerr.ErrInvalidPartition)
})

t.Run("多分区_并发发送", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -548,9 +525,9 @@ func (b *TestSuite) TestProducer_Close() {

// 调用Close后
_, err = p.Produce(context.Background(), &mq.Message{Value: []byte("hello")})
require.ErrorIs(t, err, mqerr.ErrProducerIsClosed)
require.ErrorIs(t, err, errs.ErrProducerIsClosed)
_, err = p.ProduceWithPartition(context.Background(), &mq.Message{Value: []byte("world")}, partitions-1)
require.ErrorIs(t, err, mqerr.ErrProducerIsClosed)
require.ErrorIs(t, err, errs.ErrProducerIsClosed)
}

func (b *TestSuite) TestConsumer_Close() {
Expand Down Expand Up @@ -592,10 +569,10 @@ func (b *TestSuite) TestConsumer_Close() {

// 再调用Consumer上的其他方法将返回error
_, err = c.ConsumeChan(context.Background())
require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed)
require.ErrorIs(t, err, errs.ErrConsumerIsClosed)

_, err = c.Consume(context.Background())
require.ErrorIs(t, err, mqerr.ErrConsumerIsClosed)
require.ErrorIs(t, err, errs.ErrConsumerIsClosed)
}

func (b *TestSuite) TestConsumer_ConsumeChan() {
Expand Down
File renamed without changes.
4 changes: 1 addition & 3 deletions mqerr/error.go → internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package mqerr
package errs

import "errors"

Expand All @@ -21,7 +21,5 @@ var (
ErrProducerIsClosed = errors.New("生产者已经关闭")
ErrMQIsClosed = errors.New("mq已经关闭")
ErrInvalidTopic = errors.New("topic非法")
ErrCreatedTopic = errors.New("topic已创建")
ErrInvalidPartition = errors.New("partition非法")
ErrUnknownTopic = errors.New("未知topic")
)
7 changes: 4 additions & 3 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"log"
"sync"

"github.com/ecodeclub/mq-api/internal/errs"

"github.com/ecodeclub/mq-api"
"github.com/ecodeclub/mq-api/kafka/common"
"github.com/ecodeclub/mq-api/mqerr"
kafkago "github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -68,15 +69,15 @@ func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error) {
return nil, ctx.Err()
case m, ok := <-c.msgCh:
if !ok {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrConsumerIsClosed)
return nil, fmt.Errorf("kafka: %w", errs.ErrConsumerIsClosed)
}
return m, nil
}
}

func (c *Consumer) ConsumeChan(ctx context.Context) (<-chan *mq.Message, error) {
if c.closeCtx.Err() != nil {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrConsumerIsClosed)
return nil, fmt.Errorf("kafka: %w", errs.ErrConsumerIsClosed)
}
if ctx.Err() != nil {
return nil, ctx.Err()
Expand Down
50 changes: 16 additions & 34 deletions kafka/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"strconv"
"sync"

"github.com/ecodeclub/mq-api/internal/errs"

"github.com/ecodeclub/mq-api"
"github.com/ecodeclub/mq-api/internal/pkg/validator"
"github.com/ecodeclub/mq-api/mqerr"
"github.com/pkg/errors"
kafkago "github.com/segmentio/kafka-go"
"go.uber.org/multierr"
Expand All @@ -41,10 +42,8 @@ type MQ struct {
closed bool
closeErr error

// 方便释放资源
topicConfigMapping map[string]kafkago.TopicConfig
producers []mq.Producer
consumers []mq.Consumer
producers []mq.Producer
consumers []mq.Consumer
}

func NewMQ(network string, address []string) (mq.MQ, error) {
Expand All @@ -64,39 +63,34 @@ func NewMQ(network string, address []string) (mq.MQ, error) {
return nil, err
}
return &MQ{
address: address,
controllerConn: controllerConn,
replicationFactor: defaultReplicationFactor,
topicConfigMapping: make(map[string]kafkago.TopicConfig),
address: address,
controllerConn: controllerConn,
replicationFactor: defaultReplicationFactor,
}, nil
}

func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error {
if !validator.IsValidTopic(name) {
return fmt.Errorf("%w: %s", mqerr.ErrInvalidTopic, name)
return fmt.Errorf("%w: %s", errs.ErrInvalidTopic, name)
}

if partitions <= 0 {
return fmt.Errorf("%w: %d", mqerr.ErrInvalidPartition, partitions)
return fmt.Errorf("%w: %d", errs.ErrInvalidPartition, partitions)
}

m.locker.Lock()
defer m.locker.Unlock()

if m.closed {
return fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
return fmt.Errorf("kafka: %w", errs.ErrMQIsClosed)
}

if ctx.Err() != nil {
return ctx.Err()
}

if _, ok := m.topicConfigMapping[name]; ok {
return fmt.Errorf("kafka: %w", mqerr.ErrCreatedTopic)
}

m.topicConfigMapping[name] = kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor}
return m.controllerConn.CreateTopics(m.topicConfigMapping[name])
cfg := kafkago.TopicConfig{Topic: name, NumPartitions: partitions, ReplicationFactor: m.replicationFactor}
return m.controllerConn.CreateTopics(cfg)
}

// DeleteTopics 删除topic
Expand All @@ -105,17 +99,13 @@ func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error {
defer m.locker.Unlock()

if m.closed {
return fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
return fmt.Errorf("kafka: %w", errs.ErrMQIsClosed)
}

if ctx.Err() != nil {
return ctx.Err()
}

for _, topic := range topics {
delete(m.topicConfigMapping, topic)
}

err := m.controllerConn.DeleteTopics(topics...)
var val kafkago.Error
if errors.As(err, &val) && val == kafkago.UnknownTopicOrPartition {
Expand All @@ -129,15 +119,11 @@ func (m *MQ) Producer(topic string) (mq.Producer, error) {
defer m.locker.Unlock()

if m.closed {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
}

if _, ok := m.topicConfigMapping[topic]; !ok {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic)
return nil, fmt.Errorf("kafka: %w", errs.ErrMQIsClosed)
}

balancer, _ := NewSpecifiedPartitionBalancer(&kafkago.Hash{})
p := NewProducer(m.address, topic, m.topicConfigMapping[topic].NumPartitions, balancer)
p := NewProducer(m.address, topic, balancer)
m.producers = append(m.producers, p)
return p, nil
}
Expand All @@ -147,11 +133,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) {
defer m.locker.Unlock()

if m.closed {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrMQIsClosed)
}

if _, ok := m.topicConfigMapping[topic]; !ok {
return nil, fmt.Errorf("kafka: %w", mqerr.ErrUnknownTopic)
return nil, fmt.Errorf("kafka: %w", errs.ErrMQIsClosed)
}

c := NewConsumer(m.address, topic, groupID)
Expand Down
Loading

0 comments on commit ccc9c02

Please sign in to comment.