Skip to content

Commit

Permalink
exposing functionality in armadactl, adding tests, modifying preemptO…
Browse files Browse the repository at this point in the history
…nQueue behaviour, fixing bugs
  • Loading branch information
mustafai-gr committed Nov 9, 2024
1 parent 4edccef commit afce23a
Show file tree
Hide file tree
Showing 24 changed files with 709 additions and 522 deletions.
88 changes: 88 additions & 0 deletions cmd/armadactl/cmd/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"
)

func cancelCmd() *cobra.Command {
Expand All @@ -19,6 +21,7 @@ func cancelCmd() *cobra.Command {
cancelJobCmd(),
cancelJobSetCmd(),
cancelExecutorCmd(),
cancelQueueCmd(),
)
return cmd
}
Expand Down Expand Up @@ -101,3 +104,88 @@ func cancelExecutorCmd() *cobra.Command {
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Cancel jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}

func cancelQueueCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Short: "Cancels jobs on queues.",
Long: `Cancels jobs on queues with provided name, priority classes and job states. Allows selecting of queues by label or name, one of which must be provided. All flags with multiple values must be comma separated.`,
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("job-states"); err != nil {
return err
}
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return err
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

onlyCordoned, err := cmd.Flags().GetBool("only-cordoned")
if err != nil {
return fmt.Errorf("error reading only-cordoned flag: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("selector")
if err != nil {
return fmt.Errorf("error reading queue label selector: %s", err)
}

jobStates, err := cmd.Flags().GetStringSlice("job-states")
if err != nil {
return fmt.Errorf("error reading job-states flag: %s", err)
}

var activeJobStates []utils.ActiveJobState
for _, state := range jobStates {
activeState, err := utils.ActiveJobStateFromString(state)
if err != nil {
return fmt.Errorf("error determining active job state corresponding to %s: %s", state, err)
}
activeJobStates = append(activeJobStates, activeState)
}

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-classes flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) > 0 && len(labels) > 0 {
return fmt.Errorf("you can select either with a set of queue names or a set of queue labels, but not both")
} else if len(queues) == 0 && len(labels) == 0 {
// This check makes accidentally cancelling all jobs far less likely
return fmt.Errorf("you must narrow down queue selection with either queue names or labels")
}

return a.CancelOnQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: labels,
InvertResult: inverse,
OnlyCordoned: onlyCordoned,
}, priorityClasses, activeJobStates, dryRun)
},
}
cmd.Flags().StringSliceP("job-states", "s", []string{}, "Jobs in the provided job states will be cancelled. Allowed values: queued,leased,pending,running.")
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Jobs matching the provided priority classes will be cancelled.")
cmd.Flags().StringSliceP("selector", "l", []string{}, "Select queues by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to cancel all queues that don't match the specified criteria. Defaults to false.")
cmd.Flags().Bool("only-cordoned", false, "Only cancels queues that are cordoned. Defaults to false.")
cmd.Flags().Bool("dry-run", false, "Prints out queues on which jobs will be cancelled. Defaults to false.")

return cmd
}
9 changes: 5 additions & 4 deletions cmd/armadactl/cmd/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cmd
import (
"fmt"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"

"github.com/spf13/cobra"
)

func cordon() *cobra.Command {
Expand Down Expand Up @@ -43,7 +44,7 @@ func cordonQueues(a *armadactl.App) *cobra.Command {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func uncordonQueues(a *armadactl.App) *cobra.Command {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.QueueAPI.Update = cq.Update(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cordon = cq.Cordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Uncordon = cq.Uncordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Preempt = cq.Preempt(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Cancel = cq.Cancel(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
Expand Down
70 changes: 70 additions & 0 deletions cmd/armadactl/cmd/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"
)

func preemptCmd() *cobra.Command {
Expand All @@ -17,6 +19,7 @@ func preemptCmd() *cobra.Command {
cmd.AddCommand(
preemptJobCmd(),
preemptExecutorCmd(),
preemptQueuesCmd(),
)
return cmd
}
Expand Down Expand Up @@ -79,3 +82,70 @@ func preemptExecutorCmd() *cobra.Command {
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Preempt jobs on executor matching the specified priority classes. Provided priority classes should be comma separated, as in the following example: armada-default,armada-preemptible.")
return cmd
}

func preemptQueuesCmd() *cobra.Command {
a := armadactl.New()
cmd := &cobra.Command{
Use: "queues <queue_1> <queue_2> <queue_3> ...",
Short: "Preempts jobs on queues.",
Long: `Preempts jobs on selected queues in specified priority classes. Allows selecting of queues by label or name, one of which must be provided. All flags with multiple values must be comma separated.`,
PreRunE: func(cmd *cobra.Command, args []string) error {
if err := cmd.MarkFlagRequired("priority-classes"); err != nil {
return err
}
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}

onlyCordoned, err := cmd.Flags().GetBool("only-cordoned")
if err != nil {
return fmt.Errorf("error reading only-cordoned flag: %s", err)
}

inverse, err := cmd.Flags().GetBool("inverse")
if err != nil {
return fmt.Errorf("error reading inverse flag: %s", err)
}

labels, err := cmd.Flags().GetStringSlice("selector")
if err != nil {
return fmt.Errorf("error reading queue label selector: %s", err)
}

priorityClasses, err := cmd.Flags().GetStringSlice("priority-classes")
if err != nil {
return fmt.Errorf("error reading priority-classes flag: %s", err)
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
return fmt.Errorf("error reading dry-run flag: %s", err)
}

if len(queues) > 0 && len(labels) > 0 {
return fmt.Errorf("you can select either with a set of queue names or a set of queue labels, but not both")
} else if len(queues) == 0 && len(labels) == 0 {
// This check makes accidentally preempting all jobs far less likely
return fmt.Errorf("you must narrow down queue selection with either queue names or labels")
}

return a.PreemptOnQueues(&armadactl.QueueQueryArgs{
InQueueNames: queues,
ContainsAllLabels: labels,
InvertResult: inverse,
OnlyCordoned: onlyCordoned,
}, priorityClasses, dryRun)
},
}
cmd.Flags().StringSliceP("priority-classes", "p", []string{}, "Jobs matching the provided priority classes will be preempted.")
cmd.Flags().StringSliceP("selector", "l", []string{}, "Select queues to preempt by label.")
cmd.Flags().Bool("inverse", false, "Inverts result to preempt all queues that don't match the specified criteria. Defaults to false.")
cmd.Flags().Bool("only-cordoned", false, "Only preempts queues that are cordoned. Defaults to false.")
cmd.Flags().Bool("dry-run", false, "Prints out queues on which jobs will be preempted. Defaults to false.")

return cmd
}
10 changes: 5 additions & 5 deletions cmd/armadactl/cmd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package cmd
import (
"fmt"

"github.com/armadaproject/armada/internal/common/slices"

"github.com/spf13/cobra"

"github.com/armadaproject/armada/cmd/armadactl/cmd/utils"
"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client/queue"
)
Expand Down Expand Up @@ -58,7 +58,7 @@ Job priority is evaluated inside queue, queue has its own priority. Any labels
return fmt.Errorf("error reading queue labels: %s", err)
}

labelsAsMap, err := labelSliceAsMap(labels)
labelsAsMap, err := utils.LabelSliceAsMap(labels)
if err != nil {
return fmt.Errorf("error converting queue labels to map: %s", err)
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func queuesGetCmdWithApp(a *armadactl.App) *cobra.Command {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, queues []string) error {
errs := slices.Filter(slices.Map(queues, queueNameValidation), func(err error) bool { return err != nil })
errs := slices.Filter(slices.Map(queues, utils.QueueNameValidation), func(err error) bool { return err != nil })
if len(errs) > 0 {
return fmt.Errorf("provided queue name invalid: %s", errs[0])
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func queueUpdateCmdWithApp(a *armadactl.App) *cobra.Command {
return fmt.Errorf("error reading queue labels: %s", err)
}

labelsAsMap, err := labelSliceAsMap(labels)
labelsAsMap, err := utils.LabelSliceAsMap(labels)
if err != nil {
return fmt.Errorf("error converting queue labels to map: %s", err)
}
Expand Down
25 changes: 0 additions & 25 deletions cmd/armadactl/cmd/utils.go

This file was deleted.

88 changes: 88 additions & 0 deletions cmd/armadactl/cmd/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package utils

import (
"errors"
"fmt"
"strings"

"github.com/armadaproject/armada/pkg/api"
)

func QueueNameValidation(queueName string) error {
if queueName == "" {
return fmt.Errorf("cannot provide empty queue name")
}
return nil
}

func LabelSliceAsMap(labels []string) (map[string]string, error) {
mapToReturn := make(map[string]string)
for _, label := range labels {
splitLabel := strings.Split(label, "=")
if len(splitLabel) != 2 {
return nil, fmt.Errorf("invalid label: %s", label)
}
mapToReturn[splitLabel[0]] = splitLabel[1]
}
return mapToReturn, nil
}

type ActiveJobState string

const (
UNKNOWN ActiveJobState = "unknown"
QUEUED ActiveJobState = "queued"
LEASED ActiveJobState = "leased"
PENDING ActiveJobState = "pending"
RUNNING ActiveJobState = "running"
)

func ActiveJobStateFromString(v string) (ActiveJobState, error) {
switch v {
case "queued":
return QUEUED, nil
case "leased":
return LEASED, nil
case "pending":
return PENDING, nil
case "running":
return RUNNING, nil
default:
return UNKNOWN, errors.New(`must be one of "queued", "leased", "pending", "running"`)
}
}

func ApiJobStateFromActiveJobState(s ActiveJobState) api.JobState {
switch s {
case QUEUED:
return api.JobState_QUEUED
case LEASED:
return api.JobState_LEASED
case PENDING:
return api.JobState_PENDING
case RUNNING:
return api.JobState_RUNNING
case UNKNOWN:
return api.JobState_UNKNOWN
default:
return api.JobState_UNKNOWN
}
}

func (s *ActiveJobState) String() string {
return string(*s)
}

func (s *ActiveJobState) Set(v string) error {
switch v {
case "queued", "leased", "pending", "running":
*s = ActiveJobState(v)
return nil
default:
return errors.New(`must be one of "queued", "leased", "pending", "running"`)
}
}

func (s *ActiveJobState) Type() string {
return "ActiveJobState"
}
Loading

0 comments on commit afce23a

Please sign in to comment.