Skip to content

Commit

Permalink
静态代码检查修改
Browse files Browse the repository at this point in the history
  • Loading branch information
juniaoshaonian committed Mar 9, 2024
1 parent 8e1f243 commit 5b949ca
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 71 deletions.
14 changes: 14 additions & 0 deletions e2e/memory_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build e2e

package e2e
Expand Down
16 changes: 15 additions & 1 deletion memory/consumer.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
Expand Down Expand Up @@ -104,7 +118,7 @@ func (c *Consumer) Handle(event *Event) {
// 设置消费进度
partitionInfo := <-c.receiveCh
log.Printf("消费者 %s接收到分区信息 %v", c.name, partitionInfo)
c.partitionRecords = partitionInfo.Data.([]PartitionRecord)
c.partitionRecords, _ = partitionInfo.Data.([]PartitionRecord)
// 返回设置完成的信号
c.reportCh <- &Event{
Type: PartitionNotifyAck,
Expand Down
132 changes: 73 additions & 59 deletions memory/consumergroup.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

"github.com/ecodeclub/ekit/syncx"
"github.com/ecodeclub/mq-api"

"github.com/ecodeclub/ekit/syncx"
"github.com/pkg/errors"
)

var ErrReportOffsetFail = errors.New("非平衡状态,无法上报偏移量")

const (
consumerCap = 16
defaultEventCap = 16
msgChannelLength = 1000

// ExitGroup 退出信号
ExitGroup = "exit_group"
// ExitGroupAck 退出的确认信号
ExitGroupAck = "exit_group_ack"
// ReportOffset 上报偏移量信号
ReportOffset = "report_offset"
// ReportOffsetAck 上报偏移量确认信号
ReportOffsetAck = "report_offset_ack"
// Rejoin 通知consumer重新加入消费组
Rejoin = "rejoin"
// RejoinAck 表示客户端收到重新加入消费组的指令并将offset进行上报
Expand All @@ -45,10 +59,9 @@ type ConsumerGroup struct {
// 消费者平衡器
consumerPartitionBalancer ConsumerPartitionAssigner
// 分区消费记录
partitionRecords syncx.Map[int, PartitionRecord]
partitionRecords *syncx.Map[int, PartitionRecord]
// 分区
partitions []*Partition
once sync.Once
status int32
// 用于接受在重平衡阶段channel的返回数据
balanceCh chan struct{}
Expand Down Expand Up @@ -84,17 +97,17 @@ type Event struct {
func (c *ConsumerGroup) Handler(name string, event *Event) {
switch event.Type {
case ExitGroup:
closeCh := event.Data.(chan struct{})
closeCh, _ := event.Data.(chan struct{})
c.ExitGroup(name, closeCh)
case ReportOffset:
data := event.Data.(ReportData)
data, _ := event.Data.(ReportData)
var err error
err = c.ReportOffset(data.Records)
data.ErrChan <- err
log.Printf("消费者%s上报offset成功", name)
case RejoinAck:
// consumer响应重平衡信号返回的数据,返回的是当前所有分区的偏移量
records := event.Data.([]PartitionRecord)
records, _ := event.Data.([]PartitionRecord)
// 不管上报成不成功
_ = c.ReportOffset(records)
log.Printf("消费者%s成功接受到重平衡信号,并上报offset", name)
Expand All @@ -109,16 +122,17 @@ func (c *ConsumerGroup) Handler(name string, event *Event) {
func (c *ConsumerGroup) ExitGroup(name string, closeCh chan struct{}) {
// 把自己从消费组内摘除
for {
if atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) {
defer atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable)
log.Printf("消费者 %s 准备退出消费组", name)
c.consumers.Delete(name)
c.reBalance()
log.Printf("给消费者 %s 发送退出确认信号", name)
close(closeCh)
log.Printf("消费者 %s 成功退出消费组", name)
return
if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) {
continue
}
log.Printf("消费者 %s 准备退出消费组", name)
c.consumers.Delete(name)
c.reBalance()
log.Printf("给消费者 %s 发送退出确认信号", name)
close(closeCh)
log.Printf("消费者 %s 成功退出消费组", name)
atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable)
return
}
}

Expand Down Expand Up @@ -149,7 +163,7 @@ func (c *ConsumerGroup) reBalance() {
log.Println("开始重平衡")
// 通知每一个消费者进行偏移量的上报
length := 0
consumers := make([]string, 0, 20)
consumers := make([]string, 0, consumerCap)
log.Println("开始给每个消费者,重平衡信号")
c.consumers.Range(func(key string, value *ConsumerMetaData) bool {
log.Printf("开始通知消费者%s", key)
Expand Down Expand Up @@ -198,6 +212,8 @@ func (c *ConsumerGroup) reBalance() {
log.Println("重平衡结束")
return
}
default:

}
}
log.Println("重平衡结束")
Expand All @@ -206,50 +222,48 @@ func (c *ConsumerGroup) reBalance() {
// JoinGroup 加入消费组
func (c *ConsumerGroup) JoinGroup() *Consumer {
for {
if atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) {
var length int
c.consumers.Range(func(key string, value *ConsumerMetaData) bool {
length++
return true
})
name := fmt.Sprintf("%s_%d", c.name, length)
reportCh := make(chan *Event, 16)
receiveCh := make(chan *Event, 16)
consumer := &Consumer{
partitions: c.partitions,
receiveCh: receiveCh,
reportCh: reportCh,
name: name,
msgCh: make(chan *mq.Message, 1000),
partitionRecords: make([]PartitionRecord, 0),
closeCh: make(chan struct{}),
}
c.consumers.Store(name, &ConsumerMetaData{
reportCh: reportCh,
receiveCh: receiveCh,
name: name,
})
go c.HandleConsumerSignals(name, reportCh)
go consumer.Run()
log.Println(fmt.Sprintf("新建消费者 %s", name))
// 重平衡分配分区
c.reBalance()
atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable)
return consumer
if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) {
continue
}
var length int
c.consumers.Range(func(key string, value *ConsumerMetaData) bool {
length++
return true
})
name := fmt.Sprintf("%s_%d", c.name, length)
reportCh := make(chan *Event, defaultEventCap)
receiveCh := make(chan *Event, defaultEventCap)
consumer := &Consumer{
partitions: c.partitions,
receiveCh: receiveCh,
reportCh: reportCh,
name: name,
msgCh: make(chan *mq.Message, msgChannelLength),
partitionRecords: []PartitionRecord{},
closeCh: make(chan struct{}),
}
c.consumers.Store(name, &ConsumerMetaData{
reportCh: reportCh,
receiveCh: receiveCh,
name: name,
})
go c.HandleConsumerSignals(name, reportCh)
go consumer.Run()
log.Printf("新建消费者 %s", name)
// 重平衡分配分区
c.reBalance()
atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable)
return consumer
}
}

// HandleConsumerSignals 处理消费者上报的事件
func (c *ConsumerGroup) HandleConsumerSignals(name string, reportCh chan *Event) {
for {
select {
case event := <-reportCh:
c.Handler(name, event)
if event.Type == ExitGroup {
close(reportCh)
return
}
for event := range reportCh {
c.Handler(name, event)
if event.Type == ExitGroup {
close(reportCh)
return
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions memory/consumerpartitionassigner/equaldivide/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func (b *Balancer) AssignPartition(consumers []string, partitions int) map[strin
for _, consumer := range consumers {
result[consumer] = make([]int, 0)
}

// 平均分配 partitions
partitionIndex := 0
for i := 0; i < consumerCount; i++ {
Expand All @@ -43,7 +42,6 @@ func (b *Balancer) AssignPartition(consumers []string, partitions int) map[strin
partitionIndex++
}
}

return result
}

Expand Down
63 changes: 63 additions & 0 deletions memory/consumerpartitionassigner/equaldivide/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package equaldivide

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBalancer_AssignPartition(t *testing.T) {
t.Parallel()
balancer := NewBalancer()
testcases := []struct {
name string
consumers []string
partition int
wantAnswer map[string][]int
}{
{
name: "分区书超过consumer个数",
consumers: []string{"c1", "c2", "c3", "c4"},
partition: 5,
wantAnswer: map[string][]int{
"c1": {0, 1},
"c2": {2},
"c3": {3},
"c4": {4},
},
},
{
name: "分区数小于consumer个数",
consumers: []string{"c1", "c2", "c3", "c4"},
partition: 3,
wantAnswer: map[string][]int{
"c1": {0},
"c2": {1},
"c3": {2},
"c4": {},
},
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
actualVal := balancer.AssignPartition(tc.consumers, tc.partition)
assert.Equal(t, tc.wantAnswer, actualVal)
})
}
}
20 changes: 18 additions & 2 deletions memory/mq.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
Expand All @@ -13,6 +27,8 @@ import (
"github.com/ecodeclub/mq-api/mqerr"
)

const defaultBalanceChLen = 10

type MQ struct {
locker sync.RWMutex
closed bool
Expand Down Expand Up @@ -86,7 +102,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) {
consumers: syncx.Map[string, *ConsumerMetaData]{},
consumerPartitionBalancer: t.consumerPartitionBalancer,
partitions: t.partitions,
balanceCh: make(chan struct{}, 10),
balanceCh: make(chan struct{}, defaultBalanceChLen),
status: StatusStable,
}
// 初始化分区消费进度
Expand All @@ -97,7 +113,7 @@ func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) {
Cursor: 0,
})
}
group.partitionRecords = partitionRecords
group.partitionRecords = &partitionRecords
}
consumer := group.JoinGroup()
t.consumerGroups.Store(groupID, group)
Expand Down
Loading

0 comments on commit 5b949ca

Please sign in to comment.