From 16b14d7c0e8010cc15c803f6bbcb3717a295fcd6 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 28 Aug 2024 11:49:28 +0800 Subject: [PATCH] fix: fix metrics (#3147) Signed-off-by: Song Gao --- .gitignore | 1 + internal/topo/node/batch_op.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 85f78b1dbc..11e0e5cd2b 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ corss_build_for_rpm.tar .vscode/ tools/lint/bin/ +go.work.sum diff --git a/internal/topo/node/batch_op.go b/internal/topo/node/batch_op.go index a8f8e29e64..00bc6b5a64 100644 --- a/internal/topo/node/batch_op.go +++ b/internal/topo/node/batch_op.go @@ -117,7 +117,6 @@ func (b *BatchOp) ingest(ctx api.StreamContext, item any, checkSize bool) { b.currIndex++ if checkSize && b.currIndex >= b.batchSize { b.send() - b.statManager.IncTotalRecordsOut() } b.statManager.ProcessTimeEnd() b.statManager.IncTotalMessagesProcessed(1) @@ -133,6 +132,7 @@ func (b *BatchOp) send() { }) b.Broadcast(b.buffer) + b.statManager.IncTotalRecordsOut() // Reset buffer b.buffer = &xsql.WindowTuples{ Content: make([]xsql.Row, 0, b.batchSize),