Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

This project needs more error logging #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 46 additions & 45 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ type loadConfig struct {
}

type config struct {
SrcTable string `json:"src_table"`
DstTable string `json:"dst_table"`
SrcRegion string `json:"src_region"`
DstRegion string `json:"dst_region"`
SrcEndpoint string `json:"src_endpoint"`
DstEndpoint string `json:"dst_endpoint"`
SrcEnv string `json:"src_env"`
DstEnv string `json:"dst_env"`
CopyLoad loadConfig `json:"copy_load"`
SrcTable string `json:"src_table"`
DstTable string `json:"dst_table"`
SrcRegion string `json:"src_region"`
DstRegion string `json:"dst_region"`
SrcEndpoint string `json:"src_endpoint"`
DstEndpoint string `json:"dst_endpoint"`
SrcEnv string `json:"src_env"`
DstEnv string `json:"dst_env"`
CopyLoad loadConfig `json:"copy_load"`
}

// Config file is read and dumped into this struct
Expand Down Expand Up @@ -122,42 +122,42 @@ func newStreamListener() *streamListener {
func newDynamodb(region string,
endpoint string,
environment string) (*dynamodb.DynamoDB, *dynamodbstreams.DynamoDBStreams) {
tr := &http.Transport{
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
}
tr := &http.Transport{
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
}

httpClient := &http.Client{
Timeout: 8*time.Second,
Transport: tr,
}
httpClient := &http.Client{
Timeout: 8 * time.Second,
Transport: tr,
}

sess := session.Must(
session.NewSession(
aws.NewConfig().
WithRegion(region).
WithEndpoint(endpoint).
WithMaxRetries(maxRetries).
WithHTTPClient(httpClient),
))

// No need to assume role if Pendulum is not run remotely
if environment == "local" {
return dynamodb.New(sess, &aws.Config{}), dynamodbstreams.New(sess, &aws.Config{})
}
sess := session.Must(
session.NewSession(
aws.NewConfig().
WithRegion(region).
WithEndpoint(endpoint).
WithMaxRetries(maxRetries).
WithHTTPClient(httpClient),
))

// No need to assume role if Pendulum is not run remotely
if environment == "local" {
return dynamodb.New(sess, &aws.Config{}), dynamodbstreams.New(sess, &aws.Config{})
}

roleArn := getRoleArn(environment)
if roleArn == "" {
logger.WithFields(logging.Fields{}).Error("Failed to get role ARN. Check config")
return nil, nil
}
creds := stscreds.NewCredentials(sess, roleArn)
logger.WithFields(logging.Fields{"Creds":creds}).Debug()
roleArn := getRoleArn(environment)
if roleArn == "" {
logger.WithFields(logging.Fields{}).Error("Failed to get role ARN. Check config")
return nil, nil
}
creds := stscreds.NewCredentials(sess, roleArn)
logger.WithFields(logging.Fields{"Creds": creds}).Debug()

dynamo := dynamodb.New(sess, &aws.Config{Credentials:creds})
stream := dynamodbstreams.New(sess, &aws.Config{Credentials:creds})
dynamo := dynamodb.New(sess, &aws.Config{Credentials: creds})
stream := dynamodbstreams.New(sess, &aws.Config{Credentials: creds})

return dynamo, stream
return dynamo, stream
}

// syncState Constructor
Expand All @@ -170,7 +170,7 @@ func newSyncState(tableConfig config) *syncState {
tableConfig.SrcRegion, tableConfig.SrcEndpoint,
tableConfig.SrcEnv)
if srcStreamListener.dynamo == nil || srcStreamListener.stream == nil {
logger.WithFields(logging.Fields{"Table":tableConfig.SrcTable}).Error("Failed to get dynamo client")
logger.WithFields(logging.Fields{"Table": tableConfig.SrcTable}).Error("Failed to get dynamo client")
return nil
}
srcStreamListener.streamArn, err = getStreamArn(tableConfig.SrcTable, srcStreamListener.dynamo)
Expand Down Expand Up @@ -200,8 +200,8 @@ func newSyncState(tableConfig config) *syncState {
}

type appConfig struct {
sync []config
verbose bool
sync []config
verbose bool
}

// The primary key of the Checkpoint ddb table, of the srcStream etc
Expand Down Expand Up @@ -263,10 +263,12 @@ func readConfigFile(configFile string, logger logging.Logger) ([]config, error)
}).Debug("Reading config file")
data, err := ioutil.ReadFile(configFile)
if err != nil {
logger.Error(err)
return listStreamConfig, err
}
err = json.Unmarshal(data, &listStreamConfig)
if err != nil {
logger.Error(err)
return listStreamConfig, errors.New("failed to unmarshal config")
}

Expand Down Expand Up @@ -309,7 +311,6 @@ func setDefaults(tableConfig []config) ([]config, error) {
tableConfig[i].CopyLoad.WriteQPS = 500
}


if tableConfig[i].CopyLoad.ReadWorkers == 0 {
tableConfig[i].CopyLoad.ReadWorkers = 5
}
Expand Down Expand Up @@ -344,7 +345,7 @@ func backoff(i int, s string) {
wait = maxExponentialBackoffTime
}
logger.WithFields(logging.Fields{
"Backoff Caller": s,
"Backoff Caller": s,
"Backoff Time(seconds)": wait,
}).Info("Backing off")
time.Sleep(time.Duration(wait) * time.Second)
Expand Down