Skip to content

Commit

Permalink
Add support for multiple S3 storage classes in metrics (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
tropnikovvl authored Feb 9, 2025
1 parent d61700b commit 4e1f715
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 139 deletions.
117 changes: 78 additions & 39 deletions controllers/s3talker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ import (
log "github.com/sirupsen/logrus"
)

// StorageClassMetrics
type StorageClassMetrics struct {
Size float64 `json:"size"`
ObjectNumber float64 `json:"objectNumber"`
}

// Bucket - information per bucket
type Bucket struct {
BucketName string `json:"bucketName"`
BucketSize float64 `json:"bucketSize"`
BucketObjectNumber float64 `json:"bucketObjectNumber"`
ListDuration time.Duration `json:"listDuration"`
BucketName string `json:"bucketName"`
StorageClasses map[string]StorageClassMetrics `json:"storageClasses"`
ListDuration time.Duration `json:"listDuration"`
}

// Buckets - list of Bucket objects
type Buckets []Bucket

// S3Summary - one JSON struct to rule them all
type S3Summary struct {
S3Status bool `json:"s3Status"`
S3Size float64 `json:"s3Size"`
S3ObjectNumber float64 `json:"s3ObjectNumber"`
S3Buckets Buckets `json:"s3Buckets"`
TotalListDuration time.Duration `json:"totalListDuration"`
S3Status bool `json:"s3Status"`
StorageClasses map[string]StorageClassMetrics `json:"storageClasses"`
S3Buckets Buckets `json:"s3Buckets"`
TotalListDuration time.Duration `json:"totalListDuration"`
}

// S3Conn struct - keeps information about remote S3
Expand Down Expand Up @@ -85,6 +89,24 @@ func getS3Client(cfg aws.Config, s3Conn S3Conn) S3ClientInterface {
return s3.NewFromConfig(cfg, options)
}

// distinct - removes duplicates from a slice of strings
func distinct(input []string) []string {
seen := make(map[string]struct{})
result := []string{}

for _, val := range input {
val = strings.TrimSpace(val)
if val != "" {
if _, exists := seen[val]; !exists {
seen[val] = struct{}{}
result = append(result, val)
}
}
}

return result
}

// S3UsageInfo - gets S3 connection details and returns S3Summary
func S3UsageInfo(s3Conn S3Conn, s3BucketNames string) (S3Summary, error) {
summary := S3Summary{S3Status: false}
Expand All @@ -106,7 +128,7 @@ func fetchBucketData(s3BucketNames string, s3Client S3ClientInterface, s3Region

if s3BucketNames != "" {
// If specific buckets are provided, use them
bucketNames = strings.Split(s3BucketNames, ",")
bucketNames = distinct(strings.Split(s3BucketNames, ","))
} else {
// Otherwise, fetch all buckets
result, err := s3Client.ListBuckets(context.TODO(), &s3.ListBucketsInput{BucketRegion: aws.String(s3Region)})
Expand All @@ -122,10 +144,30 @@ func fetchBucketData(s3BucketNames string, s3Client S3ClientInterface, s3Region

log.Debugf("List of buckets in %s region: %v", s3Region, bucketNames)

resultsChan := make(chan Bucket, len(bucketNames))
errorChan := make(chan error, len(bucketNames))

var wg sync.WaitGroup
var summaryMutex sync.Mutex

summaryMutex.Lock()
summary.StorageClasses = make(map[string]StorageClassMetrics)
summary.S3Buckets = make(Buckets, 0, len(bucketNames))
summaryMutex.Unlock()

processBucketResult := func(bucket Bucket) {
summaryMutex.Lock()
defer summaryMutex.Unlock()

summary.S3Buckets = append(summary.S3Buckets, bucket)
for storageClass, metrics := range bucket.StorageClasses {
summaryMetrics := summary.StorageClasses[storageClass]
summaryMetrics.Size += metrics.Size
summaryMetrics.ObjectNumber += metrics.ObjectNumber
summary.StorageClasses[storageClass] = summaryMetrics
}
log.Debugf("Bucket size and objects count: %v", bucket)
}

var errs []error
var errMutex sync.Mutex

for _, bucketName := range bucketNames {
bucketName := strings.TrimSpace(bucketName)
Expand All @@ -137,41 +179,31 @@ func fetchBucketData(s3BucketNames string, s3Client S3ClientInterface, s3Region
go func(bucketName string) {
defer wg.Done()

size, count, duration, err := calculateBucketMetrics(bucketName, s3Client)
storageClasses, duration, err := calculateBucketMetrics(bucketName, s3Client)
if err != nil {
errorChan <- err
errMutex.Lock()
errs = append(errs, err)
errMutex.Unlock()
return
}

resultsChan <- Bucket{
BucketName: bucketName,
BucketSize: size,
BucketObjectNumber: count,
ListDuration: duration,
bucket := Bucket{
BucketName: bucketName,
StorageClasses: storageClasses,
ListDuration: duration,
}

processBucketResult(bucket)
log.Debugf("Finish bucket %s processing", bucketName)
}(bucketName)
}

wg.Wait()
close(resultsChan)
close(errorChan)

var errs []error
for err := range errorChan {
errs = append(errs, err)
}
if len(errs) > 0 {
log.Errorf("Encountered errors while processing buckets: %v", errs)
}

for bucket := range resultsChan {
summary.S3Buckets = append(summary.S3Buckets, bucket)
summary.S3Size += bucket.BucketSize
summary.S3ObjectNumber += bucket.BucketObjectNumber
log.Debugf("Bucket size and objects count: %v", bucket)
}

if len(summary.S3Buckets) > 0 {
summary.S3Status = true
}
Expand All @@ -181,9 +213,9 @@ func fetchBucketData(s3BucketNames string, s3Client S3ClientInterface, s3Region
}

// calculateBucketMetrics - computes the total size and object count for a bucket
func calculateBucketMetrics(bucketName string, s3Client S3ClientInterface) (float64, float64, time.Duration, error) {
var totalSize, objectCount float64
func calculateBucketMetrics(bucketName string, s3Client S3ClientInterface) (map[string]StorageClassMetrics, time.Duration, error) {
var continuationToken *string
storageClasses := make(map[string]StorageClassMetrics)

start := time.Now()

Expand All @@ -194,12 +226,19 @@ func calculateBucketMetrics(bucketName string, s3Client S3ClientInterface) (floa
})
if err != nil {
log.Errorf("Failed to list objects for bucket %s: %v", bucketName, err)
return 0, 0, 0, err
return nil, 0, err
}

for _, obj := range page.Contents {
totalSize += float64(*obj.Size)
objectCount++
storageClass := string(obj.StorageClass)
if storageClass == "" {
storageClass = "STANDARD"
}

metrics := storageClasses[storageClass]
metrics.Size += float64(*obj.Size)
metrics.ObjectNumber++
storageClasses[storageClass] = metrics
}

if page.IsTruncated != nil && !*page.IsTruncated {
Expand All @@ -209,5 +248,5 @@ func calculateBucketMetrics(bucketName string, s3Client S3ClientInterface) (floa
}

duration := time.Since(start)
return totalSize, objectCount, duration, nil
return storageClasses, duration, nil
}
34 changes: 18 additions & 16 deletions controllers/s3talker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestS3UsageInfo_SingleBucket(t *testing.T) {

mockClient.On("ListObjectsV2", mock.Anything, mock.Anything, mock.Anything).Return(&s3.ListObjectsV2Output{
Contents: []types.Object{
{Size: aws.Int64(1024)},
{Size: aws.Int64(2048)},
{Size: aws.Int64(1024), StorageClass: "STANDARD"},
{Size: aws.Int64(2048), StorageClass: "STANDARD"},
},
IsTruncated: aws.Bool(false),
}, nil)
Expand All @@ -50,8 +50,8 @@ func TestS3UsageInfo_SingleBucket(t *testing.T) {

assert.NoError(t, err)
assert.True(t, summary.S3Status)
assert.Equal(t, float64(3072), summary.S3Size)
assert.Equal(t, float64(2), summary.S3ObjectNumber)
assert.Equal(t, float64(3072), summary.StorageClasses["STANDARD"].Size)
assert.Equal(t, float64(2), summary.StorageClasses["STANDARD"].ObjectNumber)
assert.Len(t, summary.S3Buckets, 1)
}

Expand All @@ -78,8 +78,8 @@ func TestS3UsageInfo_MultipleBuckets(t *testing.T) {

assert.NoError(t, err)
assert.True(t, summary.S3Status)
assert.Equal(t, float64(6144), summary.S3Size)
assert.Equal(t, float64(4), summary.S3ObjectNumber)
assert.Equal(t, float64(6144), summary.StorageClasses["STANDARD"].Size)
assert.Equal(t, float64(4), summary.StorageClasses["STANDARD"].ObjectNumber)
assert.Len(t, summary.S3Buckets, 2)
}

Expand Down Expand Up @@ -114,8 +114,8 @@ func TestS3UsageInfo_EmptyBucketList(t *testing.T) {

assert.NoError(t, err)
assert.True(t, summary.S3Status)
assert.Equal(t, float64(9216), summary.S3Size)
assert.Equal(t, float64(6), summary.S3ObjectNumber)
assert.Equal(t, float64(9216), summary.StorageClasses["STANDARD"].Size)
assert.Equal(t, float64(6), summary.StorageClasses["STANDARD"].ObjectNumber)
assert.Len(t, summary.S3Buckets, 3)
}

Expand All @@ -124,18 +124,20 @@ func TestCalculateBucketMetrics(t *testing.T) {

mockClient.On("ListObjectsV2", mock.Anything, mock.Anything, mock.Anything).Return(&s3.ListObjectsV2Output{
Contents: []types.Object{
{Size: aws.Int64(1024)},
{Size: aws.Int64(2048)},
{Size: aws.Int64(4096)},
{Size: aws.Int64(1024), StorageClass: "STANDARD"},
{Size: aws.Int64(2048), StorageClass: "STANDARD"},
{Size: aws.Int64(4096), StorageClass: "GLACIER"},
},
IsTruncated: aws.Bool(false),
}, nil)

size, count, duration, err := calculateBucketMetrics("bucket1", mockClient)
storageClasses, duration, err := calculateBucketMetrics("bucket1", mockClient)

assert.NoError(t, err)
assert.Equal(t, float64(7168), size)
assert.Equal(t, float64(3), count)
assert.Equal(t, float64(3072), storageClasses["STANDARD"].Size)
assert.Equal(t, float64(2), storageClasses["STANDARD"].ObjectNumber)
assert.Equal(t, float64(4096), storageClasses["GLACIER"].Size)
assert.Equal(t, float64(1), storageClasses["GLACIER"].ObjectNumber)
assert.Greater(t, duration, time.Duration(0))
}

Expand Down Expand Up @@ -167,7 +169,7 @@ func TestS3UsageInfo_WithIAMRole(t *testing.T) {

assert.NoError(t, err)
assert.True(t, summary.S3Status)
assert.Equal(t, float64(100), summary.S3Size)
assert.Equal(t, float64(1), summary.S3ObjectNumber)
assert.Equal(t, float64(100), summary.StorageClasses["STANDARD"].Size)
assert.Equal(t, float64(1), summary.StorageClasses["STANDARD"].ObjectNumber)
assert.Len(t, summary.S3Buckets, 1)
}
35 changes: 23 additions & 12 deletions e2e/tests/test_s3_bucket_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,30 @@ def parse_metrics(self, metrics_text):
try:
if 's3_bucket_object_number' in line:
bucket = line.split('bucketName="')[1].split('"')[0]
storage_class = line.split('storageClass="')[1].split('"')[0]
count = float(line.split()[-1])
parsed_metrics.setdefault(bucket, {})["object_count"] = count
parsed_metrics.setdefault(bucket, {}).setdefault("storage_classes", {}).setdefault(storage_class, {})["object_count"] = count

elif 's3_bucket_size' in line:
bucket = line.split('bucketName="')[1].split('"')[0]
storage_class = line.split('storageClass="')[1].split('"')[0]
size = float(line.split()[-1])
parsed_metrics.setdefault(bucket, {})["total_size"] = size
parsed_metrics.setdefault(bucket, {}).setdefault("storage_classes", {}).setdefault(storage_class, {})["total_size"] = size

elif 's3_endpoint_up' in line:
endpoint_up = float(line.split()[-1])
parsed_metrics["endpoint_up"] = endpoint_up

elif 's3_total_object_number' in line:
storage_class = line.split('storageClass="')[1].split('"')[0]
total_objects = float(line.split()[-1])
parsed_metrics["total_object_number"] = total_objects
parsed_metrics.setdefault("total", {}).setdefault("storage_classes", {}).setdefault(storage_class, {})["object_count"] = total_objects

elif 's3_total_size' in line:
storage_class = line.split('storageClass="')[1].split('"')[0]
total_size = float(line.split()[-1])
parsed_metrics["total_size"] = total_size
parsed_metrics.setdefault("total", {}).setdefault("storage_classes", {}).setdefault(storage_class, {})["total_size"] = total_size

except (IndexError, ValueError) as e:
logger.warning(f"Error parsing metrics line: {line}. Error: {e}")
return parsed_metrics
Expand All @@ -113,15 +118,18 @@ def verify_bucket_metrics(self, bucket, metadata, bucket_metrics):
if bucket not in bucket_metrics:
raise AssertionError(f"Metrics for bucket '{bucket}' are missing")

storage_class = "STANDARD" # Assuming STANDARD storage class for test
metrics = bucket_metrics[bucket]["storage_classes"][storage_class]

# Verify object count
actual_count = bucket_metrics[bucket].get("object_count")
actual_count = metrics["object_count"]
expected_count = len(metadata["files"])
assert actual_count == expected_count, (
f"Bucket '{bucket}' object count mismatch. Expected: {expected_count}, Got: {actual_count}"
)

# Verify total size
actual_size = bucket_metrics[bucket].get("total_size")
actual_size = metrics["total_size"]
expected_size = metadata["total_size"]
assert abs(actual_size - expected_size) < 10, (
f"Bucket '{bucket}' size mismatch. Expected: {expected_size}, Got: {actual_size}"
Expand All @@ -131,24 +139,27 @@ def verify_bucket_metrics(self, bucket, metadata, bucket_metrics):

def verify_global_metrics(self, bucket_metadata, parsed_metrics):
"""Verify global metrics (total object count, total size, and endpoint status)."""
storage_class = "STANDARD" # Assuming STANDARD storage class for test
total_metrics = parsed_metrics["total"]["storage_classes"][storage_class]

total_objects_expected = sum(len(metadata["files"]) for metadata in bucket_metadata.values())
total_size_expected = sum(metadata["total_size"] for metadata in bucket_metadata.values())

# Verify total object count
assert parsed_metrics.get("total_object_number") == total_objects_expected, (
assert total_metrics["object_count"] == total_objects_expected, (
f"Total object count mismatch. Expected: {total_objects_expected}, "
f"Got: {parsed_metrics.get('total_object_number')}"
f"Got: {total_metrics['object_count']}"
)

# Verify total size
assert parsed_metrics.get("total_size") == total_size_expected, (
assert total_metrics["total_size"] == total_size_expected, (
f"Total size mismatch. Expected: {total_size_expected}, "
f"Got: {parsed_metrics.get('total_size')}"
f"Got: {total_metrics['total_size']}"
)

# Verify endpoint status
assert parsed_metrics.get("endpoint_up") == 1, (
f"Endpoint status mismatch. Expected: 1, Got: {parsed_metrics.get('endpoint_up')}"
assert parsed_metrics["endpoint_up"] == 1, (
f"Endpoint status mismatch. Expected: 1, Got: {parsed_metrics['endpoint_up']}"
)

logger.info("Global metrics verified successfully")
Expand Down
Loading

0 comments on commit 4e1f715

Please sign in to comment.