diff --git a/go/vt/vtorc/inst/shard_dao.go b/go/vt/vtorc/inst/shard_dao.go index a90eed0f509..5501ba5d7b9 100644 --- a/go/vt/vtorc/inst/shard_dao.go +++ b/go/vt/vtorc/inst/shard_dao.go @@ -29,6 +29,30 @@ import ( // ErrShardNotFound is a fixed error message used when a shard is not found in the database. var ErrShardNotFound = errors.New("shard not found") +// ReadShardNames reads the names of vitess shards for a single keyspace. +func ReadShardNames(keyspaceName string) (shardNames []string, err error) { + shardNames = make([]string, 0) + query := `select shard from vitess_shard where keyspace = ?` + args := sqlutils.Args(keyspaceName) + err = db.QueryVTOrc(query, args, func(row sqlutils.RowMap) error { + shardNames = append(shardNames, row.GetString("shard")) + return nil + }) + return shardNames, err +} + +// ReadAllShardNames reads the names of all vitess shards by keyspace. +func ReadAllShardNames() (shardNames map[string][]string, err error) { + shardNames = make(map[string][]string) + query := `select keyspace, shard from vitess_shard` + err = db.QueryVTOrc(query, nil, func(row sqlutils.RowMap) error { + ks := row.GetString("keyspace") + shardNames[ks] = append(shardNames[ks], row.GetString("shard")) + return nil + }) + return shardNames, err +} + // ReadShardPrimaryInformation reads the vitess shard record and gets the shard primary alias and timestamp. func ReadShardPrimaryInformation(keyspaceName, shardName string) (primaryAlias string, primaryTimestamp string, err error) { if err = topo.ValidateKeyspaceName(keyspaceName); err != nil { @@ -95,3 +119,16 @@ func getShardPrimaryTermStartTimeString(shard *topo.ShardInfo) string { } return protoutil.TimeFromProto(shard.PrimaryTermStartTime).UTC().String() } + +// DeleteShard deletes a shard using a keyspace and shard name. +func DeleteShard(keyspace, shard string) error { + _, err := db.ExecVTOrc(`DELETE FROM + vitess_shard + WHERE + keyspace = ? + AND shard = ?`, + keyspace, + shard, + ) + return err +} diff --git a/go/vt/vtorc/inst/shard_dao_test.go b/go/vt/vtorc/inst/shard_dao_test.go index 3357bd2ee36..8b096889851 100644 --- a/go/vt/vtorc/inst/shard_dao_test.go +++ b/go/vt/vtorc/inst/shard_dao_test.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/vt/vtorc/db" ) -func TestSaveAndReadShard(t *testing.T) { +func TestSaveReadAndDeleteShard(t *testing.T) { // Clear the database after the test. The easiest way to do that is to run all the initialization commands again. defer func() { db.ClearVTOrcDatabase() @@ -94,6 +94,7 @@ func TestSaveAndReadShard(t *testing.T) { require.NoError(t, err) } + // ReadShardPrimaryInformation shardPrimaryAlias, primaryTimestamp, err := ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) if tt.err != "" { require.EqualError(t, err, tt.err) @@ -102,6 +103,16 @@ func TestSaveAndReadShard(t *testing.T) { require.NoError(t, err) require.EqualValues(t, tt.primaryAliasWanted, shardPrimaryAlias) require.EqualValues(t, tt.primaryTimestampWanted, primaryTimestamp) + + // ReadShardNames + shardNames, err := ReadShardNames(tt.keyspaceName) + require.NoError(t, err) + require.Equal(t, []string{tt.shardName}, shardNames) + + // DeleteShard + require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName)) + _, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName) + require.EqualError(t, err, ErrShardNotFound.Error()) }) } } diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 8115e614418..971b6c468f9 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -22,14 +22,43 @@ import ( "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/inst" ) +var statsShardsWatched = stats.NewGaugesFuncWithMultiLabels("ShardsWatched", + "Keyspace/shards currently watched", + []string{"Keyspace", "Shard"}, + getShardsWatchedStats) + +// getShardsWatchedStats returns the keyspace/shards watched in a format for stats. +func getShardsWatchedStats() map[string]int64 { + shardsWatched := make(map[string]int64) + allShardNames, err := inst.ReadAllShardNames() + if err != nil { + log.Errorf("Failed to read all shard names: %+v", err) + return shardsWatched + } + for ks, shards := range allShardNames { + for _, shard := range shards { + shardsWatched[ks+"."+shard] = 1 + } + } + return shardsWatched +} + +// refreshAllKeyspacesAndShardsMu ensures RefreshAllKeyspacesAndShards +// is not executed concurrently. +var refreshAllKeyspacesAndShardsMu sync.Mutex + // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. func RefreshAllKeyspacesAndShards(ctx context.Context) error { + refreshAllKeyspacesAndShardsMu.Lock() + defer refreshAllKeyspacesAndShardsMu.Unlock() + var keyspaces []string if len(shardsToWatch) == 0 { // all known keyspaces ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) @@ -109,6 +138,7 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { // refreshAllShards refreshes all the shard records in the given keyspace. func refreshAllShards(ctx context.Context, keyspaceName string) error { + // get all shards for keyspace name. shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ // Fetch shard records concurrently to speed up discovery. A typical // Vitess cluster will have 1-3 vtorc instances deployed, so there is @@ -119,13 +149,35 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error { log.Error(err) return err } + savedShards := make(map[string]bool) for _, shardInfo := range shardInfos { err = inst.SaveShard(shardInfo) if err != nil { log.Error(err) return err } + savedShards[shardInfo.ShardName()] = true } + + // delete shards that were not returned by ts.FindAllShardsInKeyspace(...), + // indicating they are stale. + shards, err := inst.ReadShardNames(keyspaceName) + if err != nil { + return err + } + for _, shard := range shards { + if savedShards[shard] { + continue + } + shardName := topoproto.KeyspaceShardString(keyspaceName, shard) + log.Infof("Forgetting shard: %s", shardName) + err = inst.DeleteShard(keyspaceName, shard) + if err != nil { + log.Errorf("Failed to delete shard %s: %+v", shardName, err) + return err + } + } + return nil }