diff --git a/README.md b/README.md index 3e966c9..a26f831 100644 --- a/README.md +++ b/README.md @@ -175,3 +175,10 @@ This can be used by the `Manager` to validate all API access. ```go mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator))) ``` + +## Build tag + +This library provide a `noexprlang` build tag that disables expression matching +for Streams and Consumers listing. The purpose of this build tag is to disable +the use of the `github.com/expr-lang/expr` module that disables go compiler's dead +code elimination because it uses some types and functions of the `reflect` package. diff --git a/consumer_query.go b/consumer_query.go index 03b3abb..33b0e13 100644 --- a/consumer_query.go +++ b/consumer_query.go @@ -19,9 +19,7 @@ import ( "strconv" "time" - "github.com/expr-lang/expr" "github.com/nats-io/jsm.go/api" - "gopkg.in/yaml.v3" ) type consumerMatcher func([]*Consumer) ([]*Consumer, error) @@ -355,50 +353,3 @@ func (q *consumerQuery) matchApiLevel(consumers []*Consumer) ([]*Consumer, error return (!q.invert && requiredLevel >= q.apiLevel) || (q.invert && requiredLevel < q.apiLevel) }) } - -func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) { - if q.expression == "" { - return consumers, nil - } - - var matched []*Consumer - - for _, consumer := range consumers { - cfg := map[string]any{} - state := map[string]any{} - - cfgBytes, _ := yaml.Marshal(consumer.Configuration()) - yaml.Unmarshal(cfgBytes, &cfg) - nfo, _ := consumer.LatestState() - stateBytes, _ := yaml.Marshal(nfo) - yaml.Unmarshal(stateBytes, &state) - - env := map[string]any{ - "config": cfg, - "state": state, - "info": state, - "Info": nfo, - } - - program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool()) - if err != nil { - return nil, err - } - - out, err := expr.Run(program, env) - if err != nil { - return nil, err - } - - should, ok := out.(bool) - if !ok { - return nil, fmt.Errorf("expression did not return a boolean") - } - - if should { - matched = append(matched, consumer) - } - } - - return matched, nil -} diff --git a/match.go b/match.go new file mode 100644 index 0000000..a93452b --- /dev/null +++ b/match.go @@ -0,0 +1,107 @@ +//go:build !noexprlang + +package jsm + +import ( + "fmt" + + "github.com/expr-lang/expr" + "gopkg.in/yaml.v3" +) + +func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) { + if q.expression == "" { + return streams, nil + } + + var matched []*Stream + + for _, stream := range streams { + cfg := map[string]any{} + state := map[string]any{} + info := map[string]any{} + + cfgBytes, _ := yaml.Marshal(stream.Configuration()) + yaml.Unmarshal(cfgBytes, &cfg) + nfo, _ := stream.LatestInformation() + nfoBytes, _ := yaml.Marshal(nfo) + yaml.Unmarshal(nfoBytes, &info) + stateBytes, _ := yaml.Marshal(nfo.State) + yaml.Unmarshal(stateBytes, &state) + + env := map[string]any{ + "config": cfg, + "state": state, + "info": info, + "Info": nfo, + } + + program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool()) + if err != nil { + return nil, err + } + + out, err := expr.Run(program, env) + if err != nil { + return nil, err + } + + should, ok := out.(bool) + if !ok { + return nil, fmt.Errorf("expression did not return a boolean") + } + + if should { + matched = append(matched, stream) + } + } + + return matched, nil +} + +func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) { + if q.expression == "" { + return consumers, nil + } + + var matched []*Consumer + + for _, consumer := range consumers { + cfg := map[string]any{} + state := map[string]any{} + + cfgBytes, _ := yaml.Marshal(consumer.Configuration()) + yaml.Unmarshal(cfgBytes, &cfg) + nfo, _ := consumer.LatestState() + stateBytes, _ := yaml.Marshal(nfo) + yaml.Unmarshal(stateBytes, &state) + + env := map[string]any{ + "config": cfg, + "state": state, + "info": state, + "Info": nfo, + } + + program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool()) + if err != nil { + return nil, err + } + + out, err := expr.Run(program, env) + if err != nil { + return nil, err + } + + should, ok := out.(bool) + if !ok { + return nil, fmt.Errorf("expression did not return a boolean") + } + + if should { + matched = append(matched, consumer) + } + } + + return matched, nil +} diff --git a/match_noexpr.go b/match_noexpr.go new file mode 100644 index 0000000..469261e --- /dev/null +++ b/match_noexpr.go @@ -0,0 +1,17 @@ +//go:build noexprlang + +package jsm + +import "fmt" + +// ErrNoExprLangBuild warns that expression matching is disabled when compiling +// a go binary with the `noexprlang` build tag. +var ErrNoExprLangBuild = fmt.Errorf("binary has been built with `noexprlang` build tag and thus does not support expression matching") + +func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) { + return nil, ErrNoExprLangBuild +} + +func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) { + return nil, ErrNoExprLangBuild +} diff --git a/monitor/consumer.go b/monitor/consumer.go index 0a722ec..4fcc167 100644 --- a/monitor/consumer.go +++ b/monitor/consumer.go @@ -134,7 +134,7 @@ func ConsumerInfoHealthCheck(nfo *api.ConsumerInfo, check *Result, opts Consumer consumerCheckPinned(nfo, check, opts, log) } -func ConsumerHealthCheck(server string, nopts []nats.Option, check *Result, opts ConsumerHealthCheckOptions, log api.Logger) error { +func ConsumerHealthCheck(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts ConsumerHealthCheckOptions, log api.Logger) error { if opts.StreamName == "" { check.Critical("stream name is required") return nil @@ -149,7 +149,7 @@ func ConsumerHealthCheck(server string, nopts []nats.Option, check *Result, opts return nil } - mgr, err := jsm.New(nc) + mgr, err := jsm.New(nc, jsmOpts...) if check.CriticalIfErr(err, "could not load info: %v", err) { return nil } diff --git a/monitor/js_account.go b/monitor/js_account.go index ba8413a..fbea05f 100644 --- a/monitor/js_account.go +++ b/monitor/js_account.go @@ -37,7 +37,7 @@ type CheckJetStreamAccountOptions struct { Resolver func() *api.JetStreamAccountStats `json:"-" yaml:"-"` } -func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts CheckJetStreamAccountOptions) error { +func CheckJetStreamAccount(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckJetStreamAccountOptions) error { var mgr *jsm.Manager var err error @@ -47,7 +47,7 @@ func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, op return nil } - mgr, err = jsm.New(nc) + mgr, err = jsm.New(nc, jsmOpts...) if check.CriticalIfErr(err, "setup failed: %v", err) { return nil } diff --git a/monitor/js_account_test.go b/monitor/js_account_test.go index 03bc95d..0d93b78 100644 --- a/monitor/js_account_test.go +++ b/monitor/js_account_test.go @@ -58,7 +58,7 @@ func TestCheckAccountInfo(t *testing.T) { t.Run("No limits, default thresholds", func(t *testing.T) { opts, info, check := setDefaults() info.Limits = api.JetStreamAccountLimits{} - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertListIsEmpty(t, check.Criticals) assertListIsEmpty(t, check.Warnings) assertHasPDItem(t, check, "memory=128B memory_pct=0%;75;90 storage=1024B storage_pct=0%;75;90 streams=10 streams_pct=0% consumers=100 consumers_pct=0%") @@ -66,7 +66,7 @@ func TestCheckAccountInfo(t *testing.T) { t.Run("Limits, default thresholds", func(t *testing.T) { opts, _, check := setDefaults() - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertListIsEmpty(t, check.Criticals) assertListIsEmpty(t, check.Warnings) assertHasPDItem(t, check, "memory=128B memory_pct=12%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%") @@ -76,7 +76,7 @@ func TestCheckAccountInfo(t *testing.T) { t.Run("Usage exceeds max", func(t *testing.T) { opts, info, check := setDefaults() info.Streams = 300 - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertListEquals(t, check.Criticals, "streams: exceed server limits") assertListIsEmpty(t, check.Warnings) assertHasPDItem(t, check, "memory=128B memory_pct=12%;75;90 storage=1024B storage_pct=5%;75;90 streams=300 streams_pct=150% consumers=100 consumers_pct=10%") @@ -86,14 +86,14 @@ func TestCheckAccountInfo(t *testing.T) { opts, _, check := setDefaults() opts.MemoryWarning = 90 opts.MemoryCritical = 80 - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertListEquals(t, check.Criticals, "memory: invalid thresholds") }) t.Run("Exceeds warning threshold", func(t *testing.T) { opts, info, check := setDefaults() info.Memory = 800 - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertHasPDItem(t, check, "memory=800B memory_pct=78%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%") assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.Warnings, "78% memory") @@ -103,7 +103,7 @@ func TestCheckAccountInfo(t *testing.T) { opts, info, check := setDefaults() info.Memory = 960 - assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts)) + assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts)) assertHasPDItem(t, check, "memory=960B memory_pct=93%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%") assertListEquals(t, check.Criticals, "93% memory") assertListIsEmpty(t, check.Warnings) diff --git a/monitor/stream.go b/monitor/stream.go index 8cf4646..df067ce 100644 --- a/monitor/stream.go +++ b/monitor/stream.go @@ -162,7 +162,7 @@ func CheckStreamInfoHealth(nfo *api.StreamInfo, check *Result, opts CheckStreamH streamCheckMirror(nfo, check, opts, log) } -func CheckStreamHealth(server string, nopts []nats.Option, check *Result, opts CheckStreamHealthOptions, log api.Logger) error { +func CheckStreamHealth(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckStreamHealthOptions, log api.Logger) error { if opts.StreamName == "" { check.Critical("stream name is required") return nil @@ -173,7 +173,7 @@ func CheckStreamHealth(server string, nopts []nats.Option, check *Result, opts C return nil } - mgr, err := jsm.New(nc) + mgr, err := jsm.New(nc, jsmOpts...) if check.CriticalIfErr(err, "could not load info: %v", err) { return nil } diff --git a/monitor/stream_msg.go b/monitor/stream_msg.go index 526610c..3e5eb19 100644 --- a/monitor/stream_msg.go +++ b/monitor/stream_msg.go @@ -40,13 +40,13 @@ type CheckStreamMessageOptions struct { BodyAsTimestamp bool `json:"body_as_timestamp" yaml:"body_as_timestamp"` } -func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts CheckStreamMessageOptions) error { +func CheckStreamMessage(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckStreamMessageOptions) error { nc, err := nats.Connect(server, nopts...) if check.CriticalIfErr(err, "could not load info: %v", err) { return nil } - mgr, err := jsm.New(nc) + mgr, err := jsm.New(nc, jsmOpts...) if check.CriticalIfErr(err, "could not load info: %v", err) { return nil } diff --git a/monitor/stream_msg_test.go b/monitor/stream_msg_test.go index 5ecd59c..61b2536 100644 --- a/monitor/stream_msg_test.go +++ b/monitor/stream_msg_test.go @@ -42,7 +42,7 @@ func TestCheckMessage(t *testing.T) { AgeWarning: 1, BodyAsTimestamp: true, } - assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts)) + assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts)) assertListIsEmpty(t, check.Warnings) assertListIsEmpty(t, check.OKs) assertListEquals(t, check.Criticals, "no message found") @@ -52,7 +52,7 @@ func TestCheckMessage(t *testing.T) { checkErr(t, err, "publish failed: %v", err) check = &monitor.Result{} - assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts)) + assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts)) assertListIsEmpty(t, check.Warnings) assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.OKs, "Valid message on TEST > TEST") @@ -62,7 +62,7 @@ func TestCheckMessage(t *testing.T) { checkErr(t, err, "publish failed: %v", err) check = &monitor.Result{} - assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts)) + assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts)) assertListIsEmpty(t, check.Criticals) if len(check.Warnings) != 1 { t.Fatalf("expected 1 warning got: %v", check.Warnings) @@ -73,7 +73,7 @@ func TestCheckMessage(t *testing.T) { checkErr(t, err, "publish failed: %v", err) check = &monitor.Result{} - assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts)) + assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts)) assertListIsEmpty(t, check.Warnings) if len(check.Criticals) != 1 { t.Fatalf("expected 1 critical got: %v", check.Criticals) @@ -81,7 +81,7 @@ func TestCheckMessage(t *testing.T) { opts.BodyAsTimestamp = false check = &monitor.Result{} - assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts)) + assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts)) assertListIsEmpty(t, check.Warnings) assertListIsEmpty(t, check.Criticals) assertListEquals(t, check.OKs, "Valid message on TEST > TEST") diff --git a/stream_query.go b/stream_query.go index df2a796..45bc5fb 100644 --- a/stream_query.go +++ b/stream_query.go @@ -14,15 +14,12 @@ package jsm import ( - "fmt" "regexp" "strconv" "strings" "time" - "github.com/expr-lang/expr" "github.com/nats-io/jsm.go/api" - "gopkg.in/yaml.v3" ) type streamMatcher func([]*Stream) ([]*Stream, error) @@ -231,56 +228,6 @@ func (q *streamQuery) Filter(streams []*Stream) ([]*Stream, error) { return matched, nil } -func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) { - if q.expression == "" { - return streams, nil - } - - var matched []*Stream - - for _, stream := range streams { - cfg := map[string]any{} - state := map[string]any{} - info := map[string]any{} - - cfgBytes, _ := yaml.Marshal(stream.Configuration()) - yaml.Unmarshal(cfgBytes, &cfg) - nfo, _ := stream.LatestInformation() - nfoBytes, _ := yaml.Marshal(nfo) - yaml.Unmarshal(nfoBytes, &info) - stateBytes, _ := yaml.Marshal(nfo.State) - yaml.Unmarshal(stateBytes, &state) - - env := map[string]any{ - "config": cfg, - "state": state, - "info": info, - "Info": nfo, - } - - program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool()) - if err != nil { - return nil, err - } - - out, err := expr.Run(program, env) - if err != nil { - return nil, err - } - - should, ok := out.(bool) - if !ok { - return nil, fmt.Errorf("expression did not return a boolean") - } - - if should { - matched = append(matched, stream) - } - } - - return matched, nil -} - func (q *streamQuery) matchLeaderServer(streams []*Stream) ([]*Stream, error) { if q.leader == "" { return streams, nil