diff --git a/e2e/memory_test.go b/e2e/memory_test.go index 2af0296..8f51c82 100644 --- a/e2e/memory_test.go +++ b/e2e/memory_test.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //go:build e2e package e2e diff --git a/memory/consumer.go b/memory/consumer.go index 9e3fa08..2966923 100644 --- a/memory/consumer.go +++ b/memory/consumer.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( @@ -104,7 +118,7 @@ func (c *Consumer) Handle(event *Event) { // 设置消费进度 partitionInfo := <-c.receiveCh log.Printf("消费者 %s接收到分区信息 %v", c.name, partitionInfo) - c.partitionRecords = partitionInfo.Data.([]PartitionRecord) + c.partitionRecords, _ = partitionInfo.Data.([]PartitionRecord) // 返回设置完成的信号 c.reportCh <- &Event{ Type: PartitionNotifyAck, diff --git a/memory/consumergroup.go b/memory/consumergroup.go index 11f1dab..1ec2402 100644 --- a/memory/consumergroup.go +++ b/memory/consumergroup.go @@ -1,28 +1,42 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( "fmt" "log" - "sync" "sync/atomic" "time" - "github.com/ecodeclub/ekit/syncx" "github.com/ecodeclub/mq-api" + + "github.com/ecodeclub/ekit/syncx" "github.com/pkg/errors" ) var ErrReportOffsetFail = errors.New("非平衡状态,无法上报偏移量") const ( + consumerCap = 16 + defaultEventCap = 16 + msgChannelLength = 1000 + // ExitGroup 退出信号 ExitGroup = "exit_group" - // ExitGroupAck 退出的确认信号 - ExitGroupAck = "exit_group_ack" // ReportOffset 上报偏移量信号 ReportOffset = "report_offset" - // ReportOffsetAck 上报偏移量确认信号 - ReportOffsetAck = "report_offset_ack" // Rejoin 通知consumer重新加入消费组 Rejoin = "rejoin" // RejoinAck 表示客户端收到重新加入消费组的指令并将offset进行上报 @@ -45,10 +59,9 @@ type ConsumerGroup struct { // 消费者平衡器 consumerPartitionBalancer ConsumerPartitionAssigner // 分区消费记录 - partitionRecords syncx.Map[int, PartitionRecord] + partitionRecords *syncx.Map[int, PartitionRecord] // 分区 partitions []*Partition - once sync.Once status int32 // 用于接受在重平衡阶段channel的返回数据 balanceCh chan struct{} @@ -84,17 +97,17 @@ type Event struct { func (c *ConsumerGroup) Handler(name string, event *Event) { switch event.Type { case ExitGroup: - closeCh := event.Data.(chan struct{}) + closeCh, _ := event.Data.(chan struct{}) c.ExitGroup(name, closeCh) case ReportOffset: - data := event.Data.(ReportData) + data, _ := event.Data.(ReportData) var err error err = c.ReportOffset(data.Records) data.ErrChan <- err log.Printf("消费者%s上报offset成功", name) case RejoinAck: // consumer响应重平衡信号返回的数据,返回的是当前所有分区的偏移量 - records := event.Data.([]PartitionRecord) + records, _ := event.Data.([]PartitionRecord) // 不管上报成不成功 _ = c.ReportOffset(records) log.Printf("消费者%s成功接受到重平衡信号,并上报offset", name) @@ -109,16 +122,17 @@ func (c *ConsumerGroup) Handler(name string, event *Event) { func (c *ConsumerGroup) ExitGroup(name string, closeCh chan struct{}) { // 把自己从消费组内摘除 for { - if atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) { - defer atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) - log.Printf("消费者 %s 准备退出消费组", name) - c.consumers.Delete(name) - c.reBalance() - log.Printf("给消费者 %s 发送退出确认信号", name) - close(closeCh) - log.Printf("消费者 %s 成功退出消费组", name) - return + if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) { + continue } + log.Printf("消费者 %s 准备退出消费组", name) + c.consumers.Delete(name) + c.reBalance() + log.Printf("给消费者 %s 发送退出确认信号", name) + close(closeCh) + log.Printf("消费者 %s 成功退出消费组", name) + atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) + return } } @@ -149,7 +163,7 @@ func (c *ConsumerGroup) reBalance() { log.Println("开始重平衡") // 通知每一个消费者进行偏移量的上报 length := 0 - consumers := make([]string, 0, 20) + consumers := make([]string, 0, consumerCap) log.Println("开始给每个消费者,重平衡信号") c.consumers.Range(func(key string, value *ConsumerMetaData) bool { log.Printf("开始通知消费者%s", key) @@ -198,6 +212,8 @@ func (c *ConsumerGroup) reBalance() { log.Println("重平衡结束") return } + default: + } } log.Println("重平衡结束") @@ -206,50 +222,48 @@ func (c *ConsumerGroup) reBalance() { // JoinGroup 加入消费组 func (c *ConsumerGroup) JoinGroup() *Consumer { for { - if atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) { - var length int - c.consumers.Range(func(key string, value *ConsumerMetaData) bool { - length++ - return true - }) - name := fmt.Sprintf("%s_%d", c.name, length) - reportCh := make(chan *Event, 16) - receiveCh := make(chan *Event, 16) - consumer := &Consumer{ - partitions: c.partitions, - receiveCh: receiveCh, - reportCh: reportCh, - name: name, - msgCh: make(chan *mq.Message, 1000), - partitionRecords: make([]PartitionRecord, 0), - closeCh: make(chan struct{}), - } - c.consumers.Store(name, &ConsumerMetaData{ - reportCh: reportCh, - receiveCh: receiveCh, - name: name, - }) - go c.HandleConsumerSignals(name, reportCh) - go consumer.Run() - log.Println(fmt.Sprintf("新建消费者 %s", name)) - // 重平衡分配分区 - c.reBalance() - atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) - return consumer + if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) { + continue + } + var length int + c.consumers.Range(func(key string, value *ConsumerMetaData) bool { + length++ + return true + }) + name := fmt.Sprintf("%s_%d", c.name, length) + reportCh := make(chan *Event, defaultEventCap) + receiveCh := make(chan *Event, defaultEventCap) + consumer := &Consumer{ + partitions: c.partitions, + receiveCh: receiveCh, + reportCh: reportCh, + name: name, + msgCh: make(chan *mq.Message, msgChannelLength), + partitionRecords: []PartitionRecord{}, + closeCh: make(chan struct{}), } + c.consumers.Store(name, &ConsumerMetaData{ + reportCh: reportCh, + receiveCh: receiveCh, + name: name, + }) + go c.HandleConsumerSignals(name, reportCh) + go consumer.Run() + log.Printf("新建消费者 %s", name) + // 重平衡分配分区 + c.reBalance() + atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) + return consumer } } // HandleConsumerSignals 处理消费者上报的事件 func (c *ConsumerGroup) HandleConsumerSignals(name string, reportCh chan *Event) { - for { - select { - case event := <-reportCh: - c.Handler(name, event) - if event.Type == ExitGroup { - close(reportCh) - return - } + for event := range reportCh { + c.Handler(name, event) + if event.Type == ExitGroup { + close(reportCh) + return } } } diff --git a/memory/consumerpartitionassigner/equaldivide/balancer.go b/memory/consumerpartitionassigner/equaldivide/balancer.go index ee956dc..d8c7364 100644 --- a/memory/consumerpartitionassigner/equaldivide/balancer.go +++ b/memory/consumerpartitionassigner/equaldivide/balancer.go @@ -26,7 +26,6 @@ func (b *Balancer) AssignPartition(consumers []string, partitions int) map[strin for _, consumer := range consumers { result[consumer] = make([]int, 0) } - // 平均分配 partitions partitionIndex := 0 for i := 0; i < consumerCount; i++ { @@ -43,7 +42,6 @@ func (b *Balancer) AssignPartition(consumers []string, partitions int) map[strin partitionIndex++ } } - return result } diff --git a/memory/consumerpartitionassigner/equaldivide/balancer_test.go b/memory/consumerpartitionassigner/equaldivide/balancer_test.go new file mode 100644 index 0000000..a6f6ea6 --- /dev/null +++ b/memory/consumerpartitionassigner/equaldivide/balancer_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package equaldivide + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBalancer_AssignPartition(t *testing.T) { + t.Parallel() + balancer := NewBalancer() + testcases := []struct { + name string + consumers []string + partition int + wantAnswer map[string][]int + }{ + { + name: "分区书超过consumer个数", + consumers: []string{"c1", "c2", "c3", "c4"}, + partition: 5, + wantAnswer: map[string][]int{ + "c1": {0, 1}, + "c2": {2}, + "c3": {3}, + "c4": {4}, + }, + }, + { + name: "分区数小于consumer个数", + consumers: []string{"c1", "c2", "c3", "c4"}, + partition: 3, + wantAnswer: map[string][]int{ + "c1": {0}, + "c2": {1}, + "c3": {2}, + "c4": {}, + }, + }, + } + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + actualVal := balancer.AssignPartition(tc.consumers, tc.partition) + assert.Equal(t, tc.wantAnswer, actualVal) + }) + } +} diff --git a/memory/mq.go b/memory/mq.go index 720bfc6..ca93ce9 100644 --- a/memory/mq.go +++ b/memory/mq.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( @@ -13,6 +27,8 @@ import ( "github.com/ecodeclub/mq-api/mqerr" ) +const defaultBalanceChLen = 10 + type MQ struct { locker sync.RWMutex closed bool @@ -86,7 +102,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) { consumers: syncx.Map[string, *ConsumerMetaData]{}, consumerPartitionBalancer: t.consumerPartitionBalancer, partitions: t.partitions, - balanceCh: make(chan struct{}, 10), + balanceCh: make(chan struct{}, defaultBalanceChLen), status: StatusStable, } // 初始化分区消费进度 @@ -97,7 +113,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) { Cursor: 0, }) } - group.partitionRecords = partitionRecords + group.partitionRecords = &partitionRecords } consumer := group.JoinGroup() t.consumerGroups.Store(groupID, group) diff --git a/memory/partition.go b/memory/partition.go index 348f54d..9342dc1 100644 --- a/memory/partition.go +++ b/memory/partition.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( diff --git a/memory/producer.go b/memory/producer.go index ab5f42e..2186b21 100644 --- a/memory/producer.go +++ b/memory/producer.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( diff --git a/memory/produceridgetter/hash/get.go b/memory/produceridgetter/hash/get.go index 7c74645..a6836e3 100644 --- a/memory/produceridgetter/hash/get.go +++ b/memory/produceridgetter/hash/get.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package hash import "hash/fnv" @@ -6,8 +20,8 @@ type Getter struct { Partition int } -// GetPartitionId 暂时使用hash,保证同一个key的值,在同一个分区。 -func (g *Getter) GetPartitionId(key string) int64 { +// GetPartitionID 暂时使用hash,保证同一个key的值,在同一个分区。 +func (g *Getter) GetPartitionID(key string) int64 { return hashString(key, g.Partition) } diff --git a/memory/produceridgetter/hash/get_test.go b/memory/produceridgetter/hash/get_test.go index 7adc22f..6ea0383 100644 --- a/memory/produceridgetter/hash/get_test.go +++ b/memory/produceridgetter/hash/get_test.go @@ -1 +1,32 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package hash + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetter(t *testing.T) { + t.Parallel() + // 测试两个相同的key返回的partition是同一个 + getter := Getter{ + 3, + } + partition1 := getter.GetPartitionID("msg1") + partition2 := getter.GetPartitionID("msg2") + assert.Equal(t, partition1, partition2) +} diff --git a/memory/topic.go b/memory/topic.go index 2f4f4c2..6a3ebb8 100644 --- a/memory/topic.go +++ b/memory/topic.go @@ -1,3 +1,17 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory import ( @@ -54,11 +68,13 @@ func (t *Topic) addProducer(producer mq.Producer) error { // addMessage 往分区里面添加消息 func (t *Topic) addMessage(msg *mq.Message, partition ...int64) error { var partitionID int64 - if len(partition) == 0 { - partitionID = t.partitionIDGetter.GetPartitionId(string(msg.Key)) - } else if len(partition) == 1 { + partitionLen := len(partition) + switch partitionLen { + case 0: + partitionID = t.partitionIDGetter.GetPartitionID(string(msg.Key)) + case 1: partitionID = partition[0] - } else { + default: return mqerr.ErrInvalidPartition } if partitionID < 0 || int(partitionID) >= len(t.partitions) { diff --git a/memory/type.go b/memory/type.go index e79f867..2e6ce7f 100644 --- a/memory/type.go +++ b/memory/type.go @@ -1,9 +1,23 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package memory // PartitionIDGetter 此抽象用于Producer获取对应分区号 type PartitionIDGetter interface { // GetPartitionId 用于Producer获取分区号,返回值就是分区号 - GetPartitionId(key string) int64 + GetPartitionID(key string) int64 } // ConsumerPartitionAssigner 此抽象是给消费组使用,用于将分区分配给消费组内的消费者。