Skip to content

Commit

Permalink
slack-19.0: Add stats for shards watched by VTOrc (#606)
Browse files Browse the repository at this point in the history
* Add stats for shards watched by VTOrc

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Use len() in make

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt authored Feb 17, 2025
1 parent 07fd0a3 commit 0a34913
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 2 deletions.
37 changes: 37 additions & 0 deletions go/vt/vtorc/inst/shard_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ import (
// ErrShardNotFound is a fixed error message used when a shard is not found in the database.
var ErrShardNotFound = errors.New("shard not found")

// ReadShardNames reads the names of vitess shards for a single keyspace.
func ReadShardNames(keyspaceName string) (shardNames []string, err error) {
shardNames = make([]string, 0)
query := `select shard from vitess_shard where keyspace = ?`
args := sqlutils.Args(keyspaceName)
err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error {
shardNames = append(shardNames, row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadAllShardNames reads the names of all vitess shards by keyspace.
func ReadAllShardNames() (shardNames map[string][]string, err error) {
shardNames = make(map[string][]string)
query := `select keyspace, shard from vitess_shard`
err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error {
ks := row.GetString("keyspace")
shardNames[ks] = append(shardNames[ks], row.GetString("shard"))
return nil
})
return shardNames, err
}

// ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp.
func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) {
if err = topo.ValidateKeyspaceName(keyspaceName); err != nil {
Expand Down Expand Up @@ -95,3 +119,16 @@ func getShardPrimaryTermStartTimeString(shard *topo.ShardInfo) string {
}
return protoutil.TimeFromProto(shard.PrimaryTermStartTime).UTC().String()
}

// DeleteShard deletes a shard using a keyspace and shard name.
func DeleteShard(keyspace, shard string) error {
_, err := db.ExecVTOrc(`DELETE FROM
vitess_shard
WHERE
keyspace = ?
AND shard = ?`,
keyspace,
shard,
)
return err
}
13 changes: 12 additions & 1 deletion go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/db"
)

func TestSaveAndReadShard(t *testing.T) {
func TestSaveReadAndDeleteShard(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
Expand Down Expand Up @@ -94,6 +94,7 @@ func TestSaveAndReadShard(t *testing.T) {
require.NoError(t, err)
}

// ReadShardPrimaryInformation
shardPrimaryAlias, primaryTimestamp, err := ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
if tt.err != "" {
require.EqualError(t, err, tt.err)
Expand All @@ -102,6 +103,16 @@ func TestSaveAndReadShard(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, tt.primaryAliasWanted, shardPrimaryAlias)
require.EqualValues(t, tt.primaryTimestampWanted, primaryTimestamp)

// ReadShardNames
shardNames, err := ReadShardNames(tt.keyspaceName)
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
require.EqualError(t, err, ErrShardNotFound.Error())
})
}
}
54 changes: 53 additions & 1 deletion go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,43 @@ import (

"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched",
"Keyspace/shards currently watched",
[]string{"Keyspace", "Shard"},
getShardsWatchedStats)

// getShardsWatchedStats returns the keyspace/shards watched in a format for stats.
func getShardsWatchedStats() map[string]int64 {
shardsWatched := make(map[string]int64)
allShardNames, err := inst.ReadAllShardNames()
if err != nil {
log.Errorf("Failed to read all shard names: %+v", err)
return shardsWatched
}
for ks, shards := range allShardNames {
for _, shard := range shards {
shardsWatched[ks+"."+shard] = 1
}
}
return shardsWatched
}

// refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards
// is not executed concurrently.
var refreshAllKeyspacesAndShardsMu sync.Mutex

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
refreshAllKeyspacesAndShardsMu.Lock()
defer refreshAllKeyspacesAndShardsMu.Unlock()

var keyspaces []string
if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -109,6 +138,7 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {

// refreshAllShards refreshes all the shard records in the given keyspace.
func refreshAllShards(ctx context.Context, keyspaceName string) error {
// get all shards for keyspace name.
shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
// Fetch shard records concurrently to speed up discovery. A typical
// Vitess cluster will have 1-3 vtorc instances deployed, so there is
Expand All @@ -119,13 +149,35 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}
savedShards := make(map[string]bool, len(shardInfos))
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
log.Error(err)
return err
}
savedShards[shardInfo.ShardName()] = true
}

// delete shards that were not returned by ts.FindAllShardsInKeyspace(...),
// indicating they are stale.
shards, err := inst.ReadShardNames(keyspaceName)
if err != nil {
return err
}
for _, shard := range shards {
if savedShards[shard] {
continue
}
shardName := topoproto.KeyspaceShardString(keyspaceName, shard)
log.Infof("Forgetting shard: %s", shardName)
err = inst.DeleteShard(keyspaceName, shard)
if err != nil {
log.Errorf("Failed to delete shard %s: %+v", shardName, err)
return err
}
}

return nil
}

Expand Down

0 comments on commit 0a34913

Please sign in to comment.