Skip to content

Commit

Permalink
Improve arrangement
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 15, 2025
1 parent 5f19059 commit a80d7f4
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,6 @@ func (wf *workflowFetcher) scanWorkflow(
pos = mpos.String()
}

cells := strings.Split(res.Cells, ",")
for i := range cells {
cells[i] = strings.TrimSpace(cells[i])
}

stream := &vtctldatapb.Workflow_Stream{
Id: int64(rstream.Id),
Shard: tablet.Shard,
Expand All @@ -390,37 +385,66 @@ func (wf *workflowFetcher) scanWorkflow(
TimeThrottled: rstream.TimeThrottled,
}
}
stream.State = getStreamState(stream, rstream)
shardStream.Streams = append(shardStream.Streams, stream)
meta.sourceKeyspace = rstream.Bls.Keyspace
meta.targetKeyspace = tablet.Keyspace
if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
}

if verbosity > vtctldatapb.VerbosityLevel_LOW {
stream.BinlogSource = rstream.Bls
stream.Position = pos
stream.StopPosition = rstream.StopPos
stream.RowsCopied = rstream.RowsCopied
meta.sourceShards.Insert(stream.BinlogSource.Shard)
meta.targetShards.Insert(tablet.Shard)
if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace {
return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace)
}
if rstream.TimeUpdated == nil {
rstream.TimeUpdated = &vttimepb.Time{}
}
// Merge in copy states, which we've already fetched.
if verbosity > vtctldatapb.VerbosityLevel_MINIMAL {
// Merge in copy state related info, if there is any.
shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id)
if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok {
stream.CopyStates = copyStates
stream.RowsCopied = rstream.RowsCopied
}
// Calculate and add processing status and lag related info.
stream.Position = pos
if rstream.TimeUpdated == nil {
rstream.TimeUpdated = &vttimepb.Time{}
}
if stream.TimeUpdated == nil {
stream.TimeUpdated = &vttimepb.Time{}
}
stream.TimeUpdated = rstream.TimeUpdated
stream.TransactionTimestamp = rstream.TransactionTimestamp
timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0)
vreplicationLag := time.Since(timeUpdated)
// MaxVReplicationLag represents the time since we last processed any event
// in the workflow.
if vreplicationLag.Seconds() > meta.maxVReplicationLag {
meta.maxVReplicationLag = vreplicationLag.Seconds()
}
// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
}

if verbosity > vtctldatapb.VerbosityLevel_LOW {
// Add information about the binlog sources and shards.
stream.BinlogSource = rstream.Bls
if rstream.StopPos != "" {
stream.StopPosition = rstream.StopPos
}
meta.sourceShards.Insert(stream.BinlogSource.Shard)
meta.targetShards.Insert(tablet.Shard)
}
if verbosity > vtctldatapb.VerbosityLevel_MEDIUM {
// Add configuration and metadata info.
cells := strings.Split(res.Cells, ",")
for i := range cells {
cells[i] = strings.TrimSpace(cells[i])
}
stream.Cells = cells
stream.DbName = tablet.DbName()
stream.TabletTypes = res.TabletTypes
stream.TabletSelectionPreference = res.TabletSelectionPreference
stream.Tags = strings.Split(res.Tags, ",")
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys
if options := res.Options; options != "" {
Expand All @@ -431,30 +455,6 @@ func (wf *workflowFetcher) scanWorkflow(
}
}
}
if verbosity > vtctldatapb.VerbosityLevel_MEDIUM {
stream.StopPosition = rstream.StopPos
stream.DbName = tablet.DbName()
stream.TabletTypes = res.TabletTypes
stream.TabletSelectionPreference = res.TabletSelectionPreference
stream.Cells = cells
stream.TransactionTimestamp = rstream.TransactionTimestamp
stream.TimeUpdated = rstream.TimeUpdated
stream.Tags = strings.Split(res.Tags, ",")
}

// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}

stream.State = getStreamState(stream, rstream)
shardStream.Streams = append(shardStream.Streams, stream)

if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
}
}

return nil
Expand Down

0 comments on commit a80d7f4

Please sign in to comment.