Skip to content

Commit

Permalink
Drop ActionState enum
Browse files Browse the repository at this point in the history
It was mostly a 1:1 of enum State anyway.

Signed-off-by: Manuel Mendez <manny@packet.com>
  • Loading branch information
mmlb committed Oct 27, 2020
1 parent 72c1c6c commit 08dd2a2
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 433 deletions.
6 changes: 3 additions & 3 deletions cmd/tink-cli/cmd/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ var stateCmd = &cobra.Command{
},
}

func calWorkflowProgress(cur int64, total int64, state workflow.ActionState) string {
if total == 0 || (cur == 0 && state != workflow.ActionState_ACTION_STATE_SUCCESS) {
func calWorkflowProgress(cur int64, total int64, state workflow.State) string {
if total == 0 || (cur == 0 && state != workflow.State_STATE_SUCCESS) {
return "0%"
}
var taskCompleted int64
if state == workflow.ActionState_ACTION_STATE_SUCCESS {
if state == workflow.State_STATE_SUCCESS {
taskCompleted = cur + 1
} else {
taskCompleted = cur
Expand Down
22 changes: 11 additions & 11 deletions cmd/tink-worker/internal/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func startContainer(ctx context.Context, l log.Logger, cli *client.Client, id st
return errors.Wrap(cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START")
}

func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.ActionState, error) {
func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.State, error) {
// Inspect whether the container is in running state
if _, err := cli.ContainerInspect(ctx, id); err != nil {
return pb.ActionState_ACTION_STATE_FAILED, nil
return pb.State_STATE_FAILED, nil
}

// send API call to wait for the container completion
Expand All @@ -63,32 +63,32 @@ func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.Actio
select {
case status := <-wait:
if status.StatusCode == 0 {
return pb.ActionState_ACTION_STATE_SUCCESS, nil
return pb.State_STATE_SUCCESS, nil
}
return pb.ActionState_ACTION_STATE_FAILED, nil
return pb.State_STATE_FAILED, nil
case err := <-errC:
return pb.ActionState_ACTION_STATE_FAILED, err
return pb.State_STATE_FAILED, err
case <-ctx.Done():
return pb.ActionState_ACTION_STATE_TIMEOUT, ctx.Err()
return pb.State_STATE_TIMEOUT, ctx.Err()
}
}

func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.ActionState) {
func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.State) {
// send API call to wait for the container completion
wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning)

select {
case status := <-wait:
if status.StatusCode == 0 {
failedActionStatus <- pb.ActionState_ACTION_STATE_SUCCESS
failedActionStatus <- pb.State_STATE_SUCCESS
}
failedActionStatus <- pb.ActionState_ACTION_STATE_FAILED
failedActionStatus <- pb.State_STATE_FAILED
case err := <-errC:
l.Error(err)
failedActionStatus <- pb.ActionState_ACTION_STATE_FAILED
failedActionStatus <- pb.State_STATE_FAILED
case <-ctx.Done():
l.Error(ctx.Err())
failedActionStatus <- pb.ActionState_ACTION_STATE_TIMEOUT
failedActionStatus <- pb.State_STATE_TIMEOUT
}
}

Expand Down
36 changes: 18 additions & 18 deletions cmd/tink-worker/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ func (w *Worker) captureLogs(ctx context.Context, id string) {
}
}

func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.ActionState, error) {
func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) {
l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage())

cli := w.registryClient
if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil {
return pb.ActionState_ACTION_STATE_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER PULL")
}
id, err := w.createContainer(ctx, action.Command, wfID, action)
if err != nil {
return pb.ActionState_ACTION_STATE_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER CREATE")
}
l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created")

Expand All @@ -119,10 +119,10 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc

err = startContainer(timeCtx, l, cli, id)
if err != nil {
return pb.ActionState_ACTION_STATE_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN")
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER RUN")
}

failedActionStatus := make(chan pb.ActionState)
failedActionStatus := make(chan pb.State)

// capturing logs of action container in a go-routine
go w.captureLogs(ctx, id)
Expand All @@ -139,14 +139,14 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
}

l.With("status", status.String()).Info("container removed")
if status != pb.ActionState_ACTION_STATE_SUCCESS {
if status == pb.ActionState_ACTION_STATE_TIMEOUT && action.OnTimeout != nil {
if status != pb.State_STATE_SUCCESS {
if status == pb.State_STATE_TIMEOUT && action.OnTimeout != nil {
id, err = w.createContainer(ctx, action.OnTimeout, wfID, action)
if err != nil {
l.Error(errors.Wrap(err, errCreateContainer))
}
l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created")
failedActionStatus := make(chan pb.ActionState)
failedActionStatus := make(chan pb.State)
go w.captureLogs(ctx, id)
go waitFailedContainer(ctx, l, cli, id, failedActionStatus)
err = startContainer(ctx, l, cli, id)
Expand Down Expand Up @@ -208,15 +208,15 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
}
} else {
switch wfContext.GetCurrentActionState() {
case pb.ActionState_ACTION_STATE_SUCCESS:
case pb.State_STATE_SUCCESS:
if isLastAction(wfContext, actions) {
continue
}
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()+1]
actionIndex = int(wfContext.GetCurrentActionIndex()) + 1
case pb.ActionState_ACTION_STATE_FAILED:
case pb.State_STATE_FAILED:
continue
case pb.ActionState_ACTION_STATE_TIMEOUT:
case pb.State_STATE_TIMEOUT:
continue
default:
nextAction = actions.GetActionList()[wfContext.GetCurrentActionIndex()]
Expand Down Expand Up @@ -269,12 +269,12 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
l := l.With("actionName", action.GetName(),
"taskName", action.GetTaskName(),
)
if wfContext.GetCurrentActionState() != pb.ActionState_ACTION_STATE_IN_PROGRESS {
if wfContext.GetCurrentActionState() != pb.State_STATE_RUNNING {
actionStatus := &pb.WorkflowActionStatus{
WorkflowId: wfID,
TaskName: action.GetTaskName(),
ActionName: action.GetName(),
ActionStatus: pb.ActionState_ACTION_STATE_IN_PROGRESS,
ActionStatus: pb.State_STATE_RUNNING,
Seconds: 0,
Message: "Started execution",
WorkerId: action.GetWorkerId(),
Expand Down Expand Up @@ -303,11 +303,11 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
WorkerId: action.GetWorkerId(),
}

if err != nil || status != pb.ActionState_ACTION_STATE_SUCCESS {
if status == pb.ActionState_ACTION_STATE_TIMEOUT {
actionStatus.ActionStatus = pb.ActionState_ACTION_STATE_TIMEOUT
if err != nil || status != pb.State_STATE_SUCCESS {
if status == pb.State_STATE_TIMEOUT {
actionStatus.ActionStatus = pb.State_STATE_TIMEOUT
} else {
actionStatus.ActionStatus = pb.ActionState_ACTION_STATE_FAILED
actionStatus.ActionStatus = pb.State_STATE_FAILED
}
l.With("actionStatus", actionStatus.ActionStatus.String())
l.Error(err)
Expand All @@ -318,7 +318,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er
return err
}

actionStatus.ActionStatus = pb.ActionState_ACTION_STATE_SUCCESS
actionStatus.ActionStatus = pb.State_STATE_SUCCESS
actionStatus.Message = "finished execution successfully"

err = w.reportActionStatus(ctx, actionStatus)
Expand Down
4 changes: 2 additions & 2 deletions db/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (d TinkDB) GetWorkflowContexts(ctx context.Context, wfID string) (*pb.Workf
row := d.instance.QueryRowContext(ctx, query, wfID)
var cw, ct, ca string
var cai, tact int64
var cas pb.ActionState
var cas pb.State
err := row.Scan(&cw, &ct, &ca, &cai, &cas, &tact)
if err == nil {
return &pb.WorkflowContext{
Expand Down Expand Up @@ -635,7 +635,7 @@ func (d TinkDB) ShowWorkflowEvents(wfID string, fn func(wfs *pb.WorkflowActionSt
ActionName: aName,
Seconds: secs,
Message: msg,
ActionStatus: pb.ActionState(status),
ActionStatus: pb.State(status),
CreatedAt: createdAt,
}
err = fn(wfs)
Expand Down
8 changes: 4 additions & 4 deletions grpc-server/tinkerbell.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *server) ReportActionStatus(context context.Context, req *pb.WorkflowAct
}

actionIndex := wfContext.GetCurrentActionIndex()
if req.GetActionStatus() == pb.ActionState_ACTION_STATE_IN_PROGRESS {
if req.GetActionStatus() == pb.State_STATE_RUNNING {
if wfContext.GetCurrentAction() != "" {
actionIndex = actionIndex + 1
}
Expand Down Expand Up @@ -217,15 +217,15 @@ func getWorkflowActions(context context.Context, db db.Database, wfID string) (*
// isApplicableToSend checks if a particular workflow context is applicable or if it is needed to
// be sent to a worker based on the state of the current action and the targeted workerID
func isApplicableToSend(context context.Context, wfContext *pb.WorkflowContext, workerID string, db db.Database) bool {
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_STATE_FAILED ||
wfContext.GetCurrentActionState() == pb.ActionState_ACTION_STATE_TIMEOUT {
if wfContext.GetCurrentActionState() == pb.State_STATE_FAILED ||
wfContext.GetCurrentActionState() == pb.State_STATE_TIMEOUT {
return false
}
actions, err := getWorkflowActions(context, db, wfContext.GetWorkflowId())
if err != nil {
return false
}
if wfContext.GetCurrentActionState() == pb.ActionState_ACTION_STATE_SUCCESS {
if wfContext.GetCurrentActionState() == pb.State_STATE_SUCCESS {
if isLastAction(wfContext, actions) {
return false
}
Expand Down
Loading

0 comments on commit 08dd2a2

Please sign in to comment.