From 6d4080c1e3eb7cafe3e43dcf893150d464542f24 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 24 Feb 2025 16:48:33 +0800 Subject: [PATCH] fix: fix kafka sink panic (#3566) Signed-off-by: Song Gao --- extensions/impl/kafka/sink.go | 3 +++ extensions/impl/kafka/sink_test.go | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index d4c67c01ef..9453c764c7 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -304,6 +304,9 @@ func (k *KafkaSink) ingest(ctx api.StreamContext, d *kafkago.Message, checkSize } func (k *KafkaSink) send(ctx api.StreamContext) { + if len(k.messages) < 1 { + return + } KafkaSinkCounter.WithLabelValues(LblSend, LblReq, k.ruleID, k.opID).Inc() start := time.Now() defer func() { diff --git a/extensions/impl/kafka/sink_test.go b/extensions/impl/kafka/sink_test.go index bc9e6ef811..084d4fb5dd 100644 --- a/extensions/impl/kafka/sink_test.go +++ b/extensions/impl/kafka/sink_test.go @@ -180,3 +180,9 @@ func TestToCompression(t *testing.T) { require.Equal(t, tc.expect, e) } } + +func TestKafkaSinkEmpty(t *testing.T) { + ks := &KafkaSink{} + ctx := mockContext.NewMockContext("1", "2") + ks.send(ctx) +}