diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 57eb907cf4d..020470fdfdf 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -21,6 +21,8 @@ Flags: --audit-purge-duration duration Duration for which audit logs are held before being purged. Should be in multiples of days (default 168h0m0s) --audit-to-backend Whether to store the audit log in the VTOrc database --audit-to-syslog Whether to store the audit log in the syslog + --backend-read-concurrency int Maximum concurrency for reads to the backend (default 32) + --backend-write-concurrency int Maximum concurrency for writes to the backend (default 24) --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index db367673aeb..a96465acac4 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -121,6 +121,24 @@ var ( }, ) + backendReadConcurrency = viperutil.Configure( + "backend-read-concurrency", + viperutil.Options[int64]{ + FlagName: "backend-read-concurrency", + Default: 32, + Dynamic: true, + }, + ) + + backendWriteConcurrency = viperutil.Configure( + "backend-write-concurrency", + viperutil.Options[int64]{ + FlagName: "backend-write-concurrency", + Default: 24, + Dynamic: true, + }, + ) + waitReplicasTimeout = viperutil.Configure( "wait-replicas-timeout", viperutil.Options[time.Duration]{ @@ -199,6 +217,8 @@ func registerFlags(fs *pflag.FlagSet) { fs.Bool("audit-to-backend", auditToBackend.Default(), "Whether to store the audit log in the VTOrc database") fs.Bool("audit-to-syslog", auditToSyslog.Default(), "Whether to store the audit log in the syslog") fs.Duration("audit-purge-duration", auditPurgeDuration.Default(), "Duration for which audit logs are held before being purged. Should be in multiples of days") + fs.Int64("backend-read-concurrency", backendReadConcurrency.Default(), "Maximum concurrency for reads to the backend") + fs.Int64("backend-write-concurrency", backendWriteConcurrency.Default(), "Maximum concurrency for writes to the backend") fs.Bool("prevent-cross-cell-failover", preventCrossCellFailover.Default(), "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover") fs.Duration("wait-replicas-timeout", waitReplicasTimeout.Default(), "Duration for which to wait for replica's to respond when issuing RPCs") fs.Duration("tolerable-replication-lag", tolerableReplicationLag.Default(), "Amount of replication lag that is considered acceptable for a tablet to be eligible for promotion when Vitess makes the choice of a new primary in PRS") @@ -218,6 +238,8 @@ func registerFlags(fs *pflag.FlagSet) { auditToBackend, auditToSyslog, auditPurgeDuration, + backendReadConcurrency, + backendWriteConcurrency, waitReplicasTimeout, tolerableReplicationLag, topoInformationRefreshDuration, @@ -303,6 +325,16 @@ func SetAuditPurgeDays(days int64) { auditPurgeDuration.Set(time.Duration(days) * 24 * time.Hour) } +// GetBackendReadConcurrency returns the max backend read concurrency. +func GetBackendReadConcurrency() int64 { + return backendReadConcurrency.Get() +} + +// GetBackendWriteConcurrency returns the max backend write concurrency. +func GetBackendWriteConcurrency() int64 { + return backendWriteConcurrency.Get() +} + // GetWaitReplicasTimeout is a getter function. func GetWaitReplicasTimeout() time.Duration { return waitReplicasTimeout.Get() diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 9e35e6e3e0b..7dab1669643 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -17,6 +17,7 @@ package inst import ( + "context" "encoding/json" "errors" "fmt" @@ -30,6 +31,7 @@ import ( "github.com/patrickmn/go-cache" "github.com/sjmudd/stopwatch" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" @@ -47,13 +49,11 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) -const ( - backendDBConcurrency = 20 -) +const maxBackendOpTime = time.Second * 5 var ( - instanceReadChan = make(chan bool, backendDBConcurrency) - instanceWriteChan = make(chan bool, backendDBConcurrency) + instanceReadSem = semaphore.NewWeighted(config.GetBackendReadConcurrency()) + instanceWriteSem = semaphore.NewWeighted(config.GetBackendWriteConcurrency()) ) var forgetAliases *cache.Cache @@ -88,7 +88,12 @@ func initializeInstanceDao() { func ExecDBWriteFunc(f func() error) error { m := query.NewMetric() - instanceWriteChan <- true + ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime) + defer cancel() + + if err := instanceWriteSem.Acquire(ctx, 1); err != nil { + return err + } m.WaitLatency = time.Since(m.Timestamp) // catch the exec time and error if there is one @@ -106,7 +111,7 @@ func ExecDBWriteFunc(f func() error) error { } m.ExecuteLatency = time.Since(m.Timestamp.Add(m.WaitLatency)) _ = backendWrites.Append(m) - <-instanceWriteChan // assume this takes no time + instanceWriteSem.Release(1) }() res := f() return res @@ -646,10 +651,16 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In } return instances, err } - instanceReadChan <- true - instances, err := readFunc() - <-instanceReadChan - return instances, err + + ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime) + defer cancel() + + if err := instanceReadSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer instanceReadSem.Release(1) + + return readFunc() } // ReadInstance reads an instance from the vtorc backend database