Skip to content

Commit

Permalink
Merge pull request nats-io#626 from ripienaar/mon_consistency
Browse files Browse the repository at this point in the history
Improve api consistency and use some server structs
  • Loading branch information
ripienaar authored Feb 7, 2025
2 parents a685220 + 30dabca commit 3173c77
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 179 deletions.
2 changes: 1 addition & 1 deletion audit/jetstream_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func checkStreamMetadataMonitoring(_ *Check, r *archive.Reader, examples *Exampl
}

opts.StreamName = streamName
monitor.StreamInfoHealthCheck(&streamDetails, check, *opts, log)
monitor.CheckStreamInfoHealth(&streamDetails, check, *opts, log)

for _, warning := range check.Warnings {
examples.Add("WARNING: stream %s in %s: %s", streamName, accountName, warning)
Expand Down
6 changes: 3 additions & 3 deletions monitor/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/nats-io/nats.go"
)

// ConnectionCheckOptions configures the NATS Connection check
type ConnectionCheckOptions struct {
// CheckConnectionOptions configures the NATS Connection check
type CheckConnectionOptions struct {
// ConnectTimeWarning warning threshold for time to establish the connection (seconds)
ConnectTimeWarning float64 `json:"connect_time_warning" yaml:"connect_time_warning"`
// ConnectTimeCritical critical threshold for time to establish the connection (seconds)
Expand All @@ -36,7 +36,7 @@ type ConnectionCheckOptions struct {
RequestRttCritical float64 `json:"request_rtt_critical" yaml:"request_rtt_critical"`
}

func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts ConnectionCheckOptions) error {
func CheckConnection(server string, nopts []nats.Option, timeout time.Duration, check *Result, opts CheckConnectionOptions) error {
connStart := time.Now()
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "connection failed: %v", err) {
Expand Down
6 changes: 3 additions & 3 deletions monitor/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/nats-io/nkeys"
)

// CredentialCheckOptions configures the credentials check
type CredentialCheckOptions struct {
// CheckCredentialOptions configures the credentials check
type CheckCredentialOptions struct {
// File is the file holding the credential
File string `json:"file" yaml:"file"`
// ValidityWarning is the warning threshold for credential validity (seconds)
Expand All @@ -33,7 +33,7 @@ type CredentialCheckOptions struct {
RequiresExpiry bool `json:"requires_expiry" yaml:"requires_expiry"`
}

func CheckCredential(check *Result, opts CredentialCheckOptions) error {
func CheckCredential(check *Result, opts CheckCredentialOptions) error {
ok, err := fileAccessible(opts.File)
if err != nil {
check.Critical("credential not accessible: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions monitor/credentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI
}

t.Run("no expiry", func(t *testing.T) {
opts := monitor.CredentialCheckOptions{
opts := monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
RequiresExpiry: true,
}
Expand All @@ -80,7 +80,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI

t.Run("critical", func(t *testing.T) {
check := &monitor.Result{}
assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{
assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
ValidityCritical: 100 * 24 * 365 * 60 * 60,
}))
Expand All @@ -91,7 +91,7 @@ SUAKYITMHPMSYUGPNQBLLPGOPFQN44XNCGXHNSHLJJVMD3IKYGBOLAI7TI

t.Run("warning", func(t *testing.T) {
check := &monitor.Result{}
assertNoError(t, monitor.CheckCredential(check, monitor.CredentialCheckOptions{
assertNoError(t, monitor.CheckCredential(check, monitor.CheckCredentialOptions{
File: writeCred(t, noExpiry),
ValidityWarning: 100 * 24 * 365 * 60 * 60,
}))
Expand Down
8 changes: 4 additions & 4 deletions monitor/js_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/nats-io/nats.go"
)

type JetStreamAccountOptions struct {
type CheckJetStreamAccountOptions struct {
MemoryWarning int `json:"memory_warning" yaml:"memory_warning"`
MemoryCritical int `json:"memory_critical" yaml:"memory_critical"`
FileWarning int `json:"file_warning" yaml:"file_warning"`
Expand All @@ -37,7 +37,7 @@ type JetStreamAccountOptions struct {
Resolver func() *api.JetStreamAccountStats `json:"-" yaml:"-"`
}

func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts JetStreamAccountOptions) error {
func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, opts CheckJetStreamAccountOptions) error {
var mgr *jsm.Manager
var err error

Expand Down Expand Up @@ -82,7 +82,7 @@ func CheckJetStreamAccount(server string, nopts []nats.Option, check *Result, op
return nil
}

func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info []*jsm.Stream) error {
func checkStreamClusterHealth(check *Result, opts *CheckJetStreamAccountOptions, info []*jsm.Stream) error {
var okCnt, noLeaderCnt, notEnoughReplicasCnt, critCnt, lagCritCnt, seenCritCnt int

for _, s := range info {
Expand Down Expand Up @@ -177,7 +177,7 @@ func checkStreamClusterHealth(check *Result, opts *JetStreamAccountOptions, info
return nil
}

func checkJSAccountInfo(check *Result, opts *JetStreamAccountOptions, info *api.JetStreamAccountStats) error {
func checkJSAccountInfo(check *Result, opts *CheckJetStreamAccountOptions, info *api.JetStreamAccountStats) error {
if info == nil {
return fmt.Errorf("invalid account status")
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/js_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestCheckAccountInfo(t *testing.T) {
setDefaults := func() (*monitor.JetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) {
setDefaults := func() (*monitor.CheckJetStreamAccountOptions, *api.JetStreamAccountStats, *monitor.Result) {
info := &api.JetStreamAccountStats{
JetStreamTier: api.JetStreamTier{
Memory: 128,
Expand All @@ -38,7 +38,7 @@ func TestCheckAccountInfo(t *testing.T) {
}

// cli defaults
cmd := &monitor.JetStreamAccountOptions{
cmd := &monitor.CheckJetStreamAccountOptions{
ConsumersCritical: -1,
ConsumersWarning: -1,
StreamCritical: -1,
Expand Down
6 changes: 3 additions & 3 deletions monitor/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/nats-io/nats.go"
)

// KVCheckOptions configures the KV check
type KVCheckOptions struct {
// CheckKVBucketAndKeyOptions configures the KV check
type CheckKVBucketAndKeyOptions struct {
// Bucket is the bucket to check
Bucket string `json:"bucket" yaml:"bucket"`
// Key requires a key to have a non delete/purge value set
Expand All @@ -31,7 +31,7 @@ type KVCheckOptions struct {
ValuesCritical int64 `json:"values_critical" yaml:"values_critical"`
}

func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts KVCheckOptions) error {
func CheckKVBucketAndKey(server string, nopts []nats.Option, check *Result, opts CheckKVBucketAndKeyOptions) error {
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "connection failed: %v", err) {
return nil
Expand Down
8 changes: 4 additions & 4 deletions monitor/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
t.Run("Bucket", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}
err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{
err := monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
})
checkErr(t, err, "check failed: %v", err)
Expand All @@ -121,7 +121,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
checkErr(t, err, "kv create failed")

check = &monitor.Result{}
err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.KVCheckOptions{
err = monitor.CheckKVBucketAndKey(srv.ClientURL(), nil, check, monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
ValuesCritical: -1,
ValuesWarning: -1,
Expand All @@ -142,7 +142,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
checkErr(t, err, "kv create failed: %v", err)

opts := monitor.KVCheckOptions{
opts := monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
ValuesWarning: 1,
ValuesCritical: 2,
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestCheckKVBucketAndKey(t *testing.T) {
bucket, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
checkErr(t, err, "kv create failed")

opts := monitor.KVCheckOptions{
opts := monitor.CheckKVBucketAndKeyOptions{
Bucket: "TEST",
Key: "KEY",
ValuesWarning: -1,
Expand Down
30 changes: 19 additions & 11 deletions monitor/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,25 @@ package monitor

import (
"encoding/json"
"fmt"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

type CheckMetaOptions struct {
type CheckJetstreamMetaOptions struct {
// ExpectServers the expected number of known servers in the meta cluster
ExpectServers int `json:"expect_servers" yaml:"expect_servers"`
// LagCritical the critical threshold for how many operations behind a peer may be
LagCritical uint64 `json:"lag_critical" yaml:"lag_critical"`
// SeenCritical the critical threshold for how long ago a peer was seen (seconds)
SeenCritical float64 `json:"seen_critical" yaml:"seen_critical"`

Resolver func(*nats.Conn) (*JSZResponse, error) `json:"-" yaml:"-"`
Resolver func(*nats.Conn) (*server.ServerAPIJszResponse, error) `json:"-" yaml:"-"`
}

type JSZResponse struct {
Data server.JSInfo `json:"data"`
Server server.ServerInfo `json:"server"`
}

func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckMetaOptions) error {
func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts CheckJetstreamMetaOptions) error {
var nc *nats.Conn
var err error

Expand All @@ -47,8 +43,8 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
return nil
}

opts.Resolver = func(conn *nats.Conn) (*JSZResponse, error) {
jszresp := &JSZResponse{}
opts.Resolver = func(conn *nats.Conn) (*server.ServerAPIJszResponse, error) {
jszresp := &server.ServerAPIJszResponse{}
jreq, err := json.Marshal(&server.JSzOptions{LeaderOnly: true})
if check.CriticalIfErr(err, "request failed: %v", err) {
return nil, err
Expand All @@ -60,7 +56,14 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
}

err = json.Unmarshal(res.Data, jszresp)
check.CriticalIfErr(err, "invalid result received: %s", err)
if check.CriticalIfErr(err, "invalid result received: %s", err) {
return nil, err
}

if jszresp.Error == nil {
check.Critical("invalid result received: %s", jszresp.Error.Error())
return nil, fmt.Errorf("invalid result received: %s", jszresp.Error.Error())
}

return jszresp, nil
}
Expand All @@ -71,6 +74,11 @@ func CheckJetstreamMeta(servers string, nopts []nats.Option, check *Result, opts
return nil
}

if jszresp.Data == nil {
check.Critical("no JSZ response received")
return nil
}

ci := jszresp.Data.Meta
if ci == nil {
check.Critical("no cluster information")
Expand Down
Loading

0 comments on commit 3173c77

Please sign in to comment.