Skip to content

Commit

Permalink
scripts/list-tasks: Flag to list tasks without setting
Browse files Browse the repository at this point in the history
I'm trying to modify about 700 tasks in production and it would
be really convenient when trying to make sure every last one has
been updated to just throw everything away and start over with
just he tasks that didn't get successfully published.
  • Loading branch information
willdonnelly committed Feb 20, 2025
1 parent a2dc942 commit ab523aa
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions scripts/list-tasks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/exec"
"path"
"slices"
"sort"
"strings"

Expand All @@ -44,9 +45,10 @@ use-case and workflow.
var (
logLevel = flag.String("log_level", "info", "The log level to print at.")

taskType = flag.String("type", "", "The type of catalog spec to list (typically 'capture' or 'materialization'). If unspecified the task listing will not be filtered by type.")
imageName = flag.String("connector", "", "The connector image name to filter on. Can be a full URL like 'ghcr.io/estuary/source-mysql' or a short name like 'source-mysql', and in the latter case the name will be expanded into a full URL including all variants. If unspecified the task list will not be filtered by connector.")
namePrefix = flag.String("prefix", "", "The task name prefix to filter on. If unspecified the task listing will not be filtered by name.")
taskType = flag.String("type", "", "The type of catalog spec to list (typically 'capture' or 'materialization'). If unspecified the task listing will not be filtered by type.")
imageName = flag.String("connector", "", "The connector image name to filter on. Can be a full URL like 'ghcr.io/estuary/source-mysql' or a short name like 'source-mysql', and in the latter case the name will be expanded into a full URL including all variants. If unspecified the task list will not be filtered by connector.")
namePrefix = flag.String("prefix", "", "The task name prefix to filter on. If unspecified the task listing will not be filtered by name.")
missingFlags = flag.String("missing", "", "A comma-separated list of feature flag settings. If specified only tasks missing one or more flag settings will be listed/pulled.")

addToDraft = flag.Bool("draft", false, "When true, all listed tasks will be added to the active flowctl draft.")

Expand Down Expand Up @@ -92,6 +94,16 @@ func performListing(ctx context.Context) error {
}
sort.Slice(tasks, func(i, j int) bool { return strings.Compare(tasks[i].CatalogName, tasks[j].CatalogName) < 0 })
for _, task := range tasks {
if *missingFlags != "" {
// Check whether the task spec already has all of the specified settings, and if so skip this task.
var flagSettings = strings.Split(*missingFlags, ",")
if hasAllFlags, err := checkForMissingFlags(task.Spec, flagSettings); err != nil {
return fmt.Errorf("error checking flag settings for task %q: %w", task.CatalogName, err)
} else if hasAllFlags {
log.WithField("task", task.CatalogName).Debug("task already has all specified flags")
continue
}
}
if *addToDraft {
if err := addTaskToDraft(ctx, task.CatalogName); err != nil {
return fmt.Errorf("error adding task %q to flowctl draft: %w", task.CatalogName, err)
Expand All @@ -118,7 +130,7 @@ type taskSpec struct {
}

func listTasks(ctx context.Context, taskType string, imageNames []string, namePrefix string) ([]*taskSpec, error) {
var command = []string{"flowctl", "raw", "get", "--table=live_specs", "--query", "select=catalog_name,connector_image_name"}
var command = []string{"flowctl", "raw", "get", "--table=live_specs", "--query", "select=catalog_name,connector_image_name,spec"}

if taskType != "" {
command = append(command, "--query", fmt.Sprintf("spec_type=eq.%s", taskType))
Expand Down Expand Up @@ -203,3 +215,34 @@ func flowctl(ctx context.Context, args ...string) error {
var _, err = exec.CommandContext(ctx, "flowctl", args...).Output()
return err
}

func checkForMissingFlags(spec json.RawMessage, flagSettings []string) (hasAllFlags bool, err error) {
// Extract the 'endpoint.connector.config.advanced.feature_flags' string property from the task spec,
// split into individual flag settings, and check if all of the specified settings are present.
var taskFlagsProperty = extractStringProperty(spec, "endpoint", "connector", "config", "advanced", "feature_flags")
var taskFlags = strings.Split(taskFlagsProperty, ",")
for _, setting := range flagSettings {
if !slices.Contains(taskFlags, setting) {
return false, nil
}
}
return true, nil
}

func extractStringProperty(obj json.RawMessage, pathComponents ...string) string {
for _, component := range pathComponents {
var m map[string]json.RawMessage
if err := json.Unmarshal(obj, &m); err != nil {
return ""
} else if next, ok := m[component]; !ok {
return ""
} else {
obj = next
}
}
var s string
if err := json.Unmarshal(obj, &s); err != nil {
return ""
}
return s
}

0 comments on commit ab523aa

Please sign in to comment.