diff --git a/README.md b/README.md index 3745ff43..5d4b9b26 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ that makes it easy to tail and summarize structured data in Kafka. Either: -1. Run `GO111MODULE="on" go get github.com/segmentio/topicctl/cmd/topicctl` +1. Run `go install github.com/segmentio/topicctl/cmd/topicctl@latest` 2. Clone this repo and run `make install` in the repo root 3. Use the Docker image: `docker pull segment/topicctl` @@ -228,7 +228,12 @@ independently of an `apply` workflow. ### Version compatibility We've tested `topicctl` on Kafka clusters with versions between `0.10.1` and `2.7.1`, inclusive. -If you run into any compatibility issues, please file a bug. + +Note, however, that clusters at versions prior to `2.4.0` cannot use broker APIs for applying and +thus also require ZooKeeper API access for full functionality. See the +[cluster access details](#cluster-access-details) section below for more details. + +If you run into any unexpected compatibility issues, please file a bug. ## Config formats @@ -254,13 +259,15 @@ meta: spec: bootstrapAddrs: # One or more broker bootstrap addresses - my-cluster.example.com:9092 - clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, used as safety check only) + clusterID: abc-123-xyz # Expected cluster ID for cluster (optional, + # used as safety check only) # ZooKeeper access settings (only required for pre-v2 clusters; leave off to force exclusive use # of broker APIs) zkAddrs: # One or more cluster zookeeper addresses; if these are - - zk.example.com:2181 # omitted, then the cluster will only be accessed via broker APIs; - # see the section below on cluster access for more details. + - zk.example.com:2181 # omitted, then the cluster will only be accessed via + # broker APIs; see the section below on cluster access for + # more details. zkPrefix: my-cluster # Prefix for zookeeper nodes if using zookeeper access zkLockPath: /topicctl/locks # Path used for apply locks (optional) @@ -274,9 +281,10 @@ spec: # SASL settings (optional, not supported if using ZooKeeper) sasl: enabled: true # Whether SASL is enabled - mechanism: SCRAM-SHA-512 # Mechanism to use; choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 - username: my-username # Username; can also be set via TOPICCTL_SASL_USERNAME environment variable - password: my-password # Password; can also be set via TOPICCTL_SASL_PASSWORD environment variable + mechanism: SCRAM-SHA-512 # Mechanism to use; + # choices are PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 + username: my-username # SASL username + password: my-password # SASL password ``` Note that the `name`, `environment`, `region`, and `description` fields are used @@ -284,6 +292,11 @@ for description/identification only, and don't appear in any API calls. They can be set arbitrarily, provided that they match up with the values set in the associated topic configs. +If the tool is run with the `--expand-env` option, then the cluster config will be prepreocessed +using [`os.ExpandEnv`](https://pkg.go.dev/os#ExpandEnv) at load time. The latter will replace +references of the form `$ENV_VAR_NAME` or `${ENV_VAR_NAME}` with the associated values from the +environment. + ### Topics Each topic is configured in a YAML file. The following is an diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 2e5c2670..d2bb8b0e 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -180,7 +180,7 @@ func applyTopic( return err } - clusterConfig, err := config.LoadClusterFile(clusterConfigPath) + clusterConfig, err := config.LoadClusterFile(clusterConfigPath, applyConfig.shared.expandEnv) if err != nil { return err } diff --git a/cmd/topicctl/subcmd/bootstrap.go b/cmd/topicctl/subcmd/bootstrap.go index 6110d5ae..14502aca 100644 --- a/cmd/topicctl/subcmd/bootstrap.go +++ b/cmd/topicctl/subcmd/bootstrap.go @@ -62,7 +62,10 @@ func bootstrapRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clusterConfig, err := config.LoadClusterFile(bootstrapConfig.shared.clusterConfig) + clusterConfig, err := config.LoadClusterFile( + bootstrapConfig.shared.clusterConfig, + bootstrapConfig.shared.expandEnv, + ) if err != nil { return err } diff --git a/cmd/topicctl/subcmd/check.go b/cmd/topicctl/subcmd/check.go index 33f2aaf1..aef1bc30 100644 --- a/cmd/topicctl/subcmd/check.go +++ b/cmd/topicctl/subcmd/check.go @@ -116,7 +116,7 @@ func checkTopicFile( return false, err } - clusterConfig, err := config.LoadClusterFile(clusterConfigPath) + clusterConfig, err := config.LoadClusterFile(clusterConfigPath, checkConfig.shared.expandEnv) if err != nil { return false, err } diff --git a/cmd/topicctl/subcmd/shared.go b/cmd/topicctl/subcmd/shared.go index 004c7456..0ecfaf80 100644 --- a/cmd/topicctl/subcmd/shared.go +++ b/cmd/topicctl/subcmd/shared.go @@ -16,6 +16,7 @@ import ( type sharedOptions struct { brokerAddr string clusterConfig string + expandEnv bool saslMechanism string saslPassword string saslUsername string @@ -47,10 +48,14 @@ func (s sharedOptions) validate() error { } if s.clusterConfig != "" && (s.zkAddr != "" || s.zkPrefix != "" || s.brokerAddr != "" || s.tlsCACert != "" || - s.tlsCert != "" || s.tlsKey != "" || s.saslMechanism != "") { + s.tlsCert != "" || s.tlsKey != "" || s.tlsServerName != "" || s.saslMechanism != "") { log.Warn("Broker and zk flags are ignored when using cluster-config") } + if s.clusterConfig != "" { + return err + } + useTLS := s.tlsEnabled || s.tlsCACert != "" || s.tlsCert != "" || s.tlsKey != "" useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != "" @@ -76,14 +81,14 @@ func (s sharedOptions) getAdminClient( readOnly bool, ) (admin.Client, error) { if s.clusterConfig != "" { - clusterConfig, err := config.LoadClusterFile(s.clusterConfig) + clusterConfig, err := config.LoadClusterFile(s.clusterConfig, s.expandEnv) if err != nil { return nil, err } return clusterConfig.NewAdminClient( ctx, sess, - true, + readOnly, s.saslUsername, s.saslPassword, ) @@ -125,8 +130,6 @@ func (s sharedOptions) getAdminClient( ZKAddrs: []string{s.zkAddr}, ZKPrefix: s.zkPrefix, Sess: sess, - // Run in read-only mode to ensure that tailing doesn't make any changes - // in the cluster ReadOnly: readOnly, }, ) @@ -141,6 +144,13 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) { "", "Broker address", ) + cmd.Flags().BoolVarP( + &options.expandEnv, + "expand-env", + "", + false, + "Expand environment in cluster config", + ) cmd.Flags().StringVar( &options.clusterConfig, "cluster-config", @@ -223,6 +233,13 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) { os.Getenv("TOPICCTL_CLUSTER_CONFIG"), "Cluster config", ) + cmd.Flags().BoolVarP( + &options.expandEnv, + "expand-env", + "", + false, + "Expand environment in cluster config", + ) cmd.Flags().StringVar( &options.saslPassword, "sasl-password", diff --git a/go.mod b/go.mod index 924124e7..1caf4714 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/segmentio/topicctl -go 1.16 +go 1.17 // Use something like this to use a local kafka-go (useful for testing purposes). // replace github.com/segmentio/kafka-go => /Users/benjamin.yolken/dev/src/github.com/segmentio/kafka-go @@ -12,12 +12,7 @@ require ( github.com/fatih/color v1.9.0 github.com/ghodss/yaml v1.0.0 github.com/hashicorp/go-multierror v1.1.0 - github.com/mattn/go-colorable v0.1.6 // indirect - github.com/mattn/go-tty v0.0.3 // indirect - github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect github.com/olekukonko/tablewriter v0.0.4 - github.com/onsi/gomega v1.5.0 // indirect - github.com/pkg/term v0.0.0-20200520122047-c3ffed290a03 // indirect github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699 github.com/sirupsen/logrus v1.2.0 @@ -25,7 +20,32 @@ require ( github.com/stretchr/testify v1.6.1 github.com/x-cray/logrus-prefixed-formatter v0.5.2 golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/klauspost/compress v1.9.8 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/mattn/go-colorable v0.1.6 // indirect + github.com/mattn/go-isatty v0.0.12 // indirect + github.com/mattn/go-runewidth v0.0.7 // indirect + github.com/mattn/go-tty v0.0.3 // indirect + github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect + github.com/onsi/ginkgo v1.6.0 // indirect + github.com/onsi/gomega v1.5.0 // indirect + github.com/pierrec/lz4 v2.6.0+incompatible // indirect + github.com/pkg/term v0.0.0-20200520122047-c3ffed290a03 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spf13/pflag v1.0.3 // indirect + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect + github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect + golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae // indirect + golang.org/x/text v0.3.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c // indirect diff --git a/go.sum b/go.sum index c061eca3..d9b37d7b 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da h1:p3Vo3i64TCLY7gIfzeQaUJ+kppEO5WQG3cL8iE8tGHU= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/segmentio/kafka-go v0.4.21-0.20211001180856-4d75f822c8b8 h1:H45Dpqb99IyjZoq/+WCuVz0UWe2JOrajBVmg6QsYyaQ= -github.com/segmentio/kafka-go v0.4.21-0.20211001180856-4d75f822c8b8/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699 h1:DM1XDA47wY0myfsik7hUz+pkmI2uhkfKsa6ogOTNLxw= github.com/segmentio/kafka-go v0.4.21-0.20211001205616-c03923d67699/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= diff --git a/pkg/admin/throttles_test.go b/pkg/admin/throttles_test.go index 8838af2b..dd35fc7f 100644 --- a/pkg/admin/throttles_test.go +++ b/pkg/admin/throttles_test.go @@ -328,7 +328,7 @@ func TestParseBrokerThrottles(t *testing.T) { }, } _, _, err = ParseBrokerThrottles(badBrokers) - assert.NotNil(t, err) + assert.Error(t, err) } func TestParsePartitionThrottles(t *testing.T) { @@ -385,5 +385,5 @@ func TestParsePartitionThrottles(t *testing.T) { }, } _, _, err = ParsePartitionThrottles(badTopic) - assert.NotNil(t, err) + assert.Error(t, err) } diff --git a/pkg/admin/types_test.go b/pkg/admin/types_test.go index 7f124773..79f0a4f4 100644 --- a/pkg/admin/types_test.go +++ b/pkg/admin/types_test.go @@ -155,16 +155,16 @@ func TestTopicRackHelpers(t *testing.T) { brokerRacks := BrokerRacks(testBrokers) minRacks, maxRacks, err := testTopic.RackCounts(brokerRacks) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, minRacks, 1) assert.Equal(t, maxRacks, 3) numRacks, err := testTopic.Partitions[0].NumRacks(brokerRacks) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 3, numRacks) racks, err := testTopic.Partitions[0].Racks(brokerRacks) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, []string{"rack1", "rack2", "rack3"}, racks) } @@ -343,7 +343,7 @@ func TestPartitionAssignmentHelpers(t *testing.T) { result, ) replicas, err := AssignmentsToReplicas(result) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal( t, diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index f80c381f..729ba36c 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -154,7 +154,7 @@ func TestZkClientGetBrokers(t *testing.T) { defer adminClient.Close() brokers, err := adminClient.GetBrokers(ctx, nil) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 2, len(brokers)) assert.Equal( @@ -322,7 +322,7 @@ func TestZkClientGetTopics(t *testing.T) { defer adminClient.Close() topics, err := adminClient.GetTopics(ctx, nil, true) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 2, len(topics)) assert.Equal( t, @@ -382,7 +382,7 @@ func TestZkClientGetTopics(t *testing.T) { ) topic1, err := adminClient.GetTopic(ctx, "topic1", true) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal( t, TopicInfo{ @@ -418,7 +418,7 @@ func TestZkClientGetTopics(t *testing.T) { ) _, err = adminClient.GetTopic(ctx, "non-existent-topic", true) - assert.NotNil(t, err) + assert.Error(t, err) } func TestZkClientUpdateTopicConfig(t *testing.T) { @@ -541,7 +541,7 @@ func TestZkClientUpdateTopicConfig(t *testing.T) { ctx, fmt.Sprintf("/%s/config/topics/topic1", clusterName), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.JSONEq( t, `{"config":{"key1":"value1","key2":"value2-updated","key3":"value3","key5":"new-value"},"version":1}`, @@ -552,14 +552,14 @@ func TestZkClientUpdateTopicConfig(t *testing.T) { ctx, fmt.Sprintf("/%s/config/changes", clusterName), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.Greater(t, len(changes), 0) change, _, err := adminClient.zkClient.Get( ctx, fmt.Sprintf("/%s/config/changes/%s", clusterName, changes[len(changes)-1]), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.JSONEq( t, `{"entity_path":"topics/topic1","version":2}`, @@ -668,7 +668,7 @@ func TestZkClientUpdateBrokerConfig(t *testing.T) { ctx, fmt.Sprintf("/%s/config/brokers/1", clusterName), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.JSONEq( t, `{"config":{"key2":"value2-updated","key3":"value3","key5":"new-value"},"version":1}`, @@ -679,14 +679,14 @@ func TestZkClientUpdateBrokerConfig(t *testing.T) { ctx, fmt.Sprintf("/%s/config/changes", clusterName), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.Greater(t, len(changes), 0) change, _, err := adminClient.zkClient.Get( ctx, fmt.Sprintf("/%s/config/changes/%s", clusterName, changes[len(changes)-1]), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.JSONEq( t, `{"entity_path":"brokers/1","version":2}`, @@ -764,7 +764,7 @@ func TestZkClientUpdateAssignments(t *testing.T) { defer adminClient.Close() exists, err := adminClient.assignmentInProgress(ctx) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, exists) err = adminClient.AssignPartitions( @@ -801,7 +801,7 @@ func TestZkClientUpdateAssignments(t *testing.T) { ) exists, err = adminClient.assignmentInProgress(ctx) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, exists) } @@ -872,7 +872,7 @@ func TestZkClientAddPartitions(t *testing.T) { defer adminClient.Close() exists, err := adminClient.assignmentInProgress(ctx) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, exists) err = adminClient.AddPartitions( @@ -977,7 +977,7 @@ func TestZkClientRunLeaderElection(t *testing.T) { defer adminClient.Close() exists, err := adminClient.electionInProgress(ctx) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, exists) err = adminClient.RunLeaderElection( @@ -1006,7 +1006,7 @@ func TestZkClientRunLeaderElection(t *testing.T) { ) exists, err = adminClient.electionInProgress(ctx) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, exists) } @@ -1024,19 +1024,19 @@ func TestZkClientLocking(t *testing.T) { lockPath := fmt.Sprintf("/locks/%s", util.RandomString("", 8)) held, err := adminClient.LockHeld(ctx, lockPath) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, held) lock, err := adminClient.AcquireLock(ctx, lockPath) require.NoError(t, err) held, err = adminClient.LockHeld(ctx, lockPath) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, held) lock.Unlock() held, err = adminClient.LockHeld(ctx, lockPath) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, held) } diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index 3851bfcc..5f8e0b88 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -736,7 +736,7 @@ func TestApplyThrottles(t *testing.T) { assert.Equal(t, 0, len(throttledBrokers)) err = applier.removeThottles(ctx, throttledTopic, throttledBrokers) - assert.Nil(t, err) + assert.NoError(t, err) _, err = applier.adminClient.UpdateBrokerConfig( ctx, diff --git a/pkg/apply/assigners/evaluate_test.go b/pkg/apply/assigners/evaluate_test.go index e86cf02e..35091210 100644 --- a/pkg/apply/assigners/evaluate_test.go +++ b/pkg/apply/assigners/evaluate_test.go @@ -156,9 +156,9 @@ func TestEvaluateAssignmentsNonStatic(t *testing.T) { }, ) if testCase.expectedErr[strategy] { - assert.NotNil(t, err) + assert.Error(t, err) } else { - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal( t, expectedResult, diff --git a/pkg/apply/assigners/testing.go b/pkg/apply/assigners/testing.go index 2337bc64..7c57bc74 100644 --- a/pkg/apply/assigners/testing.go +++ b/pkg/apply/assigners/testing.go @@ -31,7 +31,7 @@ func (a assignerTestCase) evaluate(t *testing.T, assigner Assigner) { replicas, err := admin.AssignmentsToReplicas(desired) require.NoError(t, err) - assert.Nil(t, admin.CheckAssignments(desired), a.description) + assert.NoError(t, admin.CheckAssignments(desired), a.description) assert.Equal( t, a.expected, diff --git a/pkg/apply/extenders/testing.go b/pkg/apply/extenders/testing.go index e2b2d4f5..31a213c3 100644 --- a/pkg/apply/extenders/testing.go +++ b/pkg/apply/extenders/testing.go @@ -26,13 +26,13 @@ func (e extenderTestCase) evaluate(t *testing.T, extender Extender) { e.extraPartitions, ) if e.err != nil { - assert.NotNil(t, err, e.description) + assert.Error(t, err, e.description) } else { replicas, err := admin.AssignmentsToReplicas(desired) require.Nil(t, err, e.description) - assert.Nil(t, err, e.description) - assert.Nil(t, admin.CheckAssignments(desired), e.description) + assert.NoError(t, err, e.description) + assert.NoError(t, admin.CheckAssignments(desired), e.description) assert.Equal( t, e.expected, diff --git a/pkg/apply/pickers/testing.go b/pkg/apply/pickers/testing.go index 9ec115e3..2d14e7b7 100644 --- a/pkg/apply/pickers/testing.go +++ b/pkg/apply/pickers/testing.go @@ -37,7 +37,7 @@ func (p pickNewTestCase) evaluate(t *testing.T, picker Picker) { p.index, ) if p.expectedErr { - assert.NotNil(t, err, p.description) + assert.Error(t, err, p.description) } else { require.Nil(t, err, p.description) @@ -81,7 +81,7 @@ func (s sortRemovalsTestCase) evaluate(t *testing.T, picker Picker) { s.index, ) if s.expectedErr { - assert.NotNil(t, err, s.description) + assert.Error(t, err, s.description) } else { require.Nil(t, err, s.description) diff --git a/pkg/apply/rebalancers/testing.go b/pkg/apply/rebalancers/testing.go index 270a1673..ebe927dc 100644 --- a/pkg/apply/rebalancers/testing.go +++ b/pkg/apply/rebalancers/testing.go @@ -32,7 +32,7 @@ func (r rebalancerTestCase) evaluate(t *testing.T, rebalancer Rebalancer) { replicas, err := admin.AssignmentsToReplicas(desired) require.NoError(t, err) - assert.Nil(t, admin.CheckAssignments(desired), r.description) + assert.NoError(t, admin.CheckAssignments(desired), r.description) assert.Equal( t, r.expected, diff --git a/pkg/config/load.go b/pkg/config/load.go index 22aff08c..158e6f33 100644 --- a/pkg/config/load.go +++ b/pkg/config/load.go @@ -15,13 +15,15 @@ import ( var sep = regexp.MustCompile("(?:^|\\s*\n)---\\s*") // LoadClusterFile loads a ClusterConfig from a path to a YAML file. -func LoadClusterFile(path string) (ClusterConfig, error) { +func LoadClusterFile(path string, expandEnv bool) (ClusterConfig, error) { contents, err := ioutil.ReadFile(path) if err != nil { return ClusterConfig{}, err } - contents = []byte(os.ExpandEnv(string(contents))) + if expandEnv { + contents = []byte(os.ExpandEnv(string(contents))) + } absPath, err := filepath.Abs(path) if err != nil { diff --git a/pkg/config/load_test.go b/pkg/config/load_test.go index 233d51b9..bdd84ed0 100644 --- a/pkg/config/load_test.go +++ b/pkg/config/load_test.go @@ -1,6 +1,7 @@ package config import ( + "os" "testing" "github.com/stretchr/testify/assert" @@ -8,8 +9,11 @@ import ( ) func TestLoadCluster(t *testing.T) { - clusterConfig, err := LoadClusterFile("testdata/test-cluster/cluster.yaml") - assert.Nil(t, err) + os.Setenv("K2_TEST_ENV_VAR", "test-region") + defer os.Unsetenv("K2_TEST_ENV_VAR") + + clusterConfig, err := LoadClusterFile("testdata/test-cluster/cluster.yaml", true) + assert.NoError(t, err) // Empty RootDir since this will vary based on where test is run. clusterConfig.RootDir = "" @@ -36,11 +40,11 @@ func TestLoadCluster(t *testing.T) { }, clusterConfig, ) - assert.Nil(t, clusterConfig.Validate()) + assert.NoError(t, clusterConfig.Validate()) - clusterConfig, err = LoadClusterFile("testdata/test-cluster/cluster-invalid.yaml") - assert.Nil(t, err) - assert.NotNil(t, clusterConfig.Validate()) + clusterConfig, err = LoadClusterFile("testdata/test-cluster/cluster-invalid.yaml", true) + assert.NoError(t, err) + assert.Error(t, clusterConfig.Validate()) } func TestLoadTopicsFile(t *testing.T) { @@ -83,13 +87,13 @@ func TestLoadTopicsFile(t *testing.T) { }, topicConfig, ) - assert.Nil(t, topicConfig.Validate(3)) + assert.NoError(t, topicConfig.Validate(3)) topicConfigs, err = LoadTopicsFile("testdata/test-cluster/topics/topic-test-invalid.yaml") assert.Equal(t, 1, len(topicConfigs)) topicConfig = topicConfigs[0] require.NoError(t, err) - assert.NotNil(t, topicConfig.Validate(3)) + assert.Error(t, topicConfig.Validate(3)) topicConfigs, err = LoadTopicsFile("testdata/test-cluster/topics/topic-test-multi.yaml") assert.Equal(t, 2, len(topicConfigs)) @@ -98,16 +102,19 @@ func TestLoadTopicsFile(t *testing.T) { } func TestCheckConsistency(t *testing.T) { - clusterConfig, err := LoadClusterFile("testdata/test-cluster/cluster.yaml") - assert.Nil(t, err) - assert.Nil(t, clusterConfig.Validate()) + os.Setenv("K2_TEST_ENV_VAR", "test-region") + defer os.Unsetenv("K2_TEST_ENV_VAR") + + clusterConfig, err := LoadClusterFile("testdata/test-cluster/cluster.yaml", true) + assert.NoError(t, err) + assert.NoError(t, clusterConfig.Validate()) topicConfigs, err := LoadTopicsFile("testdata/test-cluster/topics/topic-test.yaml") assert.Equal(t, 1, len(topicConfigs)) topicConfig := topicConfigs[0] topicConfig.SetDefaults() - assert.Nil(t, err) - assert.Nil(t, topicConfig.Validate(3)) + assert.NoError(t, err) + assert.NoError(t, topicConfig.Validate(3)) topicConfigNoMatchs, err := LoadTopicsFile( "testdata/test-cluster/topics/topic-test-no-match.yaml", @@ -115,9 +122,9 @@ func TestCheckConsistency(t *testing.T) { assert.Equal(t, 1, len(topicConfigNoMatchs)) topicConfigNoMatch := topicConfigNoMatchs[0] topicConfigNoMatch.SetDefaults() - assert.Nil(t, err) - assert.Nil(t, topicConfig.Validate(3)) + assert.NoError(t, err) + assert.NoError(t, topicConfig.Validate(3)) - assert.Nil(t, CheckConsistency(topicConfig, clusterConfig)) - assert.NotNil(t, CheckConsistency(topicConfigNoMatch, clusterConfig)) + assert.NoError(t, CheckConsistency(topicConfig, clusterConfig)) + assert.Error(t, CheckConsistency(topicConfigNoMatch, clusterConfig)) } diff --git a/pkg/config/settings_test.go b/pkg/config/settings_test.go index 00d52d43..4d7bcdba 100644 --- a/pkg/config/settings_test.go +++ b/pkg/config/settings_test.go @@ -119,7 +119,7 @@ func TestSettingsToConfigEntries(t *testing.T) { } configEntries, err := settings.ToConfigEntries(nil) - assert.Nil(t, err) + assert.NoError(t, err) assert.ElementsMatch( t, []kafka.ConfigEntry{ @@ -154,7 +154,7 @@ func TestSettingsToConfigEntries(t *testing.T) { configEntries, err = settings.ToConfigEntries( []string{"cleanup.policy", "retention.ms"}, ) - assert.Nil(t, err) + assert.NoError(t, err) assert.ElementsMatch( t, configEntries, @@ -173,7 +173,7 @@ func TestSettingsToConfigEntries(t *testing.T) { _, err = settings.ToConfigEntries( []string{"cleanup.policy", "invalid-key"}, ) - assert.NotNil(t, err) + assert.Error(t, err) badSettings := TopicSettings{ "key": map[string]int{ @@ -181,7 +181,7 @@ func TestSettingsToConfigEntries(t *testing.T) { }, } _, err = badSettings.ToConfigEntries(nil) - assert.NotNil(t, err) + assert.Error(t, err) } func TestConfigMapDiffs(t *testing.T) { diff --git a/pkg/config/testdata/test-cluster/cluster.yaml b/pkg/config/testdata/test-cluster/cluster.yaml index c615369a..50e67deb 100644 --- a/pkg/config/testdata/test-cluster/cluster.yaml +++ b/pkg/config/testdata/test-cluster/cluster.yaml @@ -1,7 +1,7 @@ meta: name: test-cluster environment: test-env - region: test-region + region: $K2_TEST_ENV_VAR description: | Test cluster diff --git a/pkg/messages/bounds_test.go b/pkg/messages/bounds_test.go index ed768903..38bfeac3 100644 --- a/pkg/messages/bounds_test.go +++ b/pkg/messages/bounds_test.go @@ -62,7 +62,7 @@ func TestGetAllPartitionBounds(t *testing.T) { require.NoError(t, err) bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil) - assert.Nil(t, err) + assert.NoError(t, err) // The first partition gets 3 messages assert.Equal(t, 4, len(bounds)) @@ -83,7 +83,7 @@ func TestGetAllPartitionBounds(t *testing.T) { 0: 1, }, ) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 4, len(boundsWithOffsets)) diff --git a/pkg/messages/tail_test.go b/pkg/messages/tail_test.go index 8011d982..3004804b 100644 --- a/pkg/messages/tail_test.go +++ b/pkg/messages/tail_test.go @@ -83,7 +83,7 @@ outerLoop: for { select { case message := <-messagesChan: - assert.Nil(t, message.Err) + assert.NoError(t, message.Err) seenKeys[string(message.Message.Key)] = struct{}{} messageCount++ diff --git a/pkg/version/version.go b/pkg/version/version.go index a02f14f7..fb6f8b31 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.0.0" +const Version = "1.2.0" diff --git a/pkg/zk/client_test.go b/pkg/zk/client_test.go index 3436a018..1dca9769 100644 --- a/pkg/zk/client_test.go +++ b/pkg/zk/client_test.go @@ -142,19 +142,19 @@ func TestPooledClientRead(t *testing.T) { ctx, fmt.Sprintf("/%s/parent1/parent2/child1/subchild1", prefix), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.True(t, exists) exists, _, err = pooledClient.Exists( ctx, fmt.Sprintf("/%s/parent1/parent2/child1/non-existent-path", prefix), ) - assert.Nil(t, err) + assert.NoError(t, err) assert.False(t, exists) // Writes not allowed err = pooledClient.Create(ctx, fmt.Sprintf("/%s/parent4", prefix), []byte("test"), true) - assert.NotNil(t, err) + assert.Error(t, err) } func TestPooledClientWrites(t *testing.T) { @@ -201,7 +201,7 @@ func TestPooledClientWrites(t *testing.T) { var testStr string _, err = pooledClient.GetJSON(ctx, testPath, &testStr) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "hello", testStr) stats, err := pooledClient.SetJSON( @@ -217,7 +217,7 @@ func TestPooledClientWrites(t *testing.T) { testObj := map[string]string{} _, err = pooledClient.GetJSON(ctx, testPath, &testObj) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal( t, map[string]string{ @@ -271,7 +271,7 @@ func TestPooledClientSequentialWrites(t *testing.T) { } children, _, err := pooledClient.Children(ctx, fmt.Sprintf("/%s", prefix)) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 5, len(children)) } @@ -313,13 +313,13 @@ func TestPooledClientLocks(t *testing.T) { require.NotNil(t, lock) children, _, err := pooledClient.Children(ctx, lockPath) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 1, len(children)) require.Nil(t, lock.Unlock()) children, _, err = pooledClient.Children(ctx, lockPath) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, 0, len(children)) }