diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index d4c67c01ef..9453c764c7 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -304,6 +304,9 @@ func (k *KafkaSink) ingest(ctx api.StreamContext, d *kafkago.Message, checkSize } func (k *KafkaSink) send(ctx api.StreamContext) { + if len(k.messages) < 1 { + return + } KafkaSinkCounter.WithLabelValues(LblSend, LblReq, k.ruleID, k.opID).Inc() start := time.Now() defer func() { diff --git a/extensions/impl/kafka/sink_test.go b/extensions/impl/kafka/sink_test.go index bc9e6ef811..084d4fb5dd 100644 --- a/extensions/impl/kafka/sink_test.go +++ b/extensions/impl/kafka/sink_test.go @@ -180,3 +180,9 @@ func TestToCompression(t *testing.T) { require.Equal(t, tc.expect, e) } } + +func TestKafkaSinkEmpty(t *testing.T) { + ks := &KafkaSink{} + ctx := mockContext.NewMockContext("1", "2") + ks.send(ctx) +}