Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#126114

125571: roachtest: grafana annotations read creds from cloud storage r=herkolategan,renatolabs a=DarrylWong

As of cockroachdb#124099 we now store the service account creds in cloud storage. We already use this to access prometheus when generating dynamic configs. This change does the same for Grafana annotations by extracting the common logic into a helper.

This will allow users to have access to Grafana annotations out of the box locally, and limit the amount of benign but potentially confusing warnings about invalid credentials.

Release note: none
Fixes: none
Epic: none

126084: jobs: limit number of retained dsp-diag-url info rows r=dt a=dt

Fixes cockroachdb#126083.

126109: kvserver: deflake `WALBytesWritten` metric r=raduberinde a=aadityasondhi

There is a race condition in Pebble metrics where sometimes the WAL is rotated prior to updated the BytesIn metric to account for the previous WAL. The metrics collection call happens async so it can sometimes cause this metric to decrease for a scrape window.

Fixes: cockroachdb#125736.

Release note: None

126114: sql: avoid slow lock verification in TestSchemaChangeAfterCreateInTxn r=rafiss a=rafiss

The addition of test-only verification pushed this test over the timeout sometimes, such that running it under the deadlock detector would cause spurious failures. We avoid this by making the test smaller under deadlock, like we do for race builds.

fixes cockroachdb#126075
Release justification: test only change
Release note: None

Co-authored-by: DarrylWong <darryl@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
5 people committed Jun 24, 2024
5 parents 4848b25 + 079a0f5 + 69b285c + 82cf310 + c5b018e commit 6693f4a
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 186 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ GO_TARGETS = [
"//pkg/roachprod/prometheus:prometheus_test",
"//pkg/roachprod/promhelperclient:promhelperclient",
"//pkg/roachprod/promhelperclient:promhelperclient_test",
"//pkg/roachprod/roachprodutil:roachprodutil",
"//pkg/roachprod/ssh:ssh",
"//pkg/roachprod/ssh:ssh_test",
"//pkg/roachprod/ui:ui",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachprod/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/roachprod/fluentbit",
"//pkg/roachprod/install",
"//pkg/roachprod/opentelemetry",
"//pkg/roachprod/roachprodutil",
"//pkg/roachprod/ssh",
"//pkg/roachprod/ui",
"//pkg/roachprod/vm",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachprod/grafana/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/cmd/roachprod/grafana",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachprod/roachprodutil",
"//pkg/util/httputil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_go_openapi_strfmt//:strfmt",
Expand Down
30 changes: 8 additions & 22 deletions pkg/cmd/roachprod/grafana/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ package grafana
import (
"context"
"fmt"
"os"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachprod/roachprodutil"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/errors"
"github.com/go-openapi/strfmt"
Expand All @@ -24,14 +24,10 @@ import (
"google.golang.org/api/idtoken"
)

const ServiceAccountJson = "GRAFANA_SERVICE_ACCOUNT_JSON"
const ServiceAccountAudience = "GRAFANA_SERVICE_ACCOUNT_AUDIENCE"

// newGrafanaClient is a helper function that creates an HTTP client to
// create grafana api calls with. If secure is true, it tries to get a
// GCS identity token by using the service account specified by the env
// variable ServiceAccountJson. This identity token is passed in
// every request to authenticate with grafana.
// GCS identity token by using the service account helpers in `roachprodutil/identity.go`.
// This identity token is passed in every request to authenticate with grafana.
func newGrafanaClient(
ctx context.Context, host string, secure bool,
) (*grafana.GrafanaHTTPAPI, error) {
Expand All @@ -42,25 +38,15 @@ func newGrafanaClient(
scheme = "https"

// Read in the service account key and audience, so we can retrieve the identity token.
grafanaKey := os.Getenv(ServiceAccountJson)
if grafanaKey == "" {
return nil, errors.Newf("%s env variable was not found", ServiceAccountJson)
}
grafanaAudience := os.Getenv(ServiceAccountAudience)
if grafanaAudience == "" {
return nil, errors.Newf("%s env variable was not found", ServiceAccountAudience)
if _, err := roachprodutil.SetServiceAccountCredsEnv(ctx, false); err != nil {
return nil, err
}

ts, err := idtoken.NewTokenSource(ctx, grafanaAudience, idtoken.WithCredentialsJSON([]byte(grafanaKey)))
token, err := roachprodutil.GetServiceAccountToken(ctx, idtoken.NewTokenSource)
if err != nil {
return nil, errors.Wrap(err, "Error creating GCS oauth token source from specified credential")
return nil, err
}
token, err := ts.Token()
if err != nil {
return nil, errors.Wrap(err, "Error getting identity token")
}

headers["Authorization"] = fmt.Sprintf("Bearer %s", token.AccessToken)
headers["Authorization"] = fmt.Sprintf("Bearer %s", token)
}

headers[httputil.ContentTypeHeader] = httputil.JSONContentType
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/config"
rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/roachprodutil"
"github.com/cockroachdb/cockroach/pkg/roachprod/ui"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm/gce"
Expand Down Expand Up @@ -1372,7 +1373,7 @@ creates an annotation over time range.
Example:
# Create an annotation over time range 1-100 on the centralized grafana instance, which needs authentication.
roachprod grafana-annotation grafana.testeng.crdb.io example-annotation-event --tags my-cluster --tags test-run-1 --dashboard-uid overview --time-range 1,100
`, grafana.ServiceAccountJson, grafana.ServiceAccountAudience),
`, roachprodutil.ServiceAccountJson, roachprodutil.ServiceAccountAudience),
Args: cobra.ExactArgs(2),
Run: wrap(func(cmd *cobra.Command, args []string) error {
req := grafana.AddAnnotationRequest{
Expand Down
57 changes: 49 additions & 8 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,59 @@ func (i InfoStorage) Delete(ctx context.Context, infoKey string) error {

// DeleteRange removes the info records between the provided
// start key (inclusive) and end key (exclusive).
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string) error {
func (i InfoStorage) DeleteRange(
ctx context.Context, startInfoKey, endInfoKey string, limit int,
) error {
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
if limit > 0 {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3 "+
"ORDER BY info_key ASC LIMIT $4",
j.ID(), startInfoKey, endInfoKey, limit,
)
return err
} else {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
j.ID(), startInfoKey, endInfoKey,
)
return err
}
})
}

// Count counts the info records in the range [start, end).
func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string) (int, error) {
if i.txn == nil {
return 0, errors.New("cannot access the job info table without an associated txn")
}

ctx, sp := tracing.ChildSpan(ctx, "count-job-info")
defer sp.Finish()

row, err := i.txn.QueryRowEx(
ctx, "job-info-count", i.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
i.j.ID(), startInfoKey, endInfoKey,
)

if err != nil || row == nil {
return 0, err
}

value, ok := row[0].(*tree.DInt)
if !ok {
return 0, errors.AssertionFailedf("job info: expected value to be DInt (was %T)", row[0])
}

return int(*value), nil
}

// Iterate iterates though the info records for a given job and info key prefix.
func (i InfoStorage) Iterate(
ctx context.Context, infoPrefix string, fn func(infoKey string, value []byte) error,
Expand Down
71 changes: 69 additions & 2 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestJobInfoAccessors(t *testing.T) {
job1 := createJob(1)
job2 := createJob(2)
job3 := createJob(3)
kPrefix, kA, kB, kC, kD := "🔑", "🔑A", "🔑B", "🔑C", "🔑D"
kPrefix, kA, kB, kC, kD, kE, kF, kG, kZ := "🔑", "🔑A", "🔑B", "🔑C", "🔑D", "🔑E", "🔑F", "🔑G", "🔑Z"
v1, v2, v3 := []byte("val1"), []byte("val2"), []byte("val3")

// Key doesn't exist yet.
Expand Down Expand Up @@ -156,6 +156,13 @@ func TestJobInfoAccessors(t *testing.T) {
}))
require.Equal(t, 3, i)

require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
require.Equal(t, 3, count)
return err
}))

// Add a new revision to kC.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
Expand Down Expand Up @@ -189,7 +196,7 @@ func TestJobInfoAccessors(t *testing.T) {
// Delete kA-kB.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kA, kC)
return infoStorage.DeleteRange(ctx, kA, kC, 0)
}))
// Verify only kC remains.
i = 0
Expand All @@ -202,6 +209,66 @@ func TestJobInfoAccessors(t *testing.T) {
})
}))
require.Equal(t, 1, i)
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
require.Equal(t, 1, count)
return err
}))

// Write kE, kF, kG.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
for _, k := range []string{kE, kF, kG} {
if err := infoStorage.Write(ctx, k, v2); err != nil {
return err
}
}
return nil
}))

// Verify we see 4 rows (c, e, f, g) in the prefix.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 4, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Delete [k, kZ) but with a limit of 2 so just kC and kE.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.DeleteRange(ctx, kC, kZ, 2)
}))

// Verify we see 2 rows (F, G) in the prefix and C and E are missing.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
count, err := infoStorage.Count(ctx, kPrefix, kZ)
if err != nil {
return err
}
require.Equal(t, 2, count)
_, ok, err := infoStorage.Get(ctx, kC)
if err != nil {
return err
}
require.False(t, ok)
_, ok, err = infoStorage.Get(ctx, kF)
if err != nil {
return err
}
require.True(t, ok)
return nil
}))

// Iterate a different job.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/jobs/job_info_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ func WriteChunkedFileToJobInfo(
jobInfo := InfoStorageForJob(txn, jobID)

// Clear any existing chunks with the same filename before writing new chunks.
// We clear all rows that with info keys in [filename, filename#_final~).
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
// We clear all rows that with info keys in [filename, filename#_final~). The
// trailing "~" makes the exclusive end-key inclusive of all possible chunks
// as "~" sorts after all digit.
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~", 0); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/physicalplan",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
Expand Down
25 changes: 25 additions & 0 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const MaxRetainedDSPDiagramsPerJob = 5

// dspDiagMaxCulledPerWrite limits how many old diagrams writing a new one will
// cull to try to maintain the limit of 5; typically it would cull no more than
// one in the steady-state but an upgrading cluster that has accumulated many
// old rows might try to cull more, so we bound how many are eligible at a time
// to some large but finite upper-bound.
const dspDiagMaxCulledPerWrite = 100

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
// table. The generation of the plan diagram and persistence to the info table
// are done asynchronously and this method does not block on their completion.
Expand All @@ -46,6 +55,22 @@ func StorePlanDiagram(

dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano())
infoStorage := jobs.InfoStorageForJob(txn, jobID)

// Limit total retained diagrams by culling older ones as needed.
count, err := infoStorage.Count(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, profilerconstants.DSPDiagramInfoKeyMax)
if err != nil {
return err
}
const keep = MaxRetainedDSPDiagramsPerJob - 1
if toCull := min(count-keep, dspDiagMaxCulledPerWrite); toCull > 0 {
if err := infoStorage.DeleteRange(
ctx, profilerconstants.DSPDiagramInfoKeyPrefix, profilerconstants.DSPDiagramInfoKeyMax, toCull,
); err != nil {
return err
}
}

// Write the new diagram.
return infoStorage.Write(ctx, dspKey, []byte(diagURL.String()))
})
// Don't log the error if the context has been canceled. This will likely be
Expand Down
24 changes: 24 additions & 0 deletions pkg/jobs/jobsprofiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand All @@ -51,6 +52,29 @@ func TestProfilerStorePlanDiagram(t *testing.T) {
ctx := context.Background()
defer s.Stopper().Stop(ctx)

// First verify that directly calling StorePlanDiagram writes n times causes
// the expected number of persisted rows in job_info, respecting the limit.
db := s.ExecutorConfig().(sql.ExecutorConfig).InternalDB
const fakeJobID = 4567
plan := &sql.PhysicalPlan{PhysicalPlan: physicalplan.MakePhysicalPlan(&physicalplan.PhysicalInfrastructure{})}
for i := 1; i < 10; i++ {
jobsprofiler.StorePlanDiagram(ctx, s.ApplicationLayer().AppStopper(), plan, db, fakeJobID)
testutils.SucceedsSoon(t, func() error {
var count int
if err := sqlDB.QueryRow(
`SELECT count(*) FROM system.job_info WHERE job_id = $1`, fakeJobID,
).Scan(&count); err != nil {
return err
}
if expected := min(i, jobsprofiler.MaxRetainedDSPDiagramsPerJob); count != expected {
return errors.Errorf("expected %d rows, got %d", expected, count)
}
return nil
})
}

// Now run various jobs that have been extended to persist diagrams and make
// sure that they also create persisted diagram rows.
_, err := sqlDB.Exec(`CREATE DATABASE test`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE TABLE foo (id INT PRIMARY KEY)`)
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/jobsprofiler/profilerconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

const DSPDiagramInfoKeyPrefix = "~dsp-diag-url-"

// DSPDiagramInfoKeyMax sorts after any diagram info key, because `:“ > [0-9].
const DSPDiagramInfoKeyMax = DSPDiagramInfoKeyPrefix + ":"

// MakeDSPDiagramInfoKey constructs an ephemeral DSP diagram info key.
func MakeDSPDiagramInfoKey(timestampInNanos int64) string {
return fmt.Sprintf("%s%d", DSPDiagramInfoKeyPrefix, timestampInNanos)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3786,7 +3786,10 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.FlushableIngestTableCount.Update(int64(m.Flush.AsIngestTableCount))
sm.FlushableIngestTableSize.Update(int64(m.Flush.AsIngestBytes))
sm.IngestCount.Update(int64(m.Ingest.Count))
sm.WALBytesWritten.Update(int64(m.WAL.BytesWritten))
// NB: `UpdateIfHigher` is used here since there is a race in pebble where
// sometimes the WAL is rotated but metrics are retrieved prior to the update
// to BytesIn to account for the previous WAL.
sm.WALBytesWritten.UpdateIfHigher(int64(m.WAL.BytesWritten))
sm.WALBytesIn.Update(int64(m.WAL.BytesIn))
sm.WALFailoverSwitchCount.Update(m.WAL.Failover.DirSwitchCount)
sm.WALFailoverPrimaryDuration.Update(m.WAL.Failover.PrimaryWriteDuration.Nanoseconds())
Expand Down
Loading

0 comments on commit 6693f4a

Please sign in to comment.