Skip to content

Commit

Permalink
fix: fix kafka sink panic (#3566)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Feb 24, 2025
1 parent e067ff3 commit 6d4080c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
3 changes: 3 additions & 0 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (k *KafkaSink) ingest(ctx api.StreamContext, d *kafkago.Message, checkSize
}

func (k *KafkaSink) send(ctx api.StreamContext) {
if len(k.messages) < 1 {
return
}
KafkaSinkCounter.WithLabelValues(LblSend, LblReq, k.ruleID, k.opID).Inc()
start := time.Now()
defer func() {
Expand Down
6 changes: 6 additions & 0 deletions extensions/impl/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,9 @@ func TestToCompression(t *testing.T) {
require.Equal(t, tc.expect, e)
}
}

func TestKafkaSinkEmpty(t *testing.T) {
ks := &KafkaSink{}
ctx := mockContext.NewMockContext("1", "2")
ks.send(ctx)
}

0 comments on commit 6d4080c

Please sign in to comment.