Skip to content

Commit

Permalink
bunch update
Browse files Browse the repository at this point in the history
  • Loading branch information
m-mizutani authored Sep 15, 2024
1 parent bacff54 commit f5f5996
Show file tree
Hide file tree
Showing 19 changed files with 1,011 additions and 39 deletions.
143 changes: 143 additions & 0 deletions destination/s3/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package s3

import (
"context"
"fmt"
"io"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"

"github.com/m-mizutani/goerr"
"github.com/secmon-as-code/hatchery"
"github.com/secmon-as-code/hatchery/pkg/logging"
"github.com/secmon-as-code/hatchery/pkg/metadata"
)

type client struct {
region string
bucket string
prefix string
cred aws.CredentialsProvider
objNameFunc ObjNameFunc
}

type Option func(*client)

func WithPrefix(prefix string) Option {
return func(c *client) {
c.prefix = prefix
}
}

type ObjNameArgs struct {
Prefix string
Timestamp time.Time
Seq int
Ext string
}

type ObjNameFunc func(args ObjNameArgs) string

func DefaultObjectName(args ObjNameArgs) string {
timeKey := args.Timestamp.Format("2006/01/02/15/20060102T150405")
return fmt.Sprintf("%s%s_%04d.%s", args.Prefix, timeKey, args.Seq, args.Ext)
}

type pipeWrier struct {
w io.WriteCloser
errCh chan error
}

func (x *pipeWrier) Write(p []byte) (n int, err error) {
return x.w.Write(p)
}

func (x *pipeWrier) Close() error {
if err := x.w.Close(); err != nil {
return goerr.Wrap(err, "failed to close write buffer")
}

if err := <-x.errCh; err != nil {
return goerr.Wrap(err, "failed to write buffer")
}

return nil
}

func New(region, bucket string, options ...Option) hatchery.Destination {
client := &client{
bucket: bucket,
region: region,
objNameFunc: DefaultObjectName,
}

for _, opt := range options {
opt(client)
}

awsOpts := []func(*config.LoadOptions) error{
config.WithRegion(client.region),
}

if client.cred != nil {
awsOpts = append(awsOpts,
config.WithCredentialsProvider(client.cred),
)
}

return func(ctx context.Context, md metadata.MetaData) (io.WriteCloser, error) {
cfg, err := config.LoadDefaultConfig(ctx, awsOpts...)
if err != nil {
return nil, goerr.Wrap(err, "failed to create AWS session")
}

// Create AWS service clients
s3Client := s3.NewFromConfig(cfg)

args := ObjNameArgs{
Prefix: client.prefix,
Timestamp: md.Timestamp(),
Seq: md.Seq(),
Ext: md.Format().Ext(),
}
objName := client.objNameFunc(args)

errCh := make(chan error, 1)
r, w := io.Pipe()
pipe := &pipeWrier{
w: w,
errCh: errCh,
}

go func() {
defer close(errCh)

uploader := manager.NewUploader(s3Client, func(u *manager.Uploader) {
u.PartSize = 10 * 1024 * 1024
})

input := &s3.PutObjectInput{
Bucket: aws.String(client.bucket),
Key: aws.String(objName),
Body: r,
}

logging.FromCtx(ctx).Info("Start to put object", "bucket", client.bucket, "key", objName)
if _, err := uploader.Upload(ctx, input); err != nil {
errCh <- goerr.Wrap(err, "failed to put object")
return
}

if err := r.Close(); err != nil {
errCh <- goerr.Wrap(err, "failed to close read buffer")
return
}
}()

return pipe, nil
}
}
53 changes: 53 additions & 0 deletions destination/s3/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package s3_test

import (
"context"
"io"
"os"
"testing"
"time"

"github.com/m-mizutani/gt"
"github.com/secmon-as-code/hatchery/destination/s3"
"github.com/secmon-as-code/hatchery/pkg/metadata"

"github.com/aws/aws-sdk-go-v2/config"
aws_s3 "github.com/aws/aws-sdk-go-v2/service/s3"
)

func TestIntegration(t *testing.T) {
bucketName, ok := os.LookupEnv("TEST_S3_BUCKET_NAME")
if !ok {
t.Skip("TEST_S3_BUCKET_NAME is not set")
}
dst := s3.New("ap-northeast-1", bucketName)

ctx := context.Background()
ts := time.Now()
md := metadata.New(
metadata.WithTimestamp(ts),
)
w, err := dst(ctx, md)
gt.NoError(t, err)

gt.R1(w.Write([]byte("Hello, world"))).NoError(t)
gt.NoError(t, w.Close()).Must()

awsOpts := []func(*config.LoadOptions) error{
config.WithRegion("ap-northeast-1"),
}

cfg := gt.R1(config.LoadDefaultConfig(ctx, awsOpts...)).NoError(t)

client := aws_s3.NewFromConfig(cfg)

expectedKey := ts.Format("2006/01/02/15/20060102T150405_0000.log")
out, err := client.GetObject(ctx, &aws_s3.GetObjectInput{
Bucket: &bucketName,
Key: &expectedKey,
})
gt.NoError(t, err).Must()

buf := gt.R1(io.ReadAll(out.Body)).NoError(t)
gt.Equal(t, string(buf), "Hello, world")
}
5 changes: 3 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hatchery
import "errors"

var (
ErrStreamConflicted = errors.New("pipeline id conflicted")
ErrStreamNotFound = errors.New("pipeline not found")
ErrStreamConflicted = errors.New("stream id conflicted")
ErrStreamNotFound = errors.New("stream not found")
ErrInvalidStream = errors.New("invalid stream")
)
51 changes: 51 additions & 0 deletions example/cli/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module github.com/secmon-as-code/hatchery/example/cli

Check notice

Code scanning / Trivy

Private tokens could appear in logs if context containing gRPC metadata is logged in github.com/grpc/grpc-go Low

Package: google.golang.org/grpc\nInstalled Version: 1.64.0\nVulnerability GHSA-xr7q-jx4m-x55m\nSeverity: LOW\nFixed Version: 1.64.1\nLink: GHSA-xr7q-jx4m-x55m

go 1.23.0

replace github.com/secmon-as-code/hatchery => ../..

require github.com/secmon-as-code/hatchery v0.0.0-00010101000000-000000000000

require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.6.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
cloud.google.com/go/storage v1.43.0 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/m-mizutani/goerr v0.1.14 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/urfave/cli/v2 v2.27.4 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/api v0.187.0 // indirect
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
Loading

0 comments on commit f5f5996

Please sign in to comment.