From 6b9587d59dbd82390c9db9c63e7c07539563a6cf Mon Sep 17 00:00:00 2001 From: emcgee-work <124061343+emcgee-work@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:29:51 +0100 Subject: [PATCH] updating index to persisted (#131) Co-authored-by: Mike Cohen --- crypto/server/manager.go | 4 +- datastore/datastore.go | 8 +- filestore/s3filestore_test.go | 2 +- foreman/foreman.go | 27 ++- foreman/foreman_test.go | 36 +++- foreman/plan.go | 6 +- ingestion/enrolment.go | 3 +- ingestion/fixtures/TestEnrollment.golden | 3 +- ingestion/flow_stats.go | 2 +- ingestion/hunts.go | 2 +- ingestion/ingestor_test.go | 22 +-- ingestion/ping.go | 3 +- ingestion/services/hunts.go | 2 +- ingestion/vfs.go | 4 +- result_sets/simple/factory.go | 2 +- result_sets/simple/reader.go | 4 +- result_sets/simple/writer.go | 8 +- result_sets/timed/reader.go | 2 +- result_sets/timed/timed.go | 2 +- schema/api/clients.go | 3 +- schema/api/notifications.go | 1 + schema/api/repository.go | 1 + schema/templates/acls.json | 19 --- schema/templates/client_keys.json | 22 --- schema/templates/clients.json | 67 -------- schema/templates/config.json | 22 --- schema/templates/hunts.json | 37 ---- schema/templates/index.json | 25 --- schema/templates/notebooks.json | 43 ----- schema/templates/notifications.json | 22 --- schema/templates/orgs.json | 25 --- schema/templates/persisted.json | 159 ++++++++++++++++++ schema/templates/repository.json | 22 --- schema/templates/tasks.json | 29 ---- .../{results.json => transient.json} | 5 +- schema/templates/user_mru.json | 25 --- schema/templates/user_options.json | 22 --- schema/templates/users.json | 22 --- services/acl_manager/acl_manager.go | 10 +- services/client_info/client_info.go | 5 +- services/client_info/client_info_common.go | 11 +- .../client_monitoring/client_monitoring.go | 14 +- services/client_monitoring/events.go | 8 +- services/hunt_dispatcher/create.go | 9 +- services/hunt_dispatcher/flows.go | 2 +- services/hunt_dispatcher/hunt_dispatcher.go | 28 ++- services/indexing/indexing.go | 16 +- services/indexing/mru.go | 4 +- services/indexing/names.go | 2 +- services/indexing/search.go | 23 ++- services/indexing/search_chan.go | 4 +- services/labeler/labeler.go | 7 +- services/launcher/delete.go | 10 +- services/launcher/flows.go | 2 +- services/launcher/launcher_test.go | 10 +- services/launcher/storage.go | 10 +- services/notebook/shared.go | 5 +- services/notebook/storage.go | 13 +- services/notifier/notifier.go | 5 +- services/orgs/delete.go | 2 +- services/orgs/orgs.go | 32 +++- services/orgs/services.go | 13 +- services/repository/manager.go | 2 +- services/repository/repository.go | 26 ++- services/server_artifacts/server_artifacts.go | 4 +- services/users/users.go | 14 +- services/vfs_service/assembler.go | 4 +- services/vfs_service/directory.go | 2 +- services/vfs_service/vfs_service.go | 2 +- vql/server/clients/delete.go | 5 +- vql/server/clients/delete_test.go | 22 ++- vql/server/hunts/delete.go | 2 +- vql/server/notebook/delete.go | 2 +- vql/uploads/upload_test.go | 2 +- 74 files changed, 474 insertions(+), 571 deletions(-) delete mode 100644 schema/templates/acls.json delete mode 100644 schema/templates/client_keys.json delete mode 100644 schema/templates/clients.json delete mode 100644 schema/templates/config.json delete mode 100644 schema/templates/hunts.json delete mode 100644 schema/templates/index.json delete mode 100644 schema/templates/notebooks.json delete mode 100644 schema/templates/notifications.json delete mode 100644 schema/templates/orgs.json create mode 100644 schema/templates/persisted.json delete mode 100644 schema/templates/repository.json delete mode 100644 schema/templates/tasks.json rename schema/templates/{results.json => transient.json} (95%) delete mode 100644 schema/templates/user_mru.json delete mode 100644 schema/templates/user_options.json delete mode 100644 schema/templates/users.json diff --git a/crypto/server/manager.go b/crypto/server/manager.go index 3c1464e..aae1daa 100644 --- a/crypto/server/manager.go +++ b/crypto/server/manager.go @@ -65,7 +65,7 @@ func (self *serverPublicKeyResolver) GetPublicKey( record, err := cvelo_services.GetElasticRecord( context.Background(), config_obj.OrgId, - "client_keys", client_id) + "persisted", client_id) if err != nil { return nil, false } @@ -93,7 +93,7 @@ func (self *serverPublicKeyResolver) SetPublicKey( EnrollTime: uint64(utils.GetTime().Now().Unix()), } return cvelo_services.SetElasticIndex( - self.ctx, config_obj.OrgId, "client_keys", client_id, pem) + self.ctx, config_obj.OrgId, "persisted", client_id, pem) } func (self *serverPublicKeyResolver) Clear() {} diff --git a/datastore/datastore.go b/datastore/datastore.go index 271f1ae..ccf02fc 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -101,7 +101,7 @@ func (self ElasticDatastore) SetSubject( DocType: "datastore", Timestamp: utils.GetTime().Now().UnixNano(), } - return services.SetElasticIndex(self.ctx, config_obj.OrgId, "results", "", record) + return services.SetElasticIndex(self.ctx, config_obj.OrgId, "transient", "", record) } func (self ElasticDatastore) SetSubjectWithCompletion( @@ -118,7 +118,7 @@ func (self ElasticDatastore) DeleteSubject( id := services.MakeId(urn.AsClientPath()) return services.DeleteDocumentByQuery( - self.ctx, config_obj.OrgId, "results", json.Format(delete_datastore_doc_query, id), services.SyncDelete) + self.ctx, config_obj.OrgId, "transient", json.Format(delete_datastore_doc_query, id), services.SyncDelete) } func (self ElasticDatastore) DeleteSubjectWithCompletion( @@ -126,7 +126,7 @@ func (self ElasticDatastore) DeleteSubjectWithCompletion( urn api.DSPathSpec, completion func()) error { id := services.MakeId(urn.AsClientPath()) return services.DeleteDocumentByQuery( - self.ctx, config_obj.OrgId, "results", json.Format(delete_datastore_doc_query, id), services.AsyncDelete) + self.ctx, config_obj.OrgId, "transient", json.Format(delete_datastore_doc_query, id), services.AsyncDelete) } func (self ElasticDatastore) ListChildren( @@ -135,7 +135,7 @@ func (self ElasticDatastore) ListChildren( dir := urn.AsDatastoreDirectory(config_obj) hits, _, err := services.QueryElasticRaw(self.ctx, config_obj.OrgId, - "results", json.Format(list_children_query, dir)) + "transient", json.Format(list_children_query, dir)) if err != nil { return nil, err } diff --git a/filestore/s3filestore_test.go b/filestore/s3filestore_test.go index 622d62d..7c7ed1f 100644 --- a/filestore/s3filestore_test.go +++ b/filestore/s3filestore_test.go @@ -96,7 +96,7 @@ func (self *S3FilestoreTest) TestS3FileWriting() { func TestS3Filestore(t *testing.T) { suite.Run(t, &S3FilestoreTest{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"clients"}, + Indexes: []string{"persisted"}, }, }) } diff --git a/foreman/foreman.go b/foreman/foreman.go index 2738fbb..020876a 100644 --- a/foreman/foreman.go +++ b/foreman/foreman.go @@ -71,7 +71,7 @@ func (self Foreman) stopHunt( ` return cvelo_services.UpdateIndex( - ctx, org_config_obj.OrgId, "hunts", hunt.HuntId, stopHuntQuery) + ctx, org_config_obj.OrgId, "persisted", hunt.HuntId, stopHuntQuery) } // Rather than retrieving the entire client record we only get those @@ -120,6 +120,12 @@ const getMinimalClientInfoQuery = `{ } ] } + }, + { + "match": { + "doc_type": "clients" + + } } ] } @@ -140,10 +146,11 @@ func getMinimalClientInfo( result := &api.ClientRecord{ ClientId: client_id, + DocType: "clients", } hits, err := cvelo_services.QueryChan(ctx, config_obj, 1000, config_obj.OrgId, - "clients", query, "client_id") + "persisted", query, "client_id") if err != nil { return nil, err } @@ -426,6 +433,12 @@ const ( "gt": %q } } + }, + { + "match": { + "doc_type": "clients" + + } } ] } @@ -448,6 +461,12 @@ const ( "value": %q } } + }, + { + "match": { + "doc_type": "clients" + + } } ] } @@ -466,7 +485,7 @@ func (self Foreman) getClientHuntMembership( seen := make(map[string]bool) hits, err := cvelo_services.QueryChan(ctx, config_obj, 1000, config_obj.OrgId, - "clients", query, "client_id") + "persisted", query, "client_id") if err != nil { return nil, err } @@ -499,7 +518,7 @@ func (self Foreman) getClientsSeenAfter( hits, err := cvelo_services.QueryChan( ctx, config_obj, 1000, - config_obj.OrgId, "clients", query, "ping") + config_obj.OrgId, "persisted", query, "ping") if err != nil { logger := logging.GetLogger(config_obj, &logging.FrontendComponent) logger.Error("getClientsSeenAfter: %v", err) diff --git a/foreman/foreman_test.go b/foreman/foreman_test.go index edb164f..a08d370 100644 --- a/foreman/foreman_test.go +++ b/foreman/foreman_test.go @@ -120,6 +120,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { Record: api.ClientRecord{ ClientId: "C.ConnectedClient", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.ConnectedClient_ping", }, @@ -128,6 +129,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { Record: api.ClientRecord{ ClientId: "C.WithLabel1", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.WithLabel1_ping", }, @@ -136,6 +138,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { ClientId: "C.WithLabel1", Labels: []string{"Label1"}, LowerLabels: []string{"label1"}, + DocType: "clients", }, Id: "C.WithLabel1_labels", }, @@ -144,6 +147,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { Record: api.ClientRecord{ ClientId: "C.WithLabel2", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.WithLabel2_ping", }, @@ -152,6 +156,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { ClientId: "C.WithLabel2", Labels: []string{"Label2"}, LowerLabels: []string{"label2"}, + DocType: "clients", }, Id: "C.WithLabel2_labels", }, @@ -160,6 +165,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { Record: api.ClientRecord{ ClientId: "C.WithLabel1And2", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.WithLabel1And2_ping", }, @@ -168,6 +174,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { ClientId: "C.WithLabel1And2", Labels: []string{"Label1", "Label2"}, LowerLabels: []string{"label1", "label2"}, + DocType: "clients", }, Id: "C.WithLabel1And2_labels", }, @@ -177,6 +184,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { Record: api.ClientRecord{ ClientId: "C.OfflineClient", Ping: uint64(utils.GetTime().Now().Add(-72 * time.Hour).UnixNano()), + DocType: "clients", }, Id: "C.OfflineClient_ping", }, @@ -185,7 +193,7 @@ func (self *ForemanTestSuite) TestClientMonitoring() { // Add these clients directly into the index. for _, c := range clients { err := cvelo_services.SetElasticIndex( - self.Ctx, config_obj.OrgId, "clients", c.Id, c.Record) + self.Ctx, config_obj.OrgId, "persisted", c.Id, c.Record) assert.NoError(self.T(), err) } @@ -370,6 +378,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { Record: api.ClientRecord{ ClientId: "C.ConnectedClient", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.ConnectedClient_ping", }, @@ -380,6 +389,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { Record: api.ClientRecord{ ClientId: "C.AlreadyRanAllClients", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.AlreadyRanAllClients_ping", }, @@ -387,6 +397,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { Record: api.ClientRecord{ ClientId: "C.AlreadyRanAllClients", AssignedHunts: []string{"H.AllClients"}, + DocType: "clients", }, Id: "C.AlreadyRanAllClients_hunts", }, @@ -394,6 +405,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { Record: api.ClientRecord{ ClientId: "C.WithLabelFoo", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.WithLabelFoo_ping", }, @@ -402,6 +414,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { ClientId: "C.WithLabelFoo", Labels: []string{"Foo"}, LowerLabels: []string{"foo"}, + DocType: "clients", }, Id: "C.WithLabelFoo_labels", }, @@ -411,6 +424,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { Record: api.ClientRecord{ ClientId: "C.OfflineClient", Ping: uint64(utils.GetTime().Now().Add(-72 * time.Hour).UnixNano()), + DocType: "clients", }, Id: "C.OfflineClient_ping", }, @@ -419,7 +433,7 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { // Add these clients directly into the index. for _, c := range clients { err := cvelo_services.SetElasticIndex( - self.Ctx, config_obj.OrgId, "clients", c.Id, c.Record) + self.Ctx, config_obj.OrgId, "persisted", c.Id, c.Record) assert.NoError(self.T(), err) } @@ -514,11 +528,12 @@ func (self *ForemanTestSuite) TestHuntsAllClients() { // Update the ping time - the client will only be scheduled once // it is online. err = cvelo_services.SetElasticIndex(self.Ctx, - config_obj.OrgId, "clients", "C.ConnectedClient_ping", + config_obj.OrgId, "persisted", "C.ConnectedClient_ping", &api.ClientRecord{ ClientId: "C.ConnectedClient", Type: "ping", Ping: uint64(utils.GetTime().Now().Add(time.Second).UnixNano()), + DocType: "clients", }) assert.NoError(self.T(), err) @@ -554,9 +569,10 @@ func (self *ForemanTestSuite) testHuntsExpireInFuture() { AssignedHunts: []string{}, Labels: []string{}, LowerLabels: []string{}, + DocType: "clients", } err := cvelo_services.SetElasticIndex( - self.Ctx, config_obj.OrgId, "clients", c.ClientId, c) + self.Ctx, config_obj.OrgId, "persisted", c.ClientId, c) assert.NoError(self.T(), err) plan, err := NewPlan(config_obj) @@ -663,6 +679,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Windows1", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.Windows1_ping", }, @@ -670,6 +687,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Windows1", System: "windows", + DocType: "clients", }, Id: "C.Windows1", }, @@ -678,6 +696,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Windows2", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.Windows2_ping", }, @@ -685,6 +704,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Windows2", System: "windows", + DocType: "clients", }, Id: "C.Windows2", }, @@ -692,6 +712,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Linux1", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.Linux1_ping", }, @@ -699,6 +720,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Linux1", System: "linux", + DocType: "clients", }, Id: "C.Linux1", }, @@ -706,6 +728,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Linux2", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }, Id: "C.Linux2_ping", }, @@ -713,6 +736,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { Record: api.ClientRecord{ ClientId: "C.Linux2", System: "linux", + DocType: "clients", }, Id: "C.Linux2", }, @@ -721,7 +745,7 @@ func (self *ForemanTestSuite) TestHuntsByOS() { // Add these clients directly into the index. for _, c := range clients { err := cvelo_services.SetElasticIndex( - self.Ctx, config_obj.OrgId, "clients", c.Id, c.Record) + self.Ctx, config_obj.OrgId, "persisted", c.Id, c.Record) assert.NoError(self.T(), err) } @@ -796,7 +820,7 @@ func (self *ForemanTestSuite) checkAssignedHunts(clientId string, expectedHunts func TestForeman(t *testing.T) { suite.Run(t, &ForemanTestSuite{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"clients", "hunts", "repository", "tasks"}, + Indexes: []string{"persisted"}, }, }) } diff --git a/foreman/plan.go b/foreman/plan.go index 1b3898f..a24171e 100644 --- a/foreman/plan.go +++ b/foreman/plan.go @@ -154,12 +154,13 @@ func (self *Plan) ExecuteClientMonitoringUpdate( // version for _, client_id := range clients { cvelo_services.SetElasticIndexAsync( - org_config_obj.OrgId, "clients", + org_config_obj.OrgId, "persisted", client_id+"_last_event_version", cvelo_services.BulkUpdateIndex, &api.ClientRecord{ ClientId: client_id, LastEventTableVersion: self. current_monitoring_state.Version, + DocType: "clients", }) } @@ -221,10 +222,11 @@ func (self *Plan) ExecuteHuntUpdate( // on this client. for _, client_id := range clients { cvelo_services.SetElasticIndexAsync( - org_config_obj.OrgId, "clients", "", + org_config_obj.OrgId, "persisted", "", cvelo_services.BulkUpdateIndex, &api.ClientRecord{ ClientId: client_id, AssignedHunts: []string{hunt.HuntId}, + DocType: "clients", }) } } diff --git a/ingestion/enrolment.go b/ingestion/enrolment.go index b96b267..dc2f147 100644 --- a/ingestion/enrolment.go +++ b/ingestion/enrolment.go @@ -48,13 +48,14 @@ func (self Ingestor) HandleInterrogation( message *crypto_proto.VeloMessage) error { services.SetElasticIndexAsync( - config_obj.OrgId, "clients", + config_obj.OrgId, "persisted", message.Source+"_interrogate", services.BulkUpdateIndex, &api.ClientRecord{ ClientId: message.Source, Type: "interrogation", LastInterrogate: message.SessionId, + DocType: "clients", }) return nil } diff --git a/ingestion/fixtures/TestEnrollment.golden b/ingestion/fixtures/TestEnrollment.golden index b22b565..69dfe17 100644 --- a/ingestion/fixtures/TestEnrollment.golden +++ b/ingestion/fixtures/TestEnrollment.golden @@ -7,6 +7,7 @@ "client_id": "C.1352adc54e292a23", "hostname": "devbox", "system": "linux", - "type": "main" + "type": "main", + "doc_type": "clients" } } \ No newline at end of file diff --git a/ingestion/flow_stats.go b/ingestion/flow_stats.go index ebbef57..dae964a 100644 --- a/ingestion/flow_stats.go +++ b/ingestion/flow_stats.go @@ -58,7 +58,7 @@ func (self Ingestor) HandleFlowStats( // progress as the collection is received. The bulk data is still // stored asyncronously. err := services.SetElasticIndex(ctx, - config_obj.OrgId, "results", "", stats) + config_obj.OrgId, "transient", "", stats) if err != nil { return err } diff --git a/ingestion/hunts.go b/ingestion/hunts.go index 59a0bec..fd228b6 100644 --- a/ingestion/hunts.go +++ b/ingestion/hunts.go @@ -41,7 +41,7 @@ func (self Ingestor) maybeHandleHuntResponse( DocType: "hunt_flow", } return services.SetElasticIndex(ctx, - config_obj.OrgId, "results", "", hunt_flow_entry) + config_obj.OrgId, "transient", "", hunt_flow_entry) } return nil diff --git a/ingestion/ingestor_test.go b/ingestion/ingestor_test.go index f8183b6..71cb3a7 100644 --- a/ingestion/ingestor_test.go +++ b/ingestion/ingestor_test.go @@ -113,7 +113,7 @@ func (self *IngestionTestSuite) TestEnrollment() { client_id := "C.1352adc54e292a23" record, err := cvelo_services.GetElasticRecord(self.ctx, - "test", "client_keys", client_id+"-test") + "test", "persisted", client_id+"-test") assert.NoError(self.T(), err) self.golden.Set("Enrollment", record) @@ -129,7 +129,7 @@ func (self *IngestionTestSuite) TestEnrollment() { // Record results in monitoring data. records, _, err := cvelo_services.QueryElasticRaw(self.ctx, - "test", "results", getAllItemsQuery) + "test", "transient", getAllItemsQuery) assert.NoError(self.T(), err) assert.Equal(self.T(), 1, len(records)) @@ -147,7 +147,7 @@ func (self *IngestionTestSuite) TestListDirectory() { // Test VFS.ListDirectory special handling. err := cvelo_services.SetElasticIndex(self.ctx, - "test", "results", flow_id, api.ArtifactCollectorRecordFromProto( + "test", "transient", flow_id, api.ArtifactCollectorRecordFromProto( &flows_proto.ArtifactCollectorContext{ ClientId: client_id, SessionId: flow_id, @@ -156,13 +156,13 @@ func (self *IngestionTestSuite) TestListDirectory() { self.ingestGoldenMessages(self.ctx, self.ingestor, "System.VFS.ListDirectory") records, _, err := cvelo_services.QueryElasticRaw(self.ctx, - "test", "results", + "test", "transient", json.Format(getCollectionQuery, client_id, flow_id, "collection")) assert.NoError(self.T(), err) self.golden.Set("System.VFS.ListDirectory", records) records, _, err = cvelo_services.QueryElasticRaw(self.ctx, - "test", "results", getAllItemsQuery) + "test", "transient", getAllItemsQuery) assert.NoError(self.T(), err) sort_records(records) self.golden.Set("System.VFS.ListDirectory Results", records) @@ -171,7 +171,7 @@ func (self *IngestionTestSuite) TestListDirectory() { // no downloads yet but a full directory listing. query := getAllItemsQueryForType records, _, err = cvelo_services.QueryElasticRaw(self.ctx, - "test", "results", query) + "test", "transient", query) assert.NoError(self.T(), err) sort_records(records) self.golden.Set("System.VFS.ListDirectory vfs", records) @@ -202,7 +202,7 @@ func (self *IngestionTestSuite) TestVFSDownload() { // Add a VFS.DownloadFile collection and replay messages. err := cvelo_services.SetElasticIndex(self.ctx, "test", - "results", "", api.ArtifactCollectorRecordFromProto( + "transient", "", api.ArtifactCollectorRecordFromProto( &flows_proto.ArtifactCollectorContext{ ClientId: client_id, SessionId: list_flow_id, @@ -218,7 +218,7 @@ func (self *IngestionTestSuite) TestVFSDownload() { // Test VFS.ListDirectory special handling. err = cvelo_services.SetElasticIndex(self.ctx, - "test", "results", "", + "test", "transient", "", api.ArtifactCollectorRecordFromProto( &flows_proto.ArtifactCollectorContext{ ClientId: client_id, @@ -255,12 +255,12 @@ func (self *IngestionTestSuite) TestClientEventMonitoring() { // Get Client Event Monitoring Clear the results so we get a clean // golden image. err := cvelo_services.DeleteByQuery( - self.ctx, "test", "results", getAllItemsQuery) + self.ctx, "test", "transient", getAllItemsQuery) assert.NoError(self.T(), err) self.ingestGoldenMessages(self.ctx, self.ingestor, "Generic.Client.Stats") records, _, err := cvelo_services.QueryElasticRaw(self.ctx, - "test", "results", getAllItemsQuery) + "test", "transient", getAllItemsQuery) assert.NoError(self.T(), err) sort_records(records) self.golden.Set("Generic.Client.Stats Results", records) @@ -303,7 +303,7 @@ func (self *IngestionTestSuite) TearDownTest() { func TestIngestor(t *testing.T) { suite.Run(t, &IngestionTestSuite{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"clients", "results", "hunts"}, + Indexes: []string{"transient", "persisted"}, }, }) } diff --git a/ingestion/ping.go b/ingestion/ping.go index a9027ff..fd1e3dd 100644 --- a/ingestion/ping.go +++ b/ingestion/ping.go @@ -17,11 +17,12 @@ func (self Ingestor) HandlePing( message *crypto_proto.VeloMessage) error { err := services.SetElasticIndex(ctx, - config_obj.OrgId, "clients", message.Source+"_ping", + config_obj.OrgId, "persisted", message.Source+"_ping", &api.ClientRecord{ ClientId: message.Source, Type: "ping", Ping: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }) if err == nil || strings.Contains(err.Error(), "document_missing_exception") { diff --git a/ingestion/services/hunts.go b/ingestion/services/hunts.go index 62bc302..375fff1 100644 --- a/ingestion/services/hunts.go +++ b/ingestion/services/hunts.go @@ -127,7 +127,7 @@ func (self *HuntStatsUpdater) Flush(ctx context.Context) error { self.mu.Unlock() return services.UpdateIndex( - ctx, self.config_obj.OrgId, "hunts", hunt_id, query) + ctx, self.config_obj.OrgId, "persisted", hunt_id, query) } func StartHuntStatsUpdater( diff --git a/ingestion/vfs.go b/ingestion/vfs.go index 9188d33..9541d5f 100644 --- a/ingestion/vfs.go +++ b/ingestion/vfs.go @@ -69,7 +69,7 @@ func (self Ingestor) HandleSystemVfsListDirectory( } err = cvelo_services.SetElasticIndexAsync( - config_obj.OrgId, "results", "", + config_obj.OrgId, "transient", "", cvelo_services.BulkUpdateCreate, record) if err != nil { @@ -143,7 +143,7 @@ func (self Ingestor) HandleSystemVfsUpload( } cvelo_services.SetElasticIndexAsync( - config_obj.OrgId, "results", "", + config_obj.OrgId, "transient", "", cvelo_services.BulkUpdateCreate, stats) } } diff --git a/result_sets/simple/factory.go b/result_sets/simple/factory.go index 91ed3ce..b3be2b9 100644 --- a/result_sets/simple/factory.go +++ b/result_sets/simple/factory.go @@ -26,7 +26,7 @@ func (self ResultSetFactory) NewResultSetWriter( base_record := NewSimpleResultSetRecord(log_path) if base_record.VFSPath != "" { err := cvelo_services.DeleteByQuery(context.Background(), org_id, - "results", json.Format(` + "transient", json.Format(` {"query": {"bool": {"must": [ {"match": {"vfs_path": %q}} ]}}}`, base_record.VFSPath)) diff --git a/result_sets/simple/reader.go b/result_sets/simple/reader.go index 380e911..83281c7 100644 --- a/result_sets/simple/reader.go +++ b/result_sets/simple/reader.go @@ -93,7 +93,7 @@ func (self *SimpleResultSetReader) getPacket(row int64) ( } org_id := filestore.GetOrgId(self.file_store_factory) - hits, _, err := cvelo_services.QueryElasticRaw(ctx, org_id, "results", query) + hits, _, err := cvelo_services.QueryElasticRaw(ctx, org_id, "transient", query) if err != nil { return nil, err } @@ -268,7 +268,7 @@ func getLastRecord(org_id string, artifact_clause) } hits, _, err := cvelo_services.QueryElasticRaw(ctx, org_id, - "results", query) + "transient", query) if err != nil { return nil, err } diff --git a/result_sets/simple/writer.go b/result_sets/simple/writer.go index 0490c36..856cf60 100644 --- a/result_sets/simple/writer.go +++ b/result_sets/simple/writer.go @@ -53,10 +53,10 @@ func (self *ElasticSimpleResultSetWriter) WriteJSONL( if self.sync { services.SetElasticIndex( - self.ctx, self.org_id, "results", "", record) + self.ctx, self.org_id, "transient", "", record) } else { services.SetElasticIndexAsync( - self.org_id, "results", "", + self.org_id, "transient", "", cvelo_services.BulkUpdateCreate, record) } } @@ -104,7 +104,7 @@ func (self *ElasticSimpleResultSetWriter) getLastRow() error { ctx := context.Background() query := json.Format(getLargestRowId, self.log_path.AsClientPath()) hits, err := services.QueryElasticAggregations( - ctx, self.org_id, "results", query) + ctx, self.org_id, "transient", query) if err != nil { return err @@ -134,7 +134,7 @@ func (self *ElasticSimpleResultSetWriter) Flush() { self.buffered_rows = 0 // Make sure the results are visible immediately - cvelo_services.FlushIndex(self.ctx, self.org_id, "results") + cvelo_services.FlushIndex(self.ctx, self.org_id, "transient") // No need to find the last start row as we assume we are the only // writers. diff --git a/result_sets/timed/reader.go b/result_sets/timed/reader.go index d699a30..962e063 100644 --- a/result_sets/timed/reader.go +++ b/result_sets/timed/reader.go @@ -98,7 +98,7 @@ func (self *TimedResultSetReader) Rows( hits_chan, err := cvelo_services.QueryChan( subctx, self.config_obj.VeloConf(), 1000, - self.config_obj.OrgId, "results", query, + self.config_obj.OrgId, "transient", query, "timestamp") if err != nil { logger := logging.GetLogger( diff --git a/result_sets/timed/timed.go b/result_sets/timed/timed.go index db90fb5..af5055f 100644 --- a/result_sets/timed/timed.go +++ b/result_sets/timed/timed.go @@ -86,7 +86,7 @@ func (self ElasticTimedResultSetWriter) WriteJSONL( services.SetElasticIndex(self.ctx, filestore.GetOrgId(self.file_store_factory), - "results", "", record) + "transient", "", record) } func (self ElasticTimedResultSetWriter) Write(row *ordereddict.Dict) { diff --git a/schema/api/clients.go b/schema/api/clients.go index 0611818..6732c45 100644 --- a/schema/api/clients.go +++ b/schema/api/clients.go @@ -48,6 +48,7 @@ type ClientRecord struct { LastLabelTimestamp uint64 `json:"labels_timestamp,omitempty"` Labels []string `json:"labels,omitempty"` LowerLabels []string `json:"lower_labels,omitempty"` + DocType string `json:"doc_type"` } func ToClientInfo(record *ClientRecord) *services.ClientInfo { @@ -82,7 +83,7 @@ func GetMultipleClients( } hits, err := cvelo_services.GetMultipleElasticRecords( - ctx, config_obj.OrgId, "clients", terms) + ctx, config_obj.OrgId, "persisted", terms) if err != nil { return nil, err } diff --git a/schema/api/notifications.go b/schema/api/notifications.go index b638ea5..6699e27 100644 --- a/schema/api/notifications.go +++ b/schema/api/notifications.go @@ -3,4 +3,5 @@ package api type NotificationRecord struct { Key string `json:"key,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` + DocType string `json:"doc_type"` } diff --git a/schema/api/repository.go b/schema/api/repository.go index 7408032..b0bb61f 100644 --- a/schema/api/repository.go +++ b/schema/api/repository.go @@ -3,4 +3,5 @@ package api type RepositoryEntry struct { Name string `json:"name,omitempty"` Definition string `json:"definition,omitempty"` + DocType string `json:"doc_type"` } diff --git a/schema/templates/acls.json b/schema/templates/acls.json deleted file mode 100644 index e31fc4a..0000000 --- a/schema/templates/acls.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "index_patterns": [ - "*acls" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "acl": { - "type": "binary" - } - } - } - } -} diff --git a/schema/templates/client_keys.json b/schema/templates/client_keys.json deleted file mode 100644 index 8d5758b..0000000 --- a/schema/templates/client_keys.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*client_keys" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "pem": { - "type": "binary" - }, - "enroll_time": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/clients.json b/schema/templates/clients.json deleted file mode 100644 index 4bc6eae..0000000 --- a/schema/templates/clients.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "index_patterns": [ - "*clients" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "client_id": { - "type": "keyword" - }, - "hostname": { - "type": "keyword" - }, - "release": { - "type": "keyword" - }, - "architecture": { - "type": "keyword" - }, - "build_time": { - "type": "keyword" - }, - "system": { - "type": "keyword" - }, - "ping": { - "type": "long" - }, - "first_seen_at": { - "type": "long" - }, - "labels": { - "type": "keyword" - }, - "lower_labels": { - "type": "keyword" - }, - "last_interrogate": { - "type": "keyword" - }, - "labels_timestamp": { - "type": "long" - }, - "mac_addresses": { - "type": "keyword" - }, - "last_hunt_timestamp": { - "type": "long" - }, - "assigned_hunts": { - "type": "keyword" - }, - "type": { - "type": "keyword" - }, - "last_event_table_version": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/config.json b/schema/templates/config.json deleted file mode 100644 index 68d1829..0000000 --- a/schema/templates/config.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*config" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "type": { - "type": "keyword" - }, - "data": { - "type": "binary" - } - } - } - } -} diff --git a/schema/templates/hunts.json b/schema/templates/hunts.json deleted file mode 100644 index ad7c8c2..0000000 --- a/schema/templates/hunts.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "index_patterns": [ - "*hunts" - ], - "template": { - "settings": { - "number_of_shards": 2, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "hunt_id": { - "type": "keyword" - }, - "timestamp": { - "type": "long" - }, - "scheduled": { - "type": "integer" - }, - "completed": { - "type": "integer" - }, - "errors": { - "type": "integer" - }, - "state": { - "type": "keyword" - }, - "hunt": { - "type": "binary" - } - } - } - } -} diff --git a/schema/templates/index.json b/schema/templates/index.json deleted file mode 100644 index 6850062..0000000 --- a/schema/templates/index.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "index_patterns": [ - "*index" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "term": { - "type": "keyword" - }, - "entity": { - "type": "keyword" - }, - "timestamp": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/notebooks.json b/schema/templates/notebooks.json deleted file mode 100644 index 18f9a6c..0000000 --- a/schema/templates/notebooks.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "index_patterns": [ - "*notebooks" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "notebook_id": { - "type": "keyword" - }, - "cell_id": { - "type": "keyword" - }, - "notebook": { - "type": "binary" - }, - "notebook_cell": { - "type": "binary" - }, - "public": { - "type": "boolean" - }, - "creator": { - "type": "keyword" - }, - "shared": { - "type": "keyword" - }, - "type": { - "type": "keyword" - }, - "timestamp": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/notifications.json b/schema/templates/notifications.json deleted file mode 100644 index 159304d..0000000 --- a/schema/templates/notifications.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*notifications" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "key": { - "type": "keyword" - }, - "timestamp": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/orgs.json b/schema/templates/orgs.json deleted file mode 100644 index be6d1bb..0000000 --- a/schema/templates/orgs.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "index_patterns": [ - "*orgs" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "org_id": { - "type": "keyword" - }, - "nonce": { - "type": "keyword" - }, - "name": { - "type": "keyword" - } - } - } - } -} diff --git a/schema/templates/persisted.json b/schema/templates/persisted.json new file mode 100644 index 0000000..59459ac --- /dev/null +++ b/schema/templates/persisted.json @@ -0,0 +1,159 @@ +{ + "index_patterns": [ + "*persisted" + ], + "template": { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + }, + "mappings": { + "dynamic": false, + "properties": { + "hunt_id": { + "type": "keyword" + }, + "timestamp": { + "type": "long" + }, + "scheduled": { + "type": "integer" + }, + "completed": { + "type": "integer" + }, + "errors": { + "type": "integer" + }, + "state": { + "type": "keyword" + }, + "hunt": { + "type": "binary" + }, + "acl": { + "type": "binary" + }, + "notebook_id": { + "type": "keyword" + }, + "cell_id": { + "type": "keyword" + }, + "notebook": { + "type": "binary" + }, + "notebook_cell": { + "type": "binary" + }, + "public": { + "type": "boolean" + }, + "creator": { + "type": "keyword" + }, + "shared": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "doc_type": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "definition": { + "type": "binary" + }, + "username": { + "type": "keyword" + }, + "record": { + "type": "binary" + }, + "gui_options": { + "type": "binary" + }, + "client_id": { + "type": "keyword" + }, + "hostname": { + "type": "keyword" + }, + "release": { + "type": "keyword" + }, + "architecture": { + "type": "keyword" + }, + "build_time": { + "type": "keyword" + }, + "system": { + "type": "keyword" + }, + "ping": { + "type": "long" + }, + "first_seen_at": { + "type": "long" + }, + "labels": { + "type": "keyword" + }, + "lower_labels": { + "type": "keyword" + }, + "last_interrogate": { + "type": "keyword" + }, + "labels_timestamp": { + "type": "long" + }, + "mac_addresses": { + "type": "keyword" + }, + "last_hunt_timestamp": { + "type": "long" + }, + "assigned_hunts": { + "type": "keyword" + }, + "last_event_table_version": { + "type": "long" + }, + "pem": { + "type": "binary" + }, + "enroll_time": { + "type": "long" + }, + "org_id": { + "type": "keyword" + }, + "nonce": { + "type": "keyword" + }, + "flow_id": { + "type": "keyword" + }, + "data": { + "type": "text", + "index": false + }, + "key": { + "type": "keyword" + }, + "term": { + "type": "keyword" + }, + "entity": { + "type": "keyword" + } + } + } + } + } + diff --git a/schema/templates/repository.json b/schema/templates/repository.json deleted file mode 100644 index b6281c7..0000000 --- a/schema/templates/repository.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*repository" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "name": { - "type": "keyword" - }, - "definition": { - "type": "binary" - } - } - } - } -} diff --git a/schema/templates/tasks.json b/schema/templates/tasks.json deleted file mode 100644 index 46b39ab..0000000 --- a/schema/templates/tasks.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "index_patterns": [ - "*tasks" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "client_id": { - "type": "keyword" - }, - "flow_id": { - "type": "keyword" - }, - "data": { - "type": "text", - "index": false - }, - "timestamp": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/results.json b/schema/templates/transient.json similarity index 95% rename from schema/templates/results.json rename to schema/templates/transient.json index cfe3e4b..9d5996c 100644 --- a/schema/templates/results.json +++ b/schema/templates/transient.json @@ -1,6 +1,6 @@ { "index_patterns": [ - "*results" + "*transient" ], "data_stream": { "timestamp_field": { @@ -84,6 +84,9 @@ }, "is_dispatched": { "type": "boolean" + }, + "key": { + "type": "keyword" } } } diff --git a/schema/templates/user_mru.json b/schema/templates/user_mru.json deleted file mode 100644 index a824edc..0000000 --- a/schema/templates/user_mru.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "index_patterns": [ - "*user_mru" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "username": { - "type": "keyword" - }, - "client_id": { - "type": "keyword" - }, - "timestamp": { - "type": "long" - } - } - } - } -} diff --git a/schema/templates/user_options.json b/schema/templates/user_options.json deleted file mode 100644 index 8d6d581..0000000 --- a/schema/templates/user_options.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*user_options" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "username": { - "type": "keyword" - }, - "gui_options": { - "type": "binary" - } - } - } - } -} diff --git a/schema/templates/users.json b/schema/templates/users.json deleted file mode 100644 index fd58d74..0000000 --- a/schema/templates/users.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "index_patterns": [ - "*users" - ], - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, - "mappings": { - "dynamic": false, - "properties": { - "username": { - "type": "keyword" - }, - "record": { - "type": "binary" - } - } - } - } -} diff --git a/services/acl_manager/acl_manager.go b/services/acl_manager/acl_manager.go index 812f72a..e228748 100644 --- a/services/acl_manager/acl_manager.go +++ b/services/acl_manager/acl_manager.go @@ -21,7 +21,8 @@ var ( ) type ACLRecord struct { - ACL string `json:"acl"` + ACL string `json:"acl"` + DocType string `json:"doc_type"` } type ACLManager struct { @@ -55,7 +56,7 @@ func (self ACLManager) GetPolicy( hit, err := cvelo_services.GetElasticRecord( context.Background(), config_obj.OrgId, - "acls", principal) + "persisted", principal) if err != nil { return nil, err } @@ -95,8 +96,9 @@ func (self ACLManager) SetPolicy( acl_lru.Set(key, acl_obj) return cvelo_services.SetElasticIndex(self.ctx, config_obj.OrgId, - "acls", principal, &ACLRecord{ - ACL: json.MustMarshalString(acl_obj), + "persisted", principal, &ACLRecord{ + ACL: json.MustMarshalString(acl_obj), + DocType: "acls", }) } diff --git a/services/client_info/client_info.go b/services/client_info/client_info.go index 4be58ce..c9fbcff 100644 --- a/services/client_info/client_info.go +++ b/services/client_info/client_info.go @@ -101,7 +101,7 @@ func (self *ClientInfoBase) Set( return cvelo_services.SetElasticIndex(ctx, self.config_obj.OrgId, - "clients", client_info.ClientId, &api.ClientRecord{ + "persisted", client_info.ClientId, &api.ClientRecord{ ClientId: client_info.ClientId, Hostname: client_info.Hostname, System: client_info.System, @@ -110,13 +110,14 @@ func (self *ClientInfoBase) Set( MacAddresses: client_info.MacAddresses, LastHuntTimestamp: client_info.LastHuntTimestamp, LastEventTableVersion: client_info.LastEventTableVersion, + DocType: "clients", }) } func (self ClientInfoBase) Remove( ctx context.Context, client_id string) { cvelo_services.DeleteDocument(ctx, self.config_obj.OrgId, - "clients", client_id, true) + "persisted", client_id, true) } // Get a single entry from a client id diff --git a/services/client_info/client_info_common.go b/services/client_info/client_info_common.go index 8b0072d..5046a38 100644 --- a/services/client_info/client_info_common.go +++ b/services/client_info/client_info_common.go @@ -19,6 +19,7 @@ var ( "query": { "bool": { "must": [ + {"match": {"doc_type" : "task"}}, {"match": {"client_id" : %q}} ]} } @@ -38,11 +39,12 @@ func (self ClientInfoQueuer) QueueMessageForClient( return cvelo_services.SetElasticIndex(ctx, self.config_obj.OrgId, - "tasks", "", &ClientTask{ + "persisted", "", &ClientTask{ ClientId: client_id, FlowId: req.SessionId, Timestamp: time.Now().UnixNano(), JSONData: string(serialized), + DocType: "task", }) } @@ -51,6 +53,7 @@ type ClientTask struct { FlowId string `json:"flow_id"` Timestamp int64 `json:"timestamp"` JSONData string `json:"data"` + DocType string `json:"doc_type"` } // Get the client's tasks and remove them from the queue. @@ -59,7 +62,7 @@ func (self ClientInfoBase) GetClientTasks( query := json.Format(getClientTasksQuery, client_id) hits, err := cvelo_services.QueryElastic(ctx, self.config_obj.OrgId, - "tasks", query) + "persisted", query) if err != nil { return nil, err } @@ -68,7 +71,7 @@ func (self ClientInfoBase) GetClientTasks( for _, hit := range hits { err = cvelo_services.DeleteDocument(ctx, self.config_obj.OrgId, - "tasks", hit.Id, cvelo_services.NoSync) + "persisted", hit.Id, cvelo_services.NoSync) if err != nil { return nil, err } @@ -95,7 +98,7 @@ func (self ClientInfoBase) PeekClientTasks( query := json.Format(getClientTasksQuery, client_id) hits, err := cvelo_services.QueryElastic(ctx, self.config_obj.OrgId, - "tasks", query) + "persisted", query) if err != nil { return nil, err } diff --git a/services/client_monitoring/client_monitoring.go b/services/client_monitoring/client_monitoring.go index 5b12ef5..92194b5 100644 --- a/services/client_monitoring/client_monitoring.go +++ b/services/client_monitoring/client_monitoring.go @@ -24,8 +24,9 @@ var ( ) type ConfigEntry struct { - Type string `json:"type"` - Data string `json:"data"` + Type string `json:"type"` + Data string `json:"data"` + DocType string `json:"doc_type"` } type ClientMonitoringManager struct { @@ -130,7 +131,7 @@ func (self ClientMonitoringManager) makeDefaultClientMonitoringLabel() *flows_pr func (self ClientMonitoringManager) GetClientMonitoringState() *flows_proto.ClientEventTable { ctx := context.Background() serialized, err := cvelo_services.GetElasticRecord( - ctx, self.config_obj.OrgId, "config", "client_monitoring") + ctx, self.config_obj.OrgId, "persisted", "client_monitoring") if err != nil { table := self.makeDefaultClientMonitoringLabel() self.SetClientMonitoringState(ctx, self.config_obj, "", table) @@ -175,10 +176,11 @@ func (self *ClientMonitoringManager) SetClientMonitoringState( return cvelo_services.SetElasticIndex(ctx, self.config_obj.OrgId, - "config", "client_monitoring", + "persisted", "client_monitoring", &ConfigEntry{ - Type: "client_monitoring", - Data: json.MustMarshalString(state), + Type: "client_monitoring", + Data: json.MustMarshalString(state), + DocType: "config", }) } diff --git a/services/client_monitoring/events.go b/services/client_monitoring/events.go index fcb9f53..fb4edf6 100644 --- a/services/client_monitoring/events.go +++ b/services/client_monitoring/events.go @@ -86,7 +86,7 @@ func listAvailableEventArtifacts( // artifacts runner, it is still possible for server artifacts // to be written by various services (e.g. Audit manager). query = json.Format(getAvailableServerArtifactsQuery, - "server", "results", OPENSEARCH_MAX_BUCKETS) + "server", "transient", OPENSEARCH_MAX_BUCKETS) } else { // Even if client events are not generated there are always @@ -96,7 +96,7 @@ func listAvailableEventArtifacts( } hits, err := cvelo_services.QueryElasticAggregations(ctx, - config_obj.OrgId, "results", + config_obj.OrgId, "transient", query) if err != nil { return nil, err @@ -175,11 +175,11 @@ func listAvailableEventTimestamps( } else { query = json.Format(getAvailableEventTimesQuery, in.ClientId, - "results", in.Artifact, OPENSEARCH_MAX_BUCKETS) + "transient", in.Artifact, OPENSEARCH_MAX_BUCKETS) } hits, err := cvelo_services.QueryElasticAggregations(ctx, - config_obj.OrgId, "results", query) + config_obj.OrgId, "transient", query) if err != nil { return nil, err } diff --git a/services/hunt_dispatcher/create.go b/services/hunt_dispatcher/create.go index 27a463a..18d24fe 100644 --- a/services/hunt_dispatcher/create.go +++ b/services/hunt_dispatcher/create.go @@ -120,18 +120,19 @@ func (self HuntDispatcher) CreateHunt( err = cvelo_services.SetElasticIndex(ctx, self.config_obj.OrgId, - "hunts", hunt_id, &HuntEntry{ + "persisted", hunt_id, &HuntEntry{ HuntId: hunt_id, Timestamp: time.Now().Unix(), Hunt: string(serialized), State: hunt.State.String(), + DocType: "hunts", }) // The actual hunt scheduling is done by the foreman. /* - if hunt.State == api_proto.Hunt_RUNNING { - scheduleClientsForHunt(ctx, config_obj, hunt) - } + if hunt.State == api_proto.Hunt_RUNNING { + scheduleClientsForHunt(ctx, config_obj, hunt) + } */ return hunt_id, nil } diff --git a/services/hunt_dispatcher/flows.go b/services/hunt_dispatcher/flows.go index 822c56b..4883236 100644 --- a/services/hunt_dispatcher/flows.go +++ b/services/hunt_dispatcher/flows.go @@ -52,7 +52,7 @@ func (self HuntDispatcher) GetFlows( query := json.Format(getHuntsFlowsQuery, start, hunt_id) hits, err := cvelo_services.QueryChan( ctx, config_obj, 1000, self.config_obj.OrgId, - "results", query, "timestamp") + "transient", query, "timestamp") if err != nil { scope.Log("GetFlows for hunt %v: %v", hunt_id, err) return diff --git a/services/hunt_dispatcher/hunt_dispatcher.go b/services/hunt_dispatcher/hunt_dispatcher.go index 95d0e18..9b13c12 100644 --- a/services/hunt_dispatcher/hunt_dispatcher.go +++ b/services/hunt_dispatcher/hunt_dispatcher.go @@ -21,6 +21,7 @@ type HuntEntry struct { Errors uint64 `json:"errors"` Hunt string `json:"hunt"` State string `json:"state"` + DocType string `json:"doc_type"` } func (self *HuntEntry) GetHunt() (*api_proto.Hunt, error) { @@ -80,9 +81,10 @@ func (self HuntDispatcher) SetHunt(hunt *api_proto.Hunt) error { } record := &HuntEntry{ - HuntId: hunt_id, - Hunt: string(serialized), - State: hunt.State.String(), + HuntId: hunt_id, + Hunt: string(serialized), + State: hunt.State.String(), + DocType: "hunts", } if hunt.Stats != nil { @@ -93,12 +95,12 @@ func (self HuntDispatcher) SetHunt(hunt *api_proto.Hunt) error { return cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "hunts", hunt.HuntId, record) + "persisted", hunt.HuntId, record) } func (self HuntDispatcher) GetHunt(hunt_id string) (*api_proto.Hunt, bool) { serialized, err := cvelo_services.GetElasticRecord(context.Background(), - self.config_obj.OrgId, "hunts", hunt_id) + self.config_obj.OrgId, "persisted", hunt_id) if err != nil { return nil, false } @@ -135,12 +137,20 @@ func (self HuntDispatcher) Refresh( func (self HuntDispatcher) Close(config_obj *config_proto.Config) {} +// TODO add sort and from/size clause const getAllHuntsQuery = ` -{"query": {"match_all" : {}}, - "sort": [{ +{ + "query": { + "bool": { + "must": [{"match": { + "doc_type": "hunts" + }}] + } + },"sort": [{ "hunt_id": {"order": "desc", "unmapped_type": "keyword"} }], - "from": %q, "size": %q} + "from": %q, "size": %q +} ` func (self HuntDispatcher) ListHunts( @@ -150,7 +160,7 @@ func (self HuntDispatcher) ListHunts( hits, _, err := cvelo_services.QueryElasticRaw( ctx, self.config_obj.OrgId, - "hunts", json.Format(getAllHuntsQuery, in.Offset, in.Count)) + "persisted", json.Format(getAllHuntsQuery, in.Offset, in.Count)) if err != nil { return nil, err } diff --git a/services/indexing/indexing.go b/services/indexing/indexing.go index ddebf7f..0e9f34e 100644 --- a/services/indexing/indexing.go +++ b/services/indexing/indexing.go @@ -19,6 +19,7 @@ import ( type ElasticIndexRecord struct { Term string `json:"term"` ClientId string `json:"client_id"` + DocType string `json:"doc_type"` } type Indexer struct { @@ -29,9 +30,10 @@ type Indexer struct { func (self Indexer) SetIndex(client_id, term string) error { return cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "index", "", &ElasticIndexRecord{ + "persisted", "", &ElasticIndexRecord{ Term: term, ClientId: client_id, + DocType: "index", }) } @@ -45,7 +47,7 @@ func (self Indexer) getIndexRecords( config_obj *config_proto.Config, query string, output_chan chan *api_proto.IndexRecord) { hits, err := cvelo_services.QueryChan(ctx, config_obj, 1000, - config_obj.OrgId, "clients", query, cvelo_services.NoSortField) + config_obj.OrgId, "persisted", query, cvelo_services.NoSortField) if err != nil { logger := logging.GetLogger(config_obj, &logging.FrontendComponent) logger.Error("getIndexRecords: %v", err) @@ -78,7 +80,15 @@ func (self Indexer) SearchIndexWithPrefix( operator, term := splitIntoOperatorAndTerms(prefix) switch operator { case "all": - query := `{"query": {"match_all" : {}}, "_source": {"includes": ["client_id"]}}` + query := ` +{ + "query": {"bool": { + "must": [{"match": { + "doc_type": "clients" + }}] + }}, + "_source": {"includes": ["client_id"]} +}` self.getIndexRecords(ctx, config_obj, query, output_chan) return diff --git a/services/indexing/mru.go b/services/indexing/mru.go index e5d9913..1064042 100644 --- a/services/indexing/mru.go +++ b/services/indexing/mru.go @@ -13,6 +13,7 @@ type MRUItem struct { Username string `json:"username"` ClientId string `json:"client_id"` Timestamp int64 `json:"timestamp"` + DocType string `json:"doc_type"` } func (self Indexer) UpdateMRU( @@ -20,10 +21,11 @@ func (self Indexer) UpdateMRU( user_name string, client_id string) error { return cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "user_mru", user_name+":"+client_id, + "persisted", user_name+":"+client_id, &MRUItem{ Username: user_name, ClientId: client_id, Timestamp: time.Now().Unix(), + DocType: "user_mru", }) } diff --git a/services/indexing/names.go b/services/indexing/names.go index e89deaa..030039a 100644 --- a/services/indexing/names.go +++ b/services/indexing/names.go @@ -80,7 +80,7 @@ func (self *Indexer) searchWithNames( query := json.Format(getAllClientsAgg, field, label, field, offset, limit+1) hits, err := cvelo_services.QueryElasticAggregations( - ctx, config_obj.OrgId, "clients", query) + ctx, config_obj.OrgId, "persisted", query) if err != nil { return nil, err } diff --git a/services/indexing/search.go b/services/indexing/search.go index 4895635..b3ddda7 100644 --- a/services/indexing/search.go +++ b/services/indexing/search.go @@ -63,7 +63,7 @@ func (self *Indexer) searchRecents( []*api.ClientRecord, int, error) { hits, total, err := cvelo_services.QueryElasticRaw( - ctx, config_obj.OrgId, "user_mru", json.Format( + ctx, config_obj.OrgId, "persisted", json.Format( clientsMRUQuery, principal, from, limit)) if err != nil { return nil, 0, err @@ -102,15 +102,26 @@ const ( "client_id": {"order": "asc", "unmapped_type": "keyword"} }], "_source": false, - "query": {"bool": {"must": [%s]}} - %s + "query": { + "bool": { + "must": [ + %s, {"match": { + "doc_type": "clients" + }}] + } +} +%s } ` getAllClientsNamesQuery = ` {"sort": [{ "client_id": {"order": "asc", "unmapped_type": "keyword"} }], - "query": {"bool": {"must": [%s]}} + "query": {"bool": {"must": [%s,{ + "match": { + "doc_type": "clients" + } + }]}} %s } ` @@ -246,7 +257,7 @@ func (self *Indexer) searchWithPrefixedNames( json.Format(limitQueryPart, offset, limit+1)) hits, total, err := cvelo_services.QueryElasticRaw( - ctx, config_obj.OrgId, "clients", query) + ctx, config_obj.OrgId, "persisted", query) if err != nil { return nil, 0, err } @@ -307,7 +318,7 @@ func (self *Indexer) searchWithSortTerms( json.Format(limitQueryPart, offset, limit+1)) hits, total, err := cvelo_services.QueryElasticIds( - ctx, config_obj.OrgId, "clients", query) + ctx, config_obj.OrgId, "persisted", query) if err != nil { return nil, 0, err } diff --git a/services/indexing/search_chan.go b/services/indexing/search_chan.go index 1a22380..92532dd 100644 --- a/services/indexing/search_chan.go +++ b/services/indexing/search_chan.go @@ -78,7 +78,7 @@ func (self *Indexer) searchWithTermsChan( // Page the query in parts. First part specifies the size. part_query := `{"size":1000,` + query[1:] hits, _, err := cvelo_services.QueryElasticIds( - ctx, config_obj.OrgId, "clients", part_query) + ctx, config_obj.OrgId, "persisted", part_query) if err != nil { logger := logging.GetLogger(config_obj, &logging.FrontendComponent) logger.Error("searchWithTermsChan: %v", err) @@ -110,7 +110,7 @@ func (self *Indexer) searchWithTermsChan( // Get the next batch hits, _, err = cvelo_services.QueryElasticIds( - ctx, config_obj.OrgId, "clients", + ctx, config_obj.OrgId, "persisted", json.Format(`{"search_after":[%q],`, last_client_id)+part_query[1:]) if err != nil { logger := logging.GetLogger(config_obj, &logging.FrontendComponent) diff --git a/services/labeler/labeler.go b/services/labeler/labeler.go index 0cf65cf..efca2e1 100644 --- a/services/labeler/labeler.go +++ b/services/labeler/labeler.go @@ -95,7 +95,7 @@ func (self Labeler) SetClientLabel( err := cvelo_services.UpdateIndex(ctx, self.config_obj.OrgId, - "clients", client_id+"_labels", + "persisted", client_id+"_labels", json.Format(label_update_query, all_label_painless, label, time.Now().UnixNano(), strings.ToLower(label))) if err == nil { @@ -107,12 +107,13 @@ func (self Labeler) SetClientLabel( } return cvelo_services.SetElasticIndex(ctx, - self.config_obj.OrgId, "clients", client_id+"_labels", + self.config_obj.OrgId, "persisted", client_id+"_labels", api.ClientRecord{ ClientId: client_id, Labels: []string{label}, LowerLabels: []string{strings.ToLower(label)}, LastLabelTimestamp: uint64(utils.GetTime().Now().UnixNano()), + DocType: "clients", }) } @@ -137,7 +138,7 @@ func (self Labeler) RemoveClientLabel( label = strings.TrimSpace(label) return cvelo_services.UpdateIndex(ctx, self.config_obj.OrgId, - "clients", client_id+"_labels", + "persisted", client_id+"_labels", json.Format(label_update_query, remove_label_painless, label, time.Now().UnixNano(), strings.ToLower(label))) } diff --git a/services/launcher/delete.go b/services/launcher/delete.go index 1ddf0c6..16e4c96 100644 --- a/services/launcher/delete.go +++ b/services/launcher/delete.go @@ -78,7 +78,7 @@ func (self *FlowStorageManager) DeleteFlow( // Order results to facilitate deletion - container deletion // happens after we read its contents. - r.delete_index("UploadMetadata", "results", "vfs_path", + r.delete_index("UploadMetadata", "transient", "vfs_path", upload_metadata_path.AsClientPath()) // Remove all result sets from artifacts. @@ -93,17 +93,17 @@ func (self *FlowStorageManager) DeleteFlow( if err != nil { continue } - r.delete_index("Result", "results", "vfs_path", + r.delete_index("Result", "transient", "vfs_path", result_path.AsClientPath()) } - r.delete_index("Log", "results", "vfs_path", + r.delete_index("Log", "transient", "vfs_path", flow_path_manager.Log().AsClientPath()) - r.delete_index("CollectionContext", "collections", "session_id", flow_id) + r.delete_index("CollectionContext", "transient", "session_id", flow_id) // All notebook and their cells notebook_id := fmt.Sprintf("N.%s-%s", flow_id, client_id) - r.delete_index("Notebook", "notebooks", "notebook_id", notebook_id) + r.delete_index("Notebook", "persisted", "notebook_id", notebook_id) return r.responses, nil } diff --git a/services/launcher/flows.go b/services/launcher/flows.go index a6bcb01..33c91da 100644 --- a/services/launcher/flows.go +++ b/services/launcher/flows.go @@ -77,7 +77,7 @@ func (self Launcher) GetFlows( query := fmt.Sprintf(getCollectionsQuery, client_id) records, _, err := cvelo_services.QueryElasticRaw(ctx, - config_obj.OrgId, "results", query) + config_obj.OrgId, "transient", query) if err != nil { return nil, err } diff --git a/services/launcher/launcher_test.go b/services/launcher/launcher_test.go index b028e87..849e07b 100644 --- a/services/launcher/launcher_test.go +++ b/services/launcher/launcher_test.go @@ -28,13 +28,13 @@ const ( // Query to retrieve all the task queued for a client. getClientTasksQuery = `{ - "sort": [ - { - "timestamp": {"order": "asc"} + "sort": [{ + "timestamp": {"order": "asc", "unmapped_type" : "long"} }], "query": { "bool": { "must": [ + {"match": {"doc_type" : "task"}}, {"match": {"client_id" : %q}} ]} } @@ -181,7 +181,7 @@ sources: func TestLauncher(t *testing.T) { suite.Run(t, &LauncherTestSuite{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"tasks", "collections", "results"}, + Indexes: []string{"persisted", "transient"}, }, }) } @@ -193,7 +193,7 @@ func PeekClientTasks(ctx context.Context, query := json.Format(getClientTasksQuery, client_id) hits, err := cvelo_services.QueryElastic(ctx, config_obj.OrgId, - "tasks", query) + "persisted", query) if err != nil { return nil, err } diff --git a/services/launcher/storage.go b/services/launcher/storage.go index 1efb090..de66127 100644 --- a/services/launcher/storage.go +++ b/services/launcher/storage.go @@ -42,7 +42,7 @@ func (self *FlowStorageManager) WriteFlow( record.Timestamp = utils.GetTime().Now().UnixNano() return cvelo_services.SetElasticIndex(ctx, - config_obj.OrgId, "results", "", record) + config_obj.OrgId, "transient", "", record) } func (self *FlowStorageManager) WriteTask( @@ -50,7 +50,7 @@ func (self *FlowStorageManager) WriteTask( config_obj *config_proto.Config, client_id string, msg *crypto_proto.VeloMessage) error { - doc_id := api.GetDocumentIdForCollection(client_id, msg.SessionId, "tasks") + doc_id := api.GetDocumentIdForCollection(client_id, msg.SessionId, "task") messages := &api_proto.ApiFlowRequestDetails{ Items: []*crypto_proto.VeloMessage{msg}, } @@ -65,7 +65,7 @@ func (self *FlowStorageManager) WriteTask( ID: doc_id, } return cvelo_services.SetElasticIndex(ctx, - config_obj.OrgId, "results", "", record) + config_obj.OrgId, "transient", "", record) } // Not used - opensearch is handled with Launcher.GetFlows() directly. @@ -123,7 +123,7 @@ func (self *FlowStorageManager) LoadCollectionContext( } hits, _, err := cvelo_services.QueryElasticRaw(ctx, - config_obj.OrgId, "results", + config_obj.OrgId, "transient", json.Format(getFlowDetailsQuery, client_id, flow_id)) if err != nil { return nil, err @@ -171,7 +171,7 @@ func (self *FlowStorageManager) GetFlowRequests( } hits, _, err := cvelo_services.QueryElasticRaw(ctx, - config_obj.OrgId, "results", + config_obj.OrgId, "transient", json.Format(getFlowTasksQuery, client_id, flow_id)) if err != nil { return nil, err diff --git a/services/notebook/shared.go b/services/notebook/shared.go index 92e9eeb..acb3e85 100644 --- a/services/notebook/shared.go +++ b/services/notebook/shared.go @@ -23,7 +23,8 @@ const ( "should": [ {"match": {"creator" : %q}}, {"match": {"shared": %q}}, - {"match": {"public": true}} + {"match": {"public": true}}, + {"match": {"doc_type" : "notebooks"}} ]} }, {"match": {"type": "User"}} @@ -40,7 +41,7 @@ func (self *NotebookManager) GetSharedNotebooks( ctx context.Context, user string, offset, count uint64) ( []*api_proto.NotebookMetadata, error) { hits, _, err := cvelo_services.QueryElasticRaw( - ctx, self.config_obj.OrgId, "notebooks", + ctx, self.config_obj.OrgId, "persisted", json.Format(query, user, user, count, offset)) if err != nil { return nil, err diff --git a/services/notebook/storage.go b/services/notebook/storage.go index 30557cf..a485212 100644 --- a/services/notebook/storage.go +++ b/services/notebook/storage.go @@ -26,6 +26,7 @@ type NotebookRecord struct { SharedWith []string `json:"shared"` Timestamp int64 `json:"timestamp"` Type string `json:"type"` + DocType string `json:"doc_type"` } type NotebookStoreImpl struct { @@ -52,7 +53,7 @@ func getType(notebook_id string) string { func (self *NotebookStoreImpl) SetNotebook(in *api_proto.NotebookMetadata) error { return cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "notebooks", in.NotebookId, &NotebookRecord{ + "persisted", in.NotebookId, &NotebookRecord{ NotebookId: in.NotebookId, Notebook: json.MustMarshalString(in), Creator: in.Creator, @@ -60,7 +61,8 @@ func (self *NotebookStoreImpl) SetNotebook(in *api_proto.NotebookMetadata) error Timestamp: time.Now().Unix(), SharedWith: append([]string{}, in.Collaborators...), - Type: getType(in.NotebookId), + Type: getType(in.NotebookId), + DocType: "notebooks", }) } @@ -68,7 +70,7 @@ func (self *NotebookStoreImpl) GetNotebook(notebook_id string) ( *api_proto.NotebookMetadata, error) { serialized, err := cvelo_services.GetElasticRecord( - self.ctx, self.config_obj.OrgId, "notebooks", notebook_id) + self.ctx, self.config_obj.OrgId, "persisted", notebook_id) if err != nil { return nil, err } @@ -91,11 +93,12 @@ func (self *NotebookStoreImpl) SetNotebookCell( err := cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "notebooks", in.CellId, &NotebookRecord{ + "persisted", in.CellId, &NotebookRecord{ NotebookId: notebook_id, CellId: in.CellId, Timestamp: time.Now().Unix(), NotebookCell: json.MustMarshalString(in), + DocType: "notebooks", }) if err != nil { return err @@ -129,7 +132,7 @@ func (self *NotebookStoreImpl) GetNotebookCell(notebook_id, cell_id string) ( *api_proto.NotebookCell, error) { serialized, err := cvelo_services.GetElasticRecord( - self.ctx, self.config_obj.OrgId, "notebooks", cell_id) + self.ctx, self.config_obj.OrgId, "persisted", cell_id) if err != nil { return nil, err } diff --git a/services/notifier/notifier.go b/services/notifier/notifier.go index 22a9413..a1d656d 100644 --- a/services/notifier/notifier.go +++ b/services/notifier/notifier.go @@ -38,7 +38,7 @@ func (self Nofitier) ListenForNotification(id string) (chan bool, func()) { case <-time.After(self.poll): serialized, err := cvelo_services.GetElasticRecord( - ctx, self.config_obj.OrgId, "notifications", id) + ctx, self.config_obj.OrgId, "persisted", id) if err != nil { continue } @@ -66,10 +66,11 @@ func (self Nofitier) NotifyListener( config_obj *config_proto.Config, id, tag string) error { return cvelo_services.SetElasticIndex( context.Background(), - self.config_obj.OrgId, "notifications", + self.config_obj.OrgId, "persisted", id, &api.NotificationRecord{ Key: id, Timestamp: time.Now().Unix(), + DocType: "notifications", }) } diff --git a/services/orgs/delete.go b/services/orgs/delete.go index bb0e79b..489f141 100644 --- a/services/orgs/delete.go +++ b/services/orgs/delete.go @@ -42,7 +42,7 @@ func (self *OrgManager) DeleteOrg( // Remove the org from the index. err = cvelo_services.DeleteDocument(ctx, - services.ROOT_ORG_ID, "orgs", + services.ROOT_ORG_ID, "persisted", org_id, cvelo_services.SyncDelete) if err != nil { return err diff --git a/services/orgs/orgs.go b/services/orgs/orgs.go index 0d69f06..b86188c 100644 --- a/services/orgs/orgs.go +++ b/services/orgs/orgs.go @@ -25,8 +25,13 @@ import ( "www.velocidex.com/golang/velociraptor/utils" ) +type OrgRecord struct { + *api_proto.OrgRecord + DocType string `json:"doc_type"` +} + type OrgContext struct { - record *api_proto.OrgRecord + record *OrgRecord config_obj *config_proto.Config service services.ServiceContainer } @@ -80,7 +85,7 @@ func (self *OrgManager) GetOrg(org_id string) (*api_proto.OrgRecord, error) { if !pres { return nil, services.NotFoundError } - return result.record, nil + return result.record.OrgRecord, nil } func (self *OrgManager) OrgIdByNonce(nonce string) (string, error) { @@ -118,12 +123,12 @@ func (self *OrgManager) CreateNewOrg(name, id string) ( self.org_id_by_nonce[org_context.record.Nonce] = org_context.record.Id self.mu.Unlock() - org_record := org_context.record - + orgRecordDocType := org_context.record + orgRecord := orgRecordDocType.OrgRecord // Write the org into the index. - return org_record, cvelo_services.SetElasticIndex(self.ctx, + return orgRecord, cvelo_services.SetElasticIndex(self.ctx, services.ROOT_ORG_ID, - "orgs", org_record.Id, org_record) + "persisted", orgRecord.Id, orgRecordDocType) } func (self *OrgManager) makeNewConfigObj( @@ -145,13 +150,24 @@ func (self *OrgManager) makeNewConfigObj( func (self *OrgManager) Scan() error { hits, _, err := cvelo_services.QueryElasticRaw( self.ctx, services.ROOT_ORG_ID, - "orgs", `{"query": {"match_all" : {}}, "size": 10000}`) + "persisted", ` +{ + "query": { + "bool":{ + "must":[{ + "match":{ + "doc_type":"orgs" + } + }] + }}, + "size": 10000 + }`) if err != nil { return err } for _, hit := range hits { - record := &api_proto.OrgRecord{} + record := &OrgRecord{} err = json.Unmarshal(hit, record) if err == nil { // Read existing records for backwards compatibility diff --git a/services/orgs/services.go b/services/orgs/services.go index 2002efe..e4d3acb 100644 --- a/services/orgs/services.go +++ b/services/orgs/services.go @@ -42,6 +42,11 @@ func (self *OrgManager) makeNewOrgContext(org_id, name, nonce string) (*OrgConte Nonce: nonce, } + orgRecord := &OrgRecord{ + record, + "orgs", + } + if utils.IsRootOrg(org_id) { record.OrgId = "root" record.Id = "root" @@ -58,7 +63,7 @@ func (self *OrgManager) makeNewOrgContext(org_id, name, nonce string) (*OrgConte } org_context := &OrgContext{ - record: record, + record: orgRecord, config_obj: org_config, service: service_manager, } @@ -141,6 +146,10 @@ func (self *OrgManager) makeClientOrgContext(org_id, name, nonce string) (*OrgCo Name: name, Nonce: nonce, } + orgRecord := &OrgRecord{ + record, + "orgs", + } if utils.IsRootOrg(org_id) { record.OrgId = "root" @@ -157,7 +166,7 @@ func (self *OrgManager) makeClientOrgContext(org_id, name, nonce string) (*OrgCo } org_context := &OrgContext{ - record: record, + record: orgRecord, config_obj: org_config, service: service_manager, } diff --git a/services/repository/manager.go b/services/repository/manager.go index 96299fa..65ab104 100644 --- a/services/repository/manager.go +++ b/services/repository/manager.go @@ -181,7 +181,7 @@ func (self *RepositoryManager) DeleteArtifactFile( ctx context.Context, config_obj *config_proto.Config, principal, name string) error { err := cvelo_services.DeleteDocument(self.ctx, self.config_obj.OrgId, - "repository", name, cvelo_services.Sync) + "persisted", name, cvelo_services.Sync) if err != nil { return err } diff --git a/services/repository/repository.go b/services/repository/repository.go index 5be37d2..967507f 100644 --- a/services/repository/repository.go +++ b/services/repository/repository.go @@ -41,9 +41,17 @@ const ( // We only need the names of the artifacts for listing. allNamesQuery = ` { - "query" : { - "match_all" : {} - } + "query": { + "bool": { + "must": [ + { + "match": { + "doc_type": "repository" + } + } + ] + } + } } ` ) @@ -90,9 +98,10 @@ func (self *Repository) List( config_obj *config_proto.Config) ([]string, error) { results := ordereddict.NewDict() - + //TODO new index does not like name sortfield hits, err := cvelo_services.QueryChan(ctx, config_obj, 1000, - self.config_obj.OrgId, "repository", allNamesQuery, "name") + self.config_obj.OrgId, "persisted", allNamesQuery, "name") + if err != nil && !errors.Is(err, os.ErrNotExist) { return nil, err } @@ -176,7 +185,7 @@ func (self *Repository) LoadProto( func (self *Repository) Del(name string) { self.lru.Remove(name) cvelo_services.DeleteDocument(self.ctx, self.config_obj.OrgId, - "repository", name, cvelo_services.SyncDelete) + "perrsisted", name, cvelo_services.SyncDelete) } func (self *Repository) Get( @@ -248,7 +257,7 @@ func (self *Repository) getFromBackend( // Nope - get it from the backend. serialized, err := cvelo_services.GetElasticRecord(self.ctx, - self.config_obj.OrgId, "repository", name) + self.config_obj.OrgId, "persisted", name) if err != nil { return nil, false } @@ -282,10 +291,11 @@ func (self *Repository) saveArtifact( // Set the artifact in the elastic index. err := cvelo_services.SetElasticIndex(self.ctx, self.config_obj.OrgId, - "repository", artifact.Name, + "persisted", artifact.Name, &api.RepositoryEntry{ Name: artifact.Name, Definition: json.MustMarshalString(artifact), + DocType: "repository", }) // Set the artifact in the LRU diff --git a/services/server_artifacts/server_artifacts.go b/services/server_artifacts/server_artifacts.go index 1eabfc3..79cfc6d 100644 --- a/services/server_artifacts/server_artifacts.go +++ b/services/server_artifacts/server_artifacts.go @@ -36,7 +36,7 @@ func (self *ServerArtifactRunner) CloudConfig() *config.ElasticConfiguration { func (self *ServerArtifactRunner) Cancel( ctx context.Context, flow_id, principal string) { cvelo_services.SetElasticIndex( - self.ctx, self.config_obj.OrgId, "collections", + self.ctx, self.config_obj.OrgId, "transient", flow_id+"_cancel", &api.ArtifactCollectorRecord{ ClientId: principal, SessionId: flow_id, @@ -78,7 +78,7 @@ func (self *ServerArtifactRunner) LaunchServerArtifact( case <-utils.GetTime().After(10 * time.Second): // If this record appears, we immediately cancel. serialized, err := cvelo_services.GetElasticRecord(sub_ctx, - self.config_obj.OrgId, "collections", session_id+"_cancel") + self.config_obj.OrgId, "transient", session_id+"_cancel") if err == nil { record := &api.ArtifactCollectorRecord{} err = json.Unmarshal(serialized, record) diff --git a/services/users/users.go b/services/users/users.go index 4ea280d..d4cf47a 100644 --- a/services/users/users.go +++ b/services/users/users.go @@ -25,11 +25,13 @@ import ( type UserRecord struct { Username string `json:"username"` Record string `json:"record"` // An encoded api_proto.VelociraptorUser + DocType string `json:"doc_type"` } type UserGUIOptions struct { Username string `json:"username"` GUIOptions string `json:"gui_options"` // An endoded api_proto.SetGUIOptionsRequest + DocType string `json:"doc_type"` } type UserManager struct { @@ -51,9 +53,10 @@ func (self *UserManager) SetUser( return cvelo_services.SetElasticIndex(ctx, services.ROOT_ORG_ID, - "users", user_record.Name, &UserRecord{ + "persisted", user_record.Name, &UserRecord{ Username: user_record.Name, Record: string(serialized), + DocType: "users", }) } @@ -61,7 +64,7 @@ func (self *UserManager) ListUsers(ctx context.Context) ( []*api_proto.VelociraptorUser, error) { hits, _, err := cvelo_services.QueryElasticRaw( self.ctx, services.ROOT_ORG_ID, - "users", `{"query": {"match_all": {}}}`) + "persisted", `{"query": {"bool":{"must":[{"match":{"doc_type":"users"}}]}}}`) if err != nil { return nil, err } @@ -128,7 +131,7 @@ func (self *UserManager) GetUserWithHashes( } serialized, err := cvelo_services.GetElasticRecord(self.ctx, - services.ROOT_ORG_ID, "users", username) + services.ROOT_ORG_ID, "persisted", username) if err != nil { return nil, err } @@ -219,9 +222,10 @@ func (self *UserManager) SetUserOptions( return cvelo_services.SetElasticIndex(ctx, services.ROOT_ORG_ID, - "user_options", username, &UserGUIOptions{ + "persisted", username, &UserGUIOptions{ Username: username, GUIOptions: string(serialized), + DocType: "user_options", }) } @@ -229,7 +233,7 @@ func (self *UserManager) GetUserOptions(ctx context.Context, username string) ( *api_proto.SetGUIOptionsRequest, error) { serialized, err := cvelo_services.GetElasticRecord(self.ctx, - services.ROOT_ORG_ID, "user_options", username) + services.ROOT_ORG_ID, "persisted", username) if err == os.ErrNotExist || len(serialized) == 0 { return &api_proto.SetGUIOptionsRequest{}, nil } diff --git a/services/vfs_service/assembler.go b/services/vfs_service/assembler.go index 985b219..f44d3da 100644 --- a/services/vfs_service/assembler.go +++ b/services/vfs_service/assembler.go @@ -108,7 +108,7 @@ func (self *VFSService) readDirectoryWithDownloads( id := cvelo_services.MakeId(utils.JoinComponents(components, "/")) hits, _, err := cvelo_services.QueryElasticRaw(ctx, - config_obj.OrgId, "results", json.Format(queryAllVFSAttributes, id)) + config_obj.OrgId, "transient", json.Format(queryAllVFSAttributes, id)) if err != nil { return nil, nil, err } @@ -177,7 +177,7 @@ func getLatestVFSListResponse( id := services.MakeId(utils.JoinComponents(components, "/")) record := &VFSRecord{} serialized, err := services.GetElasticRecordByQuery(ctx, - config_obj.OrgId, "results", + config_obj.OrgId, "transient", json.Format(vfsSidePanelRenderQuery, id, id)) if err != nil || len(serialized) == 0 { // Empty responses mean the directory is empty. diff --git a/services/vfs_service/directory.go b/services/vfs_service/directory.go index 0e495c8..d512bff 100644 --- a/services/vfs_service/directory.go +++ b/services/vfs_service/directory.go @@ -18,7 +18,7 @@ import ( ) // This record is mapped to the results index (see -// schema/templates/results.json). +// schema/templates/transient.json). type VFSRecord struct { Id string `json:"id"` ClientId string `json:"client_id"` diff --git a/services/vfs_service/vfs_service.go b/services/vfs_service/vfs_service.go index 40f870c..e8af45e 100644 --- a/services/vfs_service/vfs_service.go +++ b/services/vfs_service/vfs_service.go @@ -131,7 +131,7 @@ func (self *VFSService) WriteDownloadInfo( // Write synchronously so the GUI updates the download file right // away. err := cvelo_services.SetElasticIndex( - ctx, config_obj.OrgId, "results", "", stats) + ctx, config_obj.OrgId, "transient", "", stats) utils.GetTime().Sleep(time.Second) diff --git a/vql/server/clients/delete.go b/vql/server/clients/delete.go index 0bbf899..deb6efe 100644 --- a/vql/server/clients/delete.go +++ b/vql/server/clients/delete.go @@ -19,7 +19,8 @@ const ( {"query": { "bool": { "must": [ - {"match": {"client_id": %q}} + {"match": {"client_id": %q}}, + {"match": {"doc_type": "clients"}} ]} }} ` @@ -61,7 +62,7 @@ func (self DeleteClientPlugin) Call(ctx context.Context, return } - indexes := []string{"collections", "results", "hunts", "clients", "tasks"} + indexes := []string{"transient", "persisted"} for _, index := range indexes { if arg.ReallyDoIt { err = removeClientDocs(ctx, config_obj, index, arg.ClientId) diff --git a/vql/server/clients/delete_test.go b/vql/server/clients/delete_test.go index 5972f29..1f6e7bc 100644 --- a/vql/server/clients/delete_test.go +++ b/vql/server/clients/delete_test.go @@ -2,16 +2,15 @@ package clients import ( "context" - "os" - "testing" - "time" - "github.com/Velocidex/ordereddict" "github.com/alecthomas/assert" "github.com/stretchr/testify/suite" + "testing" + "time" "www.velocidex.com/golang/cloudvelo/schema/api" cvelo_services "www.velocidex.com/golang/cloudvelo/services" "www.velocidex.com/golang/cloudvelo/testsuite" + "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" _ "www.velocidex.com/golang/velociraptor/result_sets/simple" @@ -42,6 +41,7 @@ func (self *DeleteTestSuite) TestDeleteClient() { AssignedHunts: []string{}, Labels: []string{}, LowerLabels: []string{}, + DocType: "clients", }, { @@ -50,6 +50,7 @@ func (self *DeleteTestSuite) TestDeleteClient() { AssignedHunts: []string{"H.AllClients"}, Labels: []string{}, LowerLabels: []string{}, + DocType: "clients", }, { @@ -58,6 +59,7 @@ func (self *DeleteTestSuite) TestDeleteClient() { Labels: []string{"Foo"}, LowerLabels: []string{"foo"}, AssignedHunts: []string{}, + DocType: "clients", }, // This client has not been seen in a while @@ -67,13 +69,14 @@ func (self *DeleteTestSuite) TestDeleteClient() { AssignedHunts: []string{}, Labels: []string{}, LowerLabels: []string{}, + DocType: "clients", }, } // Add these clients directly into the index. for _, c := range clients { err := cvelo_services.SetElasticIndex( - self.Ctx, config_obj.OrgId, "clients", c.ClientId, c) + self.Ctx, config_obj.OrgId, "persisted", c.ClientId, c) assert.NoError(self.T(), err) } @@ -101,10 +104,11 @@ func (self *DeleteTestSuite) TestDeleteClient() { Set("really_do_it", true). Set("client_id", self.client_id))) - result, err := cvelo_services.GetElasticRecord(context.Background(), - self.ConfigObj.OrgId, "clients", self.client_id) + result, _, err := cvelo_services.QueryElasticRaw( + ctx, self.ConfigObj.OrgId, + "persisted", json.Format(all_client_items, self.client_id)) assert.Nil(self.T(), result) - assert.Equal(self.T(), err, os.ErrNotExist) + assert.Equal(self.T(), err, nil) // TODO - verify that files are deleted // TODO - verify that this client is removed from other indexes @@ -123,7 +127,7 @@ func (self *DeleteTestSuite) getClientRecord(client_id string) *api.ClientRecord func TestDeletePlugin(t *testing.T) { suite.Run(t, &DeleteTestSuite{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"clients"}, + Indexes: []string{"persisted"}, }, }) } diff --git a/vql/server/hunts/delete.go b/vql/server/hunts/delete.go index 14c2720..81aaa18 100644 --- a/vql/server/hunts/delete.go +++ b/vql/server/hunts/delete.go @@ -76,7 +76,7 @@ func (self DeleteHuntPlugin) Call(ctx context.Context, // is deleted immediately. if arg.ReallyDoIt { err := cvelo_services.UpdateIndex( - ctx, config_obj.OrgId, "hunts", + ctx, config_obj.OrgId, "persisted", arg.HuntId, archiveHuntScript) if err != nil { scope.Log("hunt_delete: %v", err) diff --git a/vql/server/notebook/delete.go b/vql/server/notebook/delete.go index b1b7d9d..4905ffe 100644 --- a/vql/server/notebook/delete.go +++ b/vql/server/notebook/delete.go @@ -61,7 +61,7 @@ func (self *DeleteNotebookPlugin) Call(ctx context.Context, if arg.ReallyDoIt { err := services.DeleteByQuery( - ctx, config_obj.OrgId, "notebooks", + ctx, config_obj.OrgId, "persisted", json.Format(all_notebook_items, arg.NotebookId)) if err != nil { scope.Log("notebook_delete: %v", err) diff --git a/vql/uploads/upload_test.go b/vql/uploads/upload_test.go index 421a91e..c82bc9a 100644 --- a/vql/uploads/upload_test.go +++ b/vql/uploads/upload_test.go @@ -298,7 +298,7 @@ func (self *UploaderTestSuite) checkForKey(filter string) []string { func TestUploader(t *testing.T) { suite.Run(t, &UploaderTestSuite{ CloudTestSuite: &testsuite.CloudTestSuite{ - Indexes: []string{"client_keys"}, + Indexes: []string{"persisted"}, OrgId: "test", }, golden: ordereddict.NewDict(),