Skip to content

Commit

Permalink
Add support for verbosity level in Workflow Show commands
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 14, 2025
1 parent 70114ad commit a620160
Show file tree
Hide file tree
Showing 8 changed files with 1,363 additions and 1,278 deletions.
22 changes: 16 additions & 6 deletions go/cmd/vtctldclient/command/vreplication/common/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

var ShowOptions = struct {
IncludeLogs bool
Shards []string
IncludeLogs bool
Shards []string
VerbosityLevel uint32
}{}

func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -42,23 +43,32 @@ func GetShowCommand(opts *SubCommandsOpts) *cobra.Command {
RunE: commandShow,
}
cmd.Flags().BoolVar(&ShowOptions.IncludeLogs, "include-logs", true, "Include recent logs for the workflow.")
cmd.Flags().Uint32Var(&ShowOptions.VerbosityLevel, "verbosity-level", 0, "How much detail to include in the results.")
return cmd
}

func commandShow(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.GetWorkflowsRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
IncludeLogs: ShowOptions.IncludeLogs,
Shards: ShowOptions.Shards,
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
IncludeLogs: ShowOptions.IncludeLogs,
Shards: ShowOptions.Shards,
VerbosityLevel: ShowOptions.VerbosityLevel,
}
resp, err := GetClient().GetWorkflows(GetCommandCtx(), req)
if err != nil {
return err
}

// We always use compact format with SHOW to reduce the overall
// size and noise.
cli.DefaultMarshalOptions.EmitUnpopulated = false
if len(resp.Workflows) == 0 {
return fmt.Errorf("workflow %s not found in the %s keyspace",
BaseOptions.Workflow, BaseOptions.TargetKeyspace)
}
data, err := cli.MarshalJSONPretty(resp)
if err != nil {
return err
Expand Down
2,419 changes: 1,215 additions & 1,204 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,9 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
}
ts.Id = int32(st.Id)
ts.Tablet = st.Tablet
ts.SourceShard = fmt.Sprintf("%s/%s", st.BinlogSource.Keyspace, st.BinlogSource.Shard)
if st.BinlogSource != nil {
ts.SourceShard = fmt.Sprintf("%s/%s", st.BinlogSource.Keyspace, st.BinlogSource.Shard)
}
ts.Position = st.Position
ts.Status = st.State
ts.Info = strings.Join(info, "; ")
Expand Down
138 changes: 71 additions & 67 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (wf *workflowFetcher) buildWorkflows(
}

metadata := workflowMetadataMap[workflowName]
err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace)
err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace, req.VerbosityLevel)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,7 +332,9 @@ func (wf *workflowFetcher) scanWorkflow(
meta *workflowMetadata,
copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState,
keyspace string,
verbosity uint32,
) error {

shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
shardStream, ok := workflow.ShardStreams[shardStreamKey]
if !ok {
Expand All @@ -345,9 +347,11 @@ func (wf *workflowFetcher) scanWorkflow(
}

shardStream = &vtctldatapb.Workflow_ShardStream{
Streams: nil,
TabletControls: si.TabletControls,
IsPrimaryServing: si.IsPrimaryServing,
Streams: nil,
}
if verbosity > 0 {
shardStream.TabletControls = si.TabletControls
shardStream.IsPrimaryServing = si.IsPrimaryServing
}

workflow.ShardStreams[shardStreamKey] = shardStream
Expand All @@ -371,87 +375,87 @@ func (wf *workflowFetcher) scanWorkflow(
for i := range cells {
cells[i] = strings.TrimSpace(cells[i])
}
options := res.Options
if options != "" {
if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil {
return err

if options := res.Options; options != "" && verbosity > 0 {
if options != "" {
if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil {
return err
}
}
}

stream := &vtctldatapb.Workflow_Stream{
Id: int64(rstream.Id),
Shard: tablet.Shard,
Tablet: tablet.Alias,
BinlogSource: rstream.Bls,
Position: pos,
StopPosition: rstream.StopPos,
State: rstream.State.String(),
DbName: tablet.DbName(),
TabletTypes: res.TabletTypes,
TabletSelectionPreference: res.TabletSelectionPreference,
Cells: cells,
TransactionTimestamp: rstream.TransactionTimestamp,
TimeUpdated: rstream.TimeUpdated,
Message: rstream.Message,
Tags: strings.Split(res.Tags, ","),
RowsCopied: rstream.RowsCopied,
ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{
Id: int64(rstream.Id),
Shard: tablet.Shard,
Tablet: tablet.Alias,
State: rstream.State.String(),
Message: rstream.Message,
}
if rstream.ComponentThrottled != "" {
stream.ThrottlerStatus = &vtctldatapb.Workflow_Stream_ThrottlerStatus{
ComponentThrottled: rstream.ComponentThrottled,
TimeThrottled: rstream.TimeThrottled,
},
}

// Merge in copy states, which we've already fetched.
shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id)
if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok {
stream.CopyStates = copyStates
}

if rstream.TimeUpdated == nil {
rstream.TimeUpdated = &vttimepb.Time{}
}

stream.State = getStreamState(stream, rstream)

shardStream.Streams = append(shardStream.Streams, stream)

meta.sourceShards.Insert(stream.BinlogSource.Shard)
meta.targetShards.Insert(tablet.Shard)

if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace {
return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace)
}

meta.sourceKeyspace = stream.BinlogSource.Keyspace

if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
}
}

meta.sourceKeyspace = rstream.Bls.Keyspace
meta.targetKeyspace = tablet.Keyspace

if stream.TimeUpdated == nil {
stream.TimeUpdated = &vttimepb.Time{}
if verbosity > 0 {
stream.BinlogSource = rstream.Bls
stream.Position = pos
stream.StopPosition = rstream.StopPos
stream.RowsCopied = rstream.RowsCopied
meta.sourceShards.Insert(stream.BinlogSource.Shard)
meta.targetShards.Insert(tablet.Shard)
if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace {
return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace)
}
if rstream.TimeUpdated == nil {
rstream.TimeUpdated = &vttimepb.Time{}
}
// Merge in copy states, which we've already fetched.
shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id)
if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok {
stream.CopyStates = copyStates
}
if stream.TimeUpdated == nil {
stream.TimeUpdated = &vttimepb.Time{}
}
timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0)
vreplicationLag := time.Since(timeUpdated)
// MaxVReplicationLag represents the time since we last processed any event
// in the workflow.
if vreplicationLag.Seconds() > meta.maxVReplicationLag {
meta.maxVReplicationLag = vreplicationLag.Seconds()
}
workflow.WorkflowType = res.WorkflowType.String()
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys
}
timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0)
vreplicationLag := time.Since(timeUpdated)

// MaxVReplicationLag represents the time since we last processed any event
// in the workflow.
if vreplicationLag.Seconds() > meta.maxVReplicationLag {
meta.maxVReplicationLag = vreplicationLag.Seconds()
if verbosity > 1 {
stream.StopPosition = rstream.StopPos
stream.DbName = tablet.DbName()
stream.TabletTypes = res.TabletTypes
stream.TabletSelectionPreference = res.TabletSelectionPreference
stream.Cells = cells
stream.TransactionTimestamp = rstream.TransactionTimestamp
stream.TimeUpdated = rstream.TimeUpdated
stream.Tags = strings.Split(res.Tags, ",")
}

workflow.WorkflowType = res.WorkflowType.String()
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys

// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}

stream.State = getStreamState(stream, rstream)
shardStream.Streams = append(shardStream.Streams, stream)

if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace {
return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace)
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,7 @@ message GetWorkflowsRequest {
string workflow = 4;
bool include_logs = 5;
repeated string shards = 6;
uint32 verbosity_level = 7;
}

message GetWorkflowsResponse {
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a620160

Please sign in to comment.