Skip to content

Commit

Permalink
fix: according to feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
KennyChenFight committed Sep 5, 2024
1 parent 31525a1 commit 358aafe
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions pkg/source/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,13 @@ func (a *ackTrackers) key(msg pulsar.MessageID) string {
return fmt.Sprintf("%d:%d", msg.LedgerID(), msg.EntryID())
}

func (a *ackTrackers) tryAdd(msg pulsar.MessageID) bool {
_, loaded := a.trackers.LoadOrStore(a.key(msg), newAckTracker(uint(msg.BatchSize())))
return !loaded
func (a *ackTrackers) tryAdd(msg pulsar.MessageID) (ok bool) {
key := a.key(msg)
_, ok = a.trackers.Load(key)
if !ok {
_, ok = a.trackers.LoadOrStore(key, newAckTracker(uint(msg.BatchSize())))
}
return !ok
}

func (a *ackTrackers) tryAck(msg pulsar.MessageID) (success bool, exist bool) {
Expand Down

0 comments on commit 358aafe

Please sign in to comment.