diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index c45092b811a..e36c35924b1 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -107,6 +107,7 @@ Flags: --topo_global_root string the path of the global topology data in the global topology server --topo_global_server_address string the address of the global topology server --topo_implementation string the topology implementation to use + --topo_read_concurrency int Concurrency of topo reads. (default 32) --topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass --topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s) --topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64) diff --git a/go/vt/external/golib/sqlutils/dialect.go b/go/vt/external/golib/sqlutils/dialect.go deleted file mode 100644 index 8dabe57ccaf..00000000000 --- a/go/vt/external/golib/sqlutils/dialect.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "regexp" - "strings" -) - -type regexpMap struct { - r *regexp.Regexp - replacement string -} - -func (this *regexpMap) process(text string) (result string) { - return this.r.ReplaceAllString(text, this.replacement) -} - -func rmap(regexpExpression string, replacement string) regexpMap { - return regexpMap{ - r: regexp.MustCompile(regexpSpaces(regexpExpression)), - replacement: replacement, - } -} - -func regexpSpaces(statement string) string { - return strings.Replace(statement, " ", `[\s]+`, -1) -} - -func applyConversions(statement string, conversions []regexpMap) string { - for _, rmap := range conversions { - statement = rmap.process(statement) - } - return statement -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect.go b/go/vt/external/golib/sqlutils/sqlite_dialect.go deleted file mode 100644 index c76f920d14a..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect.go +++ /dev/null @@ -1,161 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -// What's this about? -// This is a brute-force regular-expression based conversion from MySQL syntax to sqlite3 syntax. -// It is NOT meant to be a general purpose solution and is only expected & confirmed to run on -// queries issued by orchestrator. There are known limitations to this design. -// It's not even pretty. -// In fact... -// Well, it gets the job done at this time. Call it debt. - -package sqlutils - -import ( - "regexp" -) - -var sqlite3CreateTableConversions = []regexpMap{ - rmap(`(?i) (character set|charset) [\S]+`, ``), - rmap(`(?i)int unsigned`, `int`), - rmap(`(?i)int[\s]*[(][\s]*([0-9]+)[\s]*[)] unsigned`, `int`), - rmap(`(?i)engine[\s]*=[\s]*(innodb|myisam|ndb|memory|tokudb)`, ``), - rmap(`(?i)DEFAULT CHARSET[\s]*=[\s]*[\S]+`, ``), - rmap(`(?i)[\S]*int( not null|) auto_increment`, `integer`), - rmap(`(?i)comment '[^']*'`, ``), - rmap(`(?i)after [\S]+`, ``), - rmap(`(?i)alter table ([\S]+) add (index|key) ([\S]+) (.+)`, `create index ${3}_${1} on $1 $4`), - rmap(`(?i)alter table ([\S]+) add unique (index|key) ([\S]+) (.+)`, `create unique index ${3}_${1} on $1 $4`), - rmap(`(?i)([\S]+) enum[\s]*([(].*?[)])`, `$1 text check($1 in $2)`), - rmap(`(?i)([\s\S]+[/][*] sqlite3-skip [*][/][\s\S]+)`, ``), - rmap(`(?i)timestamp default current_timestamp`, `timestamp default ('')`), - rmap(`(?i)timestamp not null default current_timestamp`, `timestamp not null default ('')`), - - rmap(`(?i)add column (.*int) not null[\s]*$`, `add column $1 not null default 0`), - rmap(`(?i)add column (.* text) not null[\s]*$`, `add column $1 not null default ''`), - rmap(`(?i)add column (.* varchar.*) not null[\s]*$`, `add column $1 not null default ''`), -} - -var sqlite3InsertConversions = []regexpMap{ - rmap(`(?i)insert ignore ([\s\S]+) on duplicate key update [\s\S]+`, `insert or ignore $1`), - rmap(`(?i)insert ignore`, `insert or ignore`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)insert into ([\s\S]+) on duplicate key update [\s\S]+`, `replace into $1`), -} - -var sqlite3GeneralConversions = []regexpMap{ - rmap(`(?i)now[(][)][\s]*[-][\s]*interval [?] ([\w]+)`, `datetime('now', printf('-%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval [?] ([\w]+)`, `datetime('now', printf('+%d $1', ?))`), - rmap(`(?i)now[(][)][\s]*[-][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '-${1} $2')`), - rmap(`(?i)now[(][)][\s]*[+][\s]*interval ([0-9.]+) ([\w]+)`, `datetime('now', '+${1} $2')`), - - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[-][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('-%d $2', ?))`), - rmap(`(?i)[=<>\s]([\S]+[.][\S]+)[\s]*[+][\s]*interval [?] ([\w]+)`, ` datetime($1, printf('+%d $2', ?))`), - - rmap(`(?i)unix_timestamp[(][)]`, `strftime('%s', 'now')`), - rmap(`(?i)unix_timestamp[(]([^)]+)[)]`, `strftime('%s', $1)`), - rmap(`(?i)now[(][)]`, `datetime('now')`), - rmap(`(?i)cast[(][\s]*([\S]+) as signed[\s]*[)]`, `cast($1 as integer)`), - - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2)`), - rmap(`(?i)\bconcat[(][\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*,[\s]*([^,)]+)[\s]*[)]`, `($1 || $2 || $3)`), - - rmap(`(?i) rlike `, ` like `), -} - -var sqlite3CreateIndexConversions = []regexpMap{ - rmap(`(?i)create index([\s\S]+)[(][\s]*[0-9]+[\s]*[)]([\s\S]+)`, `create index ${1}${2}`), -} - -var sqlite3DropIndexConversions = []regexpMap{ - rmap(`(?i)drop index ([\S]+) on ([\S]+)`, `drop index if exists $1`), -} - -var ( - sqlite3IdentifyCreateTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create table`)) - sqlite3IdentifyCreateIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*create( unique|) index`)) - sqlite3IdentifyDropIndexStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*drop index`)) - sqlite3IdentifyAlterTableStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*alter table`)) - sqlite3IdentifyInsertStatement = regexp.MustCompile(regexpSpaces(`(?i)^[\s]*(insert|replace)`)) -) - -func IsInsert(statement string) bool { - return sqlite3IdentifyInsertStatement.MatchString(statement) -} - -func IsCreateTable(statement string) bool { - return sqlite3IdentifyCreateTableStatement.MatchString(statement) -} - -func IsCreateIndex(statement string) bool { - return sqlite3IdentifyCreateIndexStatement.MatchString(statement) -} - -func IsDropIndex(statement string) bool { - return sqlite3IdentifyDropIndexStatement.MatchString(statement) -} - -func IsAlterTable(statement string) bool { - return sqlite3IdentifyAlterTableStatement.MatchString(statement) -} - -func ToSqlite3CreateTable(statement string) string { - return applyConversions(statement, sqlite3CreateTableConversions) -} - -func ToSqlite3CreateIndex(statement string) string { - return applyConversions(statement, sqlite3CreateIndexConversions) -} - -func ToSqlite3DropIndex(statement string) string { - return applyConversions(statement, sqlite3DropIndexConversions) -} - -func ToSqlite3Insert(statement string) string { - statement = applyConversions(statement, sqlite3GeneralConversions) - return applyConversions(statement, sqlite3InsertConversions) -} - -// ToSqlite3Dialect converts a statement to sqlite3 dialect. The statement -// is checked in this order: -// 1. If a query, return the statement with sqlite3GeneralConversions applied. -// 2. If an insert/replace, convert with ToSqlite3Insert. -// 3. If a create index, convert with IsCreateIndex. -// 4. If an drop table, convert with IsDropIndex. -// 5. If a create table, convert with IsCreateTable. -// 6. If an alter table, convert with IsAlterTable. -// 7. As fallback, return the statement with sqlite3GeneralConversions applied. -func ToSqlite3Dialect(statement string, potentiallyDMLOrDDL bool) (translated string) { - if potentiallyDMLOrDDL { - switch { - case IsInsert(statement): - return ToSqlite3Insert(statement) - case IsCreateIndex(statement): - return ToSqlite3CreateIndex(statement) - case IsDropIndex(statement): - return ToSqlite3DropIndex(statement) - case IsCreateTable(statement): - return ToSqlite3CreateTable(statement) - case IsAlterTable(statement): - return ToSqlite3CreateTable(statement) - } - } - return applyConversions(statement, sqlite3GeneralConversions) -} diff --git a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go b/go/vt/external/golib/sqlutils/sqlite_dialect_test.go deleted file mode 100644 index cd6d1477d04..00000000000 --- a/go/vt/external/golib/sqlutils/sqlite_dialect_test.go +++ /dev/null @@ -1,352 +0,0 @@ -/* - Copyright 2017 GitHub Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -/* - This file has been copied over from VTOrc package -*/ - -package sqlutils - -import ( - "fmt" - "regexp" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var spacesRegexp = regexp.MustCompile(`[\s]+`) - -func init() { -} - -func stripSpaces(statement string) string { - statement = strings.TrimSpace(statement) - statement = spacesRegexp.ReplaceAllString(statement, " ") - return statement -} - -func TestIsCreateTable(t *testing.T) { - require.True(t, IsCreateTable("create table t(id int)")) - require.True(t, IsCreateTable(" create table t(id int)")) - require.True(t, IsCreateTable("CREATE TABLE t(id int)")) - require.True(t, IsCreateTable(` - create table t(id int) - `)) - require.False(t, IsCreateTable("where create table t(id int)")) - require.False(t, IsCreateTable("insert")) -} - -func TestToSqlite3CreateTable(t *testing.T) { - { - statement := "create table t(id int)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, statement) - } - { - statement := "create table t(id int, v varchar(123) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar(123) NOT NULL default '')") - } - { - statement := "create table t(id int, v varchar ( 123 ) CHARACTER SET ascii NOT NULL default '')" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(id int, v varchar ( 123 ) NOT NULL default '')") - } - { - statement := "create table t(i smallint unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint(5) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } - { - statement := "create table t(i smallint ( 5 ) unsigned)" - result := ToSqlite3CreateTable(statement) - require.Equal(t, result, "create table t(i smallint)") - } -} - -func TestToSqlite3AlterTable(t *testing.T) { - { - statement := ` - ALTER TABLE - database_instance - ADD COLUMN sql_delay INT UNSIGNED NOT NULL AFTER replica_lag_seconds - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - ALTER TABLE - database_instance - add column sql_delay int not null default 0 - `)) - } - { - statement := ` - ALTER TABLE - database_instance - ADD INDEX source_host_port_idx (source_host, source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } - { - statement := ` - ALTER TABLE - topology_recovery - ADD KEY last_detection_idx (last_detection_id) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - last_detection_idx_topology_recovery - on topology_recovery (last_detection_id) - `)) - } - -} - -func TestCreateIndex(t *testing.T) { - { - statement := ` - create index - source_host_port_idx_database_instance - on database_instance (source_host(128), source_port) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - create index - source_host_port_idx_database_instance - on database_instance (source_host, source_port) - `)) - } -} - -func TestIsInsert(t *testing.T) { - require.True(t, IsInsert("insert into t")) - require.True(t, IsInsert("insert ignore into t")) - require.True(t, IsInsert(` - insert ignore into t - `)) - require.False(t, IsInsert("where create table t(id int)")) - require.False(t, IsInsert("create table t(id int)")) - require.True(t, IsInsert(` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - `)) -} - -func TestToSqlite3Insert(t *testing.T) { - { - statement := ` - insert into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - on duplicate key update - domain_name=values(domain_name), - last_registered=values(last_registered) - ` - result := stripSpaces(ToSqlite3Dialect(statement, true)) - require.Equal(t, result, stripSpaces(` - replace into - cluster_domain_name (cluster_name, domain_name, last_registered) - values - (?, ?, datetime('now')) - `)) - } -} - -func TestToSqlite3GeneralConversions(t *testing.T) { - { - statement := "select now()" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now')") - } - { - statement := "select now() - interval ? second" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', printf('-%d second', ?))") - } - { - statement := "select now() + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', printf('+%d minute', ?))") - } - { - statement := "select now() + interval 5 minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime('now', '+5 minute')") - } - { - statement := "select some_table.some_column + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select datetime(some_table.some_column, printf('+%d minute', ?))") - } - { - statement := "AND primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? minute" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "AND primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d minute', ?))") - } - { - statement := "select concat(primary_instance.port, '') as port" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select (primary_instance.port || '') as port") - } - { - statement := "select concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select ('abc' || 'def') as s") - } - { - statement := "select concat( 'abc' , 'def', last.col) as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select ('abc' || 'def' || last.col) as s") - } - { - statement := "select concat(myself.only) as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select concat(myself.only) as s") - } - { - statement := "select concat(1, '2', 3, '4') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select concat(1, '2', 3, '4') as s") - } - { - statement := "select group_concat( 'abc' , 'def') as s" - result := ToSqlite3Dialect(statement, false) - require.Equal(t, result, "select group_concat( 'abc' , 'def') as s") - } -} - -func TestIsCreateIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"create index my_index on my_table(column);", true}, - {"CREATE INDEX my_index ON my_table(column);", true}, - {"create unique index my_index on my_table(column);", true}, - {"CREATE UNIQUE INDEX my_index ON my_table(column);", true}, - {"create index my_index on my_table(column) where condition;", true}, - {"create unique index my_index on my_table(column) where condition;", true}, - {"create table my_table(column);", false}, - {"drop index my_index on my_table;", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsCreateIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestIsDropIndex(t *testing.T) { - tests := []struct { - input string - expected bool - }{ - {"drop index my_index on my_table;", true}, - {"DROP INDEX my_index ON my_table;", true}, - {"drop index if exists my_index on my_table;", true}, - {"DROP INDEX IF EXISTS my_index ON my_table;", true}, - {"drop table my_table;", false}, - {"create index my_index on my_table(column);", false}, - {"alter table my_table add index my_index (column);", false}, - {"", false}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := IsDropIndex(test.input) - assert.Equal(t, test.expected, result) - }) - } -} - -func TestToSqlite3Dialect(t *testing.T) { - tests := []struct { - input string - expected string - }{ - {"create table my_table(id int);", "create table my_table(id int);"}, - {"alter table my_table add column new_col int;", "alter table my_table add column new_col int;"}, - {"insert into my_table values (1);", "insert into my_table values (1);"}, - {"", ""}, - } - - for _, test := range tests { - t.Run(test.input, func(t *testing.T) { - result := ToSqlite3Dialect(test.input, true) - assert.Equal(t, test.expected, result) - }) - } -} - -func buildToSqlite3Dialect_Insert(instances int) string { - var rows []string - for i := 0; i < instances; i++ { - rows = append(rows, `(?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())`) - } - - return fmt.Sprintf(`INSERT ignore INTO database_instance - (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, - version, major_version, version_comment, binlog_server, read_only, binlog_format, - binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, - replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - %s - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_net_timeout=VALUES(replica_net_timeout), heartbeat_interval=VALUES(heartbeat_interval), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) - `, strings.Join(rows, "\n\t\t\t\t")) -} - -func BenchmarkToSqlite3Dialect_Insert1000(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - statement := buildToSqlite3Dialect_Insert(1000) - b.StartTimer() - ToSqlite3Dialect(statement, true) - } -} - -func BenchmarkToSqlite3Dialect_Select(b *testing.B) { - for i := 0; i < b.N; i++ { - statement := "select now() - interval ? second" - ToSqlite3Dialect(statement, false) - } -} diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index dced769ca78..08b1c59f4cc 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -57,6 +57,7 @@ func init() { servenv.OnParseFor("vtcombo", registerFlags) servenv.OnParseFor("vtctld", registerFlags) servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vtorc", registerFlags) } // KeyspaceInfo is a meta struct that contains metadata to give the diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 7343db2c0ab..7a264356b95 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -27,20 +27,20 @@ import ( "sync" "time" - "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/event" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/events" "vitess.io/vitess/go/vt/topo/topoproto" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -644,6 +644,71 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac return result, err } +// GetTabletsByShard returns the tablets in the given shard using all cells. +// It can return ErrPartialResult if it couldn't read all the cells, or all +// the individual tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) { + return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil) +} + +// GetTabletsByShardCell returns the tablets in the given shard. It can return +// ErrPartialResult if it couldn't read all the cells, or all the individual +// tablets, in which case the result is valid, but partial. +func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) { + span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell") + span.Annotate("keyspace", keyspace) + span.Annotate("shard", shard) + span.Annotate("num_cells", len(cells)) + defer span.Finish() + ctx = trace.NewContext(ctx, span) + var err error + + if len(cells) == 0 { + cells, err = ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + if len(cells) == 0 { // Nothing to do + return nil, nil + } + } + + // divide the concurrency limit by the number of cells. if there are more + // cells than the limit, default to concurrency of 1. + cellConcurrency := 1 + if len(cells) < DefaultConcurrency { + cellConcurrency = DefaultConcurrency / len(cells) + } + + mu := sync.Mutex{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(DefaultConcurrency) + + tablets := make([]*TabletInfo, 0, len(cells)*3) + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, &GetTabletsByCellOptions{ + Concurrency: cellConcurrency, + Keyspace: keyspace, + Shard: shard, + }) + if err != nil { + return vterrors.Wrap(err, fmt.Sprintf("GetTabletsByCell for %v failed.", cell)) + } + mu.Lock() + tablets = append(tablets, t...) + mu.Unlock() + return nil + }) + } + if err := eg.Wait(); err != nil { + log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err) + return tablets, NewError(PartialResult, shard) + } + + return tablets, nil +} + // GetTabletMapForShard returns the tablets for a shard. It can return // ErrPartialResult if it couldn't read all the cells, or all // the individual tablets, in which case the map is valid, but partial. diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index a9e4c5962fa..62723043629 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -289,6 +289,10 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t type GetTabletsByCellOptions struct { // Concurrency controls the maximum number of concurrent calls to GetTablet. Concurrency int + // Keyspace is the optional keyspace tablets must match. + Keyspace string + // Shard is the optional shard tablets must match. + Shard string } // GetTabletsByCell returns all the tablets in the cell. @@ -316,13 +320,21 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G return nil, err } - tablets := make([]*TabletInfo, len(listResults)) + tablets := make([]*TabletInfo, 0) for n := range listResults { tablet := &topodatapb.Tablet{} if err := tablet.UnmarshalVT(listResults[n].Value); err != nil { return nil, err } - tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version} + if opt != nil && opt.Keyspace != "" { + if opt.Keyspace != tablet.Keyspace { + continue + } + if opt.Shard != "" && opt.Shard != tablet.Shard { + continue + } + } + tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version}) } return tablets, nil diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go index 3a0153a11b5..5359eac09c3 100644 --- a/go/vt/topo/tablet_test.go +++ b/go/vt/topo/tablet_test.go @@ -34,43 +34,63 @@ import ( // GetTabletsByCell first tries to get all the tablets using List. // If the response is too large, we will get an error, and fall back to one tablet at a time. func TestServerGetTabletsByCell(t *testing.T) { + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + tests := []struct { - name string - tablets int - opt *topo.GetTabletsByCellOptions - listError error + name string + createShardTablets int + opt *topo.GetTabletsByCellOptions + listError error + keyspaceShards map[string]string }{ { - name: "negative concurrency", - tablets: 1, + name: "negative concurrency", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Ensure this doesn't panic. opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, }, { - name: "single", - tablets: 1, + name: "single", + keyspaceShards: map[string]string{keyspace: shard}, + createShardTablets: 1, // Make sure the defaults apply as expected. opt: nil, }, { - name: "multiple", + name: "multiple", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, }, { - name: "multiple with list error", + name: "multiple with list error", + keyspaceShards: map[string]string{keyspace: shard}, // should work with more than 1 tablet when List returns an error - tablets: 32, - opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, - listError: topo.NewError(topo.ResourceExhausted, ""), + createShardTablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + { + name: "filtered by keyspace and shard", + keyspaceShards: map[string]string{ + keyspace: shard, + "filtered": "-", + }, + // should create 2 tablets in 2 different shards (4 total) + // but only a single shard is returned + createShardTablets: 2, + opt: &topo.GetTabletsByCellOptions{ + Concurrency: 1, + Keyspace: keyspace, + Shard: shard, + }, }, } - const cell = "zone1" - const keyspace = "keyspace" - const shard = "shard" - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -84,38 +104,49 @@ func TestServerGetTabletsByCell(t *testing.T) { // Create an ephemeral keyspace and generate shard records within // the keyspace to fetch later. - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablets := make([]*topo.TabletInfo, tt.tablets) - - for i := 0; i < tt.tablets; i++ { - tablet := &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: cell, - Uid: uint32(i), - }, - Hostname: "host1", - PortMap: map[string]int32{ - "vt": int32(i), - }, - Keyspace: keyspace, - Shard: shard, + for k, s := range tt.keyspaceShards { + require.NoError(t, ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, k, s)) + } + + tablets := make([]*topo.TabletInfo, tt.createShardTablets) + + var uid uint32 = 1 + for k, s := range tt.keyspaceShards { + for i := 0; i < tt.createShardTablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uid, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(uid), + }, + Keyspace: k, + Shard: s, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + uid++ } - tInfo := &topo.TabletInfo{Tablet: tablet} - tablets[i] = tInfo - require.NoError(t, ts.CreateTablet(ctx, tablet)) } // Verify that we return a complete list of tablets and that each // tablet matches what we expect. out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) require.NoError(t, err) - require.Len(t, out, tt.tablets) + require.Len(t, out, tt.createShardTablets) for i, tab := range tablets { require.Equal(t, tab.Tablet, tablets[i].Tablet) } + + for _, tablet := range out { + require.Equal(t, keyspace, tablet.Keyspace) + require.Equal(t, shard, tablet.Shard) + } }) } } diff --git a/go/vt/vtorc/db/db.go b/go/vt/vtorc/db/db.go index 097d3732797..688cc2039b8 100644 --- a/go/vt/vtorc/db/db.go +++ b/go/vt/vtorc/db/db.go @@ -18,7 +18,6 @@ package db import ( "database/sql" - "strings" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" @@ -68,21 +67,13 @@ func OpenVTOrc() (db *sql.DB, err error) { return db, err } -func translateStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement, true /* potentiallyDMLOrDDL */) -} - -func translateQueryStatement(statement string) string { - return sqlutils.ToSqlite3Dialect(statement, false /* potentiallyDMLOrDDL */) -} - // registerVTOrcDeployment updates the vtorc_db_deployments table upon successful deployment func registerVTOrcDeployment(db *sql.DB) error { query := ` replace into vtorc_db_deployments ( deployed_version, deployed_timestamp ) values ( - ?, NOW() + ?, datetime('now') ) ` if _, err := execInternal(db, query, ""); err != nil { @@ -97,25 +88,12 @@ func deployStatements(db *sql.DB, queries []string) error { tx, err := db.Begin() if err != nil { log.Fatal(err.Error()) + return err } for _, query := range queries { - query = translateStatement(query) if _, err := tx.Exec(query); err != nil { - if strings.Contains(err.Error(), "syntax error") { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !sqlutils.IsAlterTable(query) && !sqlutils.IsCreateIndex(query) && !sqlutils.IsDropIndex(query) { - log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) - return err - } - if !strings.Contains(err.Error(), "duplicate column name") && - !strings.Contains(err.Error(), "Duplicate column name") && - !strings.Contains(err.Error(), "check that column/key exists") && - !strings.Contains(err.Error(), "already exists") && - !strings.Contains(err.Error(), "Duplicate key name") { - log.Errorf("Error initiating vtorc: %+v; query=%+v", err, query) - } + log.Fatalf("Cannot initiate vtorc: %+v; query=%+v", err, query) + return err } } if err := tx.Commit(); err != nil { @@ -150,7 +128,6 @@ func initVTOrcDB(db *sql.DB) error { // execInternal func execInternal(db *sql.DB, query string, args ...any) (sql.Result, error) { var err error - query = translateStatement(query) res, err := sqlutils.ExecNoPrepare(db, query, args...) return res, err } @@ -166,7 +143,6 @@ func ExecVTOrc(query string, args ...any) (sql.Result, error) { // QueryVTOrcRowsMap func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { - query = translateQueryStatement(query) db, err := OpenVTOrc() if err != nil { return err @@ -177,7 +153,6 @@ func QueryVTOrcRowsMap(query string, onRow func(sqlutils.RowMap) error) error { // QueryVTOrc func QueryVTOrc(query string, argsArray []any, onRow func(sqlutils.RowMap) error) error { - query = translateQueryStatement(query) db, err := OpenVTOrc() if err != nil { return err diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index ecb47804306..1ef9c5e2aec 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -94,13 +94,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna IFNULL( primary_instance.binary_log_file = database_instance_stale_binlog_coordinates.binary_log_file AND primary_instance.binary_log_pos = database_instance_stale_binlog_coordinates.binary_log_pos - AND database_instance_stale_binlog_coordinates.first_seen < NOW() - interval ? second, + AND database_instance_stale_binlog_coordinates.first_seen < datetime('now', printf('-%d second', ?)), 0 ) ) AS is_stale_binlog_coordinates, MIN( primary_instance.last_checked <= primary_instance.last_seen - and primary_instance.last_attempted_check <= primary_instance.last_seen + interval ? second + and primary_instance.last_attempted_check <= datetime(primary_instance.last_seen, printf('+%d second', ?)) ) = 1 AS is_last_check_valid, /* To be considered a primary, traditional async replication must not be present/valid AND the host should either */ /* not be a replication group member OR be the primary of the replication group */ @@ -684,7 +684,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC sqlResult, err := db.ExecVTOrc(` update database_instance_last_analysis set analysis = ?, - analysis_timestamp = now() + analysis_timestamp = datetime('now') where alias = ? and analysis != ? @@ -709,10 +709,10 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC if !lastAnalysisChanged { // The insert only returns more than 1 row changed if this is the first insertion. sqlResult, err := db.ExecVTOrc(` - insert ignore into database_instance_last_analysis ( + insert or ignore into database_instance_last_analysis ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -738,7 +738,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC insert into database_instance_analysis_changelog ( alias, analysis_timestamp, analysis ) values ( - ?, now(), ? + ?, datetime('now'), ? ) `, tabletAlias, string(analysisCode), @@ -757,7 +757,7 @@ func ExpireInstanceAnalysisChangelog() error { delete from database_instance_analysis_changelog where - analysis_timestamp < now() - interval ? hour + analysis_timestamp < datetime('now', printf('-%d hour', ?)) `, config.UnseenInstanceForgetHours, ) diff --git a/go/vt/vtorc/inst/audit_dao.go b/go/vt/vtorc/inst/audit_dao.go index 47435be82a0..60ef08f24b7 100644 --- a/go/vt/vtorc/inst/audit_dao.go +++ b/go/vt/vtorc/inst/audit_dao.go @@ -60,7 +60,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error into audit ( audit_timestamp, audit_type, alias, keyspace, shard, message ) VALUES ( - NOW(), ?, ?, ?, ?, ? + datetime('now'), ?, ?, ?, ?, ? ) `, auditType, diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index d0a08f5afb7..273d4704b45 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -111,9 +111,11 @@ func ExecDBWriteFunc(f func() error) error { } func ExpireTableData(tableName string, timestampColumn string) error { - query := fmt.Sprintf("delete from %s where %s < NOW() - INTERVAL ? DAY", tableName, timestampColumn) writeFunc := func() error { - _, err := db.ExecVTOrc(query, config.Config.AuditPurgeDays) + _, err := db.ExecVTOrc( + fmt.Sprintf("delete from %s where %s < datetime('now', printf('-%%d DAY', ?))", tableName, timestampColumn), + config.Config.AuditPurgeDays, + ) return err } return ExecDBWriteFunc(writeFunc) @@ -574,9 +576,9 @@ func readInstancesByCondition(condition string, args []any, sort string) ([](*In query := fmt.Sprintf(` select *, - unix_timestamp() - unix_timestamp(last_checked) as seconds_since_last_checked, + strftime('%%s', 'now') - strftime('%%s', last_checked) as seconds_since_last_checked, ifnull(last_checked <= last_seen, 0) as is_last_check_valid, - unix_timestamp() - unix_timestamp(last_seen) as seconds_since_last_seen + strftime('%%s', 'now') - strftime('%%s', last_seen) as seconds_since_last_seen from vitess_tablet left join database_instance using (alias, hostname, port) @@ -657,11 +659,11 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) and ( (last_seen < last_checked) - or (unix_timestamp() - unix_timestamp(last_checked) > ?) + or (strftime('%%s', 'now') - strftime('%%s', last_checked) > ?) or (replication_sql_thread_state not in (-1 ,1)) or (replication_io_thread_state not in (-1 ,1)) - or (abs(cast(replication_lag_seconds as signed) - cast(sql_delay as signed)) > ?) - or (abs(cast(replica_lag_seconds as signed) - cast(sql_delay as signed)) > ?) + or (abs(cast(replication_lag_seconds as integer) - cast(sql_delay as integer)) > ?) + or (abs(cast(replica_lag_seconds as integer) - cast(sql_delay as integer)) > ?) or (gtid_errant != '') ) ` @@ -723,8 +725,8 @@ func ReadOutdatedInstanceKeys() ([]string, error) { WHERE CASE WHEN last_attempted_check <= last_checked - THEN last_checked < now() - interval ? second - ELSE last_checked < now() - interval ? second + THEN last_checked < datetime('now', printf('-%d second', ?)) + ELSE last_checked < datetime('now', printf('-%d second', ?)) END UNION SELECT @@ -753,7 +755,7 @@ func ReadOutdatedInstanceKeys() ([]string, error) { return res, err } -func mkInsertOdku(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { +func mkInsert(table string, columns []string, values []string, nrRows int, insertIgnore bool) (string, error) { if len(columns) == 0 { return "", errors.New("Column list cannot be empty") } @@ -765,9 +767,9 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } var q strings.Builder - var ignore string + insertStr := "REPLACE INTO" if insertIgnore { - ignore = "ignore" + insertStr = "INSERT OR IGNORE INTO" } valRow := fmt.Sprintf("(%s)", strings.Join(values, ", ")) var val strings.Builder @@ -778,26 +780,17 @@ func mkInsertOdku(table string, columns []string, values []string, nrRows int, i } col := strings.Join(columns, ", ") - var odku strings.Builder - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", columns[0], columns[0])) - for _, c := range columns[1:] { - odku.WriteString(", ") - odku.WriteString(fmt.Sprintf("%s=VALUES(%s)", c, c)) - } - - q.WriteString(fmt.Sprintf(`INSERT %s INTO %s + q.WriteString(fmt.Sprintf(`%s %s (%s) VALUES %s - ON DUPLICATE KEY UPDATE - %s `, - ignore, table, col, val.String(), odku.String())) + insertStr, table, col, val.String())) return q.String(), nil } -func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { +func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, updateLastSeen bool) (string, []any, error) { if len(instances) == 0 { return "", nil, nil } @@ -876,19 +869,19 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo for i := range columns { values[i] = "?" } - values[3] = "NOW()" // last_checked - values[4] = "NOW()" // last_attempted_check - values[5] = "1" // last_check_partial_success + values[3] = "datetime('now')" // last_checked + values[4] = "datetime('now')" // last_attempted_check + values[5] = "1" // last_check_partial_success if updateLastSeen { columns = append(columns, "last_seen") - values = append(values, "NOW()") + values = append(values, "datetime('now')") } var args []any for _, instance := range instances { // number of columns minus 2 as last_checked and last_attempted_check - // updated with NOW() + // updated with datetime('now') args = append(args, instance.InstanceAlias) args = append(args, instance.Hostname) args = append(args, instance.Port) @@ -951,7 +944,7 @@ func mkInsertOdkuForInstances(instances []*Instance, instanceWasActuallyFound bo args = append(args, instance.LastDiscoveryLatency.Nanoseconds()) } - sql, err := mkInsertOdku("database_instance", columns, values, len(instances), insertIgnore) + sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore) if err != nil { errMsg := fmt.Sprintf("Failed to build query: %v", err) log.Errorf(errMsg) @@ -973,7 +966,7 @@ func writeManyInstances(instances []*Instance, instanceWasActuallyFound bool, up if len(writeInstances) == 0 { return nil // nothing to write } - sql, args, err := mkInsertOdkuForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) + sql, args, err := mkInsertForInstances(writeInstances, instanceWasActuallyFound, updateLastSeen) if err != nil { return err } @@ -1000,7 +993,7 @@ func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { update database_instance set - last_checked = NOW(), + last_checked = datetime('now'), last_check_partial_success = ? where alias = ?`, @@ -1029,7 +1022,7 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias string) error { update database_instance set - last_attempted_check = NOW() + last_attempted_check = datetime('now') where alias = ?`, tabletAlias, @@ -1104,7 +1097,7 @@ func ForgetLongUnseenInstances() error { delete from database_instance where - last_seen < NOW() - interval ? hour`, + last_seen < datetime('now', printf('-%d hour', ?))`, config.UnseenInstanceForgetHours, ) if err != nil { @@ -1126,11 +1119,11 @@ func ForgetLongUnseenInstances() error { func SnapshotTopologies() error { writeFunc := func() error { _, err := db.ExecVTOrc(` - insert ignore into + insert or ignore into database_instance_topology_history (snapshot_unix_timestamp, alias, hostname, port, source_host, source_port, keyspace, shard, version) select - UNIX_TIMESTAMP(NOW()), + strftime('%s', 'now'), vitess_tablet.alias, vitess_tablet.hostname, vitess_tablet.port, database_instance.source_host, database_instance.source_port, vitess_tablet.keyspace, vitess_tablet.shard, database_instance.version @@ -1176,7 +1169,7 @@ func RecordStaleInstanceBinlogCoordinates(tabletAlias string, binlogCoordinates alias, binary_log_file, binary_log_pos, first_seen ) values ( - ?, ?, ?, NOW() + ?, ?, ?, datetime('now') )`, args...) if err != nil { @@ -1193,7 +1186,7 @@ func ExpireStaleInstanceBinlogCoordinates() error { writeFunc := func() error { _, err := db.ExecVTOrc(` delete from database_instance_stale_binlog_coordinates - where first_seen < NOW() - INTERVAL ? SECOND + where first_seen < datetime('now', printf('-%d second', ?)) `, expireSeconds, ) if err != nil { diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 549389f91fe..14b1bdf36d5 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -49,57 +49,44 @@ func mkTestInstances() []*Instance { return instances } -func TestMkInsertOdkuSingle(t *testing.T) { +func TestMkInsertSingle(t *testing.T) { instances := mkTestInstances() - sql, args, err := mkInsertOdkuForInstances(nil, true, true) + sql, args, err := mkInsertForInstances(nil, true, true) require.NoError(t, err) require.Equal(t, sql, "") require.Equal(t, len(args), 0) // one instance - s1 := `INSERT ignore INTO database_instance + s1 := `INSERT OR IGNORE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), - semi_sync_enforced=VALUES(semi_sync_enforced), semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + VALUES (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,` - sql1, args1, err := mkInsertOdkuForInstances(instances[:1], false, true) + sql1, args1, err := mkInsertForInstances(instances[:1], false, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql1), normalizeQuery(s1)) require.Equal(t, stripSpaces(fmtArgs(args1)), stripSpaces(a1)) } -func TestMkInsertOdkuThree(t *testing.T) { +func TestMkInsertThree(t *testing.T) { instances := mkTestInstances() // three instances - s3 := `INSERT INTO database_instance + s3 := `REPLACE INTO database_instance (alias, hostname, port, last_checked, last_attempted_check, last_check_partial_success, server_id, server_uuid, version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, mariadb_gtid, pseudo_gtid, source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) - VALUES - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()), - (?, ?, ?, NOW(), NOW(), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) - ON DUPLICATE KEY UPDATE - alias=VALUES(alias), hostname=VALUES(hostname), port=VALUES(port), last_checked=VALUES(last_checked), last_attempted_check=VALUES(last_attempted_check), last_check_partial_success=VALUES(last_check_partial_success), server_id=VALUES(server_id), server_uuid=VALUES(server_uuid), version=VALUES(version), major_version=VALUES(major_version), version_comment=VALUES(version_comment), binlog_server=VALUES(binlog_server), read_only=VALUES(read_only), binlog_format=VALUES(binlog_format), binlog_row_image=VALUES(binlog_row_image), log_bin=VALUES(log_bin), log_replica_updates=VALUES(log_replica_updates), binary_log_file=VALUES(binary_log_file), binary_log_pos=VALUES(binary_log_pos), source_host=VALUES(source_host), source_port=VALUES(source_port), replica_sql_running=VALUES(replica_sql_running), replica_io_running=VALUES(replica_io_running), replication_sql_thread_state=VALUES(replication_sql_thread_state), replication_io_thread_state=VALUES(replication_io_thread_state), has_replication_filters=VALUES(has_replication_filters), supports_oracle_gtid=VALUES(supports_oracle_gtid), oracle_gtid=VALUES(oracle_gtid), source_uuid=VALUES(source_uuid), ancestry_uuid=VALUES(ancestry_uuid), executed_gtid_set=VALUES(executed_gtid_set), gtid_mode=VALUES(gtid_mode), gtid_purged=VALUES(gtid_purged), gtid_errant=VALUES(gtid_errant), mariadb_gtid=VALUES(mariadb_gtid), pseudo_gtid=VALUES(pseudo_gtid), source_log_file=VALUES(source_log_file), read_source_log_pos=VALUES(read_source_log_pos), relay_source_log_file=VALUES(relay_source_log_file), exec_source_log_pos=VALUES(exec_source_log_pos), relay_log_file=VALUES(relay_log_file), relay_log_pos=VALUES(relay_log_pos), last_sql_error=VALUES(last_sql_error), last_io_error=VALUES(last_io_error), replication_lag_seconds=VALUES(replication_lag_seconds), replica_lag_seconds=VALUES(replica_lag_seconds), sql_delay=VALUES(sql_delay), data_center=VALUES(data_center), region=VALUES(region), - physical_environment=VALUES(physical_environment), replication_depth=VALUES(replication_depth), is_co_primary=VALUES(is_co_primary), has_replication_credentials=VALUES(has_replication_credentials), allow_tls=VALUES(allow_tls), semi_sync_enforced=VALUES(semi_sync_enforced), - semi_sync_primary_enabled=VALUES(semi_sync_primary_enabled), semi_sync_primary_timeout=VALUES(semi_sync_primary_timeout), semi_sync_primary_wait_for_replica_count=VALUES(semi_sync_primary_wait_for_replica_count), semi_sync_replica_enabled=VALUES(semi_sync_replica_enabled), semi_sync_primary_status=VALUES(semi_sync_primary_status), semi_sync_primary_clients=VALUES(semi_sync_primary_clients), semi_sync_replica_status=VALUES(semi_sync_replica_status), - last_discovery_latency=VALUES(last_discovery_latency), last_seen=VALUES(last_seen) + VALUES (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')), (?, ?, ?, datetime('now'), datetime('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` a3 := ` zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, @@ -107,7 +94,7 @@ func TestMkInsertOdkuThree(t *testing.T) { zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, false, false, 0, 0, false, false, false, , , , , , , false, false, , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, ` - sql3, args3, err := mkInsertOdkuForInstances(instances[:3], true, true) + sql3, args3, err := mkInsertForInstances(instances[:3], true, true) require.NoError(t, err) require.Equal(t, normalizeQuery(sql3), normalizeQuery(s3)) require.Equal(t, stripSpaces(fmtArgs(args3)), stripSpaces(a3)) @@ -474,27 +461,27 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { }{ { name: "No problems", - sql: []string{"update database_instance set last_checked = now()"}, + sql: []string{"update database_instance set last_checked = datetime('now')"}, instancesRequired: nil, }, { name: "One instance is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", }, instancesRequired: []string{"zone1-0000000100"}, }, { name: "One instance doesn't have myql data", sql: []string{ - "update database_instance set last_checked = now()", + "update database_instance set last_checked = datetime('now')", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103"}, }, { name: "One instance doesn't have myql data and one is outdated", sql: []string{ - "update database_instance set last_checked = now()", - "update database_instance set last_checked = datetime(now(), '-1 hour') where alias = 'zone1-0000000100'", + "update database_instance set last_checked = datetime('now')", + "update database_instance set last_checked = datetime('now', '-1 hour') where alias = 'zone1-0000000100'", `INSERT INTO vitess_tablet VALUES('zone1-0000000103','localhost',7706,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00','');`, }, instancesRequired: []string{"zone1-0000000103", "zone1-0000000100"}, @@ -531,10 +518,10 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { errInDataCollection := db.QueryVTOrcRowsMap(`select alias, last_checked, last_attempted_check, -ROUND((JULIANDAY(now()) - JULIANDAY(last_checked)) * 86400) AS difference, +ROUND((JULIANDAY(datetime('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, -last_checked < now() - interval 1500 second as is_outdated1, -last_checked < now() - interval 3000 second as is_outdated2 +last_checked < datetime('now', '-1500 second') as is_outdated1, +last_checked < datetime('now', '-3000 second') as is_outdated2 from database_instance`, func(rowMap sqlutils.RowMap) error { log.Errorf("Row in database_instance - %+v", rowMap) return nil @@ -558,12 +545,12 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = false", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= now() - interval 30 second and last_check_partial_success = true", + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", @@ -609,7 +596,7 @@ func TestUpdateInstanceLastAttemptedCheck(t *testing.T) { { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", - conditionToCheck: "last_attempted_check >= now() - interval 30 second", + conditionToCheck: "last_attempted_check >= datetime('now', '-30 second')", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", diff --git a/go/vt/vtorc/logic/disable_recovery.go b/go/vt/vtorc/logic/disable_recovery.go index 4a3766055d2..44e4f5a66ff 100644 --- a/go/vt/vtorc/logic/disable_recovery.go +++ b/go/vt/vtorc/logic/disable_recovery.go @@ -63,7 +63,7 @@ func IsRecoveryDisabled() (disabled bool, err error) { // DisableRecovery ensures recoveries are disabled globally func DisableRecovery() error { _, err := db.ExecVTOrc(` - INSERT IGNORE INTO global_recovery_disable + INSERT OR IGNORE INTO global_recovery_disable (disable_recovery) VALUES (1) `, diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index cd112481edc..73cd61676ca 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -37,7 +37,6 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/reparentutil" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" @@ -203,7 +202,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { } func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := topotools.GetTabletMapForCell(ctx, ts, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) if err != nil { log.Errorf("Error fetching topo info for cell %v: %v", cell, err) return @@ -235,7 +234,7 @@ func refreshTabletInfoOfShard(ctx context.Context, keyspace, shard string) { } func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { - tablets, err := ts.GetTabletMapForShard(ctx, keyspace, shard) + tablets, err := ts.GetTabletsByShard(ctx, keyspace, shard) if err != nil { log.Errorf("Error fetching tablets for keyspace/shard %v/%v: %v", keyspace, shard, err) return @@ -245,7 +244,7 @@ func refreshTabletsInKeyspaceShard(ctx context.Context, keyspace, shard string, refreshTablets(tablets, query, args, loader, forceRefresh, tabletsToIgnore) } -func refreshTablets(tablets map[string]*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { +func refreshTablets(tablets []*topo.TabletInfo, query string, args []any, loader func(tabletAlias string), forceRefresh bool, tabletsToIgnore []string) { // Discover new tablets. latestInstances := make(map[string]bool) var wg sync.WaitGroup diff --git a/go/vt/vtorc/logic/topology_recovery_dao.go b/go/vt/vtorc/logic/topology_recovery_dao.go index 4a7a6c77ef1..3d87c8f506f 100644 --- a/go/vt/vtorc/logic/topology_recovery_dao.go +++ b/go/vt/vtorc/logic/topology_recovery_dao.go @@ -41,14 +41,14 @@ func AttemptFailureDetectionRegistration(analysisEntry *inst.ReplicationAnalysis analysisEntry.CountReplicas, analysisEntry.IsActionableRecovery, ) - startActivePeriodHint := "now()" + startActivePeriodHint := "datetime('now')" if analysisEntry.StartActivePeriod != "" { startActivePeriodHint = "?" args = append(args, analysisEntry.StartActivePeriod) } query := fmt.Sprintf(` - insert ignore + insert or ignore into topology_failure_detection ( alias, in_active_period, @@ -95,10 +95,10 @@ func ClearActiveFailureDetections() error { _, err := db.ExecVTOrc(` update topology_failure_detection set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? MINUTE + AND start_active_period < datetime('now', printf('-%d MINUTE', ?)) `, config.FailureDetectionPeriodBlockMinutes, ) @@ -111,7 +111,7 @@ func ClearActiveFailureDetections() error { func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecovery, error) { analysisEntry := topologyRecovery.AnalysisEntry sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery ( recovery_id, uid, @@ -131,7 +131,7 @@ func writeTopologyRecovery(topologyRecovery *TopologyRecovery) (*TopologyRecover ?, ?, 1, - NOW(), + datetime('now'), 0, ?, ?, @@ -225,10 +225,10 @@ func ClearActiveRecoveries() error { _, err := db.ExecVTOrc(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = UNIX_TIMESTAMP() + end_active_period_unixtime = strftime('%s', 'now') where in_active_period = 1 - AND start_active_period < NOW() - INTERVAL ? SECOND + AND start_active_period < datetime('now', printf('-%d SECOND', ?)) `, config.Config.RecoveryPeriodBlockSeconds, ) @@ -243,7 +243,7 @@ func ClearActiveRecoveries() error { func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blockingRecoveries []*TopologyRecovery) error { for _, recovery := range blockingRecoveries { _, err := db.ExecVTOrc(` - insert + insert into blocked_topology_recovery ( alias, keyspace, @@ -256,15 +256,15 @@ func RegisterBlockedRecoveries(analysisEntry *inst.ReplicationAnalysis, blocking ?, ?, ?, - NOW(), + datetime('now'), ? ) - on duplicate key update - keyspace=values(keyspace), - shard=values(shard), - analysis=values(analysis), - last_blocked_timestamp=values(last_blocked_timestamp), - blocking_recovery_id=values(blocking_recovery_id) + on conflict(alias) do update set + keyspace=keyspace, + shard=shard, + analysis=analysis, + last_blocked_timestamp=last_blocked_timestamp, + blocking_recovery_id=blocking_recovery_id `, analysisEntry.AnalyzedInstanceAlias, analysisEntry.ClusterDetails.Keyspace, analysisEntry.ClusterDetails.Shard, @@ -325,7 +325,7 @@ func ExpireBlockedRecoveries() error { delete from blocked_topology_recovery where - last_blocked_timestamp < NOW() - interval ? second + last_blocked_timestamp < datetime('now', printf('-%d second', ?)) `, config.Config.RecoveryPollSeconds*2, ) if err != nil { @@ -339,16 +339,16 @@ func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, w additionalSet := `` if markEndRecovery { additionalSet = ` - end_recovery=IFNULL(end_recovery, NOW()), + end_recovery=IFNULL(end_recovery, datetime('now')), ` } query := fmt.Sprintf(` update topology_recovery set in_active_period = 0, - end_active_period_unixtime = case when end_active_period_unixtime = 0 then UNIX_TIMESTAMP() else end_active_period_unixtime end, + end_active_period_unixtime = case when end_active_period_unixtime = 0 then strftime('%%s', 'now') else end_active_period_unixtime end, %s acknowledged = 1, - acknowledged_at = NOW(), + acknowledged_at = datetime('now'), acknowledged_by = ?, acknowledge_comment = ? where @@ -399,7 +399,7 @@ func writeResolveRecovery(topologyRecovery *TopologyRecovery) error { is_successful = ?, successor_alias = ?, all_errors = ?, - end_recovery = NOW() + end_recovery = datetime('now') where uid = ? `, topologyRecovery.IsSuccessful, @@ -531,10 +531,10 @@ func ReadRecentRecoveries(unacknowledgedOnly bool, page int) ([]*TopologyRecover // writeTopologyRecoveryStep writes down a single step in a recovery process func writeTopologyRecoveryStep(topologyRecoveryStep *TopologyRecoveryStep) error { sqlResult, err := db.ExecVTOrc(` - insert ignore + insert or ignore into topology_recovery_steps ( recovery_step_id, recovery_uid, audit_at, message - ) values (?, ?, now(), ?) + ) values (?, ?, datetime('now'), ?) `, sqlutils.NilIfZero(topologyRecoveryStep.ID), topologyRecoveryStep.RecoveryUID, topologyRecoveryStep.Message, ) if err != nil { diff --git a/go/vt/vtorc/process/health_dao.go b/go/vt/vtorc/process/health_dao.go index 59ea557223d..dcfed83457a 100644 --- a/go/vt/vtorc/process/health_dao.go +++ b/go/vt/vtorc/process/health_dao.go @@ -40,7 +40,7 @@ func WriteRegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { insert ignore into node_health_history (hostname, token, first_seen_active, extra_info, command, app_version) values - (?, ?, NOW(), ?, ?, ?) + (?, ?, datetime('now'), ?, ?, ?) `, nodeHealth.Hostname, nodeHealth.Token, nodeHealth.ExtraInfo, nodeHealth.Command, nodeHealth.AppVersion,