From 5672467fca398d920d04765bb2a21596beda00fa Mon Sep 17 00:00:00 2001 From: Thibault NORMAND Date: Mon, 9 Dec 2024 16:07:42 +0100 Subject: [PATCH] feat(graphdb): split deletion transactions. (#303) * feat(graphdb): split deletion transactions. * feat(graphdb): use a const for deletion batch size. --- .../storage/graphdb/janusgraph_provider.go | 111 +++++++++++++----- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/pkg/kubehound/storage/graphdb/janusgraph_provider.go b/pkg/kubehound/storage/graphdb/janusgraph_provider.go index c2a2673c..643def25 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_provider.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_provider.go @@ -19,6 +19,7 @@ import ( const ( channelSizeBatchFactor = 4 // TODO maybe move that into a config file? StorageProviderName = "janusgraph" + deleteBatchSize = 10000 ) var ( @@ -70,19 +71,47 @@ func (jgp *JanusGraphProvider) Prepare(ctx context.Context) error { tx := g.Tx() defer tx.Close() - gtx, err := tx.Begin() - if err != nil { - return err - } - - err = <-gtx.V().Drop().Iterate() - if err != nil { - return err - } - - err = tx.Commit() - if err != nil { - return err + for { + // Begin a new transaction. + gtx, err := tx.Begin() + if err != nil { + return err + } + + // Retrieve the number of vertices in the graph. + page, err := gtx.V().Count().Next() + if err != nil { + return err + } + + // Decode the number of vertices from the page. + count, err := page.GetInt() + if err != nil { + return err + } + + // If there are no more vertices to delete, break the loop. + if count == 0 { + break + } + + // Delete the vertices in the graph. + err = <-gtx.V().Limit(deleteBatchSize).Drop().Iterate() + if err != nil { + return err + } + + // Commit the transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Check context for cancellation. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } return nil @@ -154,7 +183,7 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error { return nil } -// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries. +// Clean removes all vertices in the graph for the given cluster. func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error { var err error span, ctx := span.SpanRunFromContext(ctx, span.IngestorClean) @@ -165,19 +194,47 @@ func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error tx := g.Tx() defer tx.Close() - gtx, err := tx.Begin() - if err != nil { - return err - } - - err = <-gtx.V().Has("cluster", cluster).Drop().Iterate() - if err != nil { - return err - } - - err = tx.Commit() - if err != nil { - return err + for { + // Begin a new transaction. + gtx, err := tx.Begin() + if err != nil { + return err + } + + // Retrieve the number of vertices in the graph for the given cluster. + page, err := gtx.V().Has("cluster", cluster).Count().Next() + if err != nil { + return err + } + + // Decode the number of vertices from the page. + count, err := page.GetInt() + if err != nil { + return err + } + + // If there are no more vertices to delete, break the loop. + if count == 0 { + break + } + + // Delete the vertices in the graph for the given cluster. + err = <-gtx.V().Has("cluster", cluster).Limit(deleteBatchSize).Drop().Iterate() + if err != nil { + return err + } + + // Commit the transaction. + if err := tx.Commit(); err != nil { + return err + } + + // Check context for cancellation. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } return nil