From 22cbe6fed9daab91d14dddb296fc3f609281e187 Mon Sep 17 00:00:00 2001 From: Seena Fallah Date: Thu, 4 Jul 2024 23:02:33 +0200 Subject: [PATCH] zerome: retry until nothing is found (#10) * zerome: retry until nothing is found Signed-off-by: Seena Fallah * ci: update golangci-lint to 1.59.1 Signed-off-by: Seena Fallah --------- Signed-off-by: Seena Fallah --- .github/workflows/go.yml | 2 +- internal/zerome/zerome.go | 36 ++++++++++++++++++++++------------ internal/zerome/zerome_test.go | 17 +++++++++++----- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index c8cd278..4a95c6f 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -64,6 +64,6 @@ jobs: - name: Golangci-lint uses: golangci/golangci-lint-action@v4 with: - version: v1.59.0 + version: v1.59.1 args: --timeout 3m skip-pkg-cache: true diff --git a/internal/zerome/zerome.go b/internal/zerome/zerome.go index 96b6fe2..f429a2f 100644 --- a/internal/zerome/zerome.go +++ b/internal/zerome/zerome.go @@ -21,32 +21,42 @@ func New(metrics []Metric) *Client { } } -func (c *Client) Run(ctx context.Context, wg *sync.WaitGroup) { +func (c *Client) Run(ctx context.Context, outerWG *sync.WaitGroup) { for _, metric := range c.metrics { - go func(metric Metric, wg *sync.WaitGroup) { - defer wg.Done() + go func(metric Metric, outerWG *sync.WaitGroup) { + defer outerWG.Done() + + var wg sync.WaitGroup for { select { case <-ctx.Done(): + wg.Wait() + return case <-time.After(metric.Interval): - err := c.ZeroMe(ctx, metric) - if err != nil { - slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err) - } + wg.Add(1) + + go func() { + defer wg.Done() + + err := c.ZeroMe(ctx, time.Now(), metric) + if err != nil { + slog.ErrorContext(ctx, "Failed to zero metric", "metric", metric.Name, "error", err) + } + }() } } - }(metric, wg) + }(metric, outerWG) } } -func (c *Client) ZeroMe(ctx context.Context, metric Metric) error { +func (c *Client) ZeroMe(ctx context.Context, nowTime time.Time, metric Metric) error { // Query twice the interval to ensure that the metric has a missing data point in the past. queryInterval := metric.Interval * 2 //nolint:gomnd,mnd // Add query interval as a delay to cover exporter scrape failures. - ts := time.Now().Add(-queryInterval) + ts := nowTime.Add(-queryInterval) vector, err := metric.querier.Query(ctx, ts, metric.Name, queryInterval, metric.UpLabels) if err != nil { @@ -67,9 +77,11 @@ func (c *Client) ZeroMe(ctx context.Context, metric Metric) error { return err } - slog.InfoContext(ctx, "Zeroed metric", "metric", metric.Name, "vector", vector) + for _, s := range vector { + slog.InfoContext(ctx, "Zeroed sample", "metric", metric.Name, "sample", s.String()) + } - return nil + return c.ZeroMe(ctx, nowTime, metric) // Retry until nothing to zero } func (c *Client) zeroTimeSeries(metric Metric, vector model.Vector) []prompb.TimeSeries { diff --git a/internal/zerome/zerome_test.go b/internal/zerome/zerome_test.go index f730c24..dbd511d 100644 --- a/internal/zerome/zerome_test.go +++ b/internal/zerome/zerome_test.go @@ -74,7 +74,14 @@ func TestZeroMe(t *testing.T) { // setup querier mock mockQuerier := mock.NewMockAPI(ctrl) querier.SetV1API(mockQuerier) - mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(queryResult, nil, nil) + mockQuerier.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_, _, _ any, _ ...any) (model.Vector, *v1.Warnings, error) { + r := queryResult + queryResult = model.Vector{} // return empty result on next call so it will exit the loop + + return r, nil, nil + }, + ).Times(2) // setup writer mock { @@ -91,7 +98,7 @@ func TestZeroMe(t *testing.T) { client := New([]Metric{metric}) - err := client.ZeroMe(context.Background(), metric) + err := client.ZeroMe(context.Background(), nowTime, metric) require.NoError(t, err) } @@ -126,7 +133,7 @@ func TestZeroMe_QueryError(t *testing.T) { client := New([]Metric{metric}) - err := client.ZeroMe(context.Background(), metric) + err := client.ZeroMe(context.Background(), time.Now(), metric) require.ErrorIs(t, err, queryErr) } @@ -160,7 +167,7 @@ func TestZeroMe_EmptyResult(t *testing.T) { client := New([]Metric{metric}) - err := client.ZeroMe(context.Background(), metric) + err := client.ZeroMe(context.Background(), time.Now(), metric) require.NoError(t, err) } @@ -207,7 +214,7 @@ func TestZeroMe_WriteError(t *testing.T) { client := New([]Metric{metric}) - err := client.ZeroMe(context.Background(), metric) + err := client.ZeroMe(context.Background(), nowTime, metric) require.ErrorIs(t, err, writeErr) }