Skip to content

Commit

Permalink
Merge pull request nats-io#627 from ripienaar/jsm_options
Browse files Browse the repository at this point in the history
Expect jsm options on JS related monitors
  • Loading branch information
ripienaar authored and sylr committed Feb 13, 2025
2 parents 3173c77 + dc0f141 commit 88e2f94
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 121 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,10 @@ This can be used by the `Manager` to validate all API access.
```go
mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))
```

## Build tag

This library provide a `noexprlang` build tag that disables expression matching
for Streams and Consumers listing. The purpose of this build tag is to disable
the use of the `github.com/expr-lang/expr` module that disables go compiler's dead
code elimination because it uses some types and functions of the `reflect` package.
49 changes: 0 additions & 49 deletions consumer_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import (
"strconv"
"time"

"github.com/expr-lang/expr"
"github.com/nats-io/jsm.go/api"
"gopkg.in/yaml.v3"
)

type consumerMatcher func([]*Consumer) ([]*Consumer, error)
Expand Down Expand Up @@ -355,50 +353,3 @@ func (q *consumerQuery) matchApiLevel(consumers []*Consumer) ([]*Consumer, error
return (!q.invert && requiredLevel >= q.apiLevel) || (q.invert && requiredLevel < q.apiLevel)
})
}

func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) {
if q.expression == "" {
return consumers, nil
}

var matched []*Consumer

for _, consumer := range consumers {
cfg := map[string]any{}
state := map[string]any{}

cfgBytes, _ := yaml.Marshal(consumer.Configuration())
yaml.Unmarshal(cfgBytes, &cfg)
nfo, _ := consumer.LatestState()
stateBytes, _ := yaml.Marshal(nfo)
yaml.Unmarshal(stateBytes, &state)

env := map[string]any{
"config": cfg,
"state": state,
"info": state,
"Info": nfo,
}

program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool())
if err != nil {
return nil, err
}

out, err := expr.Run(program, env)
if err != nil {
return nil, err
}

should, ok := out.(bool)
if !ok {
return nil, fmt.Errorf("expression did not return a boolean")
}

if should {
matched = append(matched, consumer)
}
}

return matched, nil
}
107 changes: 107 additions & 0 deletions match.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//go:build !noexprlang

package jsm

import (
"fmt"

"github.com/expr-lang/expr"
"gopkg.in/yaml.v3"
)

func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) {
if q.expression == "" {
return streams, nil
}

var matched []*Stream

for _, stream := range streams {
cfg := map[string]any{}
state := map[string]any{}
info := map[string]any{}

cfgBytes, _ := yaml.Marshal(stream.Configuration())
yaml.Unmarshal(cfgBytes, &cfg)
nfo, _ := stream.LatestInformation()
nfoBytes, _ := yaml.Marshal(nfo)
yaml.Unmarshal(nfoBytes, &info)
stateBytes, _ := yaml.Marshal(nfo.State)
yaml.Unmarshal(stateBytes, &state)

env := map[string]any{
"config": cfg,
"state": state,
"info": info,
"Info": nfo,
}

program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool())
if err != nil {
return nil, err
}

out, err := expr.Run(program, env)
if err != nil {
return nil, err
}

should, ok := out.(bool)
if !ok {
return nil, fmt.Errorf("expression did not return a boolean")
}

if should {
matched = append(matched, stream)
}
}

return matched, nil
}

func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) {
if q.expression == "" {
return consumers, nil
}

var matched []*Consumer

for _, consumer := range consumers {
cfg := map[string]any{}
state := map[string]any{}

cfgBytes, _ := yaml.Marshal(consumer.Configuration())
yaml.Unmarshal(cfgBytes, &cfg)
nfo, _ := consumer.LatestState()
stateBytes, _ := yaml.Marshal(nfo)
yaml.Unmarshal(stateBytes, &state)

env := map[string]any{
"config": cfg,
"state": state,
"info": state,
"Info": nfo,
}

program, err := expr.Compile(q.expression, expr.Env(env), expr.AsBool())
if err != nil {
return nil, err
}

out, err := expr.Run(program, env)
if err != nil {
return nil, err
}

should, ok := out.(bool)
if !ok {
return nil, fmt.Errorf("expression did not return a boolean")
}

if should {
matched = append(matched, consumer)
}
}

return matched, nil
}
17 changes: 17 additions & 0 deletions match_noexpr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build noexprlang

package jsm

import "fmt"

// ErrNoExprLangBuild warns that expression matching is disabled when compiling
// a go binary with the `noexprlang` build tag.
var ErrNoExprLangBuild = fmt.Errorf("binary has been built with `noexprlang` build tag and thus does not support expression matching")

func (q *streamQuery) matchExpression(streams []*Stream) ([]*Stream, error) {
return nil, ErrNoExprLangBuild
}

func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) {
return nil, ErrNoExprLangBuild
}
4 changes: 2 additions & 2 deletions monitor/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func ConsumerInfoHealthCheck(nfo *api.ConsumerInfo, check *Result, opts Consumer
consumerCheckPinned(nfo, check, opts, log)
}

func ConsumerHealthCheck(server string, nopts []nats.Option, check *Result, opts ConsumerHealthCheckOptions, log api.Logger) error {
func ConsumerHealthCheck(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts ConsumerHealthCheckOptions, log api.Logger) error {
if opts.StreamName == "" {
check.Critical("stream name is required")
return nil
Expand All @@ -149,7 +149,7 @@ func ConsumerHealthCheck(server string, nopts []nats.Option, check *Result, opts
return nil
}

mgr, err := jsm.New(nc)
mgr, err := jsm.New(nc, jsmOpts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/js_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CheckJetStreamAccountOptions struct {
Resolver func() *api.JetStreamAccountStats `json:"-" yaml:"-"`
}

func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts CheckJetStreamAccountOptions) error {
func CheckJetStreamAccount(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckJetStreamAccountOptions) error {
var mgr *jsm.Manager
var err error

Expand All @@ -47,7 +47,7 @@ func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, op
return nil
}

mgr, err = jsm.New(nc)
mgr, err = jsm.New(nc, jsmOpts...)
if check.CriticalIfErr(err, "setup failed: %v", err) {
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions monitor/js_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func TestCheckAccountInfo(t *testing.T) {
t.Run("No limits, default thresholds", func(t *testing.T) {
opts, info, check := setDefaults()
info.Limits = api.JetStreamAccountLimits{}
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertHasPDItem(t, check, "memory=128B memory_pct=0%;75;90 storage=1024B storage_pct=0%;75;90 streams=10 streams_pct=0% consumers=100 consumers_pct=0%")
})

t.Run("Limits, default thresholds", func(t *testing.T) {
opts, _, check := setDefaults()
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertHasPDItem(t, check, "memory=128B memory_pct=12%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%")
Expand All @@ -76,7 +76,7 @@ func TestCheckAccountInfo(t *testing.T) {
t.Run("Usage exceeds max", func(t *testing.T) {
opts, info, check := setDefaults()
info.Streams = 300
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertListEquals(t, check.Criticals, "streams: exceed server limits")
assertListIsEmpty(t, check.Warnings)
assertHasPDItem(t, check, "memory=128B memory_pct=12%;75;90 storage=1024B storage_pct=5%;75;90 streams=300 streams_pct=150% consumers=100 consumers_pct=10%")
Expand All @@ -86,14 +86,14 @@ func TestCheckAccountInfo(t *testing.T) {
opts, _, check := setDefaults()
opts.MemoryWarning = 90
opts.MemoryCritical = 80
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertListEquals(t, check.Criticals, "memory: invalid thresholds")
})

t.Run("Exceeds warning threshold", func(t *testing.T) {
opts, info, check := setDefaults()
info.Memory = 800
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertHasPDItem(t, check, "memory=800B memory_pct=78%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%")
assertListIsEmpty(t, check.Criticals)
assertListEquals(t, check.Warnings, "78% memory")
Expand All @@ -103,7 +103,7 @@ func TestCheckAccountInfo(t *testing.T) {
opts, info, check := setDefaults()

info.Memory = 960
assertNoError(t, monitor.CheckJetStreamAccount("", nil, check, *opts))
assertNoError(t, monitor.CheckJetStreamAccount("", nil, nil, check, *opts))
assertHasPDItem(t, check, "memory=960B memory_pct=93%;75;90 storage=1024B storage_pct=5%;75;90 streams=10 streams_pct=5% consumers=100 consumers_pct=10%")
assertListEquals(t, check.Criticals, "93% memory")
assertListIsEmpty(t, check.Warnings)
Expand Down
4 changes: 2 additions & 2 deletions monitor/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func CheckStreamInfoHealth(nfo *api.StreamInfo, check *Result, opts CheckStreamH
streamCheckMirror(nfo, check, opts, log)
}

func CheckStreamHealth(server string, nopts []nats.Option, check *Result, opts CheckStreamHealthOptions, log api.Logger) error {
func CheckStreamHealth(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckStreamHealthOptions, log api.Logger) error {
if opts.StreamName == "" {
check.Critical("stream name is required")
return nil
Expand All @@ -173,7 +173,7 @@ func CheckStreamHealth(server string, nopts []nats.Option, check *Result, opts C
return nil
}

mgr, err := jsm.New(nc)
mgr, err := jsm.New(nc, jsmOpts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/stream_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type CheckStreamMessageOptions struct {
BodyAsTimestamp bool `json:"body_as_timestamp" yaml:"body_as_timestamp"`
}

func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts CheckStreamMessageOptions) error {
func CheckStreamMessage(server string, nopts []nats.Option, jsmOpts []jsm.Option, check *Result, opts CheckStreamMessageOptions) error {
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}

mgr, err := jsm.New(nc)
mgr, err := jsm.New(nc, jsmOpts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions monitor/stream_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCheckMessage(t *testing.T) {
AgeWarning: 1,
BodyAsTimestamp: true,
}
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts))
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.OKs)
assertListEquals(t, check.Criticals, "no message found")
Expand All @@ -52,7 +52,7 @@ func TestCheckMessage(t *testing.T) {
checkErr(t, err, "publish failed: %v", err)

check = &monitor.Result{}
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts))
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.Criticals)
assertListEquals(t, check.OKs, "Valid message on TEST > TEST")
Expand All @@ -62,7 +62,7 @@ func TestCheckMessage(t *testing.T) {
checkErr(t, err, "publish failed: %v", err)

check = &monitor.Result{}
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts))
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts))
assertListIsEmpty(t, check.Criticals)
if len(check.Warnings) != 1 {
t.Fatalf("expected 1 warning got: %v", check.Warnings)
Expand All @@ -73,15 +73,15 @@ func TestCheckMessage(t *testing.T) {
checkErr(t, err, "publish failed: %v", err)

check = &monitor.Result{}
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts))
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts))
assertListIsEmpty(t, check.Warnings)
if len(check.Criticals) != 1 {
t.Fatalf("expected 1 critical got: %v", check.Criticals)
}

opts.BodyAsTimestamp = false
check = &monitor.Result{}
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, check, opts))
assertNoError(t, monitor.CheckStreamMessage(srv.ClientURL(), nil, nil, check, opts))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.Criticals)
assertListEquals(t, check.OKs, "Valid message on TEST > TEST")
Expand Down
Loading

0 comments on commit 88e2f94

Please sign in to comment.