diff --git a/pkg/source/pulsar.go b/pkg/source/pulsar.go index fd332c0..7024b87 100644 --- a/pkg/source/pulsar.go +++ b/pkg/source/pulsar.go @@ -307,9 +307,13 @@ func (a *ackTrackers) key(msg pulsar.MessageID) string { return fmt.Sprintf("%d:%d", msg.LedgerID(), msg.EntryID()) } -func (a *ackTrackers) tryAdd(msg pulsar.MessageID) bool { - _, loaded := a.trackers.LoadOrStore(a.key(msg), newAckTracker(uint(msg.BatchSize()))) - return !loaded +func (a *ackTrackers) tryAdd(msg pulsar.MessageID) (ok bool) { + key := a.key(msg) + _, ok = a.trackers.Load(key) + if !ok { + _, ok = a.trackers.LoadOrStore(key, newAckTracker(uint(msg.BatchSize()))) + } + return !ok } func (a *ackTrackers) tryAck(msg pulsar.MessageID) (success bool, exist bool) {