From 57244f1d92d64114dbc67b9479dec23e7d7acb97 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Fri, 3 Jan 2025 12:40:47 +0100 Subject: [PATCH] Store raw server responses when gathering Previously the responses would be ran through data types and then saved again for no good reason, this had the side effect of losing data for a gather process using older structs than the server generating the data Now we store the data as received only indenting it for readability Signed-off-by: R.I.Pienaar --- audit/archive/writer.go | 2 +- audit/gather/gather.go | 48 ++++++++++++++--------------------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/audit/archive/writer.go b/audit/archive/writer.go index ab1207b4..f5196d38 100644 --- a/audit/archive/writer.go +++ b/audit/archive/writer.go @@ -102,7 +102,7 @@ func (w *Writer) SetTime(t time.Time) { // AddRaw adds the given artifact to the archive similarly to Add. // The artifact is assumed to be already serialized and is copied as-is byte for byte. -func (w *Writer) AddRaw(reader *bytes.Reader, extension string, tags ...*Tag) error { +func (w *Writer) AddRaw(reader io.Reader, extension string, tags ...*Tag) error { if w.zipWriter == nil { return fmt.Errorf("attempting to write into a closed writer") } diff --git a/audit/gather/gather.go b/audit/gather/gather.go index 2c2c6461..974b59e4 100644 --- a/audit/gather/gather.go +++ b/audit/gather/gather.go @@ -22,7 +22,6 @@ import ( "os" "os/user" "path/filepath" - "reflect" "strings" "sync" "time" @@ -44,9 +43,8 @@ type EndpointSelection struct { // EndpointCaptureConfig configuration for capturing and tagging server and account endpoints type EndpointCaptureConfig struct { - ApiSuffix string - ResponseValue any - TypeTag *archive.Tag + ApiSuffix string + TypeTag *archive.Tag } type Configuration struct { @@ -75,74 +73,60 @@ func NewCaptureConfiguration() *Configuration { ServerEndpointConfigs: []EndpointCaptureConfig{ { "VARZ", - server.Varz{}, archive.TagServerVars(), }, { "CONNZ", - server.Connz{}, archive.TagServerConnections(), }, { "ROUTEZ", - server.Routez{}, archive.TagServerRoutes(), }, { "GATEWAYZ", - server.Gatewayz{}, archive.TagServerGateways(), }, { "LEAFZ", - server.Leafz{}, archive.TagServerLeafs(), }, { "SUBSZ", - server.Subsz{}, archive.TagServerSubs(), }, { "JSZ", - server.JSInfo{}, archive.TagServerJetStream(), }, { "ACCOUNTZ", - server.Accountz{}, archive.TagServerAccounts(), }, { "HEALTHZ", - server.HealthStatus{}, archive.TagServerHealth(), }, }, AccountEndpointConfigs: []EndpointCaptureConfig{ { "CONNZ", - server.Connz{}, archive.TagAccountConnections(), }, { "LEAFZ", - server.Leafz{}, archive.TagAccountLeafs(), }, { "SUBSZ", - server.Subsz{}, archive.TagAccountSubs(), }, { "INFO", - server.AccountInfo{}, archive.TagAccountInfo(), }, { "JSZ", - server.JetStreamStats{}, archive.TagAccountJetStream(), }, }, @@ -399,7 +383,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server for accountId, serversCount := range accountIdsToServersCountMap { for _, endpoint := range g.cfg.AccountEndpointConfigs { subject := fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.%s", accountId, endpoint.ApiSuffix) - endpointResponses := make(map[Responder]any, serversCount) + endpointResponses := make(map[Responder]io.Reader, serversCount) err := g.doReqAsync(context.TODO(), nil, subject, serversCount, func(b []byte) { var apiResponse ServerAPIResponseNoData @@ -419,10 +403,10 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server return } - endpointResponse := reflect.New(reflect.TypeOf(endpoint.ResponseValue)).Interface() - err = json.Unmarshal(apiResponse.Data, endpointResponse) + buff := bytes.NewBuffer([]byte{}) + err = json.Indent(buff, apiResponse.Data, "", " ") if err != nil { - g.log.Errorf("Failed to deserialize %s response for account %s: %s", endpoint.ApiSuffix, accountId, err) + g.log.Errorf("Failed to indent %s response for account %s: %s", endpoint.ApiSuffix, accountId, err) return } @@ -436,7 +420,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server return } - endpointResponses[responder] = endpointResponse + endpointResponses[responder] = buff }) if err != nil { g.log.Errorf("Failed to request %s for account %s: %s", endpoint.ApiSuffix, accountId, err) @@ -457,7 +441,7 @@ func (g *gather) captureAccountEndpoints(serverInfoMap map[string]*server.Server endpoint.TypeTag, } - err = g.aw.Add(endpointResponse, tags...) + err = g.aw.AddRaw(endpointResponse, "json", tags...) if err != nil { return fmt.Errorf("failed to add response to %s to archive: %w", subject, err) } @@ -550,6 +534,10 @@ func (g *gather) captureServerProfiles(serverInfoMap map[string]*server.ServerIn // Capture configured endpoints for each known server func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerInfo) error { + if g.aw == nil { + return fmt.Errorf("no archive writer supplied") + } + g.log.Infof("Querying %d endpoints on %d known servers...", len(g.cfg.ServerEndpointConfigs), len(serverInfoMap)) capturedCount := 0 for serverId, serverInfo := range serverInfoMap { @@ -557,8 +545,6 @@ func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerI for _, endpoint := range g.cfg.ServerEndpointConfigs { subject := fmt.Sprintf("$SYS.REQ.SERVER.%s.%s", serverId, endpoint.ApiSuffix) - endpointResponse := reflect.New(reflect.TypeOf(endpoint.ResponseValue)).Interface() - responses, err := g.doReq(context.TODO(), nil, subject, 1) if err != nil { g.log.Errorf("Failed to request %s from server %s: %s", endpoint.ApiSuffix, serverName, err) @@ -578,9 +564,10 @@ func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerI continue } - err = json.Unmarshal(apiResponse.Data, endpointResponse) + buff := bytes.NewBuffer([]byte{}) + err = json.Indent(buff, apiResponse.Data, "", " ") if err != nil { - g.log.Errorf("Failed to deserialize %s response data from server %s: %s", endpoint.ApiSuffix, serverName, err) + g.log.Errorf("Failed to indent %s response data from server %s: %s", endpoint.ApiSuffix, serverName, err) continue } @@ -595,10 +582,7 @@ func (g *gather) captureServerEndpoints(serverInfoMap map[string]*server.ServerI tags = append(tags, archive.TagNoCluster()) } - if g.aw == nil { - panic("no aw") - } - err = g.aw.Add(endpointResponse, tags...) + err = g.aw.AddRaw(buff, "json", tags...) if err != nil { return fmt.Errorf("failed to add endpoint %s response to archive: %w", subject, err) }