diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index f96679d4aa8..cbb1f37889c 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -15,6 +15,7 @@ - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)** - **[Update default MySQL version to 8.0.40](#mysql-8-0-40)** - **[Update lite images to Debian Bookworm](#debian-bookworm)** + - **[Support for Filtering Query logs on Error](#query-logs)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** - **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)** @@ -25,7 +26,7 @@ These are the RPC changes made in this release - -1. `GetTransactionInfo` RPC has been added to both `VtctldServer`, and `TabletManagerClient` interface. These RPCs are used to fecilitate the users in reading the state of an unresolved distributed transaction. This can be useful in debugging what went wrong and how to fix the problem. +1. `GetTransactionInfo` RPC has been added to both `VtctldServer`, and `TabletManagerClient` interface. These RPCs are used to facilitate the users in reading the state of an unresolved distributed transaction. This can be useful in debugging what went wrong and how to fix the problem. ### Deprecations and Deletions @@ -132,6 +133,10 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552]. +### Support for Filtering Query logs on Error + +The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet. + ## Minor Changes #### VTTablet Flags diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index b9da44a5e79..561f6048ce6 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -272,6 +272,7 @@ Flags: --querylog-buffer-size int Maximum number of buffered query logs before throttling log output (default 10) --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") + --querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables). diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 5d3f35ef7e1..1b3238bca87 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -177,6 +177,7 @@ Flags: --querylog-buffer-size int Maximum number of buffered query logs before throttling log output (default 10) --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") + --querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --redact-debug-ui-queries redact full queries and bind variables from debug UI diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 1532bf870e0..398d10afd7c 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -264,6 +264,7 @@ Flags: --query-log-stream-handler string URL handler for streaming queries log (default "/debug/querylog") --querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization --querylog-format string format for query logs ("text" or "json") (default "text") + --querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all") --querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged. --querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries) --queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables). diff --git a/go/streamlog/streamlog.go b/go/streamlog/streamlog.go index 40d33b840b4..00fc4dd4478 100644 --- a/go/streamlog/streamlog.go +++ b/go/streamlog/streamlog.go @@ -20,7 +20,7 @@ package streamlog import ( "fmt" "io" - rand "math/rand/v2" + "math/rand/v2" "net/http" "net/url" "os" @@ -47,40 +47,42 @@ var ( []string{"Log", "Subscriber"}) ) -var ( - redactDebugUIQueries bool - queryLogFilterTag string - queryLogRowThreshold uint64 - queryLogFormat = "text" - queryLogSampleRate float64 -) +const ( + // QueryLogFormatText is the format specifier for text querylog output + QueryLogFormatText = "text" -func GetRedactDebugUIQueries() bool { - return redactDebugUIQueries -} + // QueryLogFormatJSON is the format specifier for json querylog output + QueryLogFormatJSON = "json" -func SetRedactDebugUIQueries(newRedactDebugUIQueries bool) { - redactDebugUIQueries = newRedactDebugUIQueries -} + // QueryLogModeAll is the mode specifier for logging all queries + QueryLogModeAll = "all" -func SetQueryLogFilterTag(newQueryLogFilterTag string) { - queryLogFilterTag = newQueryLogFilterTag -} + // QueryLogModeError is the mode specifier for logging only queries that return an error + QueryLogModeError = "error" +) -func SetQueryLogRowThreshold(newQueryLogRowThreshold uint64) { - queryLogRowThreshold = newQueryLogRowThreshold +type QueryLogConfig struct { + RedactDebugUIQueries bool + FilterTag string + Format string + Mode string + RowThreshold uint64 + sampleRate float64 } -func SetQueryLogSampleRate(sampleRate float64) { - queryLogSampleRate = sampleRate +var queryLogConfigInstance = QueryLogConfig{ + Format: QueryLogFormatText, + Mode: QueryLogModeAll, } -func GetQueryLogFormat() string { - return queryLogFormat +func GetQueryLogConfig() QueryLogConfig { + return queryLogConfigInstance } -func SetQueryLogFormat(newQueryLogFormat string) { - queryLogFormat = newQueryLogFormat +func NewQueryLogConfigForTest() QueryLogConfig { + return QueryLogConfig{ + Format: QueryLogFormatText, + } } func init() { @@ -91,28 +93,23 @@ func init() { func registerStreamLogFlags(fs *pflag.FlagSet) { // RedactDebugUIQueries controls whether full queries and bind variables are suppressed from debug UIs. - fs.BoolVar(&redactDebugUIQueries, "redact-debug-ui-queries", redactDebugUIQueries, "redact full queries and bind variables from debug UI") + fs.BoolVar(&queryLogConfigInstance.RedactDebugUIQueries, "redact-debug-ui-queries", queryLogConfigInstance.RedactDebugUIQueries, "redact full queries and bind variables from debug UI") // QueryLogFormat controls the format of the query log (either text or json) - fs.StringVar(&queryLogFormat, "querylog-format", queryLogFormat, "format for query logs (\"text\" or \"json\")") + fs.StringVar(&queryLogConfigInstance.Format, "querylog-format", queryLogConfigInstance.Format, "format for query logs (\"text\" or \"json\")") // QueryLogFilterTag contains an optional string that must be present in the query for it to be logged - fs.StringVar(&queryLogFilterTag, "querylog-filter-tag", queryLogFilterTag, "string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization") + fs.StringVar(&queryLogConfigInstance.FilterTag, "querylog-filter-tag", queryLogConfigInstance.FilterTag, "string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization") // QueryLogRowThreshold only log queries returning or affecting this many rows - fs.Uint64Var(&queryLogRowThreshold, "querylog-row-threshold", queryLogRowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.") + fs.Uint64Var(&queryLogConfigInstance.RowThreshold, "querylog-row-threshold", queryLogConfigInstance.RowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.") // QueryLogSampleRate causes a sample of queries to be logged - fs.Float64Var(&queryLogSampleRate, "querylog-sample-rate", queryLogSampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)") -} - -const ( - // QueryLogFormatText is the format specifier for text querylog output - QueryLogFormatText = "text" + fs.Float64Var(&queryLogConfigInstance.sampleRate, "querylog-sample-rate", queryLogConfigInstance.sampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)") - // QueryLogFormatJSON is the format specifier for json querylog output - QueryLogFormatJSON = "json" -) + // QueryLogMode controls the mode for logging queries (all or error) + fs.StringVar(&queryLogConfigInstance.Mode, "querylog-mode", queryLogConfigInstance.Mode, `Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged.`) +} // StreamLogger is a non-blocking broadcaster of messages. // Subscribers can use channels or HTTP. @@ -257,27 +254,30 @@ func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter { } } -// shouldSampleQuery returns true if a query should be sampled based on queryLogSampleRate -func shouldSampleQuery() bool { - if queryLogSampleRate <= 0 { +// shouldSampleQuery returns true if a query should be sampled based on sampleRate +func (qlConfig QueryLogConfig) shouldSampleQuery() bool { + if qlConfig.sampleRate <= 0 { return false - } else if queryLogSampleRate >= 1 { + } else if qlConfig.sampleRate >= 1 { return true } - return rand.Float64() <= queryLogSampleRate + return rand.Float64() <= qlConfig.sampleRate } // ShouldEmitLog returns whether the log with the given SQL query // should be emitted or filtered -func ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64) bool { - if shouldSampleQuery() { +func (qlConfig QueryLogConfig) ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64, hasError bool) bool { + if qlConfig.shouldSampleQuery() { return true } - if queryLogRowThreshold > max(rowsAffected, rowsReturned) && queryLogFilterTag == "" { + if qlConfig.RowThreshold > max(rowsAffected, rowsReturned) && qlConfig.FilterTag == "" { return false } - if queryLogFilterTag != "" { - return strings.Contains(sql, queryLogFilterTag) + if qlConfig.FilterTag != "" { + return strings.Contains(sql, qlConfig.FilterTag) + } + if qlConfig.Mode == QueryLogModeError { + return hasError } return true } diff --git a/go/streamlog/streamlog_test.go b/go/streamlog/streamlog_test.go index c1321c92a94..8256ada479e 100644 --- a/go/streamlog/streamlog_test.go +++ b/go/streamlog/streamlog_test.go @@ -266,40 +266,29 @@ func TestFile(t *testing.T) { } func TestShouldSampleQuery(t *testing.T) { - queryLogSampleRate = -1 - assert.False(t, shouldSampleQuery()) + qlConfig := QueryLogConfig{sampleRate: -1} + assert.False(t, qlConfig.shouldSampleQuery()) - queryLogSampleRate = 0 - assert.False(t, shouldSampleQuery()) + qlConfig.sampleRate = 0 + assert.False(t, qlConfig.shouldSampleQuery()) - // for test coverage, can't test a random result - queryLogSampleRate = 0.5 - shouldSampleQuery() + qlConfig.sampleRate = 1.0 + assert.True(t, qlConfig.shouldSampleQuery()) - queryLogSampleRate = 1.0 - assert.True(t, shouldSampleQuery()) - - queryLogSampleRate = 100.0 - assert.True(t, shouldSampleQuery()) + qlConfig.sampleRate = 100.0 + assert.True(t, qlConfig.shouldSampleQuery()) } func TestShouldEmitLog(t *testing.T) { - origQueryLogFilterTag := queryLogFilterTag - origQueryLogRowThreshold := queryLogRowThreshold - origQueryLogSampleRate := queryLogSampleRate - defer func() { - SetQueryLogFilterTag(origQueryLogFilterTag) - SetQueryLogRowThreshold(origQueryLogRowThreshold) - SetQueryLogSampleRate(origQueryLogSampleRate) - }() - tests := []struct { sql string qLogFilterTag string qLogRowThreshold uint64 qLogSampleRate float64 + qLogMode string rowsAffected uint64 rowsReturned uint64 + errored bool ok bool }{ { @@ -356,43 +345,33 @@ func TestShouldEmitLog(t *testing.T) { rowsReturned: 17, ok: true, }, + { + sql: "log only error - no error", + qLogMode: "error", + errored: false, + ok: false, + }, + { + sql: "log only error - errored", + qLogMode: "error", + errored: true, + ok: true, + }, } for _, tt := range tests { t.Run(tt.sql, func(t *testing.T) { - SetQueryLogFilterTag(tt.qLogFilterTag) - SetQueryLogRowThreshold(tt.qLogRowThreshold) - SetQueryLogSampleRate(tt.qLogSampleRate) - - require.Equal(t, tt.ok, ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned)) + qlConfig := QueryLogConfig{ + FilterTag: tt.qLogFilterTag, + RowThreshold: tt.qLogRowThreshold, + sampleRate: tt.qLogSampleRate, + Mode: tt.qLogMode, + } + require.Equal(t, tt.ok, qlConfig.ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned, tt.errored)) }) } } -func BenchmarkShouldEmitLog(b *testing.B) { - b.Run("default", func(b *testing.B) { - SetQueryLogSampleRate(0.0) - for i := 0; i < b.N; i++ { - ShouldEmitLog("select * from test where user='someone'", 0, 123) - } - }) - b.Run("filter_tag", func(b *testing.B) { - SetQueryLogSampleRate(0.0) - SetQueryLogFilterTag("LOG_QUERY") - defer SetQueryLogFilterTag("") - for i := 0; i < b.N; i++ { - ShouldEmitLog("select /* LOG_QUERY=1 */ * from test where user='someone'", 0, 123) - } - }) - b.Run("50%_sample_rate", func(b *testing.B) { - SetQueryLogSampleRate(0.5) - defer SetQueryLogSampleRate(0.0) - for i := 0; i < b.N; i++ { - ShouldEmitLog("select * from test where user='someone'", 0, 123) - } - }) -} - func TestGetFormatter(t *testing.T) { tests := []struct { name string diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index d45073cd006..155fe6de743 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -74,7 +74,12 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests queryLogBufferSize := 10 plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false) - vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0, vtgate.NewDynamicViperConfig()) + eConfig := vtgate.ExecutorConfig{ + Normalize: opts.Normalize, + StreamSize: streamSize, + AllowScatter: true, + } + vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, eConfig, false, plans, schemaTracker, opts.PlannerVersion, vtgate.NewDynamicViperConfig()) vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)) return nil diff --git a/go/vt/vtgate/bench_test.go b/go/vt/vtgate/bench_test.go index 5c64c7e3473..c2336662c69 100644 --- a/go/vt/vtgate/bench_test.go +++ b/go/vt/vtgate/bench_test.go @@ -80,7 +80,7 @@ func BenchmarkWithNormalizer(b *testing.B) { func BenchmarkWithoutNormalizer(b *testing.B) { vtgateInst, _, ctx := createVtgateEnv(b) - vtgateInst.executor.normalize = false + vtgateInst.executor.config.Normalize = false for i := 0; i < b.N; i++ { _, _, err := vtgateInst.Execute( diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 0281e28700f..c8dfd0b6269 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -107,39 +107,45 @@ func init() { // Executor is the engine that executes queries by utilizing // the abilities of the underlying vttablets. -type Executor struct { - env *vtenv.Environment - serv srvtopo.Server - cell string - resolver *Resolver - scatterConn *ScatterConn - txConn *TxConn +type ( + ExecutorConfig struct { + Normalize bool + StreamSize int + // AllowScatter will fail planning if set to false and a plan contains any scatter queries + AllowScatter bool + WarmingReadsPercent int + QueryLogToFile string + } - mu sync.Mutex - vschema *vindexes.VSchema - streamSize int - vschemaStats *VSchemaStats + Executor struct { + config ExecutorConfig - plans *PlanCache - epoch atomic.Uint32 + env *vtenv.Environment + serv srvtopo.Server + cell string + resolver *Resolver + scatterConn *ScatterConn + txConn *TxConn - normalize bool + mu sync.Mutex + vschema *vindexes.VSchema + vschemaStats *VSchemaStats - vm *VSchemaManager - schemaTracker SchemaInfo + plans *PlanCache + epoch atomic.Uint32 - // allowScatter will fail planning if set to false and a plan contains any scatter queries - allowScatter bool + vm *VSchemaManager + schemaTracker SchemaInfo - // queryLogger is passed in for logging from this vtgate executor. - queryLogger *streamlog.StreamLogger[*logstats.LogStats] + // queryLogger is passed in for logging from this vtgate executor. + queryLogger *streamlog.StreamLogger[*logstats.LogStats] - warmingReadsPercent int - warmingReadsChannel chan bool + warmingReadsChannel chan bool - vConfig econtext.VCursorConfig - ddlConfig dynamicconfig.DDL -} + vConfig econtext.VCursorConfig + ddlConfig dynamicconfig.DDL + } +) var executorOnce sync.Once @@ -163,28 +169,24 @@ func NewExecutor( serv srvtopo.Server, cell string, resolver *Resolver, - normalize, warnOnShardedOnly bool, - streamSize int, + eConfig ExecutorConfig, + warnOnShardedOnly bool, plans *PlanCache, schemaTracker SchemaInfo, - noScatter bool, pv plancontext.PlannerVersion, - warmingReadsPercent int, ddlConfig dynamicconfig.DDL, ) *Executor { e := &Executor{ - env: env, - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - normalize: normalize, - streamSize: streamSize, + config: eConfig, + env: env, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + schemaTracker: schemaTracker, - allowScatter: !noScatter, plans: plans, - warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), ddlConfig: ddlConfig, } @@ -234,7 +236,7 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn trace.AnnotateSQL(span, sqlparser.Preview(sql)) defer span.Finish() - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, streamlog.GetQueryLogConfig()) stmtType, result, err := e.execute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats) logStats.Error = err if result == nil { @@ -297,7 +299,7 @@ func (e *Executor) StreamExecute( trace.AnnotateSQL(span, sqlparser.Preview(sql)) defer span.Finish() - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, streamlog.GetQueryLogConfig()) srr := &streaminResultReceiver{callback: callback} var err error @@ -327,7 +329,7 @@ func (e *Executor) StreamExecute( byteCount += col.Len() } - if byteCount >= e.streamSize { + if byteCount >= e.config.StreamSize { err := callback(result) seenResults.Store(true) result = &sqltypes.Result{} @@ -1340,7 +1342,7 @@ func isValidPayloadSize(query string) bool { // Prepare executes a prepare statements. func (e *Executor) Prepare(ctx context.Context, method string, safeSession *econtext.SafeSession, sql string, bindVars map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) { - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, streamlog.GetQueryLogConfig()) fld, err = e.prepare(ctx, safeSession, sql, bindVars, logStats) logStats.Error = err @@ -1419,7 +1421,7 @@ func (e *Executor) initVConfig(warnOnShardedOnly bool, pv plancontext.PlannerVer DBDDLPlugin: dbDDLPlugin, - WarmingReadsPercent: e.warmingReadsPercent, + WarmingReadsPercent: e.config.WarmingReadsPercent, WarmingReadsTimeout: warmingReadsQueryTimeout, WarmingReadsChannel: e.warmingReadsChannel, } @@ -1539,7 +1541,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar } func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.Plan) error { - if e.allowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) { + if e.config.AllowScatter || plan.Instructions == nil || sqlparser.AllowScatterDirective(stmt) { return nil } // we go over all the primitives in the plan, searching for a route that is of SelectScatter opcode @@ -1610,7 +1612,7 @@ func (e *Executor) PlanPrepareStmt(ctx context.Context, vcursor *econtext.VCurso } // creating this log stats to not interfere with the original log stats. - lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.Session().GetSessionUUID(), nil) + lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.Session().GetSessionUUID(), nil, streamlog.GetQueryLogConfig()) plan, err := e.getPlan( ctx, vcursor, diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 503b5e5cd8b..2846c763ae9 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -608,9 +608,8 @@ func TestUpdateComments(t *testing.T) { } func TestUpdateNormalize(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - executor.normalize = true session := &vtgatepb.Session{ TargetString: "@primary", } @@ -1337,7 +1336,7 @@ func TestInsertSharded(t *testing.T) { testQueryLog(t, executor, logChan, "TestExecute", "INSERT", "insert into user2(id, `name`, lastname) values (2, 'myname', 'mylastname')", 1) // insert with binary values - executor.normalize = true + executor.config.Normalize = true sbc1.Queries = nil sbc2.Queries = nil sbclookup.Queries = nil @@ -1368,8 +1367,7 @@ func TestInsertSharded(t *testing.T) { } func TestInsertNegativeValue(t *testing.T) { - executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, sbclookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -2528,9 +2526,7 @@ func TestDeleteEqualWithPrepare(t *testing.T) { } func TestUpdateLastInsertID(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "update user set a = last_insert_id() where id = 1" session := &vtgatepb.Session{ diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 43987217039..099b785b446 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -65,10 +65,6 @@ var badVSchema = ` } ` -const ( - testBufferSize = 10 -) - type DestinationAnyShardPickerFirstShard struct{} func (dp DestinationAnyShardPickerFirstShard) PickShard(shardCount int) int { @@ -130,7 +126,7 @@ func init() { vindexes.Register("keyrange_lookuper_unique", newKeyRangeLookuperUnique) } -func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) { +func createExecutorEnvCallback(t testing.TB, eConfig ExecutorConfig, eachShard func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn)) (executor *Executor, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) cell := "aa" @@ -183,7 +179,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta // one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness. plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} @@ -198,7 +194,11 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta } func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { - executor, ctx = createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { + return createExecutorEnvWithConfig(t, createExecutorConfig()) +} + +func createExecutorEnvWithConfig(t testing.TB, eConfig ExecutorConfig) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { + executor, ctx = createExecutorEnvCallback(t, eConfig, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { switch { case ks == KsTestSharded && shard == "-20": sbc1 = conn @@ -210,7 +210,6 @@ func createExecutorEnv(t testing.TB) (executor *Executor, sbc1, sbc2, sbclookup }) return } - func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) @@ -232,7 +231,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex plans := DefaultPlanCache() env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion}) require.NoError(t, err) - executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor = NewExecutor(ctx, env, serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -244,6 +243,21 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex return executor, sbc1, sbc2, sbclookup, ctx } +func createExecutorConfig() ExecutorConfig { + return ExecutorConfig{ + StreamSize: 10, + AllowScatter: true, + } +} + +func createExecutorConfigWithNormalizer() ExecutorConfig { + return ExecutorConfig{ + StreamSize: 10, + AllowScatter: true, + Normalize: true, + } +} + func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqltypes.Result) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn, ctx context.Context) { var cancel context.CancelFunc ctx, cancel = context.WithCancel(context.Background()) @@ -269,7 +283,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { @@ -294,7 +308,9 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context, replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil) queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) - executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, NewDynamicViperConfig()) + eConfig := createExecutorConfig() + eConfig.WarmingReadsPercent = warmingReadsPercent + executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, eConfig, false, DefaultPlanCache(), nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) t.Cleanup(func() { diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index 16628729ac6..3d8261495fe 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -166,7 +166,7 @@ func TestSelectDBA(t *testing.T) { func TestSystemVariablesMySQLBelow80(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "5.7.0") - executor.normalize = true + executor.config.Normalize = true setVarEnabled = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -200,7 +200,7 @@ func TestSystemVariablesMySQLBelow80(t *testing.T) { func TestSystemVariablesWithSetVarDisabled(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true executor.vConfig.SetVarEnabled = false session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -233,7 +233,7 @@ func TestSystemVariablesWithSetVarDisabled(t *testing.T) { func TestSetSystemVariablesTx(t *testing.T) { executor, sbc1, _, _, _ := createCustomExecutor(t, "{}", "8.0.1") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: "TestExecutor"}) @@ -278,9 +278,7 @@ func TestSetSystemVariablesTx(t *testing.T) { } func TestSetSystemVariables(t *testing.T) { - executor, _, _, lookup, _ := createExecutorEnv(t) - executor.normalize = true - + executor, _, _, lookup, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded, SystemVariables: map[string]string{}}) // Set @@sql_mode and execute a select statement. We should have SET_VAR in the select statement @@ -389,8 +387,7 @@ func TestSetSystemVariables(t *testing.T) { } func TestSetSystemVariablesWithReservedConnection(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, SystemVariables: map[string]string{}}) @@ -445,8 +442,7 @@ func TestSelectVindexFunc(t *testing.T) { } func TestCreateTableValidTimestamp(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor", SystemVariables: map[string]string{"sql_mode": "ALLOW_INVALID_DATES"}}) @@ -464,8 +460,7 @@ func TestCreateTableValidTimestamp(t *testing.T) { } func TestGen4SelectDBA(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) query := "select * from INFORMATION_SCHEMA.TABLE_CONSTRAINTS" _, err := executor.Execute(context.Background(), nil, "TestSelectDBA", @@ -699,18 +694,19 @@ func TestStreamLimitOffset(t *testing.T) { }}, } - executor, _ := createExecutorEnvCallback(t, func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { - if ks == KsTestSharded { - conn.SetResults([]*sqltypes.Result{{ - Fields: []*querypb.Field{ - {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, - {Name: "textcol", Type: sqltypes.VarChar, Charset: uint32(collations.MySQL8().DefaultConnectionCharset())}, - {Name: "weight_string(id)", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)}, - }, - Rows: returnRows[shard], - }}) - } - }) + executor, _ := createExecutorEnvCallback(t, createExecutorConfig(), + func(shard, ks string, tabletType topodatapb.TabletType, conn *sandboxconn.SandboxConn) { + if ks == KsTestSharded { + conn.SetResults([]*sqltypes.Result{{ + Fields: []*querypb.Field{ + {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, + {Name: "textcol", Type: sqltypes.VarChar, Charset: uint32(collations.MySQL8().DefaultConnectionCharset())}, + {Name: "weight_string(id)", Type: sqltypes.VarBinary, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_BINARY_FLAG)}, + }, + Rows: returnRows[shard], + }}) + } + }) results := make(chan *sqltypes.Result, 10) session := &vtgatepb.Session{ @@ -756,12 +752,11 @@ func TestStreamLimitOffset(t *testing.T) { } func TestSelectLastInsertId(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", LastInsertId: 52, } - executor.normalize = true logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -780,7 +775,7 @@ func TestSelectLastInsertId(t *testing.T) { } func TestSelectSystemVariables(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -790,7 +785,6 @@ func TestSelectSystemVariables(t *testing.T) { SessionTrackGtids: true, }, } - executor.normalize = true logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -840,8 +834,7 @@ func TestSelectSystemVariables(t *testing.T) { } func TestSelectInitializedVitessAwareVariable(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -872,8 +865,7 @@ func TestSelectInitializedVitessAwareVariable(t *testing.T) { } func TestSelectUserDefinedVariable(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -908,8 +900,7 @@ func TestSelectUserDefinedVariable(t *testing.T) { } func TestFoundRows(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -935,8 +926,7 @@ func TestFoundRows(t *testing.T) { } func TestRowCount(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -968,8 +958,7 @@ func testRowCount(t *testing.T, ctx context.Context, executor *Executor, session } func TestSelectLastInsertIdInUnion(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -1002,8 +991,7 @@ func TestSelectLastInsertIdInUnion(t *testing.T) { } func TestSelectLastInsertIdInWhere(t *testing.T) { - executor, _, _, lookup, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, lookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -1022,8 +1010,7 @@ func TestSelectLastInsertIdInWhere(t *testing.T) { } func TestLastInsertIDInVirtualTable(t *testing.T) { - executor, sbc1, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) result1 := []*sqltypes.Result{{ Fields: []*querypb.Field{ {Name: "id", Type: sqltypes.Int32, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG)}, @@ -1050,8 +1037,7 @@ func TestLastInsertIDInVirtualTable(t *testing.T) { } func TestLastInsertIDInSubQueryExpression(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", LastInsertId: 12345, @@ -1074,8 +1060,7 @@ func TestLastInsertIDInSubQueryExpression(t *testing.T) { } func TestSelectDatabase(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "select database()" newSession := &vtgatepb.Session{ TargetString: "@primary", @@ -1334,8 +1319,7 @@ func TestSelectComments(t *testing.T) { } func TestSelectNormalize(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "@primary", @@ -1644,7 +1628,7 @@ func TestSelectListArg(t *testing.T) { func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) ex.SetQueryLogger(queryLogger) return ex } @@ -3123,8 +3107,7 @@ func TestSelectBindvarswithPrepare(t *testing.T) { } func TestSelectDatabasePrepare(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -3137,8 +3120,7 @@ func TestSelectDatabasePrepare(t *testing.T) { } func TestSelectWithUnionAll(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) sql := "select id from user where id in (1, 2, 3) union all select id from user where id in (1, 2, 3)" bv, _ := sqltypes.BuildBindVariable([]int64{1, 2, 3}) bv1, _ := sqltypes.BuildBindVariable([]int64{1, 2}) @@ -3326,7 +3308,7 @@ func TestStreamOrderByWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -3369,7 +3351,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { } queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfigWithNormalizer(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() // some sleep for all goroutines to start @@ -3420,7 +3402,7 @@ func TestSelectScatterFails(t *testing.T) { executor := createExecutor(ctx, serv, cell, resolver) defer executor.Close() - executor.allowScatter = false + executor.config.AllowScatter = false logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -3447,8 +3429,7 @@ func TestSelectScatterFails(t *testing.T) { } func TestGen4SelectStraightJoin(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select u.id from user u straight_join user2 u2 on u.id = u2.id" @@ -3469,8 +3450,7 @@ func TestGen4SelectStraightJoin(t *testing.T) { } func TestGen4MultiColumnVindexEqual(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola = 1 and colb = 2" @@ -3507,8 +3487,7 @@ func TestGen4MultiColumnVindexEqual(t *testing.T) { } func TestGen4MultiColumnVindexIn(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where cola IN (1,17984) and colb IN (2,3,4)" @@ -3545,8 +3524,7 @@ func TestGen4MultiColumnVindexIn(t *testing.T) { } func TestGen4MultiColMixedColComparision(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984)" @@ -3581,8 +3559,7 @@ func TestGen4MultiColMixedColComparision(t *testing.T) { } func TestGen4MultiColBestVindexSel(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where colb = 2 and cola IN (1,17984) and cola = 1" @@ -3626,8 +3603,7 @@ func TestGen4MultiColBestVindexSel(t *testing.T) { } func TestGen4MultiColMultiEqual(t *testing.T) { - executor, sbc1, sbc2, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, sbc2, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: "TestExecutor"}) query := "select * from user_region where (cola,colb) in ((17984,2),(17984,3))" @@ -4229,8 +4205,7 @@ func TestSelectAggregationRandom(t *testing.T) { } func TestSelectDateTypes(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) qr, err := executor.Execute(context.Background(), nil, "TestSelectDateTypes", session, "select '2020-01-01' + interval month(date_sub(FROM_UNIXTIME(1234), interval 1 month))-1 month", nil) @@ -4240,8 +4215,7 @@ func TestSelectDateTypes(t *testing.T) { } func TestSelectHexAndBit(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) qr, err := executor.Execute(context.Background(), nil, "TestSelectHexAndBit", session, "select 0b1001, b'1001', 0x9, x'09'", nil) @@ -4256,8 +4230,7 @@ func TestSelectHexAndBit(t *testing.T) { // TestSelectCFC tests validates that cfc vindex plan gets cached and same plan is getting reused. // This also validates that cache_size is able to calculate the cfc vindex plan size. func TestSelectCFC(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) _, err := executor.Execute(context.Background(), nil, "TestSelectCFC", session, "select /*vt+ PLANNER=gen4 */ c2 from tbl_cfc where c1 like 'A%'", nil) @@ -4281,12 +4254,11 @@ func TestSelectCFC(t *testing.T) { } func TestSelectView(t *testing.T) { - executor, sbc, _, _, _ := createExecutorEnv(t) + executor, sbc, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) // add the view to local vschema err := executor.vschema.AddView(KsTestSharded, "user_details_view", "select user.id, user_extra.col from user join user_extra on user.id = user_extra.user_id", executor.vm.parser) require.NoError(t, err) - executor.normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{}) _, err = executor.Execute(context.Background(), nil, "TestSelectView", session, "select * from user_details_view", nil) @@ -4327,7 +4299,7 @@ func TestWarmingReads(t *testing.T) { ctx := utils.LeakCheckContext(t) executor, primary, replica := createExecutorEnvWithPrimaryReplicaConn(t, ctx, 100) - executor.normalize = true + executor.config.Normalize = true session := econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded}) // Since queries on the replica will run in a separate go-routine, we need synchronization for the Queries field in the sandboxconn. replica.RequireQueriesLocking() @@ -4451,8 +4423,7 @@ func TestStreamJoinQuery(t *testing.T) { // It also tests that setting a global variable does not affect the session variable and vice versa. // Also, test what happens on running select @@global and select @@session for a system variable. func TestSysVarGlobalAndSession(t *testing.T) { - executor, sbc1, _, _, _ := createExecutorEnv(t) - executor.normalize = true + executor, sbc1, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, SystemVariables: map[string]string{}}) sbc1.SetResults([]*sqltypes.Result{ diff --git a/go/vt/vtgate/executor_set_test.go b/go/vt/vtgate/executor_set_test.go index f8ed0b558c3..310d885a134 100644 --- a/go/vt/vtgate/executor_set_test.go +++ b/go/vt/vtgate/executor_set_test.go @@ -514,7 +514,7 @@ func createMap(keys []string, values []any) map[string]*querypb.BindVariable { func TestSetVar(t *testing.T) { executor, _, _, sbc, ctx := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded}) @@ -553,7 +553,7 @@ func TestSetVar(t *testing.T) { func TestSetVarShowVariables(t *testing.T) { executor, _, _, sbc, ctx := createCustomExecutor(t, "{}", "8.0.0") - executor.normalize = true + executor.config.Normalize = true session := econtext.NewAutocommitSession(&vtgatepb.Session{EnableSystemSettings: true, TargetString: KsTestUnsharded}) @@ -576,8 +576,7 @@ func TestSetVarShowVariables(t *testing.T) { } func TestExecutorSetAndSelect(t *testing.T) { - e, _, _, sbc, ctx := createExecutorEnv(t) - e.normalize = true + e, _, _, sbc, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) testcases := []struct { sysVar string diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index 8bb10aae8fb..796e0c0466b 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -68,7 +68,7 @@ func TestStreamSQLSharded(t *testing.T) { queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize) plans := DefaultPlanCache() - executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, NewDynamicViperConfig()) + executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, createExecutorConfig(), false, plans, nil, querypb.ExecuteOptions_Gen4, NewDynamicViperConfig()) executor.SetQueryLogger(queryLogger) defer executor.Close() diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 859e6a05973..904805e789b 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -36,6 +36,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/streamlog" + econtext "vitess.io/vitess/go/vt/vtgate/executorcontext" "vitess.io/vitess/go/mysql/collations" @@ -196,9 +198,7 @@ func TestExecutorTransactionsNoAutoCommit(t *testing.T) { } func TestDirectTargetRewrites(t *testing.T) { - executor, _, _, sbclookup, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, sbclookup, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "TestUnsharded/0@primary", @@ -1625,7 +1625,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *econtext.VCursorImpl, sq } func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *econtext.VCursorImpl, sql string, comments sqlparser.MarginComments, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool) (*engine.Plan, *logstats.LogStats) { - logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) + logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, streamlog.NewQueryLogConfigForTest()) vcursor.SafeSession = &econtext.SafeSession{ Session: &vtgatepb.Session{ Options: &querypb.ExecuteOptions{SkipQueryPlanCache: skipQueryPlanCache}}, @@ -1633,7 +1633,7 @@ func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *econ stmt, reservedVars, err := parseAndValidateQuery(sql, sqlparser.NewTestParser()) require.NoError(t, err) - plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars /* normalize */, e.normalize, logStats) + plan, err := e.getPlan(context.Background(), vcursor, sql, stmt, comments, bindVars, reservedVars /* normalize */, e.config.Normalize, logStats) require.NoError(t, err) // Wait for cache to settle @@ -1693,8 +1693,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) - r.normalize = true + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) query1 := "select * from music_user_map where id = 1" @@ -1710,8 +1709,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Skip Cache", func(t *testing.T) { // Skip cache using directive - r, _, _, _, ctx := createExecutorEnv(t) - r.normalize = true + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, r.vConfig) query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" @@ -1735,9 +1733,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { } func TestGetPlanNormalized(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - r.normalize = true emptyvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) unshardedvc, _ := econtext.NewVCursorImpl(econtext.NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) @@ -1792,10 +1789,9 @@ func TestGetPlanPriority(t *testing.T) { testCase := aTestCase t.Run(testCase.name, func(t *testing.T) { - r, _, _, _, ctx := createExecutorEnv(t) + r, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) - r.normalize = true - logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) + logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, streamlog.NewQueryLogConfigForTest()) vCursor, err := econtext.NewVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, nullResultsObserver{}, econtext.VCursorConfig{}) assert.NoError(t, err) @@ -1818,7 +1814,7 @@ func TestGetPlanPriority(t *testing.T) { } func TestPassthroughDDL(t *testing.T) { - executor, sbc1, sbc2, _, ctx := createExecutorEnv(t) + executor, sbc1, sbc2, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{ TargetString: "TestExecutor", } @@ -1841,7 +1837,6 @@ func TestPassthroughDDL(t *testing.T) { // Force the query to go to only one shard. Normalization doesn't make any difference. session.TargetString = "TestExecutor/40-60" - executor.normalize = true _, err = executorExec(ctx, executor, session, alterDDL, nil) require.NoError(t, err) @@ -1853,7 +1848,6 @@ func TestPassthroughDDL(t *testing.T) { // Use range query session.TargetString = "TestExecutor[-]" - executor.normalize = true _, err = executorExec(ctx, executor, session, alterDDL, nil) require.NoError(t, err) @@ -2003,10 +1997,7 @@ func TestExecutorMaxPayloadSizeExceeded(t *testing.T) { } func TestOlapSelectDatabase(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - - executor.normalize = true - + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := &vtgatepb.Session{Autocommit: true} sql := `select database()` @@ -2284,9 +2275,7 @@ func TestExecutorExplainStmt(t *testing.T) { } func TestExecutorVExplain(t *testing.T) { - executor, _, _, _, ctx := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, _, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) logChan := executor.queryLogger.Subscribe("Test") defer executor.queryLogger.Unsubscribe(logChan) @@ -2756,9 +2745,7 @@ func TestExecutorStartTxnStmt(t *testing.T) { } func TestExecutorPrepareExecute(t *testing.T) { - executor, _, _, _, _ := createExecutorEnv(t) - - executor.normalize = true + executor, _, _, _, _ := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) session := econtext.NewAutocommitSession(&vtgatepb.Session{}) // prepare statement. diff --git a/go/vt/vtgate/executorcontext/vcursor_impl_test.go b/go/vt/vtgate/executorcontext/vcursor_impl_test.go index 08e27be4c51..244a80c9ffe 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl_test.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl_test.go @@ -27,6 +27,8 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/streamlog" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -367,7 +369,7 @@ func TestSetExecQueryTimeout(t *testing.T) { func TestRecordMirrorStats(t *testing.T) { safeSession := NewSafeSession(nil) - logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil) + logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil, streamlog.NewQueryLogConfigForTest()) vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, &vindexes.VSchema{}, nil, nil, fakeObserver{}, VCursorConfig{}) require.NoError(t, err) diff --git a/go/vt/vtgate/logstats/logstats.go b/go/vt/vtgate/logstats/logstats.go index fdc0e69c8db..3ad75d07722 100644 --- a/go/vt/vtgate/logstats/logstats.go +++ b/go/vt/vtgate/logstats/logstats.go @@ -33,6 +33,8 @@ import ( // LogStats records the stats for a single vtgate query type LogStats struct { + Config streamlog.QueryLogConfig + Ctx context.Context Method string TabletType string @@ -59,7 +61,7 @@ type LogStats struct { // NewLogStats constructs a new LogStats with supplied Method and ctx // field values, and the StartTime field set to the present time. -func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable) *LogStats { +func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable, config streamlog.QueryLogConfig) *LogStats { return &LogStats{ Ctx: ctx, Method: methodName, @@ -67,6 +69,7 @@ func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindV SessionUUID: sessionUUID, BindVariables: bindVars, StartTime: time.Now(), + Config: config, } } @@ -130,16 +133,15 @@ func (stats *LogStats) MirrorTargetErrorStr() string { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { - if !streamlog.ShouldEmitLog(stats.SQL, stats.RowsAffected, stats.RowsReturned) { + if !stats.Config.ShouldEmitLog(stats.SQL, stats.RowsAffected, stats.RowsReturned, stats.Error != nil) { return nil } - redacted := streamlog.GetRedactDebugUIQueries() _, fullBindParams := params["full"] remoteAddr, username := stats.RemoteAddrUsername() log := logstats.NewLogger() - log.Init(streamlog.GetQueryLogFormat() == streamlog.QueryLogFormatJSON) + log.Init(stats.Config.Format == streamlog.QueryLogFormatJSON) log.Key("Method") log.StringUnquoted(stats.Method) log.Key("RemoteAddr") @@ -167,7 +169,7 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Key("SQL") log.String(stats.SQL) log.Key("BindVars") - if redacted { + if stats.Config.RedactDebugUIQueries { log.Redacted() } else { log.BindVariables(stats.BindVariables, fullBindParams) diff --git a/go/vt/vtgate/logstats/logstats_test.go b/go/vt/vtgate/logstats/logstats_test.go index 872b34c6964..78b27342a7e 100644 --- a/go/vt/vtgate/logstats/logstats_test.go +++ b/go/vt/vtgate/logstats/logstats_test.go @@ -55,11 +55,7 @@ func testFormat(t *testing.T, stats *LogStats, params url.Values) string { } func TestLogStatsFormat(t *testing.T) { - defer func() { - streamlog.SetRedactDebugUIQueries(false) - streamlog.SetQueryLogFormat("text") - }() - logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil) + logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil, streamlog.NewQueryLogConfigForTest()) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) logStats.TablesUsed = []string{"ks1.tbl1", "ks2.tbl2"} @@ -125,8 +121,8 @@ func TestLogStatsFormat(t *testing.T) { for _, variable := range logStats.BindVariables { fmt.Println("->" + fmt.Sprintf("%v", variable)) } - streamlog.SetRedactDebugUIQueries(test.redact) - streamlog.SetQueryLogFormat(test.format) + logStats.Config.RedactDebugUIQueries = test.redact + logStats.Config.Format = test.format if test.format == "text" { got := testFormat(t, logStats, params) t.Logf("got: %s", got) @@ -148,9 +144,8 @@ func TestLogStatsFormat(t *testing.T) { } func TestLogStatsFilter(t *testing.T) { - defer func() { streamlog.SetQueryLogFilterTag("") }() - - logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", + map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, streamlog.NewQueryLogConfigForTest()) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) params := map[string][]string{"full": {}} @@ -159,21 +154,20 @@ func TestLogStatsFilter(t *testing.T) { want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) - streamlog.SetQueryLogFilterTag("LOG_THIS_QUERY") + logStats.Config.FilterTag = "LOG_THIS_QUERY" got = testFormat(t, logStats, params) want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) - streamlog.SetQueryLogFilterTag("NOT_THIS_QUERY") + logStats.Config.FilterTag = "NOT_THIS_QUERY" got = testFormat(t, logStats, params) want = "" assert.Equal(t, want, got) } func TestLogStatsRowThreshold(t *testing.T) { - defer func() { streamlog.SetQueryLogRowThreshold(0) }() - - logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", + map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, streamlog.NewQueryLogConfigForTest()) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) params := map[string][]string{"full": {}} @@ -182,11 +176,11 @@ func TestLogStatsRowThreshold(t *testing.T) { want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) - streamlog.SetQueryLogRowThreshold(0) got = testFormat(t, logStats, params) want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t0.000000\t0.000000\t0.000000\t\t\"sql1 /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t0\t0\t\"\"\t\"\"\t\"\"\tfalse\t[]\t\"\"\t0.000000\t0.000000\t\"\"\n" assert.Equal(t, want, got) - streamlog.SetQueryLogRowThreshold(1) + + logStats.Config.RowThreshold = 1 got = testFormat(t, logStats, params) assert.Empty(t, got) } @@ -197,14 +191,14 @@ func TestLogStatsContextHTML(t *testing.T) { Html: testconversions.MakeHTMLForTest(html), } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) if logStats.ContextHTML().String() != html { t.Fatalf("expect to get html: %s, but got: %s", html, logStats.ContextHTML().String()) } } func TestLogStatsErrorStr(t *testing.T) { - logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) if logStats.ErrorStr() != "" { t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) } @@ -216,7 +210,7 @@ func TestLogStatsErrorStr(t *testing.T) { } func TestLogStatsMirrorTargetErrorStr(t *testing.T) { - logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) if logStats.MirrorTargetErrorStr() != "" { t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) } @@ -228,7 +222,7 @@ func TestLogStatsMirrorTargetErrorStr(t *testing.T) { } func TestLogStatsRemoteAddrUsername(t *testing.T) { - logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) addr, user := logStats.RemoteAddrUsername() if addr != "" { t.Fatalf("remote addr should be empty") @@ -244,7 +238,7 @@ func TestLogStatsRemoteAddrUsername(t *testing.T) { User: username, } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) addr, user = logStats.RemoteAddrUsername() if addr != remoteAddr { t.Fatalf("expected to get remote addr: %s, but got: %s", remoteAddr, addr) @@ -253,3 +247,18 @@ func TestLogStatsRemoteAddrUsername(t *testing.T) { t.Fatalf("expected to get username: %s, but got: %s", username, user) } } + +// TestLogStatsErrorsOnly tests that LogStats only logs errors when the query log mode is set to errors only for VTGate. +func TestLogStatsErrorsOnly(t *testing.T) { + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, streamlog.NewQueryLogConfigForTest()) + logStats.Config.Mode = streamlog.QueryLogModeError + + // no error, should not log + logOutput := testFormat(t, logStats, url.Values{}) + assert.Empty(t, logOutput) + + // error, should log + logStats.Error = errors.New("test error") + logOutput = testFormat(t, logStats, url.Values{}) + assert.Contains(t, logOutput, "test error") +} diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index db7923c09f0..75a7af6bf2a 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -131,7 +131,7 @@ func (e *Executor) newExecute( // the vtgate to clear the cached plans when processing the new serving vschema. // When buffering ends, many queries might be getting planned at the same time and we then // take full advatange of the cached plan. - plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.normalize, logStats) + plan, err = e.getPlan(ctx, vcursor, query, stmt, comments, bindVars, reservedVars, e.config.Normalize, logStats) execStart := e.logPlanningFinished(logStats, plan) if err != nil { diff --git a/go/vt/vtgate/querylog.go b/go/vt/vtgate/querylog.go index bddc799363d..686ff419764 100644 --- a/go/vt/vtgate/querylog.go +++ b/go/vt/vtgate/querylog.go @@ -49,8 +49,8 @@ func (e *Executor) defaultQueryLogger() error { queryzHandler(e, w, r) }) - if queryLogToFile != "" { - _, err := queryLogger.LogToFile(queryLogToFile, streamlog.GetFormatter(queryLogger)) + if e.config.QueryLogToFile != "" { + _, err := queryLogger.LogToFile(e.config.QueryLogToFile, streamlog.GetFormatter(queryLogger)) if err != nil { return err } diff --git a/go/vt/vtgate/querylogz_test.go b/go/vt/vtgate/querylogz_test.go index 9236b2ac840..2bc5d7e2295 100644 --- a/go/vt/vtgate/querylogz_test.go +++ b/go/vt/vtgate/querylogz_test.go @@ -26,16 +26,17 @@ import ( "testing" "time" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/logstats" - "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" ) func TestQuerylogzHandlerFormatting(t *testing.T) { req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil) - logStats := logstats.NewLogStats(context.Background(), "Execute", "select name, 'inject ' from test_table limit 1000", "suuid", nil) + logStats := logstats.NewLogStats(context.Background(), "Execute", + "select name, 'inject ' from test_table limit 1000", "suuid", nil, streamlog.NewQueryLogConfigForTest()) logStats.StmtType = "select" logStats.RowsAffected = 1000 logStats.ShardQueries = 1 @@ -139,8 +140,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) { checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) // ensure querylogz is not affected by the filter tag - streamlog.SetQueryLogFilterTag("XXX_SKIP_ME") - defer func() { streamlog.SetQueryLogFilterTag("") }() + logStats.Config.FilterTag = "XXX_SKIP_ME" ch = make(chan *logstats.LogStats, 1) ch <- logStats querylogzHandler(ch, response, req, sqlparser.NewTestParser()) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 4a880202b05..8b8302d77d4 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -352,22 +352,15 @@ func Init( plans := DefaultPlanCache() - executor := NewExecutor( - ctx, - env, - serv, - cell, - resolver, - normalizeQueries, - warnShardedOnly, - streamBufferSize, - plans, - si, - noScatter, - pv, - warmingReadsPercent, - dynamicConfig, - ) + eConfig := ExecutorConfig{ + Normalize: normalizeQueries, + StreamSize: streamBufferSize, + AllowScatter: !noScatter, + WarmingReadsPercent: warmingReadsPercent, + QueryLogToFile: queryLogToFile, + } + + executor := NewExecutor(ctx, env, serv, cell, resolver, eConfig, warnShardedOnly, plans, si, pv, dynamicConfig) if err := executor.defaultQueryLogger(); err != nil { log.Fatalf("error initializing query logger: %v", err) diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go index c113ed16308..f05f63474d0 100644 --- a/go/vt/vtgate/vtgate_test.go +++ b/go/vt/vtgate/vtgate_test.go @@ -715,8 +715,7 @@ func createVtgateEnv(t testing.TB) (*VTGate, *sandboxconn.SandboxConn, context.C cell := "aa" sb := createSandbox(KsTestSharded) sb.ShardSpec = "-" - executor, _, _, sbc, ctx := createExecutorEnv(t) - executor.normalize = normalizeQueries + executor, _, _, sbc, ctx := createExecutorEnvWithConfig(t, createExecutorConfigWithNormalizer()) vsm := newVStreamManager(executor.resolver.resolver, executor.serv, cell) vtg := newVTGate(executor, executor.resolver, vsm, nil, executor.scatterConn.gateway) diff --git a/go/vt/vttablet/filelogger/filelogger_test.go b/go/vt/vttablet/filelogger/filelogger_test.go index f747ebba93b..47602e1a855 100644 --- a/go/vt/vttablet/filelogger/filelogger_test.go +++ b/go/vt/vttablet/filelogger/filelogger_test.go @@ -75,11 +75,6 @@ func TestFileLog(t *testing.T) { // TestFileLog sends a stream of five query records to the plugin, and verifies that they are logged. func TestFileLogRedacted(t *testing.T) { - streamlog.SetRedactDebugUIQueries(true) - defer func() { - streamlog.SetRedactDebugUIQueries(false) - }() - dir := t.TempDir() logPath := path.Join(dir, "test.log") @@ -94,7 +89,9 @@ func TestFileLogRedacted(t *testing.T) { log1 := &tabletenv.LogStats{ Ctx: ctx, OriginalSQL: "test 1", + Config: streamlog.NewQueryLogConfigForTest(), } + log1.Config.RedactDebugUIQueries = true log1.AddRewrittenSQL("test 1 PII", time.Time{}) log1.MysqlResponseTime = 0 tabletenv.StatsLogger.Send(log1) @@ -102,7 +99,9 @@ func TestFileLogRedacted(t *testing.T) { log2 := &tabletenv.LogStats{ Ctx: ctx, OriginalSQL: "test 2", + Config: streamlog.NewQueryLogConfigForTest(), } + log2.Config.RedactDebugUIQueries = true log2.AddRewrittenSQL("test 2 PII", time.Time{}) log2.MysqlResponseTime = 0 tabletenv.StatsLogger.Send(log2) diff --git a/go/vt/vttablet/sysloglogger/sysloglogger_test.go b/go/vt/vttablet/sysloglogger/sysloglogger_test.go index 3a06b98ed1c..835f6b71b48 100644 --- a/go/vt/vttablet/sysloglogger/sysloglogger_test.go +++ b/go/vt/vttablet/sysloglogger/sysloglogger_test.go @@ -50,13 +50,14 @@ func (fw *fakeWriter) Info(msg string) error { return fw.write(syslog.LOG_INFO, func (fw *fakeWriter) Close() error { return nil } // mockLogStats generates a dummy tabletserver.LogStats message for testing. -func mockLogStats(originalSQL string) *tabletenv.LogStats { - logstats := tabletenv.NewLogStats(context.Background(), "Execute") +func mockLogStats(originalSQL string, redactUIQuery bool) *tabletenv.LogStats { + logstats := tabletenv.NewLogStats(context.Background(), "Execute", streamlog.NewQueryLogConfigForTest()) logstats.StartTime = time.Time{} logstats.PlanType = "PASS_SELECT" logstats.OriginalSQL = originalSQL logstats.AddRewrittenSQL(originalSQL, time.Now()) logstats.MysqlResponseTime = 0 + logstats.Config.RedactDebugUIQueries = redactUIQuery return logstats } @@ -112,11 +113,11 @@ func TestSyslog(t *testing.T) { }() // Send fake messages to the mock channel, and then close the channel to end the plugin loop - ch <- mockLogStats("select 1") - ch <- mockLogStats("select 2") - ch <- mockLogStats("select 3") - ch <- mockLogStats("select 4") - ch <- mockLogStats("select 5") + ch <- mockLogStats("select 1", false) + ch <- mockLogStats("select 2", false) + ch <- mockLogStats("select 3", false) + ch <- mockLogStats("select 4", false) + ch <- mockLogStats("select 5", false) close(ch) <-syncChannel @@ -142,10 +143,6 @@ func TestSyslog(t *testing.T) { // when redaction is enabled. func TestSyslogRedacted(t *testing.T) { // Overwrite the usual syslog writer and StatsLogger subscription channel with mocks - streamlog.SetRedactDebugUIQueries(true) - defer func() { - streamlog.SetRedactDebugUIQueries(false) - }() mock := newFakeWriter() writer = mock ch = make(chan *tabletenv.LogStats, 10) @@ -158,11 +155,11 @@ func TestSyslogRedacted(t *testing.T) { }() // Send fake messages to the mock channel, and then close the channel to end the plugin loop - ch <- mockLogStats("select 1") - ch <- mockLogStats("select 2") - ch <- mockLogStats("select 3") - ch <- mockLogStats("select 4") - ch <- mockLogStats("select 5") + ch <- mockLogStats("select 1", true) + ch <- mockLogStats("select 2", true) + ch <- mockLogStats("select 3", true) + ch <- mockLogStats("select 4", true) + ch <- mockLogStats("select 5", true) close(ch) <-syncChannel @@ -198,10 +195,10 @@ func TestSyslogWithBadData(t *testing.T) { }() // Send 5 records for logging, one of which is bad - ch <- mockLogStats("select 1") - ch <- mockLogStats("select 2") - ch <- mockLogStats("select 3") - ch <- mockLogStats("select 5") + ch <- mockLogStats("select 1", false) + ch <- mockLogStats("select 2", false) + ch <- mockLogStats("select 3", false) + ch <- mockLogStats("select 5", false) close(ch) <-syncChannel @@ -239,11 +236,11 @@ func TestSyslogWithInterruptedConnection(t *testing.T) { close(syncChannel) }() - ch <- mockLogStats("select 1") - ch <- mockLogStats("select 2") - ch <- mockLogStats("select 3") - ch <- mockLogStats("select 4") // This record will get dropped due to a syslog outage - ch <- mockLogStats("select 5") + ch <- mockLogStats("select 1", false) + ch <- mockLogStats("select 2", false) + ch <- mockLogStats("select 3", false) + ch <- mockLogStats("select 4", false) // This record will get dropped due to a syslog outage + ch <- mockLogStats("select 5", false) close(ch) <-syncChannel diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index b21667392d6..f83d7ff9006 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -31,6 +31,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/streamlog" + "vitess.io/vitess/go/event/syslogger" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/sqlerror" @@ -714,7 +716,7 @@ func TestNoTwopc(t *testing.T) { func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB, closer func()) { db = setUpQueryExecutorTest(t) - logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") + logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor", streamlog.NewQueryLogConfigForTest()) tsv = newTestTabletServer(ctx, smallTxPool, db) cfg := tabletenv.NewDefaultConfig() cfg.DB = newDBConfigs(db) @@ -741,7 +743,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv // newShortAgeExecutor is same as newTestTxExecutor, but shorter transaction abandon age. func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) { db = setUpQueryExecutorTest(t) - logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") + logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor", streamlog.NewQueryLogConfigForTest()) tsv = newTestTabletServer(ctx, smallTxPool|shortTwopcAge, db) db.AddQueryPattern("insert into _vt\\.redo_state\\(dtid, state, time_created\\) values \\(_binary'aa', 1,.*", &sqltypes.Result{}) db.AddQueryPattern("insert into _vt\\.redo_statement.*", &sqltypes.Result{}) @@ -758,7 +760,7 @@ func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, ts // newNoTwopcExecutor is same as newTestTxExecutor, but 2pc disabled. func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) { db = setUpQueryExecutorTest(t) - logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") + logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor", streamlog.NewQueryLogConfigForTest()) tsv = newTestTabletServer(ctx, noTwopc, db) return &DTExecutor{ ctx: ctx, diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index d0542165878..4168a11f584 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -196,6 +196,8 @@ type QueryEngine struct { // Loggers accessCheckerLogger *logutil.ThrottledLogger + + redactUIQuery bool } // NewQueryEngine creates a new QueryEngine. @@ -209,6 +211,7 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { se: se, queryRuleSources: rules.NewMap(), enablePerWorkloadTableMetrics: config.EnablePerWorkloadTableMetrics, + redactUIQuery: streamlog.NewQueryLogConfigForTest().RedactDebugUIQueries, } // Cache for query plans: user configured size with a doorkeeper by default to prevent one-off queries @@ -737,7 +740,7 @@ func (qe *QueryEngine) handleHTTPConsolidations(response http.ResponseWriter, re response.Write([]byte(fmt.Sprintf("Length: %d\n", len(items)))) for _, v := range items { var query string - if streamlog.GetRedactDebugUIQueries() { + if qe.redactUIQuery { query, _ = qe.env.Environment().Parser().RedactSQLQuery(v.Query) } else { query = v.Query diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index a1c293ea8ba..c86e8aa163a 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -108,7 +108,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) _, err := qe.GetPlan(ctx, logStats, "", false) require.EqualError(t, err, "Query was empty") } @@ -195,7 +195,7 @@ func TestQueryPlanCache(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) initialHits := qe.queryEnginePlanCacheHits.Get() initialMisses := qe.queryEnginePlanCacheMisses.Get() @@ -244,7 +244,7 @@ func TestNoQueryPlanCache(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true) if err != nil { @@ -273,7 +273,7 @@ func TestNoQueryPlanCacheDirective(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false) if err != nil { @@ -301,7 +301,7 @@ func TestStreamQueryPlanCache(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, false) require.NoError(t, err) @@ -325,7 +325,7 @@ func TestNoStreamQueryPlanCache(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, true) require.NoError(t, err) require.NotNil(t, firstPlan) @@ -349,7 +349,7 @@ func TestNoStreamQueryPlanCacheDirective(t *testing.T) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, false) require.NoError(t, err) require.NotNil(t, firstPlan) @@ -369,7 +369,7 @@ func TestStatsURL(t *testing.T) { defer qe.Close() // warm up cache ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) qe.GetPlan(ctx, logStats, query, false) request, _ := http.NewRequest("GET", "/debug/tablet_plans", nil) @@ -425,18 +425,12 @@ func runConsolidatedQuery(t *testing.T, sql string) *QueryEngine { } func TestConsolidationsUIRedaction(t *testing.T) { - // Reset to default redaction state. - defer func() { - streamlog.SetRedactDebugUIQueries(false) - }() - request, _ := http.NewRequest("GET", "/debug/consolidations", nil) sql := "select * from test_db_01 where col = 'secret'" redactedSQL := "select * from test_db_01 where col = :col" // First with the redaction off - streamlog.SetRedactDebugUIQueries(false) unRedactedResponse := httptest.NewRecorder() qe := runConsolidatedQuery(t, sql) @@ -446,7 +440,7 @@ func TestConsolidationsUIRedaction(t *testing.T) { } // Now with the redaction on - streamlog.SetRedactDebugUIQueries(true) + qe.redactUIQuery = true redactedResponse := httptest.NewRecorder() qe.handleHTTPConsolidations(redactedResponse, request) @@ -473,7 +467,7 @@ func BenchmarkPlanCacheThroughput(b *testing.B) { defer qe.Close() ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) for i := 0; i < b.N; i++ { query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.IntN(500)) @@ -503,7 +497,7 @@ func benchmarkPlanCache(b *testing.B, db *fakesqldb.DB, par int) { b.SetParallelism(par) b.RunParallel(func(pb *testing.PB) { ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) for pb.Next() { query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.IntN(500)) @@ -618,7 +612,7 @@ func TestPlanCachePollution(t *testing.T) { runner := func(totalQueries uint64, stats *Stats, sample func() string) { for i := uint64(0); i < totalQueries; i++ { ctx := context.Background() - logStats := tabletenv.NewLogStats(ctx, "GetPlanStats") + logStats := tabletenv.NewLogStats(ctx, "GetPlanStats", streamlog.NewQueryLogConfigForTest()) query := sample() start := time.Now() diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index ade79ecaef5..6f6becd4038 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -28,6 +28,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/streamlog" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtenv" @@ -840,7 +842,7 @@ func TestQueryExecutorMessageStreamACL(t *testing.T) { ctx: ctx, query: "stream from msg", plan: plan, - logStats: tabletenv.NewLogStats(ctx, "TestQueryExecutor"), + logStats: tabletenv.NewLogStats(ctx, "TestQueryExecutor", streamlog.NewQueryLogConfigForTest()), tsv: tsv, } @@ -1556,7 +1558,7 @@ func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 { } func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor { - logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutor") + logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutor", streamlog.NewQueryLogConfigForTest()) plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false) if err != nil { panic(err) @@ -1573,7 +1575,7 @@ func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, tx } func newTestQueryExecutorStreaming(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor { - logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutorStreaming") + logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutorStreaming", streamlog.NewQueryLogConfigForTest()) plan, err := tsv.qe.GetStreamPlan(ctx, logStats, sql, false) if err != nil { panic(err) diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index 60fac1ea3af..7263bea3afc 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -63,6 +63,8 @@ type QueryList struct { parser *sqlparser.Parser ca ClusterActionState + + redactUIQuery bool } type ClusterActionState int @@ -76,10 +78,11 @@ const ( // NewQueryList creates a new QueryList func NewQueryList(name string, parser *sqlparser.Parser) *QueryList { return &QueryList{ - name: name, - queryDetails: make(map[int64][]*QueryDetail), - parser: parser, - ca: ClusterActionNotInProgress, + name: name, + queryDetails: make(map[int64][]*QueryDetail), + parser: parser, + ca: ClusterActionNotInProgress, + redactUIQuery: streamlog.GetQueryLogConfig().RedactDebugUIQueries, } } @@ -186,7 +189,7 @@ func (ql *QueryList) AppendQueryzRows(rows []QueryDetailzRow) []QueryDetailzRow for _, qds := range ql.queryDetails { for _, qd := range qds { query := qd.conn.Current() - if streamlog.GetRedactDebugUIQueries() { + if ql.redactUIQuery { query, _ = ql.parser.RedactSQLQuery(query) } row := QueryDetailzRow{ diff --git a/go/vt/vttablet/tabletserver/querylogz_test.go b/go/vt/vttablet/tabletserver/querylogz_test.go index ee26437f330..7f865541295 100644 --- a/go/vt/vttablet/tabletserver/querylogz_test.go +++ b/go/vt/vttablet/tabletserver/querylogz_test.go @@ -35,7 +35,7 @@ import ( func TestQuerylogzHandler(t *testing.T) { req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil) - logStats := tabletenv.NewLogStats(context.Background(), "Execute") + logStats := tabletenv.NewLogStats(context.Background(), "Execute", streamlog.NewQueryLogConfigForTest()) logStats.PlanType = planbuilder.PlanSelect.String() logStats.OriginalSQL = "select name, 'inject ' from test_table limit 1000" logStats.RowsAffected = 1000 @@ -144,8 +144,7 @@ func TestQuerylogzHandler(t *testing.T) { checkQuerylogzHasStats(t, slowQueryPattern, logStats, body) // ensure querylogz is not affected by the filter tag - streamlog.SetQueryLogFilterTag("XXX_SKIP_ME") - defer func() { streamlog.SetQueryLogFilterTag("") }() + logStats.Config.FilterTag = "XXX_SKIP_ME" ch = make(chan *tabletenv.LogStats, 1) ch <- logStats querylogzHandler(ch, response, req, sqlparser.NewTestParser()) diff --git a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go index caa519cc477..c8d9f634384 100644 --- a/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go +++ b/go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "vitess.io/vitess/go/streamlog" querypb "vitess.io/vitess/go/vt/proto/query" "github.com/stretchr/testify/require" @@ -126,7 +127,7 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string defer wg.Done() exporter := servenv.NewExporter("ConsolidatorTest", "") timings := exporter.NewTimings("ConsolidatorWaits", "", "StreamConsolidations") - logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation") + logStats := tabletenv.NewLogStats(context.Background(), "StreamConsolidation", streamlog.NewQueryLogConfigForTest()) query, callback := generateCallback(worker) start := time.Now() err := ct.cc.Consolidate(timings, logStats, query, func(result *sqltypes.Result) error { diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 42cc300f92d..08bf0b1d7c8 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -278,11 +278,12 @@ func Init() { currentConfig.Healthcheck.UnhealthyThreshold = unhealthyThreshold currentConfig.GracePeriods.Transition = transitionGracePeriod - switch streamlog.GetQueryLogFormat() { + logFormat := streamlog.GetQueryLogConfig().Format + switch logFormat { case streamlog.QueryLogFormatText: case streamlog.QueryLogFormatJSON: default: - log.Exitf("Invalid querylog-format value %v: must be either text or json", streamlog.GetQueryLogFormat()) + log.Exitf("Invalid querylog-format value %v: must be either text or json", logFormat) } if queryLogHandler != "" { diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats.go b/go/vt/vttablet/tabletserver/tabletenv/logstats.go index ad7e09de169..40156f3bcdc 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats.go @@ -43,6 +43,8 @@ const ( // LogStats records the stats for a single query type LogStats struct { + Config streamlog.QueryLogConfig + Ctx context.Context Method string Target *querypb.Target @@ -66,11 +68,12 @@ type LogStats struct { // NewLogStats constructs a new LogStats with supplied Method and ctx // field values, and the StartTime field set to the present time. -func NewLogStats(ctx context.Context, methodName string) *LogStats { +func NewLogStats(ctx context.Context, methodName string, config streamlog.QueryLogConfig) *LogStats { return &LogStats{ Ctx: ctx, Method: methodName, StartTime: time.Now(), + Config: config, } } @@ -177,17 +180,16 @@ func (stats *LogStats) CallInfo() (string, string) { // Logf formats the log record to the given writer, either as // tab-separated list of logged fields or as JSON. func (stats *LogStats) Logf(w io.Writer, params url.Values) error { - if !streamlog.ShouldEmitLog(stats.OriginalSQL, uint64(stats.RowsAffected), uint64(len(stats.Rows))) { + if !stats.Config.ShouldEmitLog(stats.OriginalSQL, uint64(stats.RowsAffected), uint64(len(stats.Rows)), stats.Error != nil) { return nil } - redacted := streamlog.GetRedactDebugUIQueries() _, fullBindParams := params["full"] // TODO: remove username here we fully enforce immediate caller id callInfo, username := stats.CallInfo() log := logstats.NewLogger() - log.Init(streamlog.GetQueryLogFormat() == streamlog.QueryLogFormatJSON) + log.Init(stats.Config.Format == streamlog.QueryLogFormatJSON) log.Key("Method") log.StringUnquoted(stats.Method) log.Key("CallInfo") @@ -209,7 +211,7 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Key("OriginalSQL") log.String(stats.OriginalSQL) log.Key("BindVars") - if redacted { + if stats.Config.RedactDebugUIQueries { log.Redacted() } else { log.BindVariables(stats.BindVariables, fullBindParams) @@ -217,7 +219,7 @@ func (stats *LogStats) Logf(w io.Writer, params url.Values) error { log.Key("Queries") log.Int(int64(stats.NumberOfQueries)) log.Key("RewrittenSQL") - if redacted { + if stats.Config.RedactDebugUIQueries { log.Redacted() } else { log.String(stats.RewrittenSQL()) diff --git a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go index 7412a0a436c..4c31b890cca 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/logstats_test.go @@ -26,6 +26,8 @@ import ( "time" "github.com/google/safehtml/testconversions" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/streamlog" @@ -35,7 +37,7 @@ import ( ) func TestLogStats(t *testing.T) { - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.QueryLogConfig{}) logStats.AddRewrittenSQL("sql1", time.Now()) if !strings.Contains(logStats.RewrittenSQL(), "sql1") { @@ -59,7 +61,7 @@ func testFormat(stats *LogStats, params url.Values) string { } func TestLogStatsFormat(t *testing.T) { - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) logStats.OriginalSQL = "sql" @@ -70,90 +72,65 @@ func TestLogStatsFormat(t *testing.T) { logStats.Rows = [][]sqltypes.Value{{sqltypes.NewVarBinary("a")}} params := map[string][]string{"full": {}} - streamlog.SetRedactDebugUIQueries(false) - streamlog.SetQueryLogFormat("text") - got := testFormat(logStats, url.Values(params)) + got := testFormat(logStats, params) want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t12345\t1\t\"\"\t\n" - if got != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) - } + assert.Equal(t, want, got) - streamlog.SetRedactDebugUIQueries(true) - streamlog.SetQueryLogFormat("text") - got = testFormat(logStats, url.Values(params)) + logStats.Config.RedactDebugUIQueries = true + + got = testFormat(logStats, params) want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql\"\t\"[REDACTED]\"\t1\t\"[REDACTED]\"\tmysql\t0.000000\t0.000000\t0\t12345\t1\t\"\"\t\n" - if got != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) - } + assert.Equal(t, want, got) - streamlog.SetRedactDebugUIQueries(false) - streamlog.SetQueryLogFormat("json") - got = testFormat(logStats, url.Values(params)) + logStats.Config.RedactDebugUIQueries = false + logStats.Config.Format = streamlog.QueryLogFormatJSON + + got = testFormat(logStats, params) var parsed map[string]any err := json.Unmarshal([]byte(got), &parsed) if err != nil { t.Errorf("logstats format: error unmarshaling json: %v -- got:\n%v", err, got) } formatted, err := json.MarshalIndent(parsed, "", " ") - if err != nil { - t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) - } + require.NoError(t, err) want = "{\n \"BindVars\": {\n \"intVal\": {\n \"type\": \"INT64\",\n \"value\": 1\n }\n },\n \"CallInfo\": \"\",\n \"ConnWaitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"MysqlTime\": 0,\n \"OriginalSQL\": \"sql\",\n \"PlanType\": \"\",\n \"Queries\": 1,\n \"QuerySources\": \"mysql\",\n \"ResponseSize\": 1,\n \"RewrittenSQL\": \"sql with pii\",\n \"RowsAffected\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"TotalTime\": 1.000001,\n \"TransactionID\": 12345,\n \"Username\": \"\"\n}" - if string(formatted) != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) - } + assert.Equal(t, want, string(formatted)) + + logStats.Config.RedactDebugUIQueries = true + logStats.Config.Format = streamlog.QueryLogFormatJSON - streamlog.SetRedactDebugUIQueries(true) - streamlog.SetQueryLogFormat("json") - got = testFormat(logStats, url.Values(params)) + got = testFormat(logStats, params) err = json.Unmarshal([]byte(got), &parsed) - if err != nil { - t.Errorf("logstats format: error unmarshaling json: %v -- got:\n%v", err, got) - } + require.NoError(t, err) formatted, err = json.MarshalIndent(parsed, "", " ") - if err != nil { - t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) - } + require.NoError(t, err) want = "{\n \"BindVars\": \"[REDACTED]\",\n \"CallInfo\": \"\",\n \"ConnWaitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"MysqlTime\": 0,\n \"OriginalSQL\": \"sql\",\n \"PlanType\": \"\",\n \"Queries\": 1,\n \"QuerySources\": \"mysql\",\n \"ResponseSize\": 1,\n \"RewrittenSQL\": \"[REDACTED]\",\n \"RowsAffected\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"TotalTime\": 1.000001,\n \"TransactionID\": 12345,\n \"Username\": \"\"\n}" - if string(formatted) != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) - } - - streamlog.SetRedactDebugUIQueries(false) + assert.Equal(t, want, string(formatted)) // Make sure formatting works for string bind vars. We can't do this as part of a single // map because the output ordering is undefined. logStats.BindVariables = map[string]*querypb.BindVariable{"strVal": sqltypes.StringBindVariable("abc")} + logStats.Config.RedactDebugUIQueries = false + logStats.Config.Format = streamlog.QueryLogFormatText - streamlog.SetQueryLogFormat("text") - got = testFormat(logStats, url.Values(params)) + got = testFormat(logStats, params) want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql\"\t{\"strVal\": {\"type\": \"VARCHAR\", \"value\": \"abc\"}}\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t12345\t1\t\"\"\t\n" - if got != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) - } + assert.Equal(t, want, got) + + logStats.Config.RedactDebugUIQueries = false + logStats.Config.Format = streamlog.QueryLogFormatJSON - streamlog.SetQueryLogFormat("json") - got = testFormat(logStats, url.Values(params)) + got = testFormat(logStats, params) err = json.Unmarshal([]byte(got), &parsed) - if err != nil { - t.Errorf("logstats format: error unmarshaling json: %v -- got:\n%v", err, got) - } + require.NoError(t, err) formatted, err = json.MarshalIndent(parsed, "", " ") - if err != nil { - t.Errorf("logstats format: error marshaling json: %v -- got:\n%v", err, got) - } + require.NoError(t, err) want = "{\n \"BindVars\": {\n \"strVal\": {\n \"type\": \"VARCHAR\",\n \"value\": \"abc\"\n }\n },\n \"CallInfo\": \"\",\n \"ConnWaitTime\": 0,\n \"Effective Caller\": \"\",\n \"End\": \"2017-01-01 01:02:04.000001\",\n \"Error\": \"\",\n \"ImmediateCaller\": \"\",\n \"Method\": \"test\",\n \"MysqlTime\": 0,\n \"OriginalSQL\": \"sql\",\n \"PlanType\": \"\",\n \"Queries\": 1,\n \"QuerySources\": \"mysql\",\n \"ResponseSize\": 1,\n \"RewrittenSQL\": \"sql with pii\",\n \"RowsAffected\": 0,\n \"Start\": \"2017-01-01 01:02:03.000000\",\n \"TotalTime\": 1.000001,\n \"TransactionID\": 12345,\n \"Username\": \"\"\n}" - if string(formatted) != want { - t.Errorf("logstats format: got:\n%q\nwant:\n%v\n", string(formatted), want) - } - - streamlog.SetQueryLogFormat("text") + assert.Equal(t, want, string(formatted)) } func TestLogStatsFilter(t *testing.T) { - defer func() { streamlog.SetQueryLogFilterTag("") }() - - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) logStats.OriginalSQL = "sql /* LOG_THIS_QUERY */" @@ -163,21 +140,21 @@ func TestLogStatsFilter(t *testing.T) { logStats.Rows = [][]sqltypes.Value{{sqltypes.NewVarBinary("a")}} params := map[string][]string{"full": {}} - got := testFormat(logStats, url.Values(params)) + got := testFormat(logStats, params) want := "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t0\t1\t\"\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } - streamlog.SetQueryLogFilterTag("LOG_THIS_QUERY") - got = testFormat(logStats, url.Values(params)) + logStats.Config.FilterTag = "LOG_THIS_QUERY" + got = testFormat(logStats, params) want = "test\t\t\t''\t''\t2017-01-01 01:02:03.000000\t2017-01-01 01:02:04.000001\t1.000001\t\t\"sql /* LOG_THIS_QUERY */\"\t{\"intVal\": {\"type\": \"INT64\", \"value\": 1}}\t1\t\"sql with pii\"\tmysql\t0.000000\t0.000000\t0\t0\t1\t\"\"\t\n" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) } - streamlog.SetQueryLogFilterTag("NOT_THIS_QUERY") - got = testFormat(logStats, url.Values(params)) + logStats.Config.FilterTag = "NOT_THIS_QUERY" + got = testFormat(logStats, params) want = "" if got != want { t.Errorf("logstats format: got:\n%q\nwant:\n%q\n", got, want) @@ -185,7 +162,7 @@ func TestLogStatsFilter(t *testing.T) { } func TestLogStatsFormatQuerySources(t *testing.T) { - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) if logStats.FmtQuerySources() != "none" { t.Fatalf("should return none since log stats does not have any query source, but got: %s", logStats.FmtQuerySources()) } @@ -207,14 +184,14 @@ func TestLogStatsContextHTML(t *testing.T) { Html: testconversions.MakeHTMLForTest(html), } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats := NewLogStats(ctx, "test") + logStats := NewLogStats(ctx, "test", streamlog.NewQueryLogConfigForTest()) if logStats.ContextHTML().String() != html { t.Fatalf("expect to get html: %s, but got: %s", html, logStats.ContextHTML().String()) } } func TestLogStatsErrorStr(t *testing.T) { - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) if logStats.ErrorStr() != "" { t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) } @@ -226,7 +203,7 @@ func TestLogStatsErrorStr(t *testing.T) { } func TestLogStatsCallInfo(t *testing.T) { - logStats := NewLogStats(context.Background(), "test") + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) caller, user := logStats.CallInfo() if caller != "" { t.Fatalf("caller should be empty") @@ -243,7 +220,7 @@ func TestLogStatsCallInfo(t *testing.T) { User: username, } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats = NewLogStats(ctx, "test") + logStats = NewLogStats(ctx, "test", streamlog.NewQueryLogConfigForTest()) caller, user = logStats.CallInfo() wantCaller := remoteAddr + ":FakeExecute(fakeRPC)" if caller != wantCaller { @@ -253,3 +230,18 @@ func TestLogStatsCallInfo(t *testing.T) { t.Fatalf("expected to get username: %s, but got: %s", username, user) } } + +// TestLogStatsErrorsOnly tests that LogStats only logs errors when the query log mode is set to errors only for VTTablet. +func TestLogStatsErrorsOnly(t *testing.T) { + logStats := NewLogStats(context.Background(), "test", streamlog.NewQueryLogConfigForTest()) + logStats.Config.Mode = streamlog.QueryLogModeError + + // no error, should not log + logOutput := testFormat(logStats, url.Values{}) + assert.Empty(t, logOutput) + + // error, should log + logStats.Error = errors.New("test error") + logOutput = testFormat(logStats, url.Values{}) + assert.Contains(t, logOutput, "test error") +} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index deeac10bd05..30f73d2d818 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/pools/smartconnpool" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" @@ -1551,7 +1552,7 @@ func (tsv *TabletServer) execRequest( defer span.Finish() - logStats := tabletenv.NewLogStats(ctx, requestName) + logStats := tabletenv.NewLogStats(ctx, requestName, streamlog.GetQueryLogConfig()) logStats.Target = target logStats.OriginalSQL = sql logStats.BindVariables = sqltypes.CopyBindVariables(bindVariables) @@ -1875,7 +1876,9 @@ func (tsv *TabletServer) registerQuerylogzHandler() { } func (tsv *TabletServer) registerTxlogzHandler() { - tsv.exporter.HandleFunc("/txlogz", txlogzHandler) + tsv.exporter.HandleFunc("/txlogz", func(w http.ResponseWriter, r *http.Request) { + txlogzHandler(w, r, streamlog.GetQueryLogConfig().RedactDebugUIQueries) + }) } func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { @@ -1890,7 +1893,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { func (tsv *TabletServer) registerTwopczHandler() { tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() - txe := NewDTExecutor(ctx, tabletenv.NewLogStats(ctx, "twopcz"), tsv.te, tsv.qe, tsv.getShard) + txe := NewDTExecutor(ctx, tabletenv.NewLogStats(ctx, "twopcz", streamlog.GetQueryLogConfig()), tsv.te, tsv.qe, tsv.getShard) twopczHandler(txe, w, r) }) } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 621941224a9..e4faf4ee6c9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/mysql/config" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/vtenv" @@ -1659,7 +1660,7 @@ func TestPurgeMessages(t *testing.T) { func TestHandleExecUnknownError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logStats := tabletenv.NewLogStats(ctx, "TestHandleExecError") + logStats := tabletenv.NewLogStats(ctx, "TestHandleExecError", streamlog.NewQueryLogConfigForTest()) cfg := tabletenv.NewDefaultConfig() srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type") tsv := NewTabletServer(ctx, vtenv.NewTestEnv(), "TabletServerTest", cfg, memorytopo.NewServer(ctx, ""), &topodatapb.TabletAlias{}, srvTopoCounts) @@ -1676,7 +1677,7 @@ func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) { defer cancel() tl := newTestLogger() defer tl.Close() - logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation") + logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation", streamlog.NewQueryLogConfigForTest()) env, err := vtenv.New(vtenv.Options{ MySQLServerVersion: config.DefaultMySQLVersion, TruncateErrLen: 32, diff --git a/go/vt/vttablet/tabletserver/txlogz.go b/go/vt/vttablet/tabletserver/txlogz.go index 8d1b88c8c85..9a60812ca0b 100644 --- a/go/vt/vttablet/tabletserver/txlogz.go +++ b/go/vt/vttablet/tabletserver/txlogz.go @@ -25,7 +25,6 @@ import ( "github.com/google/safehtml/template" "vitess.io/vitess/go/acl" - "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logz" @@ -76,13 +75,13 @@ var ( // Endpoint: /txlogz?timeout=%d&limit=%d // timeout: the txlogz will keep dumping transactions until timeout // limit: txlogz will keep dumping transactions until it hits the limit -func txlogzHandler(w http.ResponseWriter, req *http.Request) { +func txlogzHandler(w http.ResponseWriter, req *http.Request, redactUIQuery bool) { if err := acl.CheckAccessHTTP(req, acl.DEBUGGING); err != nil { acl.SendError(w, err) return } - if streamlog.GetRedactDebugUIQueries() { + if redactUIQuery { io.WriteString(w, ` diff --git a/go/vt/vttablet/tabletserver/txlogz_test.go b/go/vt/vttablet/tabletserver/txlogz_test.go index 8faec74d07b..4556665a52a 100644 --- a/go/vt/vttablet/tabletserver/txlogz_test.go +++ b/go/vt/vttablet/tabletserver/txlogz_test.go @@ -27,8 +27,6 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "vitess.io/vitess/go/vt/callerid" - - "vitess.io/vitess/go/streamlog" ) func testNotRedacted(t *testing.T, r *httptest.ResponseRecorder) { @@ -45,11 +43,10 @@ func testRedacted(t *testing.T, r *httptest.ResponseRecorder) { func testHandler(req *http.Request, t *testing.T) { // Test with redactions off to start - streamlog.SetRedactDebugUIQueries(false) response := httptest.NewRecorder() tabletenv.TxLogger.Send("test msg") - txlogzHandler(response, req) + txlogzHandler(response, req, false) if !strings.Contains(response.Body.String(), "error") { t.Fatalf("should show an error page since transaction log format is invalid.") } @@ -66,26 +63,22 @@ func testHandler(req *http.Request, t *testing.T) { txConn.txProps.EndTime = txConn.txProps.StartTime response = httptest.NewRecorder() tabletenv.TxLogger.Send(txConn) - txlogzHandler(response, req) + txlogzHandler(response, req, false) testNotRedacted(t, response) txConn.txProps.EndTime = txConn.txProps.StartTime.Add(time.Duration(2) * time.Second) response = httptest.NewRecorder() tabletenv.TxLogger.Send(txConn) - txlogzHandler(response, req) + txlogzHandler(response, req, false) testNotRedacted(t, response) txConn.txProps.EndTime = txConn.txProps.StartTime.Add(time.Duration(500) * time.Millisecond) response = httptest.NewRecorder() tabletenv.TxLogger.Send(txConn) - txlogzHandler(response, req) + txlogzHandler(response, req, false) testNotRedacted(t, response) // Test with redactions on - streamlog.SetRedactDebugUIQueries(true) - txlogzHandler(response, req) + txlogzHandler(response, req, true) testRedacted(t, response) - - // Reset to default redaction state - streamlog.SetRedactDebugUIQueries(false) } func TestTxlogzHandler(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go index 10428ed67c7..d8962f1c2b8 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer.go @@ -19,14 +19,13 @@ limitations under the License. package txserializer import ( + "context" "fmt" "net/http" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/streamlog" @@ -87,9 +86,10 @@ type TxSerializer struct { logQueueExceededDryRun *logutil.ThrottledLogger logGlobalQueueExceededDryRun *logutil.ThrottledLogger - mu sync.Mutex - queues map[string]*queue - globalSize int + mu sync.Mutex + queues map[string]*queue + globalSize int + redactUIQuery bool } // New returns a TxSerializer object. @@ -130,6 +130,7 @@ func New(env tabletenv.Env) *TxSerializer { logQueueExceededDryRun: logutil.NewThrottledLogger("HotRowProtection QueueExceeded DryRun", 5*time.Second), logGlobalQueueExceededDryRun: logutil.NewThrottledLogger("HotRowProtection GlobalQueueExceeded DryRun", 5*time.Second), queues: make(map[string]*queue), + redactUIQuery: streamlog.NewQueryLogConfigForTest().RedactDebugUIQueries, } } @@ -328,7 +329,7 @@ func (txs *TxSerializer) Pending(key string) int { // ServeHTTP lists the most recent, cached queries and their count. func (txs *TxSerializer) ServeHTTP(response http.ResponseWriter, request *http.Request) { - if streamlog.GetRedactDebugUIQueries() { + if txs.redactUIQuery { response.Write([]byte(` diff --git a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go index e1b4b5a7612..d53c434d9b6 100644 --- a/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go +++ b/go/vt/vttablet/tabletserver/txserializer/tx_serializer_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -71,17 +70,13 @@ func TestTxSerializer_NoHotRow(t *testing.T) { } func TestTxSerializerRedactDebugUI(t *testing.T) { - streamlog.SetRedactDebugUIQueries(true) - defer func() { - streamlog.SetRedactDebugUIQueries(false) - }() - cfg := tabletenv.NewDefaultConfig() cfg.HotRowProtection.MaxQueueSize = 1 cfg.HotRowProtection.MaxGlobalQueueSize = 1 cfg.HotRowProtection.MaxConcurrency = 5 txs := New(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TxSerializerTest")) resetVariables(txs) + txs.redactUIQuery = true done, waited, err := txs.Wait(context.Background(), "t1 where1", "t1") if err != nil {