Skip to content

Commit

Permalink
Merge branch 'slack-19.0' into slack-19.0-vstream-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
makinje16 authored Feb 6, 2025
2 parents ab70165 + a76e257 commit fa29d46
Show file tree
Hide file tree
Showing 55 changed files with 7,346 additions and 6,881 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
github.com/kr/text v0.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249
github.com/slackhq/vitess-addons v0.19.7
github.com/slackhq/vitess-addons v0.19.8
github.com/slok/noglog v0.2.0
github.com/spf13/afero v1.11.0
github.com/spf13/jwalterweatherman v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sjmudd/stopwatch v0.1.1 h1:x45OvxFB5OtCkjvYtzRF5fWB857Jzjjk84Oyd5C5ebw=
github.com/sjmudd/stopwatch v0.1.1/go.mod h1:BLw0oIQJ1YLXBO/q9ufK/SgnKBVIkC2qrm6uy78Zw6U=
github.com/slackhq/vitess-addons v0.19.7 h1:3rP5jIjTMAJSInl92ePn6BevACvVVT4DV3oynwuQKRo=
github.com/slackhq/vitess-addons v0.19.7/go.mod h1:slG5BxqN541wVV5Y5tuHE3z1CwCXj9GVRmLX5wkI/zw=
github.com/slackhq/vitess-addons v0.19.8 h1:AjAbXkrvdciDeiJ4a3CG57rKcg1MAHm5JPsfwryu3XU=
github.com/slackhq/vitess-addons v0.19.8/go.mod h1:slG5BxqN541wVV5Y5tuHE3z1CwCXj9GVRmLX5wkI/zw=
github.com/slok/noglog v0.2.0 h1:1czu4l2EoJ8L92UwdSXXa1Y+c5TIjFAFm2P+mjej95E=
github.com/slok/noglog v0.2.0/go.mod h1:TfKxwpEZPT+UA83bQ6RME146k0MM4e8mwHLf6bhcGDI=
github.com/smartystreets/assertions v0.0.0-20190116191733-b6c0e53d7304/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/keyranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config string config file name
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
Expand Down Expand Up @@ -107,7 +107,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestHighNumberOfParams(t *testing.T) {
}

func TestPrepareStatements(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 17, "vtgate")
mcmp, closer := start(t)
defer closer()

Expand Down
24 changes: 21 additions & 3 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func TestAPIEndpoints(t *testing.T) {
status, resp := utils.MakeAPICallRetry(t, vtorc, "/debug/health", func(code int, response string) bool {
return code == 0
})
// When VTOrc is up and hasn't run the topo-refresh, is should be healthy but HasDiscovered should be false.
assert.Equal(t, 500, status)
// When VTOrc starts it runs OpenTabletDiscovery(), which triggers a topo-refresh. VTOrc should be healthy and HasDiscovered should be true.
assert.Equal(t, 200, status)
assert.Contains(t, resp, `"Healthy": true,`)
assert.Contains(t, resp, `"DiscoveredOnce": false`)
assert.Contains(t, resp, `"DiscoveredOnce": true`)

// find primary from topo
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
Expand Down Expand Up @@ -96,6 +96,24 @@ func TestAPIEndpoints(t *testing.T) {
return response != "null"
})

t.Run("Database State", func(t *testing.T) {
// Get database state
status, resp, err := utils.MakeAPICall(t, vtorc, "/api/database-state")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Contains(t, resp, `"alias": "zone1-0000000101"`)
assert.Contains(t, resp, `{
"TableName": "vitess_keyspace",
"Rows": [
{
"durability_policy": "none",
"keyspace": "ks",
"keyspace_type": "0"
}
]
},`)
})

t.Run("Disable Recoveries API", func(t *testing.T) {
// Disable recoveries of VTOrc
status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets))
}

hc.topoWatchers = topoWatchers
Expand Down
29 changes: 12 additions & 17 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/topo/topoproto"

"vitess.io/vitess/go/vt/key"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
)

const (
Expand All @@ -56,7 +53,7 @@ var (
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
tablet *topodatapb.Tablet
}

// TopologyWatcher polls the topology periodically for changes to
Expand All @@ -70,7 +67,6 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
concurrency int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,15 +88,14 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -112,7 +107,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -271,14 +266,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
IsIncluded(tablet *topodatapb.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
Expand All @@ -299,7 +294,7 @@ type FilterByShard struct {
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard for use by a
Expand Down Expand Up @@ -344,7 +339,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}

// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err)
Expand Down Expand Up @@ -384,7 +379,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}

// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
Expand All @@ -403,7 +398,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool {
if fbtg.tags == nil {
return true
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) {
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true)

done := make(chan bool, 3)
result := make(chan bool, 1)
Expand Down Expand Up @@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) {
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)

for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true)
defer tw.Stop()

// Force fallback to getting tablets individually.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/external/golib/sqlutils/sqlutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type RowMap map[string]CellData
// CellData is the result of a single (atomic) column in a single row
type CellData sql.NullString

func (this *CellData) MarshalJSON() ([]byte, error) {
func (this CellData) MarshalJSON() ([]byte, error) {
if this.Valid {
return json.Marshal(this.String)
} else {
Expand Down
13 changes: 13 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ func Empty(id []byte) bool {
// KeyRange helper methods
//

// Make a Key Range
func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// NewCompleteKeyRange returns a complete key range.
func NewCompleteKeyRange() *topodatapb.KeyRange {
return &topodatapb.KeyRange{
Start: nil,
End: nil,
}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
Loading

0 comments on commit fa29d46

Please sign in to comment.