Skip to content

Commit

Permalink
vtorc: use golang.org/x/sync/semaphore, add flag for db concurrency`
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Feb 21, 2025
1 parent 2118bc3 commit 0edc405
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Flags:
--audit-purge-duration duration Duration for which audit logs are held before being purged. Should be in multiples of days (default 168h0m0s)
--audit-to-backend Whether to store the audit log in the VTOrc database
--audit-to-syslog Whether to store the audit log in the syslog
--backend-read-concurrency int Maximum concurrency for reads to the backend (default 32)
--backend-write-concurrency int Maximum concurrency for writes to the backend (default 24)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ var (
},
)

backendReadConcurrency = viperutil.Configure(
"backend-read-concurrency",
viperutil.Options[int64]{
FlagName: "backend-read-concurrency",
Default: 32,
Dynamic: true,
},
)

backendWriteConcurrency = viperutil.Configure(
"backend-write-concurrency",
viperutil.Options[int64]{
FlagName: "backend-write-concurrency",
Default: 24,
Dynamic: true,
},
)

waitReplicasTimeout = viperutil.Configure(
"wait-replicas-timeout",
viperutil.Options[time.Duration]{
Expand Down Expand Up @@ -199,6 +217,8 @@ func registerFlags(fs *pflag.FlagSet) {
fs.Bool("audit-to-backend", auditToBackend.Default(), "Whether to store the audit log in the VTOrc database")
fs.Bool("audit-to-syslog", auditToSyslog.Default(), "Whether to store the audit log in the syslog")
fs.Duration("audit-purge-duration", auditPurgeDuration.Default(), "Duration for which audit logs are held before being purged. Should be in multiples of days")
fs.Int64("backend-read-concurrency", backendReadConcurrency.Default(), "Maximum concurrency for reads to the backend")
fs.Int64("backend-write-concurrency", backendWriteConcurrency.Default(), "Maximum concurrency for writes to the backend")
fs.Bool("prevent-cross-cell-failover", preventCrossCellFailover.Default(), "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover")
fs.Duration("wait-replicas-timeout", waitReplicasTimeout.Default(), "Duration for which to wait for replica's to respond when issuing RPCs")
fs.Duration("tolerable-replication-lag", tolerableReplicationLag.Default(), "Amount of replication lag that is considered acceptable for a tablet to be eligible for promotion when Vitess makes the choice of a new primary in PRS")
Expand All @@ -218,6 +238,8 @@ func registerFlags(fs *pflag.FlagSet) {
auditToBackend,
auditToSyslog,
auditPurgeDuration,
backendReadConcurrency,
backendWriteConcurrency,
waitReplicasTimeout,
tolerableReplicationLag,
topoInformationRefreshDuration,
Expand Down Expand Up @@ -303,6 +325,16 @@ func SetAuditPurgeDays(days int64) {
auditPurgeDuration.Set(time.Duration(days) * 24 * time.Hour)
}

// GetBackendReadConcurrency returns the max backend read concurrency.
func GetBackendReadConcurrency() int64 {
return backendReadConcurrency.Get()
}

// GetBackendWriteConcurrency returns the max backend write concurrency.
func GetBackendWriteConcurrency() int64 {
return backendWriteConcurrency.Get()
}

// GetWaitReplicasTimeout is a getter function.
func GetWaitReplicasTimeout() time.Duration {
return waitReplicasTimeout.Get()
Expand Down
33 changes: 22 additions & 11 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package inst

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -30,6 +31,7 @@ import (

"github.com/patrickmn/go-cache"
"github.com/sjmudd/stopwatch"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
Expand All @@ -47,13 +49,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
backendDBConcurrency = 20
)
const maxBackendOpTime = time.Second * 5

var (
instanceReadChan = make(chan bool, backendDBConcurrency)
instanceWriteChan = make(chan bool, backendDBConcurrency)
instanceReadSem = semaphore.NewWeighted(config.GetBackendReadConcurrency())
instanceWriteSem = semaphore.NewWeighted(config.GetBackendWriteConcurrency())
)

var forgetAliases *cache.Cache
Expand Down Expand Up @@ -88,7 +88,12 @@ func initializeInstanceDao() {
func ExecDBWriteFunc(f func() error) error {
m := query.NewMetric()

instanceWriteChan <- true
ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime)
defer cancel()

if err := instanceWriteSem.Acquire(ctx, 1); err != nil {
return err
}
m.WaitLatency = time.Since(m.Timestamp)

// catch the exec time and error if there is one
Expand All @@ -106,7 +111,7 @@ func ExecDBWriteFunc(f func() error) error {
}
m.ExecuteLatency = time.Since(m.Timestamp.Add(m.WaitLatency))
_ = backendWrites.Append(m)
<-instanceWriteChan // assume this takes no time
instanceWriteSem.Release(1)
}()
res := f()
return res
Expand Down Expand Up @@ -646,10 +651,16 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In
}
return instances, err
}
instanceReadChan <- true
instances, err := readFunc()
<-instanceReadChan
return instances, err

ctx, cancel := context.WithTimeout(context.Background(), maxBackendOpTime)
defer cancel()

if err := instanceReadSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer instanceReadSem.Release(1)

return readFunc()
}

// ReadInstance reads an instance from the vtorc backend database
Expand Down

0 comments on commit 0edc405

Please sign in to comment.