Skip to content

Commit

Permalink
Merge pull request #602 from ripienaar/gather_raw
Browse files Browse the repository at this point in the history
Store raw server responses when gathering
  • Loading branch information
ripienaar authored Jan 3, 2025
2 parents 0b2c431 + 57244f1 commit d9197c0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 33 deletions.
2 changes: 1 addition & 1 deletion audit/archive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (w *Writer) SetTime(t time.Time) {

// AddRaw adds the given artifact to the archive similarly to Add.
// The artifact is assumed to be already serialized and is copied as-is byte for byte.
func (w *Writer) AddRaw(reader *bytes.Reader, extension string, tags ...*Tag) error {
func (w *Writer) AddRaw(reader io.Reader, extension string, tags ...*Tag) error {
if w.zipWriter == nil {
return fmt.Errorf("attempting to write into a closed writer")
}
Expand Down
48 changes: 16 additions & 32 deletions audit/gather/gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"os/user"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -44,9 +43,8 @@ type EndpointSelection struct {

// EndpointCaptureConfig configuration for capturing and tagging server and account endpoints
type EndpointCaptureConfig struct {
ApiSuffix string
ResponseValue any
TypeTag *archive.Tag
ApiSuffix string
TypeTag *archive.Tag
}

type Configuration struct {
Expand Down Expand Up @@ -75,74 +73,60 @@ func NewCaptureConfiguration() *Configuration {
ServerEndpointConfigs: []EndpointCaptureConfig{
{
"VARZ",
server.Varz{},
archive.TagServerVars(),
},
{
"CONNZ",
server.Connz{},
archive.TagServerConnections(),
},
{
"ROUTEZ",
server.Routez{},
archive.TagServerRoutes(),
},
{
"GATEWAYZ",
server.Gatewayz{},
archive.TagServerGateways(),
},
{
"LEAFZ",
server.Leafz{},
archive.TagServerLeafs(),
},
{
"SUBSZ",
server.Subsz{},
archive.TagServerSubs(),
},
{
"JSZ",
server.JSInfo{},
archive.TagServerJetStream(),
},
{
"ACCOUNTZ",
server.Accountz{},
archive.TagServerAccounts(),
},
{
"HEALTHZ",
server.HealthStatus{},
archive.TagServerHealth(),
},
},
AccountEndpointConfigs: []EndpointCaptureConfig{
{
"CONNZ",
server.Connz{},
archive.TagAccountConnections(),
},
{
"LEAFZ",
server.Leafz{},
archive.TagAccountLeafs(),
},
{
"SUBSZ",
server.Subsz{},
archive.TagAccountSubs(),
},
{
"INFO",
server.AccountInfo{},
archive.TagAccountInfo(),
},
{
"JSZ",
server.JetStreamStats{},
archive.TagAccountJetStream(),
},
},
Expand Down Expand Up @@ -399,7 +383,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server
for accountId, serversCount := range accountIdsToServersCountMap {
for _, endpoint := range g.cfg.AccountEndpointConfigs {
subject := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.%s", accountId, endpoint.ApiSuffix)
endpointResponses := make(map[Responder]any, serversCount)
endpointResponses := make(map[Responder]io.Reader, serversCount)

err := g.doReqAsync(context.TODO(), nil, subject, serversCount, func(b []byte) {
var apiResponse ServerAPIResponseNoData
Expand All @@ -419,10 +403,10 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server
return
}

endpointResponse := reflect.New(reflect.TypeOf(endpoint.ResponseValue)).Interface()
err = json.Unmarshal(apiResponse.Data, endpointResponse)
buff := bytes.NewBuffer([]byte{})
err = json.Indent(buff, apiResponse.Data, "", " ")
if err != nil {
g.log.Errorf("Failed to deserialize %s response for account %s: %s", endpoint.ApiSuffix, accountId, err)
g.log.Errorf("Failed to indent %s response for account %s: %s", endpoint.ApiSuffix, accountId, err)
return
}

Expand All @@ -436,7 +420,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server
return
}

endpointResponses[responder] = endpointResponse
endpointResponses[responder] = buff
})
if err != nil {
g.log.Errorf("Failed to request %s for account %s: %s", endpoint.ApiSuffix, accountId, err)
Expand All @@ -457,7 +441,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server
endpoint.TypeTag,
}

err = g.aw.Add(endpointResponse, tags...)
err = g.aw.AddRaw(endpointResponse, "json", tags...)
if err != nil {
return fmt.Errorf("failed to add response to %s to archive: %w", subject, err)
}
Expand Down Expand Up @@ -550,15 +534,17 @@ func (g *gather) captureServerProfiles(serverInfoMap map[string]*server.ServerIn

// Capture configured endpoints for each known server
func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerInfo) error {
if g.aw == nil {
return fmt.Errorf("no archive writer supplied")
}

g.log.Infof("Querying %d endpoints on %d known servers...", len(g.cfg.ServerEndpointConfigs), len(serverInfoMap))
capturedCount := 0
for serverId, serverInfo := range serverInfoMap {
serverName := serverInfo.Name
for _, endpoint := range g.cfg.ServerEndpointConfigs {
subject := fmt.Sprintf("$SYS.REQ.SERVER.%s.%s", serverId, endpoint.ApiSuffix)

endpointResponse := reflect.New(reflect.TypeOf(endpoint.ResponseValue)).Interface()

responses, err := g.doReq(context.TODO(), nil, subject, 1)
if err != nil {
g.log.Errorf("Failed to request %s from server %s: %s", endpoint.ApiSuffix, serverName, err)
Expand All @@ -578,9 +564,10 @@ func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerI
continue
}

err = json.Unmarshal(apiResponse.Data, endpointResponse)
buff := bytes.NewBuffer([]byte{})
err = json.Indent(buff, apiResponse.Data, "", " ")
if err != nil {
g.log.Errorf("Failed to deserialize %s response data from server %s: %s", endpoint.ApiSuffix, serverName, err)
g.log.Errorf("Failed to indent %s response data from server %s: %s", endpoint.ApiSuffix, serverName, err)
continue
}

Expand All @@ -595,10 +582,7 @@ func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerI
tags = append(tags, archive.TagNoCluster())
}

if g.aw == nil {
panic("no aw")
}
err = g.aw.Add(endpointResponse, tags...)
err = g.aw.AddRaw(buff, "json", tags...)
if err != nil {
return fmt.Errorf("failed to add endpoint %s response to archive: %w", subject, err)
}
Expand Down

0 comments on commit d9197c0

Please sign in to comment.