Skip to content

Commit

Permalink
Add support for Spanner as a data repository
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinbarbour committed Mar 12, 2024
1 parent 8722d44 commit c622974
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 6 deletions.
7 changes: 5 additions & 2 deletions assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package achgateway

import "embed"

//go:embed migrations/*.sql
var MigrationFS embed.FS
//go:embed migrations/*mysql*.sql
var MySqlMigrationFS embed.FS

//go:embed migrations/*spanner*.sql
var SpannerMigrationFS embed.FS

//go:embed configs/config.default.yml
var ConfigFS embed.FS
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module github.com/moov-io/achgateway
go 1.22.0

require (
cloud.google.com/go/spanner v1.55.0
github.com/PagerDuty/go-pagerduty v1.7.0
github.com/ProtonMail/go-crypto v1.0.0
github.com/Shopify/sarama v1.38.1
Expand Down Expand Up @@ -45,7 +46,6 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.5 // indirect
cloud.google.com/go/longrunning v0.5.4 // indirect
cloud.google.com/go/spanner v1.55.0 // indirect
cloud.google.com/go/storage v1.35.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect
Expand Down
64 changes: 61 additions & 3 deletions internal/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/moov-io/base/stime"
"github.com/moov-io/base/telemetry"

"cloud.google.com/go/spanner"
"github.com/gorilla/mux"
"gocloud.dev/pubsub"
)
Expand All @@ -52,6 +53,7 @@ type Environment struct {
Config *service.Config
TimeService stime.TimeService
DB *sql.DB
SpannerClient *spanner.Client
InternalClient *http.Client
Events events.Emitter
Telemetry telemetry.Config
Expand Down Expand Up @@ -125,6 +127,25 @@ func NewEnvironment(env *Environment) (*Environment, error) {
}
}

// spanner setup
if env.DB == nil && env.SpannerClient == nil && env.Config.Database.Spanner != nil {
env.Logger.Info().Log("connecting to spanner database")
client, close, err := initializeSpannerDatabase(env.Logger, env.Config.Database)
if err != nil {
close()
return env, fmt.Errorf("setting up spanner database failed: %w", err)
}
env.SpannerClient = client

// Add DB closing to the Shutdown call for the Environment
prev := env.Shutdown
env.Shutdown = func() {
prev()
cancelFunc()
close()
}
}

if env.InternalClient == nil {
env.InternalClient = service.NewInternalClient(env.Logger, env.Config.Clients, "internal-client")
}
Expand Down Expand Up @@ -161,8 +182,18 @@ func NewEnvironment(env *Environment) (*Environment, error) {
return env, fmt.Errorf("unable to create http files subscription: %v", err)
}

fileRepository := files.NewRepository(env.DB)
shardRepository := shards.NewRepository(env.DB, env.Config.Sharding.Mappings)
var fileRepository files.Repository
var shardRepository shards.Repository

if env.DB != nil {
fileRepository = files.NewRepository(env.DB)
shardRepository = shards.NewRepository(env.DB, env.Config.Sharding.Mappings)
}

if env.SpannerClient != nil {
fileRepository = files.NewSpannerRepository(env.SpannerClient)
shardRepository = shards.NewSpannerRepository(env.SpannerClient, env.Config.Sharding.Mappings)
}

fileReceiver, err := pipeline.Start(ctx, env.Logger, env.Config, shardRepository, fileRepository, httpSub)
if err != nil {
Expand Down Expand Up @@ -235,6 +266,33 @@ func LoadConfig(logger log.Logger) (*service.Config, error) {
return cfg, nil
}

func initializeSpannerDatabase(logger log.Logger, config database.DatabaseConfig) (*spanner.Client, func(), error) {
ctx, cancelFunc := context.WithCancel(context.Background())

logger.Info().Log("connecting to spanner database")

// Create the spanner client
dbString := "projects/" + config.Spanner.Project + "/instances/" + config.Spanner.Instance + "/databases/" + config.DatabaseName
client, err := spanner.NewClient(ctx, dbString)
if err != nil {
return nil, cancelFunc, logger.Fatal().LogErrorf("Error creating spanner client: %w", err).Err()
}

shutdown := func() {
logger.Info().Log("Shutting down the spanner client")
cancelFunc()
client.Close()
}

if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.SpannerMigrationFS)); err != nil {
return nil, shutdown, logger.Fatal().LogErrorf("Error running migrations: %w", err).Err()
}

logger.Info().Log("Finished initializing spanner client")

return client, shutdown, err
}

func initializeDatabase(logger log.Logger, config database.DatabaseConfig) (*sql.DB, func(), error) {
ctx, cancelFunc := context.WithCancel(context.Background())

Expand All @@ -258,7 +316,7 @@ func initializeDatabase(logger log.Logger, config database.DatabaseConfig) (*sql
}

// Run the migrations
if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.MigrationFS)); err != nil {
if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.MySqlMigrationFS)); err != nil {
return nil, shutdown, logger.Fatal().LogErrorf("Error running migrations: %w", err).Err()
}

Expand Down
46 changes: 46 additions & 0 deletions internal/files/repo_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"cloud.google.com/go/spanner"
"github.com/moov-io/base/telemetry"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -24,6 +25,13 @@ func NewRepository(db *sql.DB) Repository {
return &sqlRepository{db: db}
}

func NewSpannerRepository(client *spanner.Client) Repository {
if client == nil {
return &MockRepository{}
}
return &spannerRepository{client: client}
}

type sqlRepository struct {
db *sql.DB
}
Expand Down Expand Up @@ -65,3 +73,41 @@ func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error {
}
return nil
}

type spannerRepository struct {
client *spanner.Client
}

func (r *spannerRepository) Record(ctx context.Context, file AcceptedFile) error {
ctx, span := telemetry.StartSpan(ctx, "files-record", trace.WithAttributes(
attribute.String("achgateway.file_id", file.FileID),
))
defer span.End()

m := spanner.Insert("files",
[]string{"file_id", "shard_key", "hostname", "accepted_at"},
[]interface{}{file.FileID, file.ShardKey, file.Hostname, file.AcceptedAt},
)
_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
if err != nil {
return fmt.Errorf("recording file failed: %w", err)
}
return nil
}

func (r *spannerRepository) Cancel(ctx context.Context, fileID string) error {
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
attribute.String("achgateway.file_id", fileID),
))
defer span.End()

m := spanner.Update("files",
[]string{"file_id", "canceled_at"},
[]interface{}{fileID, time.Now().In(time.UTC)},
)
_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
if err != nil {
return fmt.Errorf("saving file cancellation failed: %w", err)
}
return nil
}
41 changes: 41 additions & 0 deletions internal/shards/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package shards

import (
"context"
"database/sql"
"fmt"

"cloud.google.com/go/spanner"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/database"

Expand All @@ -40,6 +42,13 @@ func NewRepository(db *sql.DB, static []service.ShardMapping) Repository {
return &sqlRepository{db: db}
}

func NewSpannerRepository(client *spanner.Client, static []service.ShardMapping) Repository {
if client == nil {
return &InMemoryRepository{Shards: static}
}
return &spannerRepository{client: client}
}

type sqlRepository struct {
db *sql.DB
}
Expand Down Expand Up @@ -168,3 +177,35 @@ func (r *sqlRepository) write(shardKey, shardName string) error {
}
return err
}

type spannerRepository struct {
client *spanner.Client
}

func (r *spannerRepository) Lookup(shardKey string) (string, error) {
row, err := r.client.Single().ReadRow(context.Background(), "shard_mappings", spanner.Key{shardKey}, []string{"shard_name"})
if err != nil {
return "", err
}

return row.String(), nil
}

func (r *spannerRepository) List() ([]service.ShardMapping, error) {
return nil, errors.Errorf("spannerRepository.List not implemented")
}

func (r *spannerRepository) Add(create service.ShardMapping, run database.RunInTx) error {
ctx := context.Background()

m := spanner.Insert("shard_mappings",
[]string{"shard_key", "shard_name"},
[]interface{}{create.ShardKey, create.ShardName},
)

_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
if err != nil {
return fmt.Errorf("recording file failed: %w", err)
}
return nil
}
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions migrations/001_spanner_shard_mappings.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE shard_mappings (
shard_key STRING(50) NOT NULL,
shard_name STRING(50) NOT NULL,
) PRIMARY KEY (shard_key);
File renamed without changes.
7 changes: 7 additions & 0 deletions migrations/002_spanner_files.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE files (
file_id STRING(MAX) NOT NULL,
shard_key STRING(MAX) NOT NULL,
hostname STRING(MAX) NOT NULL,
accepted_at TIMESTAMP NOT NULL,
canceled_at TIMESTAMP,
) PRIMARY KEY (file_id);

0 comments on commit c622974

Please sign in to comment.