diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index bfc55064148..e63ffcc3547 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -11,6 +11,7 @@ - **[VTGate Config File Changes](#vtgate-config-file-changes)** - **[Support for More Efficient JSON Replication](#efficient-json-replication)** - **[Support for LAST_INSERT_ID(x)](#last-insert-id)** + - **[Support for Maximum Idle Connections in the Pool](#max-idle-connections)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** - **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)** @@ -88,6 +89,17 @@ In [#17408](https://github.com/vitessio/vitess/pull/17408) and [#17409](https:// **Limitations**: - When using `LAST_INSERT_ID(x)` in ordered queries (e.g., `SELECT last_insert_id(col) FROM table ORDER BY foo`), MySQL sets the session’s last-insert-id value according to the *last row returned*. Vitess does not guarantee the same behavior. +### Support for Maximum Idle Connections in the Pool + +In [#17443](https://github.com/vitessio/vitess/pull/17443) we introduced a new configurable max-idle-count parameter for connection pools. This allows you to specify the maximum number of idle connections retained in each connection pool to optimize performance and resource efficiency. + +You can control idle connection retention for the query server’s query pool, stream pool, and transaction pool with the following flags: +• --queryserver-config-query-pool-max-idle-count: Defines the maximum number of idle connections retained in the query pool. +• --queryserver-config-stream-pool-max-idle-count: Defines the maximum number of idle connections retained in the stream pool. +• --queryserver-config-txpool-max-idle-count: Defines the maximum number of idle connections retained in the transaction pool. + +This feature ensures that, during traffic spikes, idle connections are available for faster responses, while minimizing overhead in low-traffic periods by limiting the number of idle connections retained. It helps strike a balance between performance, efficiency, and cost. + ## Minor Changes #### VTTablet Flags diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 1c57dd0c08e..052c19ecaae 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -282,11 +282,13 @@ Flags: --queryserver-config-pool-conn-max-lifetime duration query server connection max lifetime, vttablet manages various mysql connection pools. This config means if a connection has lived at least this long, it connection will be removed from pool upon the next time it is returned to the pool. --queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16) --queryserver-config-query-cache-memory int query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432) + --queryserver-config-query-pool-max-idle-count int query server query pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-query-pool-timeout duration query server query pool timeout, it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead. --queryserver-config-query-timeout duration query server query timeout, this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s) --queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true) --queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s) --queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768) + --queryserver-config-stream-pool-max-idle-count int query server stream pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200) --queryserver-config-stream-pool-timeout duration query server stream pool timeout, it is how long vttablet waits for a connection from the stream pool. If set to 0 (default) then there is no timeout. --queryserver-config-strict-table-acl only allow queries that pass table acl checks @@ -294,6 +296,7 @@ Flags: --queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20) --queryserver-config-transaction-timeout duration query server transaction timeout, a transaction will be killed if it takes longer than this value (default 30s) --queryserver-config-truncate-error-len int truncate errors sent to client if they are longer than this value (0 means do not truncate) + --queryserver-config-txpool-max-idle-count int query server transaction pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-txpool-timeout duration query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1s) --queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this --queryserver-enable-views Enable views support in vttablet. diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index bc647fb5347..e2b0c30db7f 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -274,11 +274,13 @@ Flags: --queryserver-config-pool-conn-max-lifetime duration query server connection max lifetime, vttablet manages various mysql connection pools. This config means if a connection has lived at least this long, it connection will be removed from pool upon the next time it is returned to the pool. --queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16) --queryserver-config-query-cache-memory int query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432) + --queryserver-config-query-pool-max-idle-count int query server query pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-query-pool-timeout duration query server query pool timeout, it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead. --queryserver-config-query-timeout duration query server query timeout, this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s) --queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true) --queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s) --queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768) + --queryserver-config-stream-pool-max-idle-count int query server stream pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200) --queryserver-config-stream-pool-timeout duration query server stream pool timeout, it is how long vttablet waits for a connection from the stream pool. If set to 0 (default) then there is no timeout. --queryserver-config-strict-table-acl only allow queries that pass table acl checks @@ -286,6 +288,7 @@ Flags: --queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20) --queryserver-config-transaction-timeout duration query server transaction timeout, a transaction will be killed if it takes longer than this value (default 30s) --queryserver-config-truncate-error-len int truncate errors sent to client if they are longer than this value (0 means do not truncate) + --queryserver-config-txpool-max-idle-count int query server transaction pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods. --queryserver-config-txpool-timeout duration query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1s) --queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this --queryserver-enable-views Enable views support in vttablet. diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 72c228b594e..90c2d7b9668 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -48,7 +48,7 @@ type BinlogEvent interface { IsValid() bool // General protocol events. - NextPosition() uint32 + NextPosition() uint64 // IsFormatDescription returns true if this is a // FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index 548875c44f7..0bd9d401eaa 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -47,7 +47,7 @@ import ( // +----------------------------+ // | extra_headers 19 : x-19 | // +============================+ -// http://dev.mysql.com/doc/internals/en/event-header-fields.html +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header type binlogEvent []byte const ( @@ -119,8 +119,9 @@ func (ev binlogEvent) Length() uint32 { } // NextPosition returns the nextPosition field from the header -func (ev binlogEvent) NextPosition() uint32 { - return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4]) +func (ev binlogEvent) NextPosition() uint64 { + // Only 4 bytes are used for the next_position field in the header. + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // IsFormatDescription implements BinlogEvent.IsFormatDescription(). diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index b7e6ed9e0f2..8c60956faf1 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } - return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) + // The header only uses 4 bytes for the next_position. + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // rotate implements BinlogEvent.Rotate(). @@ -139,7 +140,7 @@ type filePosFakeEvent struct { timestamp uint32 } -func (ev filePosFakeEvent) NextPosition() uint32 { +func (ev filePosFakeEvent) NextPosition() uint64 { return 0 } @@ -283,7 +284,7 @@ type filePosGTIDEvent struct { gtid replication.FilePosGTID } -func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent { +func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent { return filePosGTIDEvent{ filePosFakeEvent: filePosFakeEvent{ timestamp: timestamp, diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 84c65842c7e..4c5a0c9523e 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -18,6 +18,7 @@ package mysql import ( "fmt" + "math" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -32,9 +33,14 @@ const ( // This file contains the methods related to replication. // WriteComBinlogDump writes a ComBinlogDump command. -// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax. // Returns a SQLError. -func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error { +// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html +func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error { + // The binary log file position is a uint64, but the protocol command + // only uses 4 bytes for the file position. + if binlogPos > math.MaxUint32 { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos) + } c.sequence = 0 length := 1 + // ComBinlogDump 4 + // binlog-pos @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog len(binlogFilename) // binlog-filename data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDump) - pos = writeUint32(data, pos, binlogPos) + pos = writeUint32(data, pos, uint32(binlogPos)) pos = writeUint16(data, pos, flags) pos = writeUint32(data, pos, serverID) _ = writeEOFString(data, pos, binlogFilename) diff --git a/go/mysql/replication/filepos_gtid.go b/go/mysql/replication/filepos_gtid.go index 850fb421915..95c7efcd3b1 100644 --- a/go/mysql/replication/filepos_gtid.go +++ b/go/mysql/replication/filepos_gtid.go @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s) } - pos, err := strconv.ParseUint(parts[1], 0, 32) + pos, err := strconv.ParseUint(parts[1], 0, 64) if err != nil { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s) } return FilePosGTID{ File: parts[0], - Pos: uint32(pos), + Pos: pos, }, nil } @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) { // FilePosGTID implements GTID. type FilePosGTID struct { File string - Pos uint32 + Pos uint64 } // String implements GTID.String(). diff --git a/go/mysql/replication/filepos_gtid_test.go b/go/mysql/replication/filepos_gtid_test.go index 174aed6ccf9..6cef4756af2 100644 --- a/go/mysql/replication/filepos_gtid_test.go +++ b/go/mysql/replication/filepos_gtid_test.go @@ -23,7 +23,7 @@ import ( func Test_filePosGTID_String(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } tests := []struct { name string @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) { fields{file: "mysql-bin.166031", pos: 192394}, "mysql-bin.166031:192394", }, + { + "handles large position correctly", + fields{file: "vt-1448040107-bin.003222", pos: 4663881395}, + "vt-1448040107-bin.003222:4663881395", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) { func Test_filePosGTID_ContainsGTID(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } type args struct { other GTID diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index c9a54485497..680cb9e68dc 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -17,6 +17,7 @@ limitations under the License. package mysql import ( + "math" "reflect" "testing" @@ -37,6 +38,10 @@ func TestComBinlogDump(t *testing.T) { cConn.Close() }() + // Try to write a ComBinlogDump packet with a position greater than 4 bytes. + err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e) + require.Error(t, err) + // Write ComBinlogDump packet, read it, compare. if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil { t.Fatalf("WriteComBinlogDump failed: %v", err) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index d49032f34a1..b024cc656df 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error) type Config[C Connection] struct { Capacity int64 + MaxIdleCount int64 IdleTimeout time.Duration MaxLifetime time.Duration RefreshInterval time.Duration @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct { active atomic.Int64 // capacity is the maximum number of connections that this pool can open capacity atomic.Int64 + // maxIdleCount is the maximum idle connections in the pool + idleCount atomic.Int64 // workers is a waitgroup for all the currently running worker goroutines workers sync.WaitGroup @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct { // maxCapacity is the maximum value to which capacity can be set; when the pool // is re-opened, it defaults to this capacity maxCapacity int64 + // maxIdleCount is the maximum idle connections in the pool + maxIdleCount int64 // maxLifetime is the maximum time a connection can be open maxLifetime atomic.Int64 // idleTimeout is the maximum time a connection can remain idle @@ -158,6 +163,7 @@ func NewPool[C Connection](config *Config[C]) *ConnPool[C] { pool := &ConnPool[C]{} pool.freshSettingsStack.Store(-1) pool.config.maxCapacity = config.Capacity + pool.config.maxIdleCount = config.MaxIdleCount pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds()) pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds()) pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds()) @@ -192,6 +198,7 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration func (pool *ConnPool[C]) open() { pool.close = make(chan struct{}) pool.capacity.Store(pool.config.maxCapacity) + pool.setIdleCount() // The expire worker takes care of removing from the waiter list any clients whose // context has been cancelled. @@ -315,6 +322,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 { return pool.config.maxCapacity } +func (pool *ConnPool[C]) setIdleCount() { + capacity := pool.Capacity() + maxIdleCount := pool.config.maxIdleCount + if maxIdleCount == 0 || maxIdleCount > capacity { + pool.idleCount.Store(capacity) + } else { + pool.idleCount.Store(maxIdleCount) + } +} + // InUse returns the number of connections that the pool has lent out to clients and that // haven't been returned yet. func (pool *ConnPool[C]) InUse() int64 { @@ -340,6 +357,10 @@ func (pool *ConnPool[C]) SetIdleTimeout(duration time.Duration) { pool.config.idleTimeout.Store(duration.Nanoseconds()) } +func (pool *ConnPool[D]) IdleCount() int64 { + return pool.idleCount.Load() +} + func (pool *ConnPool[D]) RefreshInterval() time.Duration { return time.Duration(pool.config.refreshInterval.Load()) } @@ -396,6 +417,10 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } if !pool.wait.tryReturnConn(conn) { + if pool.closeOnIdleLimitReached(conn) { + return + } + connSetting := conn.Conn.Setting() if connSetting == nil { pool.clean.Push(conn) @@ -407,6 +432,23 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } } +// closeOnIdleLimitReached closes a connection if the number of idle connections (active - inuse) in the pool +// exceeds the idleCount limit. It returns true if the connection is closed, false otherwise. +func (pool *ConnPool[C]) closeOnIdleLimitReached(conn *Pooled[C]) bool { + for { + open := pool.active.Load() + idle := open - pool.borrowed.Load() + if idle <= pool.idleCount.Load() { + return false + } + if pool.active.CompareAndSwap(open, open-1) { + pool.Metrics.idleClosed.Add(1) + conn.Close() + return true + } + } +} + func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { maxLifetime := pool.config.maxLifetime.Load() if maxLifetime == 0 { @@ -629,6 +671,9 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { if oldcap == newcap { return nil } + // update the idle count to match the new capacity if necessary + // wait for connections to be returned to the pool if we're reducing the capacity. + defer pool.setIdleCount() const delay = 10 * time.Millisecond @@ -732,6 +777,9 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) { // the smartconnpool doesn't have a maximum capacity return pool.Capacity() }) + stats.NewGaugeFunc(name+"IdleAllowed", "Tablet server conn pool idle allowed limit", func() int64 { + return pool.IdleCount() + }) stats.NewCounterFunc(name+"WaitCount", "Tablet server conn pool wait count", func() int64 { return pool.Metrics.WaitCount() }) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 701327005ad..44bd431d189 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -746,6 +746,51 @@ func TestExtendedLifetimeTimeout(t *testing.T) { } } +// TestMaxIdleCount tests the MaxIdleCount setting, to check if the pool closes +// the idle connections when the number of idle connections exceeds the limit. +func TestMaxIdleCount(t *testing.T) { + testMaxIdleCount := func(t *testing.T, setting *Setting, maxIdleCount int64, expClosedConn int) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 5, + MaxIdleCount: maxIdleCount, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + defer p.Close() + + var conns []*Pooled[*TestConn] + for i := 0; i < 5; i++ { + r, err := p.Get(ctx, setting) + require.NoError(t, err) + assert.EqualValues(t, i+1, state.open.Load()) + assert.EqualValues(t, 0, p.Metrics.IdleClosed()) + + conns = append(conns, r) + } + + for _, conn := range conns { + p.put(conn) + } + + closedConn := 0 + for _, conn := range conns { + if conn.Conn.IsClosed() { + closedConn++ + } + } + assert.EqualValues(t, expClosedConn, closedConn) + assert.EqualValues(t, expClosedConn, p.Metrics.IdleClosed()) + } + + t.Run("WithoutSettings", func(t *testing.T) { testMaxIdleCount(t, nil, 2, 3) }) + t.Run("WithSettings", func(t *testing.T) { testMaxIdleCount(t, sFoo, 2, 3) }) + t.Run("WithoutSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, nil, 0, 0) }) + t.Run("WithSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, sFoo, 0, 0) }) +} + func TestCreateFail(t *testing.T) { var state TestState state.chaos.failConnect = true diff --git a/go/test/endtoend/migration/migration_test.go b/go/test/endtoend/migration/migration_test.go index eca112e388d..54afd2cb3ee 100644 --- a/go/test/endtoend/migration/migration_test.go +++ b/go/test/endtoend/migration/migration_test.go @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi streams from the same source. The main difference between an external source vs a vitess source is that the source proto contains an "external_mysql" field instead of keyspace and shard. That field is the key into the externalConnections section of the input yaml. - -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter: > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') */ func TestMigration(t *testing.T) { yamlFile := startCluster(t) @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) { migrate(t, "customer", "commerce", []string{"customer"}) migrate(t, "customer", "commerce", []string{"orders"}) vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess - waitForVReplicationToCatchup(t, vttablet, 1*time.Second) + waitForVReplicationToCatchup(t, vttablet, 30*time.Second) testcases := []struct { query string @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) { var sqlEscaped bytes.Buffer val.EncodeSQL(&sqlEscaped) query := fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+ - "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String()) - fmt.Printf("VReplicationExec: %s\n", query) - vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess - err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query) + "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+ + "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String()) + fmt.Printf("VReplication insert: %s\n", query) + vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias + err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query) require.NoError(t, err) } diff --git a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go index a13077ef87b..e052762cd13 100644 --- a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go +++ b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go @@ -259,6 +259,11 @@ func testRevertible(t *testing.T) { toSchema: `id int primary key, i2 int default null`, removedUniqueKeyNames: `i1_uidx`, }, + { + name: "removed expression unique key, skipped", + fromSchema: `id int primary key, i1 int default null, unique key idx1 ((id + 1))`, + toSchema: `id int primary key, i2 int default null`, + }, { name: "expanding unique key removes unique constraint", fromSchema: `id int primary key, i1 int default null, unique key i1_uidx(i1)`, diff --git a/go/test/endtoend/preparestmt/benchmark_test.go b/go/test/endtoend/preparestmt/benchmark_test.go new file mode 100644 index 00000000000..fcf10a6948a --- /dev/null +++ b/go/test/endtoend/preparestmt/benchmark_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package preparestmt + +import ( + "math/rand/v2" + "testing" + + "github.com/icrowley/fake" +) + +/* +export ver=v1 p=~/benchmark && go test \ +-run '^$' -bench '^BenchmarkPreparedStmt' \ +-benchtime 2s -count 6 -cpu 4 \ +| tee $p/${ver}.txt +*/ +func BenchmarkPreparedStmt(b *testing.B) { + dbo := Connect(b) + defer dbo.Close() + + // prepare statement + insertStmt := `insert into sks.t1 (name, age, email, created_at, is_active) values(?, ?, ?, current_timestamp, ?)` + selectStmt := `select id, name, age, email from sks.t1 where age between ? and ? and is_active = ? limit ?` + updateStmt := `update sks.t1 set is_active = ? where id = ?` + deleteStmt := `delete from sks.t1 where is_active = ? and age = ?` + + joinStmt := `SELECT + user.id AS user_id +FROM + sks.t1 AS user +LEFT JOIN + sks.t1 AS parent ON user.id = parent.id AND parent.age = ? +LEFT JOIN + sks.t1 AS manager ON user.id = manager.id AND manager.is_active = ? +LEFT JOIN + sks.t1 AS child ON user.id = child.id +WHERE + user.is_active = ? + AND user.id = ? + AND parent.id = ? + AND manager.id = ?` + + iStmt, err := dbo.Prepare(insertStmt) + if err != nil { + b.Fatal(err) + } + defer iStmt.Close() + + b.Run("Insert", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := iStmt.Exec(fake.FirstName(), rand.IntN(100), fake.EmailAddress(), rand.IntN(2)) + if err != nil { + b.Fatal(err) + } + } + }) + + sStmt, err := dbo.Prepare(selectStmt) + if err != nil { + b.Fatal(err) + } + defer sStmt.Close() + + b.Run("Select", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + age := rand.IntN(80) + r, err := sStmt.Query(age, age+20, rand.IntN(2), rand.IntN(10)) + if err != nil { + b.Fatal(err) + } + r.Close() + } + }) + + jStmt, err := dbo.Prepare(joinStmt) + if err != nil { + b.Fatal(err) + } + defer jStmt.Close() + + b.Run("Join Select:Simple Route", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + age := rand.IntN(80) + active := rand.IntN(2) + id := rand.IntN(2000) + r, err := jStmt.Query(age, active, active, id, id, id) + if err != nil { + b.Fatal(err) + } + r.Close() + } + }) + + uStmt, err := dbo.Prepare(updateStmt) + if err != nil { + b.Fatal(err) + } + defer sStmt.Close() + + b.Run("Update", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = uStmt.Exec(rand.IntN(2), rand.IntN(2000)) + if err != nil { + b.Fatal(err) + } + } + }) + + dStmt, err := dbo.Prepare(deleteStmt) + if err != nil { + b.Fatal(err) + } + defer sStmt.Close() + + b.Run("Delete", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err = dStmt.Exec(rand.IntN(2), rand.IntN(100)) + if err != nil { + b.Fatal(err) + } + } + }) + +} diff --git a/go/test/endtoend/preparestmt/main_test.go b/go/test/endtoend/preparestmt/main_test.go index 0e067062c94..dd0094fe825 100644 --- a/go/test/endtoend/preparestmt/main_test.go +++ b/go/test/endtoend/preparestmt/main_test.go @@ -18,6 +18,7 @@ package preparestmt import ( "database/sql" + _ "embed" "flag" "fmt" "os" @@ -51,7 +52,7 @@ type DBInfo struct { } func init() { - dbInfo.KeyspaceName = keyspaceName + dbInfo.KeyspaceName = uks dbInfo.Username = "testuser1" dbInfo.Password = "testpassword1" dbInfo.Params = []string{ @@ -65,9 +66,9 @@ var ( clusterInstance *cluster.LocalProcessCluster dbInfo DBInfo hostname = "localhost" - keyspaceName = "test_keyspace" + uks = "uks" + sks = "sks" testingID = 1 - tableName = "vt_prepare_stmt_test" cell = "zone1" mysqlAuthServerStatic = "mysql_auth_server_static.json" jsonExample = `{ @@ -108,57 +109,18 @@ var ( } } }` - sqlSchema = `create table ` + tableName + ` ( - id bigint auto_increment, - msg varchar(64), - keyspace_id bigint(20) unsigned NOT NULL, - tinyint_unsigned TINYINT, - bool_signed BOOL, - smallint_unsigned SMALLINT, - mediumint_unsigned MEDIUMINT, - int_unsigned INT, - float_unsigned FLOAT(10,2), - double_unsigned DOUBLE(16,2), - decimal_unsigned DECIMAL, - t_date DATE, - t_datetime DATETIME, - t_datetime_micros DATETIME(6), - t_time TIME, - t_timestamp TIMESTAMP, - c8 bit(8) DEFAULT NULL, - c16 bit(16) DEFAULT NULL, - c24 bit(24) DEFAULT NULL, - c32 bit(32) DEFAULT NULL, - c40 bit(40) DEFAULT NULL, - c48 bit(48) DEFAULT NULL, - c56 bit(56) DEFAULT NULL, - c63 bit(63) DEFAULT NULL, - c64 bit(64) DEFAULT NULL, - json_col JSON, - text_col TEXT, - data longblob, - tinyint_min TINYINT, - tinyint_max TINYINT, - tinyint_pos TINYINT, - tinyint_neg TINYINT, - smallint_min SMALLINT, - smallint_max SMALLINT, - smallint_pos SMALLINT, - smallint_neg SMALLINT, - medint_min MEDIUMINT, - medint_max MEDIUMINT, - medint_pos MEDIUMINT, - medint_neg MEDIUMINT, - int_min INT, - int_max INT, - int_pos INT, - int_neg INT, - bigint_min BIGINT, - bigint_max BIGINT, - bigint_pos BIGINT, - bigint_neg BIGINT, - primary key (id) - ) Engine=InnoDB` + + //go:embed uSchema.sql + uSQLSchema string + + //go:embed uVschema.json + uVschema string + + //go:embed sSchema.sql + sSQLSchema string + + //go:embed sVschema.json + sVschema string ) func TestMain(m *testing.M) { @@ -185,15 +147,13 @@ func TestMain(m *testing.M) { } // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: sqlSchema, - } - uks := &cluster.Keyspace{Name: "uks"} - if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { + ks := cluster.Keyspace{Name: uks, SchemaSQL: uSQLSchema, VSchema: uVschema} + if err := clusterInstance.StartUnshardedKeyspace(ks, 1, false); err != nil { return 1, err } - if err := clusterInstance.StartUnshardedKeyspace(*uks, 0, false); err != nil { + + ks = cluster.Keyspace{Name: sks, SchemaSQL: sSQLSchema, VSchema: sVschema} + if err := clusterInstance.StartKeyspace(ks, []string{"-"}, 0, false); err != nil { return 1, err } @@ -203,7 +163,8 @@ func TestMain(m *testing.M) { vtgateInstance.ExtraArgs = []string{ "--mysql_server_query_timeout", "1s", "--mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic, - "--mysql_server_version", "8.0.16-7", + "--pprof-http", + "--schema_change_signal=false", } // Start vtgate @@ -251,7 +212,7 @@ func createConfig(name, data string) error { } // Connect will connect the vtgate through mysql protocol. -func Connect(t *testing.T, params ...string) *sql.DB { +func Connect(t testing.TB, params ...string) *sql.DB { dbo, err := sql.Open("mysql", dbInfo.ConnectionString(params...)) require.Nil(t, err) return dbo @@ -285,7 +246,7 @@ func execErr(dbo *sql.DB, stmt string, params ...any) *mysql.MySQLError { func selectWhere(t *testing.T, dbo *sql.DB, where string, params ...any) []tableData { var out []tableData // prepare query - qry := "SELECT msg, data, text_col, t_datetime, t_datetime_micros FROM " + tableName + qry := "SELECT msg, data, text_col, t_datetime, t_datetime_micros FROM vt_prepare_stmt_test" if where != "" { qry += " WHERE (" + where + ")" } @@ -307,7 +268,7 @@ func selectWhere(t *testing.T, dbo *sql.DB, where string, params ...any) []table func selectWhereWithTx(t *testing.T, tx *sql.Tx, where string, params ...any) []tableData { var out []tableData // prepare query - qry := "SELECT msg, data, text_col, t_datetime, t_datetime_micros FROM " + tableName + qry := "SELECT msg, data, text_col, t_datetime, t_datetime_micros FROM vt_prepare_stmt_test" if where != "" { qry += " WHERE (" + where + ")" } diff --git a/go/test/endtoend/preparestmt/sSchema.sql b/go/test/endtoend/preparestmt/sSchema.sql new file mode 100644 index 00000000000..7da6b759a9a --- /dev/null +++ b/go/test/endtoend/preparestmt/sSchema.sql @@ -0,0 +1,10 @@ +CREATE TABLE t1 +( + id BIGINT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + age INT, + email VARCHAR(100), + created_at DATETIME, + is_active BOOLEAN, + INDEX age_idx (age) +); \ No newline at end of file diff --git a/go/test/endtoend/preparestmt/sVschema.json b/go/test/endtoend/preparestmt/sVschema.json new file mode 100644 index 00000000000..77801827d42 --- /dev/null +++ b/go/test/endtoend/preparestmt/sVschema.json @@ -0,0 +1,22 @@ +{ + "sharded": true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ], + "auto_increment":{ + "column" : "id", + "sequence" : "uks.t1_seq" + } + } + } +} diff --git a/go/test/endtoend/preparestmt/stmt_methods_test.go b/go/test/endtoend/preparestmt/stmt_methods_test.go index 5768c6eec7a..e8e001fdca7 100644 --- a/go/test/endtoend/preparestmt/stmt_methods_test.go +++ b/go/test/endtoend/preparestmt/stmt_methods_test.go @@ -48,7 +48,7 @@ func TestSelectDatabase(t *testing.T) { require.True(t, rows.Next(), "no rows found") err = rows.Scan(&resultBytes) require.NoError(t, err) - assert.Equal(t, string(resultBytes), "test_keyspace") + assert.Equal(t, string(resultBytes), "uks") } // TestInsertUpdateDelete validates all insert, update and @@ -57,7 +57,7 @@ func TestInsertUpdateDelete(t *testing.T) { dbo := Connect(t) defer dbo.Close() // prepare insert statement - insertStmt := `insert into ` + tableName + ` values( ?, ?, ?, ?, ?, ?, ?, ?, + insertStmt := `insert into vt_prepare_stmt_test values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);` @@ -129,7 +129,7 @@ func testReplica(t *testing.T) { // testcount validates inserted rows count with expected count. func testcount(t *testing.T, dbo *sql.DB, except int) { - r, err := dbo.Query("SELECT count(1) FROM " + tableName) + r, err := dbo.Query("SELECT count(1) FROM vt_prepare_stmt_test") require.Nil(t, err) r.Next() @@ -145,7 +145,7 @@ func TestAutoIncColumns(t *testing.T) { dbo := Connect(t) defer dbo.Close() // insert a row without id - insertStmt := "INSERT INTO " + tableName + ` ( + insertStmt := "INSERT INTO vt_prepare_stmt_test" + ` ( msg,keyspace_id,tinyint_unsigned,bool_signed,smallint_unsigned, mediumint_unsigned,int_unsigned,float_unsigned,double_unsigned, decimal_unsigned,t_date,t_datetime,t_datetime_micros,t_time,t_timestamp,c8,c16,c24, @@ -180,7 +180,7 @@ func TestAutoIncColumns(t *testing.T) { // deleteRecord test deletion operation corresponds to the testingID. func deleteRecord(t *testing.T, dbo *sql.DB) { // delete the record with id 1 - exec(t, dbo, "DELETE FROM "+tableName+" WHERE id = ?;", testingID) + exec(t, dbo, "DELETE FROM vt_prepare_stmt_test WHERE id = ?;", testingID) data := selectWhere(t, dbo, "id = ?", testingID) assert.Equal(t, 0, len(data)) @@ -192,7 +192,7 @@ func updateRecord(t *testing.T, dbo *sql.DB) { // update the record with id 1 updateData := "new data value" updateTextCol := "new text col value" - updateQuery := "update " + tableName + " set data = ? , text_col = ? where id = ?;" + updateQuery := "update vt_prepare_stmt_test set data = ? , text_col = ? where id = ?;" exec(t, dbo, updateQuery, updateData, updateTextCol, testingID) @@ -226,7 +226,7 @@ func TestColumnParameter(t *testing.T) { id := 1000 parameter1 := "param1" message := "TestColumnParameter" - insertStmt := "INSERT INTO " + tableName + " (id, msg, keyspace_id) VALUES (?, ?, ?);" + insertStmt := "INSERT INTO vt_prepare_stmt_test (id, msg, keyspace_id) VALUES (?, ?, ?);" values := []any{ id, message, @@ -237,7 +237,7 @@ func TestColumnParameter(t *testing.T) { var param, msg string var recID int - selectStmt := "SELECT COALESCE(?, id), msg FROM " + tableName + " WHERE msg = ? LIMIT ?" + selectStmt := "SELECT COALESCE(?, id), msg FROM vt_prepare_stmt_test WHERE msg = ? LIMIT ?" results1, err := dbo.Query(selectStmt, parameter1, message, 1) require.Nil(t, err) @@ -313,10 +313,7 @@ func TestSelectDBA(t *testing.T) { dbo := Connect(t) defer dbo.Close() - _, err := dbo.Exec("use uks") - require.NoError(t, err) - - _, err = dbo.Exec("CREATE TABLE `a` (`one` int NOT NULL,`two` int NOT NULL,PRIMARY KEY(`one`, `two`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") + _, err := dbo.Exec("CREATE TABLE `a` (`one` int NOT NULL,`two` int NOT NULL,PRIMARY KEY(`one`, `two`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") require.NoError(t, err) prepare, err := dbo.Prepare(`SELECT @@ -332,10 +329,10 @@ func TestSelectDBA(t *testing.T) { extra extra, table_name table_name FROM information_schema.columns - WHERE table_schema = ? + WHERE table_schema = ? and table_name = ? ORDER BY ordinal_position`) require.NoError(t, err) - rows, err := prepare.Query("uks") + rows, err := prepare.Query("uks", "a") require.NoError(t, err) defer rows.Close() var rec columns @@ -441,6 +438,6 @@ func TestBinaryColumn(t *testing.T) { AND column_info.table_schema = ? -- Exclude views. AND table_info.table_type = 'BASE TABLE' - ORDER BY BINARY table_info.table_name`, keyspaceName, keyspaceName) + ORDER BY BINARY table_info.table_name`, uks, uks) require.NoError(t, err) } diff --git a/go/test/endtoend/preparestmt/uSchema.sql b/go/test/endtoend/preparestmt/uSchema.sql new file mode 100644 index 00000000000..5284cacded6 --- /dev/null +++ b/go/test/endtoend/preparestmt/uSchema.sql @@ -0,0 +1,56 @@ +create table vt_prepare_stmt_test +( + id bigint auto_increment, + msg varchar(64), + keyspace_id bigint(20) unsigned NOT NULL, + tinyint_unsigned TINYINT, + bool_signed BOOL, + smallint_unsigned SMALLINT, + mediumint_unsigned MEDIUMINT, + int_unsigned INT, + float_unsigned FLOAT(10, 2), + double_unsigned DOUBLE(16, 2), + decimal_unsigned DECIMAL, + t_date DATE, + t_datetime DATETIME, + t_datetime_micros DATETIME(6), + t_time TIME, + t_timestamp TIMESTAMP, + c8 bit(8) DEFAULT NULL, + c16 bit(16) DEFAULT NULL, + c24 bit(24) DEFAULT NULL, + c32 bit(32) DEFAULT NULL, + c40 bit(40) DEFAULT NULL, + c48 bit(48) DEFAULT NULL, + c56 bit(56) DEFAULT NULL, + c63 bit(63) DEFAULT NULL, + c64 bit(64) DEFAULT NULL, + json_col JSON, + text_col TEXT, + data longblob, + tinyint_min TINYINT, + tinyint_max TINYINT, + tinyint_pos TINYINT, + tinyint_neg TINYINT, + smallint_min SMALLINT, + smallint_max SMALLINT, + smallint_pos SMALLINT, + smallint_neg SMALLINT, + medint_min MEDIUMINT, + medint_max MEDIUMINT, + medint_pos MEDIUMINT, + medint_neg MEDIUMINT, + int_min INT, + int_max INT, + int_pos INT, + int_neg INT, + bigint_min BIGINT, + bigint_max BIGINT, + bigint_pos BIGINT, + bigint_neg BIGINT, + primary key (id) +) Engine = InnoDB; + +CREATE TABLE t1_seq ( id INT, next_id BIGINT, cache BIGINT, PRIMARY KEY(id)) comment 'vitess_sequence'; + +INSERT INTO t1_seq (id, next_id, cache) values (0, 1, 500000); \ No newline at end of file diff --git a/go/test/endtoend/preparestmt/uVschema.json b/go/test/endtoend/preparestmt/uVschema.json new file mode 100644 index 00000000000..1f4de22e018 --- /dev/null +++ b/go/test/endtoend/preparestmt/uVschema.json @@ -0,0 +1,8 @@ +{ + "sharded": false, + "tables": { + "sequence_test_seq": { + "type": "sequence" + } + } +} diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 0102b9b5e2d..6bfce05119a 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -368,29 +368,33 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa require.NoError(t, err, output) done = true state := "" - streams := gjson.Get(output, "workflows.0.shard_streams.*.streams") - streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream - info := stream.Map() - // We need to wait for all streams to have the desired state. - state = info["state"].String() - if state == wantState { - for i := 0; i < len(fieldEqualityChecks); i++ { - if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { - key := kvparts[0] - val := kvparts[1] - res := info[key].String() - if !strings.EqualFold(res, val) { - done = false + shardStreams := gjson.Get(output, "workflows.0.shard_streams") + // We need to wait for all streams in all shard streams to have the desired state. + shardStreams.ForEach(func(shardStreamId, shardStream gjson.Result) bool { + streams := shardStream.Get("*") + streams.ForEach(func(streamId, stream gjson.Result) bool { + info := stream.Map() + state = info["state"].String() + if state == wantState { + for i := 0; i < len(fieldEqualityChecks); i++ { + if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 { + key := kvparts[0] + val := kvparts[1] + res := info[key].String() + if !strings.EqualFold(res, val) { + done = false + } } } - } - if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && - (info["position"].Exists() && info["position"].String() == "") { + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && + (info["position"].Exists() && info["position"].String() == "") { + done = false + } + } else { done = false } - } else { - done = false - } + return true + }) return true }) if done { diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go index 419a2e843c3..c58e8e9bb45 100644 --- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go +++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go @@ -86,7 +86,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, "ON", primaryInstance.GTIDMode) assert.Equal(t, "FULL", primaryInstance.BinlogRowImage) assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID)) - assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.True(t, primaryInstance.SemiSyncPrimaryEnabled) assert.True(t, primaryInstance.SemiSyncReplicaEnabled) assert.True(t, primaryInstance.SemiSyncPrimaryStatus) @@ -138,7 +138,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, utils.Hostname, replicaInstance.SourceHost) assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort) assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID)) - assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.False(t, replicaInstance.SemiSyncPrimaryEnabled) assert.True(t, replicaInstance.SemiSyncReplicaEnabled) assert.False(t, replicaInstance.SemiSyncPrimaryStatus) @@ -156,11 +156,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.True(t, replicaInstance.ReplicationIOThreadRuning) assert.True(t, replicaInstance.ReplicationSQLThreadRuning) assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0)) assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0)) assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID)) - assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0)) assert.Empty(t, replicaInstance.LastIOError) assert.Empty(t, replicaInstance.LastSQLError) assert.EqualValues(t, 0, replicaInstance.SQLDelay) diff --git a/go/test/vschemawrapper/vschema_wrapper.go b/go/test/vschemawrapper/vschema_wrapper.go index 3f9f072afc6..27219c6a255 100644 --- a/go/test/vschemawrapper/vschema_wrapper.go +++ b/go/test/vschemawrapper/vschema_wrapper.go @@ -18,7 +18,9 @@ package vschemawrapper import ( "context" + "errors" "fmt" + "sort" "strings" "vitess.io/vitess/go/mysql/collations" @@ -275,17 +277,6 @@ func (vw *VSchemaWrapper) FindTableOrVindex(tab sqlparser.TableName) (*vindexes. return vw.Vcursor.FindTableOrVindex(tab) } -func (vw *VSchemaWrapper) getfirstKeyspace() (ks *vindexes.Keyspace) { - var f string - for name, schema := range vw.V.Keyspaces { - if f == "" || f > name { - f = name - ks = schema.Keyspace - } - } - return -} - func (vw *VSchemaWrapper) getActualKeyspace() string { if vw.Keyspace == nil { return "" @@ -301,15 +292,32 @@ func (vw *VSchemaWrapper) getActualKeyspace() string { } func (vw *VSchemaWrapper) SelectedKeyspace() (*vindexes.Keyspace, error) { - return vw.V.Keyspaces["main"].Keyspace, nil + return vw.AnyKeyspace() } func (vw *VSchemaWrapper) AnyKeyspace() (*vindexes.Keyspace, error) { - return vw.SelectedKeyspace() + ks, found := vw.V.Keyspaces["main"] + if found { + return ks.Keyspace, nil + } + + size := len(vw.V.Keyspaces) + if size == 0 { + return nil, errors.New("no keyspace found in vschema") + } + + // Find the first keyspace in the map alphabetically to get deterministic results + keys := make([]string, size) + for key := range vw.V.Keyspaces { + keys = append(keys, key) + } + sort.Strings(keys) + + return vw.V.Keyspaces[keys[0]].Keyspace, nil } func (vw *VSchemaWrapper) FirstSortedKeyspace() (*vindexes.Keyspace, error) { - return vw.V.Keyspaces["main"].Keyspace, nil + return vw.AnyKeyspace() } func (vw *VSchemaWrapper) TargetString() string { @@ -344,12 +352,12 @@ func (vw *VSchemaWrapper) IsViewsEnabled() bool { // FindMirrorRule finds the mirror rule for the requested keyspace, table // name, and the tablet type in the VSchema. -func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) { +func (vw *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) { destKeyspace, destTabletType, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY) if err != nil { return nil, err } - mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType) + mirrorRule, err := vw.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType) if err != nil { return nil, err } diff --git a/go/timer/randticker_test.go b/go/timer/randticker_test.go index 59bcd1d89ea..7acdbf3156a 100644 --- a/go/timer/randticker_test.go +++ b/go/timer/randticker_test.go @@ -43,21 +43,3 @@ func TestTick(t *testing.T) { t.Error("Channel was not closed") } } - -func TestTickSkip(t *testing.T) { - tkr := NewRandTicker(10*time.Millisecond, 1*time.Millisecond) - time.Sleep(35 * time.Millisecond) - end := <-tkr.C - diff := time.Since(end) - if diff < 20*time.Millisecond { - t.Errorf("diff: %v, want >20ms", diff) - } - - // This tick should be up-to-date - end = <-tkr.C - diff = time.Since(end) - if diff > 1*time.Millisecond { - t.Errorf("diff: %v, want <1ms", diff) - } - tkr.Stop() -} diff --git a/go/vt/schemadiff/onlineddl_test.go b/go/vt/schemadiff/onlineddl_test.go index f5309b4f943..fb8710f8498 100644 --- a/go/vt/schemadiff/onlineddl_test.go +++ b/go/vt/schemadiff/onlineddl_test.go @@ -937,6 +937,11 @@ func TestRevertible(t *testing.T) { fromSchema: "id int, primary key (id), key idx1 ((id + 1))", toSchema: "id int, primary key (id), key idx2 ((id + 2))", }, + { + name: "remove unique index with expression, add another, skip both", + fromSchema: "id int, i int, primary key (id), unique key idx1 ((id + 1))", + toSchema: "id int, i int, primary key (id), unique key idx2 ((i + 2))", + }, } var ( diff --git a/go/vt/schemadiff/table_test.go b/go/vt/schemadiff/table_test.go index ac871dbd4af..c1d8d8c241e 100644 --- a/go/vt/schemadiff/table_test.go +++ b/go/vt/schemadiff/table_test.go @@ -2582,6 +2582,24 @@ func TestValidate(t *testing.T) { alter: "alter table t add key idx2 ((id + 2))", to: "create table t (id int, primary key (id), key idx1 ((id + 1)), key idx2 ((id + 2)))", }, + { + name: "key with multicolumn expression", + from: "create table t (id int, i int, primary key (id), key idx1 ((id + 1), (i + 2)))", + alter: "alter table t add key idx2 ((id + 2))", + to: "create table t (id int, i int, primary key (id), key idx1 ((id + 1), (i + 2)), key idx2 ((id + 2)))", + }, + { + name: "key with expression and unknown columns", + from: "create table t (id int, i int, primary key (id), key idx1 ((id + 1), (i + 2)))", + alter: "alter table t add key idx2 ((i2 + 2))", + expectErr: &InvalidColumnInKeyError{Table: "t", Column: "i2", Key: "idx2"}, + }, + { + name: "drop column used in expression", + from: "create table t (id int, i int, primary key (id), key idx1 ((id + 1), (i + 2)))", + alter: "alter table t drop column i", + expectErr: &InvalidColumnInKeyError{Table: "t", Column: "i", Key: "idx1"}, + }, // partitions { name: "drop column used by partitions", diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 9bc1d51d9ed..9e48ec58407 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -29,8 +29,18 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +// testStatsConnReadSem is a semaphore for unit tests. +// It intentionally has a concurrency limit of '1' to +// allow semaphore contention in tests. var testStatsConnReadSem = semaphore.NewWeighted(1) +// testStatsConnStatsReset resets StatsConn-based stats. +func testStatsConnStatsReset() { + topoStatsConnErrors.ResetAll() + topoStatsConnReadWaitTimings.Reset() + topoStatsConnTimings.Reset() +} + // The fakeConn is a wrapper for a Conn that emits stats for every operation type fakeConn struct { v Version @@ -185,238 +195,216 @@ func (st *fakeConn) IsReadOnly() bool { } // createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore. -func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) { +func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration, semAcquiredChan chan struct{}) { if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil { panic(err) } defer testStatsConnReadSem.Release(1) + semAcquiredChan <- struct{}{} time.Sleep(duration) } // TestStatsConnTopoListDir emits stats on ListDir func TestStatsConnTopoListDir(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() - go createTestReadSemaphoreContention(ctx, 100*time.Millisecond) + semAcquiredChan := make(chan struct{}) + go createTestReadSemaphoreContention(ctx, 100*time.Millisecond, semAcquiredChan) + <-semAcquiredChan statsConn.ListDir(ctx, "", true) - timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["ListDir.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) - waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"] - if got := waitTimingsCounts; got != 1 { - t.Errorf("stats were not properly recorded: got = %d, want = 1", got) - } + require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["ListDir.global"]) + require.NotZero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["ListDir.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["ListDir.global"]) statsConn.ListDir(ctx, "error", true) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["ListDir.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["ListDir.global"]) } // TestStatsConnTopoCreate emits stats on Create func TestStatsConnTopoCreate(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Create(ctx, "", []byte{}) - timingCounts := topoStatsConnTimings.Counts()["Create.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Create.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Create.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Create.global"]) statsConn.Create(ctx, "error", []byte{}) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Create.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Create.global"]) } // TestStatsConnTopoUpdate emits stats on Update func TestStatsConnTopoUpdate(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Update(ctx, "", []byte{}, conn.v) - timingCounts := topoStatsConnTimings.Counts()["Update.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Update.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Update.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Update.global"]) statsConn.Update(ctx, "error", []byte{}, conn.v) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Update.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Update.global"]) } // TestStatsConnTopoGet emits stats on Get func TestStatsConnTopoGet(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() - go createTestReadSemaphoreContention(ctx, time.Millisecond*100) + semAcquiredChan := make(chan struct{}) + go createTestReadSemaphoreContention(ctx, time.Millisecond*100, semAcquiredChan) + <-semAcquiredChan statsConn.Get(ctx, "") - timingCounts := topoStatsConnTimings.Counts()["Get.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Get.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) - waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"] - if got := waitTimingsCounts; got != 1 { - t.Errorf("stats were not properly recorded: got = %d, want = 1", got) - } + require.Equal(t, int64(1), topoStatsConnReadWaitTimings.Counts()["Get.global"]) + require.NotZero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Get.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Get.global"]) statsConn.Get(ctx, "error") // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Get.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Get.global"]) } // TestStatsConnTopoDelete emits stats on Delete func TestStatsConnTopoDelete(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Delete(ctx, "", conn.v) - timingCounts := topoStatsConnTimings.Counts()["Delete.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Delete.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["Delete.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["Delete.global"]) statsConn.Delete(ctx, "error", conn.v) // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["Delete.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Delete.global"]) } // TestStatsConnTopoLock emits stats on Lock func TestStatsConnTopoLock(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Lock(ctx, "", "") - timingCounts := topoStatsConnTimings.Counts()["Lock.global"] - require.Equal(t, timingCounts, int64(1)) + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Lock.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) statsConn.LockWithTTL(ctx, "", "", time.Second) - timingCounts = topoStatsConnTimings.Counts()["LockWithTTL.global"] - require.Equal(t, timingCounts, int64(1)) + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockWithTTL.global"]) statsConn.LockName(ctx, "", "") - timingCounts = topoStatsConnTimings.Counts()["LockName.global"] - require.Equal(t, timingCounts, int64(1)) + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["LockName.global"]) // Error is zero before getting an error. - errorCount := topoStatsConnErrors.Counts()["Lock.global"] - require.Equal(t, errorCount, int64(0)) + require.Zero(t, topoStatsConnErrors.Counts()["Lock.global"]) statsConn.Lock(ctx, "error", "") // Error stats gets emitted. - errorCount = topoStatsConnErrors.Counts()["Lock.global"] - require.Equal(t, errorCount, int64(1)) + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["Lock.global"]) } // TestStatsConnTopoWatch emits stats on Watch func TestStatsConnTopoWatch(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) ctx := context.Background() statsConn.Watch(ctx, "") - timingCounts := topoStatsConnTimings.Counts()["Watch.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } - + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Watch.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) } // TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation func TestStatsConnTopoNewLeaderParticipation(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) _, _ = statsConn.NewLeaderParticipation("", "") - timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["NewLeaderParticipation.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) // error is zero before getting an error - errorCount := topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] - if got, want := errorCount, int64(0); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Zero(t, topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]) _, _ = statsConn.NewLeaderParticipation("error", "") // error stats gets emitted - errorCount = topoStatsConnErrors.Counts()["NewLeaderParticipation.global"] - if got, want := errorCount, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnErrors.Counts()["NewLeaderParticipation.global"]) } // TestStatsConnTopoClose emits stats on Close func TestStatsConnTopoClose(t *testing.T) { + testStatsConnStatsReset() + defer testStatsConnStatsReset() + conn := &fakeConn{} statsConn := NewStatsConn("global", conn, testStatsConnReadSem) statsConn.Close() - timingCounts := topoStatsConnTimings.Counts()["Close.global"] - if got, want := timingCounts, int64(1); got != want { - t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) - } + require.Equal(t, int64(1), topoStatsConnTimings.Counts()["Close.global"]) + require.NotZero(t, topoStatsConnTimings.Time()) + require.Zero(t, topoStatsConnReadWaitTimings.Time()) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8123416eb41..f27851275b6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "math" "slices" "sort" "strings" @@ -28,7 +27,6 @@ import ( "text/template" "time" - "github.com/google/uuid" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc/codes" @@ -41,7 +39,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -1249,287 +1246,6 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea }) } -// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. -// It passes on the request to the target primary tablets that are -// participating in the given workflow and VDiff. -func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("source_cells", req.SourceCells) - span.Annotate("target_cells", req.TargetCells) - span.Annotate("tablet_types", req.TabletTypes) - span.Annotate("tables", req.Tables) - span.Annotate("auto_retry", req.AutoRetry) - span.Annotate("max_diff_duration", req.MaxDiffDuration) - if req.AutoStart != nil { - span.Annotate("auto_start", req.GetAutoStart()) - } - - var err error - req.Uuid = strings.TrimSpace(req.Uuid) - if req.Uuid == "" { // Generate a UUID - req.Uuid = uuid.New().String() - } else { // Validate UUID if provided - if err = uuid.Validate(req.Uuid); err != nil { - return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) - } - } - - tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) - - if req.Limit == 0 { // This would produce no useful results - req.Limit = math.MaxInt64 - } - // This is a pointer so there's no ZeroValue in the message - // and an older v18 client will not provide it. - if req.MaxDiffDuration == nil { - req.MaxDiffDuration = &vttimepb.Duration{} - } - // The other vttime.Duration vars should not be nil as the - // client should always provide them, but we check anyway to - // be safe. - if req.FilteredReplicationWaitTime == nil { - // A value of 0 is not valid as the vdiff will never succeed. - req.FilteredReplicationWaitTime = &vttimepb.Duration{ - Seconds: int64(DefaultTimeout.Seconds()), - } - } - if req.WaitUpdateInterval == nil { - req.WaitUpdateInterval = &vttimepb.Duration{} - } - - autoStart := true - if req.AutoStart != nil { - autoStart = req.GetAutoStart() - } - - options := &tabletmanagerdatapb.VDiffOptions{ - PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ - TabletTypes: tabletTypesStr, - SourceCell: strings.Join(req.SourceCells, ","), - TargetCell: strings.Join(req.TargetCells, ","), - }, - CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ - Tables: strings.Join(req.Tables, ","), - AutoRetry: req.AutoRetry, - MaxRows: req.Limit, - TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, - MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, - UpdateTableStats: req.UpdateTableStats, - MaxDiffSeconds: req.MaxDiffDuration.Seconds, - AutoStart: &autoStart, - }, - ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ - OnlyPks: req.OnlyPKs, - DebugQuery: req.DebugQuery, - MaxSampleRows: req.MaxReportSampleRows, - RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, - }, - } - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.CreateAction), - Options: options, - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if ts.frozen { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", - req.TargetKeyspace, req.Workflow) - } - - workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { - s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff create action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffCreateResponse{ - UUID: req.Uuid, - }, nil -} - -// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.DeleteAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff delete action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffDeleteResponse{}, nil -} - -// VDiffResume is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ResumeAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff resume action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffResumeResponse{}, nil -} - -// VDiffShow is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ShowAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - output := &vdiffOutput{ - responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), - err: nil, - } - output.err = ts.ForAllTargets(func(target *MigrationTarget) error { - resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - output.mu.Lock() - defer output.mu.Unlock() - output.responses[target.GetShard().ShardName()] = resp - return err - }) - if output.err != nil { - s.Logger().Errorf("Error executing vdiff show action: %v", output.err) - return nil, output.err - } - return &vtctldatapb.VDiffShowResponse{ - TabletResponses: output.responses, - }, nil -} - -// VDiffStop is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.StopAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff stop action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffStopResponse{}, nil -} - // WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. // It passes on the request to the target primary tablets that are // participating in the given workflow. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 26d722f1de0..8bb4a06f23a 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/prototext" @@ -39,7 +38,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -191,257 +189,6 @@ func TestCheckReshardingJournalExistsOnTablet(t *testing.T) { } } -// TestVDiffCreate performs some basic tests of the VDiffCreate function -// to ensure that it behaves as expected given a specific request. -func TestVDiffCreate(t *testing.T) { - ctx := context.Background() - workflowName := "wf1" - sourceKeyspace := &testKeyspace{ - KeyspaceName: "source", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "target", - ShardNames: []string{"-80", "80-"}, - } - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - tests := []struct { - name string - req *vtctldatapb.VDiffCreateRequest - wantErr string - }{ - { - name: "no values", - req: &vtctldatapb.VDiffCreateRequest{}, - // We did not provide any keyspace or shard. - wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", - }, - { - name: "generated UUID", - req: &vtctldatapb.VDiffCreateRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflowName, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.wantErr == "" { - env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ - query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", - result: &querypb.QueryResult{}, - }) - } - got, err := env.ws.VDiffCreate(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - return - } - require.NoError(t, err) - require.NotNil(t, got) - // Ensure that we always use a valid UUID. - err = uuid.Validate(got.UUID) - require.NoError(t, err) - }) - } -} - -func TestVDiffResume(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.ResumeAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffResumeRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic resume", // Both target shards - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on first shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on invalid shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffResume(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} - -func TestVDiffStop(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.StopAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffStopRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic stop", // Both target shards - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on first shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on invalid shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffStop(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} - func TestMoveTablesComplete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() diff --git a/go/vt/vtctl/workflow/vdiff.go b/go/vt/vtctl/workflow/vdiff.go index 6be5fe3c3b5..30953868b0b 100644 --- a/go/vt/vtctl/workflow/vdiff.go +++ b/go/vt/vtctl/workflow/vdiff.go @@ -17,16 +17,26 @@ limitations under the License. package workflow import ( + "context" "encoding/json" "math" "sort" "strings" "time" + "github.com/google/uuid" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" ) // TableSummary aggregates the current state of the table diff from all shards. @@ -278,3 +288,284 @@ func BuildProgressReport(rowsCompared int64, rowsToCompare int64, startedAt stri } return report } + +// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. +// It passes on the request to the target primary tablets that are +// participating in the given workflow and VDiff. +func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("source_cells", req.SourceCells) + span.Annotate("target_cells", req.TargetCells) + span.Annotate("tablet_types", req.TabletTypes) + span.Annotate("tables", req.Tables) + span.Annotate("auto_retry", req.AutoRetry) + span.Annotate("max_diff_duration", req.MaxDiffDuration) + if req.AutoStart != nil { + span.Annotate("auto_start", req.GetAutoStart()) + } + + var err error + req.Uuid = strings.TrimSpace(req.Uuid) + if req.Uuid == "" { // Generate a UUID + req.Uuid = uuid.New().String() + } else { // Validate UUID if provided + if err = uuid.Validate(req.Uuid); err != nil { + return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) + } + } + + tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) + + if req.Limit == 0 { // This would produce no useful results + req.Limit = math.MaxInt64 + } + // This is a pointer so there's no ZeroValue in the message + // and an older v18 client will not provide it. + if req.MaxDiffDuration == nil { + req.MaxDiffDuration = &vttimepb.Duration{} + } + // The other vttime.Duration vars should not be nil as the + // client should always provide them, but we check anyway to + // be safe. + if req.FilteredReplicationWaitTime == nil { + // A value of 0 is not valid as the vdiff will never succeed. + req.FilteredReplicationWaitTime = &vttimepb.Duration{ + Seconds: int64(DefaultTimeout.Seconds()), + } + } + if req.WaitUpdateInterval == nil { + req.WaitUpdateInterval = &vttimepb.Duration{} + } + + autoStart := true + if req.AutoStart != nil { + autoStart = req.GetAutoStart() + } + + options := &tabletmanagerdatapb.VDiffOptions{ + PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ + TabletTypes: tabletTypesStr, + SourceCell: strings.Join(req.SourceCells, ","), + TargetCell: strings.Join(req.TargetCells, ","), + }, + CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ + Tables: strings.Join(req.Tables, ","), + AutoRetry: req.AutoRetry, + MaxRows: req.Limit, + TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, + MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, + UpdateTableStats: req.UpdateTableStats, + MaxDiffSeconds: req.MaxDiffDuration.Seconds, + AutoStart: &autoStart, + }, + ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ + OnlyPks: req.OnlyPKs, + DebugQuery: req.DebugQuery, + MaxSampleRows: req.MaxReportSampleRows, + RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, + }, + } + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.CreateAction), + Options: options, + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if ts.frozen { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", + req.TargetKeyspace, req.Workflow) + } + + workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { + s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff create action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffCreateResponse{ + UUID: req.Uuid, + }, nil +} + +// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.DeleteAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff delete action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffDeleteResponse{}, nil +} + +// VDiffResume is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ResumeAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff resume action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffResumeResponse{}, nil +} + +// VDiffShow is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ShowAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + output := &vdiffOutput{ + responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), + err: nil, + } + output.err = ts.ForAllTargets(func(target *MigrationTarget) error { + resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + output.mu.Lock() + defer output.mu.Unlock() + output.responses[target.GetShard().ShardName()] = resp + return err + }) + if output.err != nil { + s.Logger().Errorf("Error executing vdiff show action: %v", output.err) + return nil, output.err + } + return &vtctldatapb.VDiffShowResponse{ + TabletResponses: output.responses, + }, nil +} + +// VDiffStop is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.StopAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff stop action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffStopResponse{}, nil +} diff --git a/go/vt/vtctl/workflow/vdiff_test.go b/go/vt/vtctl/workflow/vdiff_test.go index e5578afc170..0da4a3ef480 100644 --- a/go/vt/vtctl/workflow/vdiff_test.go +++ b/go/vt/vtctl/workflow/vdiff_test.go @@ -17,13 +17,21 @@ limitations under the License. package workflow import ( + "context" + "fmt" "math" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) func TestBuildProgressReport(t *testing.T) { @@ -134,3 +142,350 @@ func TestBuildProgressReport(t *testing.T) { }) } } + +// TestVDiffCreate performs some basic tests of the VDiffCreate function +// to ensure that it behaves as expected given a specific request. +func TestVDiffCreate(t *testing.T) { + ctx := context.Background() + workflowName := "wf1" + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "target", + ShardNames: []string{"-80", "80-"}, + } + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + tests := []struct { + name string + req *vtctldatapb.VDiffCreateRequest + wantErr string + }{ + { + name: "no values", + req: &vtctldatapb.VDiffCreateRequest{}, + // We did not provide any keyspace or shard. + wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", + }, + { + name: "generated UUID", + req: &vtctldatapb.VDiffCreateRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflowName, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.wantErr == "" { + env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ + query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + result: &querypb.QueryResult{}, + }) + } + got, err := env.ws.VDiffCreate(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + // Ensure that we always use a valid UUID. + err = uuid.Validate(got.UUID) + require.NoError(t, err) + }) + } +} + +func TestVDiffResume(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.ResumeAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffResumeRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic resume", // Both target shards + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on first shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on invalid shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffResume(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffStop(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.StopAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffStopRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic stop", // Both target shards + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on first shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on invalid shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffStop(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffDelete(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.DeleteAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffDeleteRequest + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse + wantErr string + }{ + { + name: "basic delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + }, + { + name: "invalid delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + err: fmt.Errorf("error on invalid delete"), + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + wantErr: "error on invalid delete", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffDelete(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index fc91c28b021..7837955c541 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -311,7 +311,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias) a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{ LogFile: m.GetString("binary_log_file"), - LogPos: m.GetUint32("binary_log_pos"), + LogPos: m.GetUint64("binary_log_pos"), Type: BinaryLog, } isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates") diff --git a/go/vt/vtorc/inst/binlog.go b/go/vt/vtorc/inst/binlog.go index 9c115e4e457..b4abf34ac7e 100644 --- a/go/vt/vtorc/inst/binlog.go +++ b/go/vt/vtorc/inst/binlog.go @@ -40,22 +40,23 @@ const ( // BinlogCoordinates described binary log coordinates in the form of log file & log position. type BinlogCoordinates struct { LogFile string - LogPos uint32 + LogPos uint64 Type BinlogType } -// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345" +// into a BinlogCoordinates struct. func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { tokens := strings.SplitN(logFileLogPos, ":", 2) if len(tokens) != 2 { return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) } - logPos, err := strconv.ParseUint(tokens[1], 10, 32) + logPos, err := strconv.ParseUint(tokens[1], 10, 64) if err != nil { return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) } - return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil + return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil } // DisplayString returns a user-friendly string representation of these coordinates @@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta } detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1] logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32) - detachedCoordinates.LogPos = uint32(logPos) + detachedCoordinates.LogPos = logPos return true, detachedCoordinates } diff --git a/go/vt/vtorc/inst/binlog_test.go b/go/vt/vtorc/inst/binlog_test.go index bc0110e981c..1f73f3c0029 100644 --- a/go/vt/vtorc/inst/binlog_test.go +++ b/go/vt/vtorc/inst/binlog_test.go @@ -41,7 +41,7 @@ func TestPreviousFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, previous.LogFile, "mysql-bin.000009") - require.Equal(t, previous.LogPos, uint32(0)) + require.Equal(t, previous.LogPos, uint64(0)) } func TestNextFileCoordinates(t *testing.T) { @@ -49,7 +49,7 @@ func TestNextFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, next.LogFile, "mysql-bin.000011") - require.Equal(t, next.LogPos, uint32(0)) + require.Equal(t, next.LogPos, uint64(0)) } func TestBinlogCoordinates(t *testing.T) { diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 66aef7c8a78..9198514d6ed 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -548,14 +548,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.GtidPurged = m.GetString("gtid_purged") instance.GtidErrant = m.GetString("gtid_errant") instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file") - instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos") + instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos") instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file") - instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos") + instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos") instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file") - instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos") + instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos") instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file") - instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos") + instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos") instance.RelaylogCoordinates.Type = RelayLog instance.LastSQLError = m.GetString("last_sql_error") instance.LastIOError = m.GetString("last_io_error") diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index f8b5cfd9b8d..ce81a2dd516 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -968,7 +968,9 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer preparationConnRestoreLockWaitTimeout() if needsShadowTableAnalysis { - // Run `ANALYZE TABLE` on the vreplication table so that it has up-to-date statistics at cut-over + // Run `ANALYZE TABLE` on the vreplication table so that it has up-to-date statistics at cut-over. + // The statement will be replicated, so that in case there's a PRS/ERS shortly after cut-over, the + // promoted replica will have good statistics. parsed := sqlparser.BuildParsedQuery(sqlAnalyzeTable, vreplTable) if _, err := preparationsConn.Conn.Exec(ctx, parsed.Query, -1, false); err != nil { // Best effort only. Do not fail the mgiration if this fails. diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 943a3b1df07..6ba0217519c 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -499,7 +499,8 @@ const ( sqlDropTable = "DROP TABLE `%a`" sqlDropTableIfExists = "DROP TABLE IF EXISTS `%a`" sqlShowTableStatus = "SHOW TABLE STATUS LIKE '%a'" - sqlAnalyzeTable = "ANALYZE NO_WRITE_TO_BINLOG TABLE `%a`" + sqlAnalyzeTableLocal = "ANALYZE NO_WRITE_TO_BINLOG TABLE `%a`" + sqlAnalyzeTable = "ANALYZE TABLE `%a`" sqlShowCreateTable = "SHOW CREATE TABLE `%a`" sqlShowVariablesLikePreserveForeignKey = "show global variables like 'rename_table_preserve_foreign_key'" sqlShowVariablesLikeFastAnalyzeTable = "show global variables like 'fast_analyze_table'" diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 2761c27c801..b00bcc40894 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -208,7 +208,7 @@ func (v *VRepl) executeAnalyzeTable(ctx context.Context, conn *dbconnpool.DBConn defer conn.ExecuteFetch(sqlDisableFastAnalyzeTable, 1, false) } - parsed := sqlparser.BuildParsedQuery(sqlAnalyzeTable, tableName) + parsed := sqlparser.BuildParsedQuery(sqlAnalyzeTableLocal, tableName) if _, err := conn.ExecuteFetch(parsed.Query, 1, false); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index 14fcc6d0f2e..141d8257062 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -69,6 +69,7 @@ func NewPool(env tabletenv.Env, name string, cfg tabletenv.ConnPoolConfig) *Pool config := smartconnpool.Config[*Conn]{ Capacity: int64(cfg.Size), IdleTimeout: cfg.IdleTimeout, + MaxIdleCount: int64(cfg.MaxIdleCount), MaxLifetime: cfg.MaxLifetime, RefreshInterval: mysqlctl.PoolDynamicHostnameResolution, } diff --git a/go/vt/vttablet/tabletserver/connpool/pool_test.go b/go/vt/vttablet/tabletserver/connpool/pool_test.go index 8cf27cbb327..c305c61b7b4 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool_test.go +++ b/go/vt/vttablet/tabletserver/connpool/pool_test.go @@ -55,10 +55,10 @@ func TestConnPoolTimeout(t *testing.T) { defer db.Close() cfg := tabletenv.ConnPoolConfig{ - Size: 1, + Size: 1, + Timeout: time.Second, + IdleTimeout: 10 * time.Second, } - cfg.Timeout = time.Second - cfg.IdleTimeout = 10 * time.Second connPool := NewPool(tabletenv.NewEnv(vtenv.NewTestEnv(), nil, "PoolTest"), "TestPool", cfg) params := dbconfigs.New(db.ConnParams()) connPool.Open(params, params, params) @@ -135,6 +135,58 @@ func TestConnPoolSetCapacity(t *testing.T) { } } +// TestConnPoolMaxIdleCount tests the max idle count for the pool. +// The pool should close the idle connections if the idle count is more than the allowed idle count. +// Changing the pool capacity will affect the idle count allowed for that pool. +func TestConnPoolMaxIdleCount(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + + cfg := tabletenv.ConnPoolConfig{ + Size: 5, + MaxIdleCount: 2, + } + connPool := NewPool(tabletenv.NewEnv(vtenv.NewTestEnv(), nil, "PoolTest"), "TestPool", cfg) + params := dbconfigs.New(db.ConnParams()) + connPool.Open(params, params, params) + defer connPool.Close() + + assert.EqualValues(t, 5, connPool.Capacity(), "pool capacity should be 5") + assert.EqualValues(t, 2, connPool.IdleCount(), "pool idle count should be 2") + + var conns []*PooledConn + for i := 0; i < 3; i++ { + conn, err := connPool.Get(context.Background(), nil) + require.NoError(t, err) + conns = append(conns, conn) + } + + // after recycle - 1 idle connection + conns[0].Recycle() + assert.Zero(t, connPool.Metrics.IdleClosed(), "pool idle closed should be 0") + + // after recycle - 2 idle connection + conns[1].Recycle() + assert.Zero(t, connPool.Metrics.IdleClosed(), "pool idle closed should be 0") + + // after recycle - 3 idle connection, 1 will be closed + conns[2].Recycle() + assert.EqualValues(t, 1, connPool.Metrics.IdleClosed(), "pool idle closed should be 1") + + // changing the pool capacity will affect the idle count allowed for that pool. + // If setting the capacity to lower value than max idle count. + + err := connPool.SetCapacity(context.Background(), 4) + require.NoError(t, err) + assert.EqualValues(t, 4, connPool.Capacity(), "pool capacity should be 4") + assert.EqualValues(t, 2, connPool.IdleCount(), "pool idle count should be 2") + + err = connPool.SetCapacity(context.Background(), 1) + require.NoError(t, err) + assert.EqualValues(t, 1, connPool.Capacity(), "pool capacity should be 1") + assert.EqualValues(t, 1, connPool.IdleCount(), "pool idle count should be changed to 1") +} + func TestConnPoolStatJSON(t *testing.T) { db := fakesqldb.New(t) defer db.Close() diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 994999f2368..42cc300f92d 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -143,6 +143,9 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.DurationVar(¤tConfig.OltpReadPool.Timeout, "queryserver-config-query-pool-timeout", defaultConfig.OltpReadPool.Timeout, "query server query pool timeout, it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead.") fs.DurationVar(¤tConfig.OlapReadPool.Timeout, "queryserver-config-stream-pool-timeout", defaultConfig.OlapReadPool.Timeout, "query server stream pool timeout, it is how long vttablet waits for a connection from the stream pool. If set to 0 (default) then there is no timeout.") fs.DurationVar(¤tConfig.TxPool.Timeout, "queryserver-config-txpool-timeout", defaultConfig.TxPool.Timeout, "query server transaction pool timeout, it is how long vttablet waits if tx pool is full") + fs.IntVar(¤tConfig.OltpReadPool.MaxIdleCount, "queryserver-config-query-pool-max-idle-count", defaultConfig.OltpReadPool.MaxIdleCount, "query server query pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods.") + fs.IntVar(¤tConfig.OlapReadPool.MaxIdleCount, "queryserver-config-stream-pool-max-idle-count", defaultConfig.OlapReadPool.MaxIdleCount, "query server stream pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods.") + fs.IntVar(¤tConfig.TxPool.MaxIdleCount, "queryserver-config-txpool-max-idle-count", defaultConfig.TxPool.MaxIdleCount, "query server transaction pool - maximum number of idle connections to retain in the pool. Use this to balance between faster response times during traffic bursts and resource efficiency during low-traffic periods.") fs.DurationVar(¤tConfig.OltpReadPool.IdleTimeout, "queryserver-config-idle-timeout", defaultConfig.OltpReadPool.IdleTimeout, "query server idle timeout, vttablet manages various mysql connection pools. This config means if a connection has not been used in given idle timeout, this connection will be removed from pool. This effectively manages number of connection objects and optimize the pool performance.") fs.DurationVar(¤tConfig.OltpReadPool.MaxLifetime, "queryserver-config-pool-conn-max-lifetime", defaultConfig.OltpReadPool.MaxLifetime, "query server connection max lifetime, vttablet manages various mysql connection pools. This config means if a connection has lived at least this long, it connection will be removed from pool upon the next time it is returned to the pool.") @@ -424,6 +427,7 @@ type ConnPoolConfig struct { Size int `json:"size,omitempty"` Timeout time.Duration `json:"timeoutSeconds,omitempty"` IdleTimeout time.Duration `json:"idleTimeoutSeconds,omitempty"` + MaxIdleCount int `json:"maxIdleCount,omitempty"` MaxLifetime time.Duration `json:"maxLifetimeSeconds,omitempty"` PrefillParallelism int `json:"prefillParallelism,omitempty"` } @@ -433,9 +437,10 @@ func (cfg *ConnPoolConfig) MarshalJSON() ([]byte, error) { tmp := struct { Proxy - Timeout string `json:"timeoutSeconds,omitempty"` - IdleTimeout string `json:"idleTimeoutSeconds,omitempty"` - MaxLifetime string `json:"maxLifetimeSeconds,omitempty"` + Timeout string `json:"timeoutSeconds,omitempty"` + IdleTimeout string `json:"idleTimeoutSeconds,omitempty"` + MaxIdleCount int `json:"maxIdleCount,omitempty"` + MaxLifetime string `json:"maxLifetimeSeconds,omitempty"` }{ Proxy: Proxy(*cfg), } @@ -460,6 +465,7 @@ func (cfg *ConnPoolConfig) UnmarshalJSON(data []byte) (err error) { Size int `json:"size,omitempty"` Timeout string `json:"timeoutSeconds,omitempty"` IdleTimeout string `json:"idleTimeoutSeconds,omitempty"` + MaxIdleCount int `json:"maxIdleCount,omitempty"` MaxLifetime string `json:"maxLifetimeSeconds,omitempty"` PrefillParallelism int `json:"prefillParallelism,omitempty"` } @@ -490,6 +496,7 @@ func (cfg *ConnPoolConfig) UnmarshalJSON(data []byte) (err error) { } cfg.Size = tmp.Size + cfg.MaxIdleCount = tmp.MaxIdleCount cfg.PrefillParallelism = tmp.PrefillParallelism return nil diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index d16b6276964..9ae653bafb9 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -28,13 +28,12 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/mysqlctl" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/yaml2" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) func TestConfigParse(t *testing.T) { @@ -49,10 +48,11 @@ func TestConfigParse(t *testing.T) { }, }, OltpReadPool: ConnPoolConfig{ - Size: 16, - Timeout: 10 * time.Second, - IdleTimeout: 20 * time.Second, - MaxLifetime: 50 * time.Second, + Size: 16, + Timeout: 10 * time.Second, + IdleTimeout: 20 * time.Second, + MaxLifetime: 50 * time.Second, + MaxIdleCount: 8, }, RowStreamer: RowStreamerConfig{ MaxInnoDBTrxHistLen: 1000, @@ -113,6 +113,7 @@ txPool: {} oltpReadPool: size: 16 idleTimeoutSeconds: 20s + maxIdleCount: 8 maxLifetimeSeconds: 50s `) gotCfg := cfg diff --git a/go/vt/vttest/vtprocess.go b/go/vt/vttest/vtprocess.go index 6371811a60e..1719fafd8a7 100644 --- a/go/vt/vttest/vtprocess.go +++ b/go/vt/vttest/vtprocess.go @@ -205,7 +205,7 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) ( if args.VtComboBindAddress != "" { vtcomboBindAddress = args.VtComboBindAddress } - grpcBindAddress := "127.0.0.1" + grpcBindAddress := "" if servenv.GRPCBindAddress() != "" { grpcBindAddress = servenv.GRPCBindAddress() } diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 984ff93095e..3251c7c8e3d 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -133,7 +133,8 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64 + require.NoError(t, err) goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica1RelayLogPos, } @@ -182,7 +183,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run