Skip to content

Commit

Permalink
feat: fix some bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Feb 14, 2025
1 parent 955d0c0 commit 3124339
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions go/vt/vttablet/tabletserver/semisyncmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ import (
"sync"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

const (
semiSyncWaitSessionsRead = "SHOW STATUS LIKE 'Rpl_semi_sync_%_wait_sessions'"
semiSyncRecoverWrite = "INSERT INTO semisync_recover (ts) VALUES (NOW())"
semiSyncRecoverClear = "TRUNCATE TABLE semisync_recover"
semiSyncRecoverWrite = "INSERT INTO %s.semisync_recover (ts) VALUES (NOW())"
semiSyncRecoverClear = "TRUNCATE TABLE %s.semisync_recover"
)

// Monitor is a monitor that checks if the primary tablet
Expand Down Expand Up @@ -137,7 +139,9 @@ func (m *Monitor) checkAndFixSemiSyncBlocked() {
if isBlocked {
// If we are blocked, then we want to start the writes.
// That function is re-entrant. If we are already writing, then it will just return.
m.startWrites()
// We start it in a go-routine, because we want to continue to check for when
// we get unblocked.
go m.startWrites()
}
}

Expand Down Expand Up @@ -165,7 +169,7 @@ func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) {
if len(res.Rows) != 1 || len(res.Rows[0]) != 2 {
return false, errors.New("unexpected number of rows received")
}
value, err := res.Rows[0][1].ToInt()
value, err := res.Rows[0][1].ToCastInt64()
return value != 0, err
}

Expand Down Expand Up @@ -252,7 +256,7 @@ func (m *Monitor) write() {
return
}
defer conn.Recycle()
_, err = conn.Conn.ExecuteFetch(semiSyncRecoverWrite, 0, false)
_, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncRecoverWrite), 0, false)
if err != nil {
log.Errorf("SemiSync Monitor: failed to write to semisync_recovery table: %v", err)
}
Expand Down Expand Up @@ -283,7 +287,7 @@ func (m *Monitor) clearAllData() {
return
}
defer conn.Recycle()
_, err = conn.Conn.ExecuteFetch(semiSyncRecoverClear, 0, false)
_, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncRecoverClear), 0, false)
if err != nil {
log.Errorf("SemiSync Monitor: failed to clear semisync_recovery table: %v", err)
}
Expand All @@ -298,3 +302,8 @@ func (m *Monitor) addWaiter() chan any {
m.waiters = append(m.waiters, ch)
return ch
}

// bindSideCarDBName binds the sidecar db name to the query.
func (m *Monitor) bindSideCarDBName(query string) string {
return sqlparser.BuildParsedQuery(query, sidecar.GetIdentifier()).Query
}

0 comments on commit 3124339

Please sign in to comment.