Skip to content

Commit

Permalink
Schema hint
Browse files Browse the repository at this point in the history
  • Loading branch information
m-mizutani committed Nov 19, 2024
1 parent f690a58 commit d61d88a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 28 deletions.
24 changes: 15 additions & 9 deletions destination/gcs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,22 @@ func (c *Client) Prefix() string { return c.prefix }
func (c *Client) Gzip() bool { return c.gzip }

type ObjNameArgs struct {
Prefix string
Timestamp time.Time
Seq int
Ext string
Prefix string
Timestamp time.Time
Seq int
Ext string
SchemaHint string
}

type ObjNameFunc func(args ObjNameArgs) string

func DefaultObjectName(args ObjNameArgs) string {
timeKey := args.Timestamp.Format("2006/01/02/15/20060102T150405")
return fmt.Sprintf("%s%s_%04d.%s", args.Prefix, timeKey, args.Seq, args.Ext)
schema := args.SchemaHint
if schema != "" {
schema += "/"
}
return fmt.Sprintf("%s%s%s_%04d.%s", args.Prefix, schema, timeKey, args.Seq, args.Ext)
}

type gzipWriter struct {
Expand Down Expand Up @@ -77,10 +82,11 @@ func New(bucket string, options ...Option) hatchery.Destination {
}

args := ObjNameArgs{
Prefix: c.prefix,
Timestamp: md.Timestamp(),
Seq: md.Seq(),
Ext: md.Format().Ext(),
Prefix: c.prefix,
Timestamp: md.Timestamp(),
Seq: md.Seq(),
Ext: md.Format().Ext(),
SchemaHint: md.SchemaHint(),
}
if c.gzip {
args.Ext += ".gz"
Expand Down
14 changes: 11 additions & 3 deletions pkg/metadata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type MetaData struct {
timestamp *time.Time
seq int
format types.DataFormat
timestamp *time.Time
seq int
format types.DataFormat
schemaHint string
}

func (m MetaData) Timestamp() time.Time {
Expand All @@ -21,6 +22,7 @@ func (m MetaData) Timestamp() time.Time {

func (m MetaData) Seq() int { return m.seq }
func (m MetaData) Format() types.DataFormat { return m.format }
func (m MetaData) SchemaHint() string { return m.schemaHint }

func New(options ...Option) MetaData {
var md MetaData
Expand Down Expand Up @@ -51,3 +53,9 @@ func WithFormat(f types.DataFormat) Option {
md.format = f
}
}

func WithSchemaHint(hint string) Option {
return func(md *MetaData) {
md.schemaHint = hint
}
}
43 changes: 27 additions & 16 deletions source/falcon_data_replicator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -50,18 +51,11 @@ type client struct {
NewSQS func(cfg aws.Config, optFns ...func(*sqs.Options)) interfaces.SQS
NewS3 func(cfg aws.Config, optFns ...func(*s3.Options)) interfaces.S3

MaxMessages int32
MaxPull int
MaxPull int
}

type Option func(*client)

func WithMaxMessages(n int32) Option {
return func(x *client) {
x.MaxMessages = n
}
}

func WithMaxPull(n int) Option {
return func(x *client) {
x.MaxPull = n
Expand Down Expand Up @@ -89,8 +83,7 @@ func New(awsRegion, awsAccessKeyId string, awsSecretAccessKey secret.String, sqs
return s3.NewFromConfig(cfg, optFns...)
},

MaxMessages: 10,
MaxPull: 0,
MaxPull: 0,
}

for _, opt := range opts {
Expand All @@ -107,7 +100,9 @@ func New(awsRegion, awsAccessKeyId string, awsSecretAccessKey secret.String, sqs
)
}
return func(ctx context.Context, p *hatchery.Pipe) error {
logging.FromCtx(ctx).Info("Run Falcon Data Replicator source", "config", x)
logger := logging.FromCtx(ctx).With("source", "falcon_data_replicator")
logger.Info("New source (Falcon Data Replicator)", "config", x)
ctx = logging.InjectCtx(ctx, logger)

cfg, err := config.LoadDefaultConfig(ctx, awsOpts...)
if err != nil {
Expand All @@ -122,9 +117,6 @@ func New(awsRegion, awsAccessKeyId string, awsSecretAccessKey secret.String, sqs
input := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(x.AWS.SqsURL),
}
if x.MaxMessages > 0 {
input.MaxNumberOfMessages = x.MaxMessages
}

for i := 0; x.MaxPull == 0 || i < x.MaxPull; i++ {
c := &fdrClients{sqs: sqsClient, s3: s3Client}
Expand All @@ -150,6 +142,7 @@ var (
)

func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInput, p *hatchery.Pipe) error {
logger := logging.FromCtx(ctx)
result, err := clients.sqs.ReceiveMessage(ctx, input)
if err != nil {
return goerr.Wrap(err, "failed to receive messages from SQS").With("input", input)
Expand All @@ -161,7 +154,7 @@ func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInp
// Iterate over received messages
for _, message := range result.Messages {
if message.Body == nil {
logging.FromCtx(ctx).Warn("Received message with no body", "message", message)
logger.Warn("Received message with no body", "message", message)
continue
}

Expand All @@ -171,10 +164,11 @@ func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInp
return goerr.Wrap(err, "failed to unmarshal message").With("message", *message.Body)
}

logging.FromCtx(ctx).Info("Received message", "msg", msg, "body", *message.Body)
logger.Debug("Received SQS message", "msg", msg)

for _, file := range msg.Files {
// Download the object from S3
logger.Info("downloading object from S3", "bucket", msg.Bucket, "path", file.Path)
s3Input := &s3.GetObjectInput{
Bucket: aws.String(msg.Bucket),
Key: aws.String(file.Path),
Expand All @@ -185,8 +179,25 @@ func copy(ctx context.Context, clients *fdrClients, input *sqs.ReceiveMessageInp
}
defer safe.CloseReader(ctx, s3Obj.Body)

// Parse key of the object
parts := strings.Split(file.Path, "/")
var schemaHint string
if len(parts) > 1 {
switch parts[1] {
case "data":
schemaHint = "data"
case "fdrv2":
schemaHint = "fdrv2_" + parts[2]
}
}
if schemaHint == "" {
logger.Warn("failed to parse schema hint", "path", file.Path)
schemaHint = "unknown"
}

md := metadata.New(
metadata.WithTimestamp(time.Unix(msg.Timestamp/1000, 0)),
metadata.WithSchemaHint(schemaHint),
)

r, err := gzip.NewReader(s3Obj.Body)
Expand Down

0 comments on commit d61d88a

Please sign in to comment.