Skip to content

Commit

Permalink
Rehydrate last dumps by cluster (#226)
Browse files Browse the repository at this point in the history
* adding rehydrate command

* adding list method in blob/puller

* grpc rehydratelatest method

* rehydrate new proto

* generated grpc

* generated puller mock

* update protobuff

* adding DumpResult to handle all path

* typo logs

* fixing listFiles function

* cleaning

* adding unit test for the DumpResult

* adding unit test to IsTarGz

* adding unit tests for blob storage (local only)

* adding files for unit tests

* PR comments

* fix unit tests

* fix linter

* fixing unit test

* PR comment

* linter fix

* merging rehydrate to ingest command
  • Loading branch information
jt-dd authored Jul 24, 2024
1 parent 265f490 commit 3e5e8f5
Show file tree
Hide file tree
Showing 34 changed files with 1,609 additions and 111 deletions.
19 changes: 18 additions & 1 deletion cmd/kubehound/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,17 @@ var (
remoteIngestCmd = &cobra.Command{
Use: "remote",
Short: "Ingest data remotely on a KHaaS instance",
Long: `Run an ingestion on KHaaS from a bucket to build the attack path`,
Long: `Run an ingestion on KHaaS from a bucket to build the attack path, by default it will rehydrate the latest snapshot previously dumped on a KHaaS instance from all clusters`,
PreRunE: func(cobraCmd *cobra.Command, args []string) error {
viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck
cobraCmd.MarkFlagRequired("khaas-server") //nolint: errcheck
viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck

if !isIngestRemoteDefault() {
cobraCmd.MarkFlagRequired("run_id") //nolint: errcheck
cobraCmd.MarkFlagRequired("cluster") //nolint: errcheck
}

return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true)
},
RunE: func(cobraCmd *cobra.Command, args []string) error {
Expand All @@ -56,11 +62,22 @@ var (
return fmt.Errorf("get config: %w", err)
}

if isIngestRemoteDefault() {
return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor)
}

return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID)
},
}
)

func isIngestRemoteDefault() bool {
runID := viper.GetString(config.IngestorRunID)
clusterName := viper.GetString(config.IngestorClusterName)

return runID == "" && clusterName == ""
}

func init() {

ingestCmd.AddCommand(localIngestCmd)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
gocloud.dev v0.37.0
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.1
google.golang.org/protobuf v1.34.2
gopkg.in/DataDog/dd-trace-go.v1 v1.64.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/DataDog/dd-trace-go.v1 v1.64.1 h1:HN/zoIV8FvrLKA1ZBkbyo4E1MnPh9hPc2Q0C/ojom3I=
gopkg.in/DataDog/dd-trace-go.v1 v1.64.1/go.mod h1:qzwVu8Qr8CqzQNw2oKEXRdD+fMnjYatjYMGE0tdCVG4=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
Expand Down
9 changes: 2 additions & 7 deletions pkg/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,16 @@ func InitRemoteDumpCmd(cmd *cobra.Command) {

func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) {

cmd.Flags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)")
cmd.Flags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint")
cmd.PersistentFlags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)")
cmd.PersistentFlags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint")

// IngestorAPIEndpoint
if standalone {
cmd.Flags().String("run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)")
viper.BindPFlag(config.IngestorRunID, cmd.Flags().Lookup("run_id")) //nolint: errcheck
cmd.MarkFlagRequired("run_id") //nolint: errcheck

cmd.Flags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)")
viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck
cmd.MarkFlagRequired("cluster") //nolint: errcheck

// Reusing the same flags for the dump cloud and ingest command
cmd.MarkFlagRequired("khaas-server") //nolint: errcheck
}
}

Expand Down
31 changes: 9 additions & 22 deletions pkg/dump/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dump
import (
"context"
"fmt"
"path"

"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
Expand All @@ -15,42 +14,30 @@ import (
)

type DumpIngestor struct {
directoryOutput string
ResultName string
collector collector.CollectorClient
writer writer.DumperWriter
collector collector.CollectorClient
writer writer.DumperWriter
}

const (
OfflineDumpDateFormat = "2006-01-02-15-04-05"
OfflineDumpPrefix = "kubehound_"
)

// ./<clusterName>/kubehound_<clusterName>_<run_id>
func DumpIngestorResultName(clusterName string, runID string) string {
return path.Join(clusterName, fmt.Sprintf("%s%s_%s", OfflineDumpPrefix, clusterName, runID))
}

// func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string) (*DumpIngestor, error) {
func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string, runID *config.RunID) (*DumpIngestor, error) {
// Generate path for the dump
clusterName, err := getClusterName(ctx, collector)
if err != nil {
return nil, err
}

resultName := DumpIngestorResultName(clusterName, runID.String())
dumpResult, err := NewDumpResult(clusterName, runID.String(), compression)
if err != nil {
return nil, fmt.Errorf("create dump result: %w", err)
}

dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, resultName)
dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, dumpResult.GetFullPath())
if err != nil {
return nil, fmt.Errorf("create collector writer: %w", err)
}

return &DumpIngestor{
directoryOutput: directoryOutput,
collector: collector,
writer: dumpWriter,
ResultName: resultName,
collector: collector,
writer: dumpWriter,
}, nil
}

Expand Down
16 changes: 5 additions & 11 deletions pkg/dump/ingestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ const (
)

func TestNewDumpIngestor(t *testing.T) {
t.Parallel()
ctx := context.Background()

t.Setenv("KUBECONFIG", "./testdata/kube-config")
clientset := fake.NewSimpleClientset()
collectorClient := collector.NewTestK8sAPICollector(ctx, clientset)

Expand All @@ -48,8 +48,6 @@ func TestNewDumpIngestor(t *testing.T) {
runID: config.NewRunID(),
},
want: &DumpIngestor{
directoryOutput: mockDirectoryOutput,

writer: &writer.FileWriter{},
},
wantErr: false,
Expand All @@ -63,16 +61,16 @@ func TestNewDumpIngestor(t *testing.T) {
runID: config.NewRunID(),
},
want: &DumpIngestor{
directoryOutput: mockDirectoryOutput,
writer: &writer.TarWriter{},
writer: &writer.TarWriter{},
},
wantErr: false,
},
}
for _, tt := range tests {
// Can not run parallel tests as the environment variable KUBECONFIG is set
// t.Setenv is not compatible with parallel tests
for _, tt := range tests { //nolint:paralleltest
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := NewDumpIngestor(ctx, tt.args.collectorClient, tt.args.compression, tt.args.directoryOutput, tt.args.runID)
if (err != nil) != tt.wantErr {
t.Errorf("NewDumpIngestorsss() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -83,10 +81,6 @@ func TestNewDumpIngestor(t *testing.T) {
if !assert.Equal(t, reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer)) {
t.Errorf("NewDumpIngestor() = %v, want %v", reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer))
}

if !assert.Equal(t, got.directoryOutput, tt.want.directoryOutput) {
t.Errorf("NewDumpIngestor() = %v, want %v", got.directoryOutput, tt.want.directoryOutput)
}
})
}
}
Expand Down
113 changes: 113 additions & 0 deletions pkg/dump/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package dump

import (
"fmt"
"path"
"regexp"
)

type DumpResult struct {
clusterName string
RunID string
isDir bool
extension string
}

const (
DumpResultClusterNameRegex = `([A-Za-z0-9\.\-_]+)`
DumpResultRunIDRegex = `([a-z0-9]{26})`
DumpResultExtensionRegex = `\.?([a-z0-9\.]+)?`
DumpResultPrefix = "kubehound_"
DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex
DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex

DumpResultTarWriterExtension = "tar.gz"
)

func NewDumpResult(clusterName, runID string, isCompressed bool) (*DumpResult, error) {
dumpResult := &DumpResult{
clusterName: clusterName,
RunID: runID,
isDir: true,
}
if isCompressed {
dumpResult.Compressed()
}

err := dumpResult.Validate()
if err != nil {
return nil, err
}

return dumpResult, nil
}

func (i *DumpResult) Validate() error {
re := regexp.MustCompile(DumpResultClusterNameRegex)
if !re.MatchString(i.clusterName) {
return fmt.Errorf("Invalid clustername: %q", i.clusterName)
}

matches := re.FindStringSubmatch(i.clusterName)
if len(matches) == 2 && matches[1] != i.clusterName {
return fmt.Errorf("Invalid clustername: %q", i.clusterName)
}

re = regexp.MustCompile(DumpResultRunIDRegex)
if !re.MatchString(i.RunID) {
return fmt.Errorf("Invalid runID: %q", i.RunID)
}

return nil
}

func (i *DumpResult) Compressed() {
i.isDir = false
i.extension = DumpResultTarWriterExtension
}

// ./<clusterName>/kubehound_<clusterName>_<run_id>
func (i *DumpResult) GetFullPath() string {
filename := i.GetFilename()

return path.Join(i.clusterName, filename)
}

func (i *DumpResult) GetFilename() string {
filename := fmt.Sprintf("%s%s_%s", DumpResultPrefix, i.clusterName, i.RunID)
if i.isDir {
return filename
}

return fmt.Sprintf("%s.%s", filename, i.extension)
}

func ParsePath(path string) (*DumpResult, error) {
// ./<clusterName>/kubehound_<clusterName>_<run_id>[.tar.gz]
// re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`)
re := regexp.MustCompile(DumpResultPathRegex)
if !re.MatchString(path) {
return nil, fmt.Errorf("Invalid path provided: %q", path)
}

matches := re.FindStringSubmatch(path)
// The cluster name should match (parent dir and in the filename)
if matches[1] != matches[2] {
return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path)
}

clusterName := matches[1]
runID := matches[3]
extension := matches[4]

isCompressed := false
if extension != "" {
isCompressed = true
}
result, err := NewDumpResult(clusterName, runID, isCompressed)
if err != nil {
return nil, err
}

return result, nil
}
Loading

0 comments on commit 3e5e8f5

Please sign in to comment.