Skip to content

Commit

Permalink
querier: add query interval delay to queries to cover scrape failures (
Browse files Browse the repository at this point in the history
…#4)

Add twice scrape interval to queries to cover scrape/ingestion delays.

Signed-off-by: Seena Fallah <seenafallah@gmail.com>
  • Loading branch information
clwluvw authored Jun 15, 2024
1 parent c27660a commit c97f43f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
4 changes: 2 additions & 2 deletions internal/querier/promquerier/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func New(address string, headers map[string]string) (*PromQuerier, error) {
}, nil
}

func (pq *PromQuerier) Query(ctx context.Context, metric string, interval time.Duration, upLabels []string) (model.Vector, error) {
func (pq *PromQuerier) Query(ctx context.Context, ts time.Time, metric string, interval time.Duration, upLabels []string) (model.Vector, error) {
// Metirc has only one data point in the interval and it is present in now time
// and the up metric has two data points in the interval.
// This is to ensure that the metric has a missing data point in past not present and the exporter was up in the interval.
Expand All @@ -54,7 +54,7 @@ func (pq *PromQuerier) Query(ctx context.Context, metric string, interval time.D
// Timeout should be half of the interval
timeout := interval / 2 //nolint:gomnd,mnd

result, warnings, err := pq.v1api.Query(ctx, query, time.Now(), v1.WithTimeout(timeout))
result, warnings, err := pq.v1api.Query(ctx, query, ts, v1.WithTimeout(timeout))
if err != nil {
return nil, err
}
Expand Down
24 changes: 16 additions & 8 deletions internal/querier/promquerier/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ func TestQuery(t *testing.T) {
}
)

queryTS := time.Now()

promQuerier := newPromQuerier(t)
promQuerier.mockV1API(t).EXPECT().Query(
gomock.Any(),
"count_over_time(metric[1m0s]) == 1 and metric and on(job,instance) (count_over_time(up[1m0s]) == 2)",
gomock.Any(),
queryTS,
gomock.Any(),
).Return(expectedVector, nil, nil)

v, err := promQuerier.Query(context.Background(), metric, interval, upLabels)
v, err := promQuerier.Query(context.Background(), queryTS, metric, interval, upLabels)
require.NoError(t, err)
require.Equal(t, expectedVector, v)
}
Expand Down Expand Up @@ -99,15 +101,17 @@ func TestQuery_WithWarnings(t *testing.T) {
}
)

queryTS := time.Now()

promQuerier := newPromQuerier(t)
promQuerier.mockV1API(t).EXPECT().Query(
gomock.Any(),
"count_over_time(metric[1m0s]) == 1 and metric and on(job,instance) (count_over_time(up[1m0s]) == 2)",
gomock.Any(),
queryTS,
gomock.Any(),
).Return(expectedVector, v1.Warnings{"warning!"}, nil)

v, err := promQuerier.Query(context.Background(), metric, interval, upLabels)
v, err := promQuerier.Query(context.Background(), queryTS, metric, interval, upLabels)
require.NoError(t, err)
require.Equal(t, expectedVector, v)
}
Expand Down Expand Up @@ -136,15 +140,17 @@ func TestQuery_UnexpectedResult(t *testing.T) {
}
)

queryTS := time.Now()

promQuerier := newPromQuerier(t)
promQuerier.mockV1API(t).EXPECT().Query(
gomock.Any(),
"count_over_time(metric[1m0s]) == 1 and metric and on(job,instance) (count_over_time(up[1m0s]) == 2)",
gomock.Any(),
queryTS,
gomock.Any(),
).Return(returnValue, nil, nil)

v, err := promQuerier.Query(context.Background(), metric, interval, upLabels)
v, err := promQuerier.Query(context.Background(), queryTS, metric, interval, upLabels)
require.Error(t, err)
require.Nil(t, v)
}
Expand All @@ -160,15 +166,17 @@ func TestQuery_WithErrors(t *testing.T) {
expectedErr = &v1.Error{Type: v1.ErrTimeout}
)

queryTS := time.Now()

promQuerier := newPromQuerier(t)
promQuerier.mockV1API(t).EXPECT().Query(
gomock.Any(),
"count_over_time(metric[1m0s]) == 1 and metric and on(job,instance) (count_over_time(up[1m0s]) == 2)",
gomock.Any(),
queryTS,
gomock.Any(),
).Return(nil, nil, expectedErr)

v, err := promQuerier.Query(context.Background(), metric, interval, upLabels)
v, err := promQuerier.Query(context.Background(), queryTS, metric, interval, upLabels)
require.ErrorIs(t, err, expectedErr)
require.Nil(t, v)
}
2 changes: 1 addition & 1 deletion internal/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

type Querier interface {
Query(ctx context.Context, metric string, interval time.Duration, upLabels []string) (model.Vector, error)
Query(ctx context.Context, ts time.Time, metric string, interval time.Duration, upLabels []string) (model.Vector, error)
}
5 changes: 4 additions & 1 deletion internal/zerome/zerome.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func (c *Client) ZeroMe(ctx context.Context, metric Metric) error {
// Query twice the interval to ensure that the metric has a missing data point in the past.
queryInterval := metric.Interval * 2 //nolint:gomnd,mnd

vector, err := metric.querier.Query(ctx, metric.Name, queryInterval, metric.UpLabels)
// Add query interval as a delay to cover exporter scrape failures.
ts := time.Now().Add(-queryInterval)

vector, err := metric.querier.Query(ctx, ts, metric.Name, queryInterval, metric.UpLabels)
if err != nil {
slog.ErrorContext(ctx, "Failed to query metric", "metric", metric.Name, "error", err)

Expand Down

0 comments on commit c97f43f

Please sign in to comment.