Skip to content

Commit

Permalink
Even more logs
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 10, 2025
1 parent 7587ffd commit 6a76093
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
2 changes: 2 additions & 0 deletions go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 20 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func newVCopierCopyTask(args *vcopierCopyTaskArgs) *vcopierCopyTask {
}

func newVCopierCopyTaskArgs(rows []*querypb.Row, lastpk *querypb.Row) *vcopierCopyTaskArgs {
dbg("vcopierCopyTaskArgs", rows)
return &vcopierCopyTaskArgs{
rows: rows,
lastpk: lastpk,
Expand Down Expand Up @@ -752,6 +753,7 @@ func (vcq *vcopierCopyWorkQueue) close() {
// executed in a separate goroutine. Otherwise the task will be executed in the
// calling goroutine.
func (vcq *vcopierCopyWorkQueue) enqueue(ctx context.Context, currT *vcopierCopyTask) error {
dbg("vcopierCopyWorkQueue.enqueue", currT.args.rows)
if !vcq.isOpen {
return fmt.Errorf("work queue is not open")
}
Expand All @@ -768,16 +770,21 @@ func (vcq *vcopierCopyWorkQueue) enqueue(ctx context.Context, currT *vcopierCopy
}

execute := func(task *vcopierCopyTask) {
dbg("vcopierCopyWorkQueue.enqueue.preexecute 1", task.args.rows)
currW.execute(ctx, task)
vcq.workerPool.Put(poolH)
}

// If the work queue is configured to work concurrently, execute the task
// in a separate goroutine. Otherwise execute the task in the calling
// goroutine.

if vcq.concurrent {
log.Infof("Executing task in a separate goroutine")
dbg("vcopierCopyWorkQueue.enqueue.preexecute 2", currT.args.rows)
go execute(currT)
} else {
dbg("vcopierCopyWorkQueue.enqueue.preexecute 2", currT.args.rows)
execute(currT)
}

Expand Down Expand Up @@ -841,6 +848,15 @@ func (vtl *vcopierCopyTaskLifecycle) before(state vcopierCopyTaskState) *vcopier
return vtl.hooks[key]
}

func dbg(msg string, rows []*querypb.Row) {
if len(rows) == 0 {
log.Infof("%s: rows is empty", msg)
} else {
log.Infof("%s: rows has %d elements, first row >%v<(%p)", msg, len(rows), rows[0], rows[0])
}
log.Flush()
}

// onResult returns a vcopierCopyTaskResultHooks that can be used to register
// callbacks to be triggered when a task reaches a "done" state (= canceled,
// completed, failed).
Expand Down Expand Up @@ -874,7 +890,7 @@ func (vtl *vcopierCopyTaskLifecycle) tryAdvance(
) (vcopierCopyTaskState, error) {
var err error
newState := nextState

dbg("vcopierCopyTaskLifecycle.tryAdvance", args.rows)
if err = vtl.before(nextState).notify(ctx, args); err != nil {
goto END
}
Expand Down Expand Up @@ -1037,6 +1053,7 @@ func (vbc *vcopierCopyWorker) Expired(time.Duration) bool {
// execute advances a task through each state until it is done (= canceled,
// completed, failed).
func (vbc *vcopierCopyWorker) execute(ctx context.Context, task *vcopierCopyTask) *vcopierCopyTaskResult {
dbg("vcopierCopyWorker.execute", task.args.rows)
startedAt := time.Now()
state := vcopierCopyTaskPending

Expand Down Expand Up @@ -1067,8 +1084,9 @@ func (vbc *vcopierCopyWorker) execute(ctx context.Context, task *vcopierCopyTask
case vcopierCopyTaskInsertRows:
advanceFn = func(ctx context.Context, args *vcopierCopyTaskArgs) error {
for i, row := range args.rows {

if row == nil || len(row.Lengths) == 0 {
log.Infof("In worker.execute, row %d is nil or has zero length %v", i, row)
log.Infof("In worker.execute, row %d is nil or has zero length >%+v< (%p)", i, row, row)
}
}
if _, err := vbc.insertRows(ctx, args.rows); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
serr := vc.vr.sourceVStreamer.VStreamTables(ctx, func(resp *binlogdatapb.VStreamTablesResponse) error {
defer vc.vr.stats.PhaseTimings.Record("copy", time.Now())
defer vc.vr.stats.CopyLoopCount.Add(1)
log.Infof("VStreamTablesResponse: received table %s, #fields %d, #rows %d, gtid %s, lastpk %+v",
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk)
log.Infof("VStreamTablesResponse: received table %s, #fields %d, #rows %d (%p), gtid %s, lastpk %+v",
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Rows, resp.Gtid, resp.Lastpk)
tableName := resp.TableName
gtid = resp.Gtid

Expand Down

0 comments on commit 6a76093

Please sign in to comment.