From 202f6fb5376a7a9d605654cf29a6b66b4bf301eb Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Sun, 24 Apr 2022 12:15:03 +0200 Subject: [PATCH] wip: add new mongodb module with unfinished EventStore impl --- go.work | 1 + go.work.sum | 3 +- mongodb/event_store.go | 190 +++++++++++++++++++++++++++++++++++++++++ mongodb/go.mod | 18 ++++ mongodb/go.sum | 60 +++++++++++++ mongodb/types.go | 14 +++ 6 files changed, 284 insertions(+), 2 deletions(-) create mode 100644 mongodb/event_store.go create mode 100644 mongodb/go.mod create mode 100644 mongodb/go.sum create mode 100644 mongodb/types.go diff --git a/go.work b/go.work index 2ff3799..ee5ec8a 100644 --- a/go.work +++ b/go.work @@ -2,6 +2,7 @@ go 1.18 use ( ./core + ./mongodb ./oteleventually ./postgres ./serdes diff --git a/go.work.sum b/go.work.sum index 61b2308..b55e8ca 100644 --- a/go.work.sum +++ b/go.work.sum @@ -26,11 +26,10 @@ go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEa go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee h1:WG0RUwxtNT4qqaXX3DPA8zHFNm/D9xaBpxzHt1WcA/E= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec h1:RlWgLqCMMIYYEVcAR5MDsuHlVkaIPDAF+5Dehzg8L5A= diff --git a/mongodb/event_store.go b/mongodb/event_store.go new file mode 100644 index 0000000..251e847 --- /dev/null +++ b/mongodb/event_store.go @@ -0,0 +1,190 @@ +package mongodb + +import ( + "context" + "errors" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" + + "github.com/get-eventually/go-eventually/core/event" + "github.com/get-eventually/go-eventually/core/message" + "github.com/get-eventually/go-eventually/core/serde" + "github.com/get-eventually/go-eventually/core/version" +) + +var _ event.Store = EventStore{} + +type EventStore struct { + Client *mongo.Client + DatabaseName string + Serde serde.Bytes[message.Message] +} + +func (es EventStore) openSession() (mongo.Session, error) { + return es.Client.StartSession(&options.SessionOptions{ + DefaultReadConcern: readconcern.Majority(), + DefaultReadPreference: readpref.Primary(), + // Prefer that the write operations, being transactions with strong validations, + // are replicated to the majority of the replicas in the replica set. + DefaultWriteConcern: writeconcern.New( + writeconcern.WMajority(), + ), + }) +} + +func (es EventStore) database() *mongo.Database { + return es.Client.Database(es.DatabaseName, &options.DatabaseOptions{ + // If we're using event.Store.Stream, is to re-hydrate the state of + // an Aggregate to perform a write operation through a Command. + ReadConcern: readconcern.Majority(), + ReadPreference: readpref.Primary(), + }) +} + +func (es EventStore) eventsCollection() *mongo.Collection { + return es.database().Collection("events") +} + +func (es EventStore) eventStreamsCollection() *mongo.Collection { + return es.database().Collection("event_streams") +} + +// updateEventStream updates the Event Stream entry in the `event_streams` +// collection and performs optimistic locking checks. +// +// Returns the old version of the Event Stream, before the update. +func (es EventStore) updateEventStream( + ctx mongo.SessionContext, + id event.StreamID, + expected version.Check, + newVersionOffset int, +) (version.Version, error) { + eventStreamsCollection := es.eventStreamsCollection() + + var eventStream bson.M + + err := eventStreamsCollection. + FindOne(ctx, bson.D{{Key: "_id", Value: string(id)}}). + Decode(&eventStream) + + if errors.Is(err, mongo.ErrNoDocuments) { + eventStream = bson.M{"_id": string(id), "version": int64(0)} + } else if err != nil { + return 0, fmt.Errorf("mongodb.EventStore: failed to find event stream, %w", err) + } + + currentVersion := version.Version(eventStream["version"].(int64)) + if v, ok := expected.(version.CheckExact); ok && currentVersion != version.Version(v) { + return 0, version.ConflictError{ + Expected: version.Version(v), + Actual: currentVersion, + } + } + + newVersion := currentVersion + version.Version(newVersionOffset) + eventStream["version"] = newVersion + + panic("implement me!") +} + +func (es EventStore) append( + ctx mongo.SessionContext, + id event.StreamID, + expected version.Check, + events ...event.Envelope, +) (version.Version, error) { + oldEventStreamVersion, err := es.updateEventStream(ctx, id, expected, len(events)) + if err != nil { + return 0, fmt.Errorf("mongodb.EventStore: failed to update event stream version, %w", err) + } + + eventsCollection := es.eventsCollection() + + var documents bson.A + for i, evt := range events { + msg, err := es.Serde.Serialize(evt.Message) + if err != nil { + return 0, fmt.Errorf("mongodb.EventStore: failed to serialize event, %w", err) + } + + documents = append(documents, bson.M{ + "event_stream_id": string(id), + "version": uint64(oldEventStreamVersion) + uint64(i) + 1, + "message": msg, + "metadata": evt.Metadata, + }) + } + + if _, err := eventsCollection.InsertMany(ctx, documents); err != nil { + return 0, fmt.Errorf("mongodb.EventStore: failed to insert new domain events, %w", err) + } + + panic("implement me!") +} + +// Append implements event.Store +func (es EventStore) Append( + ctx context.Context, + id event.StreamID, + expected version.Check, + events ...event.Envelope, +) (version.Version, error) { + sess, err := es.openSession() + if err != nil { + return 0, fmt.Errorf("mongodb.EventStore: failed to open a new session, %w", err) + } + + result, err := sess.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) { + return es.append(sessCtx, id, expected, events...) + }) + + return result.(version.Version), err +} + +// Stream implements event.Store +func (es EventStore) Stream( + ctx context.Context, + stream event.StreamWrite, + id event.StreamID, + selector version.Selector, +) error { + defer close(stream) + + eventsCollection := es.eventsCollection() + cursor, err := eventsCollection.Find(ctx, bson.D{ + { + Key: "event_stream_id", + Value: string(id), + }, + { + Key: "version", + Value: bson.D{ + { + Key: "$gte", + Value: selector.From, + }, + }, + }, + }) + + if err != nil { + return fmt.Errorf("mongodb.EventStore: failed to open event stream cursor, %w", err) + } + + for cursor.Next(ctx) { + // TODO: write the conversion logic from document to event message. + // We can register a common type and use a bsoncodec.Registry for passing struct values directly. + } + + if err := cursor.Err(); err != nil { + return fmt.Errorf("mongodb.EventStore: failed while iterating the event stream query cursor, %w", err) + } + + return nil +} diff --git a/mongodb/go.mod b/mongodb/go.mod new file mode 100644 index 0000000..85ccbb1 --- /dev/null +++ b/mongodb/go.mod @@ -0,0 +1,18 @@ +module github.com/get-eventually/go-eventually/mongodb + +go 1.18 + +require ( + github.com/go-stack/stack v1.8.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.15.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + go.mongodb.org/mongo-driver v1.9.0 // indirect + golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/text v0.3.7 // indirect +) diff --git a/mongodb/go.sum b/mongodb/go.sum new file mode 100644 index 0000000..778c1ad --- /dev/null +++ b/mongodb/go.sum @@ -0,0 +1,60 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A= +github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +go.mongodb.org/mongo-driver v1.9.0 h1:f3aLGJvQmBl8d9S40IL+jEyBC6hfLPbJjv9t5hEM9ck= +go.mongodb.org/mongo-driver v1.9.0/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA= +golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mongodb/types.go b/mongodb/types.go new file mode 100644 index 0000000..d89e066 --- /dev/null +++ b/mongodb/types.go @@ -0,0 +1,14 @@ +package mongodb + +import ( + "go.mongodb.org/mongo-driver/bson/bsoncodec" +) + +var bsonRegistry = bsoncodec.NewRegistryBuilder(). + RegisterCodec(nil, nil). + Build() + +type persistedEvent struct { + id string + version uint64 +}