From f126128a0444b8faa4b08ff9cee0c40d7f20fc8c Mon Sep 17 00:00:00 2001 From: Guillaume Louvigny Date: Wed, 15 Jul 2020 12:22:36 +0200 Subject: [PATCH] feat: use low level libp2p pubsub, old peermonitor refactor (#50) Signed-off-by: Guillaume Louvigny --- baseorbitdb/orbitdb.go | 163 +++++++++----------- go.mod | 2 +- iface/interface.go | 60 ++++++- pubsub/directchannel/channel_test.go | 3 +- pubsub/event.go | 39 +++-- pubsub/interface.go | 28 ---- pubsub/oneonone/events.go | 1 - pubsub/peermonitor/doc.go | 2 - pubsub/peermonitor/event.go | 30 ---- pubsub/peermonitor/interface.go | 28 ---- pubsub/peermonitor/peermonitor.go | 195 ----------------------- pubsub/pubsub.go | 118 -------------- pubsub/pubsubcoreapi/doc.go | 2 + pubsub/pubsubcoreapi/pubsub.go | 185 ++++++++++++++++++++++ pubsub/pubsubraw/doc.go | 2 + pubsub/pubsubraw/pubsub.go | 146 ++++++++++++++++++ pubsub/subscription.go | 136 ---------------- tests/directchannel_test.go | 123 --------------- tests/replicate_test.go | 223 ++++++++++++++++++--------- 19 files changed, 636 insertions(+), 850 deletions(-) delete mode 100644 pubsub/interface.go delete mode 100644 pubsub/oneonone/events.go delete mode 100644 pubsub/peermonitor/doc.go delete mode 100644 pubsub/peermonitor/event.go delete mode 100644 pubsub/peermonitor/interface.go delete mode 100644 pubsub/peermonitor/peermonitor.go delete mode 100644 pubsub/pubsub.go create mode 100644 pubsub/pubsubcoreapi/doc.go create mode 100644 pubsub/pubsubcoreapi/pubsub.go create mode 100644 pubsub/pubsubraw/doc.go create mode 100644 pubsub/pubsubraw/pubsub.go delete mode 100644 pubsub/subscription.go delete mode 100644 tests/directchannel_test.go diff --git a/baseorbitdb/orbitdb.go b/baseorbitdb/orbitdb.go index e540e69a..31117f47 100644 --- a/baseorbitdb/orbitdb.go +++ b/baseorbitdb/orbitdb.go @@ -7,6 +7,7 @@ import ( "path" "strings" "sync" + "time" ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-ipfs-log/entry" @@ -21,6 +22,7 @@ import ( coreapi "github.com/ipfs/interface-go-ipfs-core" p2pcore "github.com/libp2p/go-libp2p-core" "github.com/pkg/errors" + "go.opentelemetry.io/otel/api/trace" "go.uber.org/zap" "berty.tech/go-orbit-db/accesscontroller" @@ -30,12 +32,10 @@ import ( "berty.tech/go-orbit-db/cache/cacheleveldown" "berty.tech/go-orbit-db/iface" _ "berty.tech/go-orbit-db/internal/buildconstraints" // fail for bad go version - "berty.tech/go-orbit-db/pubsub" "berty.tech/go-orbit-db/pubsub/oneonone" - "berty.tech/go-orbit-db/pubsub/peermonitor" + "berty.tech/go-orbit-db/pubsub/pubsubcoreapi" "berty.tech/go-orbit-db/stores" "berty.tech/go-orbit-db/utils" - "go.opentelemetry.io/otel/api/trace" ) // OrbitDB An alias of the type defined in the iface package @@ -95,6 +95,7 @@ type NewOrbitDBOptions struct { Logger *zap.Logger Tracer trace.Tracer DirectChannelFactory iface.DirectChannelFactory + PubSub iface.PubSubInterface } type orbitDB struct { @@ -103,7 +104,7 @@ type orbitDB struct { ipfs coreapi.CoreAPI identity *idp.Identity id p2pcore.PeerID - pubsub pubsub.Interface + pubsub iface.PubSubInterface keystore keystore.Interface closeKeystore func() error stores map[string]Store @@ -118,7 +119,6 @@ type orbitDB struct { muStores sync.RWMutex muIdentity sync.RWMutex muID sync.RWMutex - muPubSub sync.RWMutex muIPFS sync.RWMutex muKeyStore sync.RWMutex muCaches sync.RWMutex @@ -155,13 +155,6 @@ func (o *orbitDB) PeerID() p2pcore.PeerID { return o.id } -func (o *orbitDB) PubSub() pubsub.Interface { - o.muPubSub.RLock() - defer o.muPubSub.RUnlock() - - return o.pubsub -} - func (o *orbitDB) KeyStore() keystore.Interface { // TODO: check why o.keystore is never set o.muKeyStore.RLock() @@ -214,19 +207,6 @@ func (o *orbitDB) closeAllStores() { o.stores = map[string]Store{} } -func (o *orbitDB) closePubSub() { - o.muPubSub.Lock() - defer o.muPubSub.Unlock() - - if o.pubsub != nil { - if err := o.pubsub.Close(); err != nil { - o.logger.Error("unable to close pubsub", zap.Error(err)) - } - } - - o.pubsub = nil -} - func (o *orbitDB) closeCache() { o.muCaches.Lock() defer o.muCaches.Unlock() @@ -389,19 +369,15 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, return nil, err } - ps, err := pubsub.NewPubSub(is, k.ID(), &pubsub.Options{ - Logger: options.Logger, - Tracer: options.Tracer, - }) - if err != nil { - return nil, err - } - if options.PeerID == nil { id := k.ID() options.PeerID = &id } + if options.PubSub == nil { + options.PubSub = pubsubcoreapi.NewPubSub(is, k.ID(), time.Second, options.Logger, options.Tracer) + } + if options.Cache == nil { options.Cache = cacheleveldown.New(&cache.Options{Logger: options.Logger}) } @@ -414,7 +390,7 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity, ipfs: is, identity: identity, id: *options.PeerID, - pubsub: ps, + pubsub: options.PubSub, cache: options.Cache, directory: *options.Directory, stores: map[string]Store{}, @@ -498,7 +474,6 @@ func NewOrbitDB(ctx context.Context, ipfs coreapi.CoreAPI, options *NewOrbitDBOp func (o *orbitDB) Close() error { o.closeAllStores() o.closeDirectConnections() - o.closePubSub() o.closeCache() o.closeKeyStore() @@ -756,7 +731,7 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd options.Replicate = boolPtr(true) } - //options.AccessController = accessController + // options.AccessController = accessController options.Keystore = o.KeyStore() options.Cache = c @@ -783,38 +758,33 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd return nil, errors.Wrap(err, "unable to instantiate store") } - o.storeListener(ctx, store) + topic, err := o.pubsub.TopicSubscribe(ctx, parsedDBAddress.String()) + if err != nil { + return nil, errors.Wrap(err, "unable to subscribe to pubsub") + } + + o.storeListener(ctx, store, topic) o.setStore(parsedDBAddress.String(), store) // Subscribe to pubsub to get updates from peers, // this is what hooks us into the message propagation layer // and the p2p network - if ps := o.PubSub(); *options.Replicate && ps != nil { - sub, err := ps.Subscribe(ctx, parsedDBAddress.String()) - if err != nil { - return nil, errors.Wrap(err, "unable to subscribe to pubsub") + if *options.Replicate { + if err := o.pubSubChanListener(ctx, store, topic, parsedDBAddress); err != nil { + return nil, err } - - o.pubSubChanListener(ctx, sub, parsedDBAddress) } return store, nil } -func (o *orbitDB) onClose(ctx context.Context, addr cid.Cid) error { - // Unsubscribe from pubsub - if ps := o.PubSub(); ps != nil { - if err := ps.Unsubscribe(addr.String()); err != nil { - return errors.Wrap(err, "unable to unsubscribe from pubsub") - } - } - +func (o *orbitDB) onClose(addr cid.Cid) error { o.deleteStore(addr.String()) return nil } -func (o *orbitDB) storeListener(ctx context.Context, store Store) { +func (o *orbitDB) storeListener(ctx context.Context, store Store, topic iface.PubSubTopic) { go func() { for evt := range store.Subscribe(ctx) { switch e := evt.(type) { @@ -825,14 +795,14 @@ func (o *orbitDB) storeListener(ctx context.Context, store Store) { continue } - if ps := o.PubSub(); ps != nil { + if topic != nil { headsBytes, err := json.Marshal(e.Heads) if err != nil { o.logger.Debug(fmt.Sprintf("unable to serialize heads %v", err)) continue } - err = ps.Publish(ctx, e.Address.String(), headsBytes) + err = topic.Publish(ctx, headsBytes) if err != nil { o.logger.Debug(fmt.Sprintf("unable to publish message on pubsub %v", err)) continue @@ -845,55 +815,33 @@ func (o *orbitDB) storeListener(ctx context.Context, store Store) { o.logger.Debug("received stores.close event") - if err := o.onClose(ctx, store.Address().GetRoot()); err != nil { + if err := o.onClose(store.Address().GetRoot()); err != nil { o.logger.Debug(fmt.Sprintf("unable to perform onClose %v", err)) } }() } -func (o *orbitDB) pubSubChanListener(ctx context.Context, ps pubsub.Subscription, addr address.Address) { - go func() { - for e := range ps.Subscribe(ctx) { - o.logger.Debug("Got pub sub message") - switch evt := e.(type) { - case *pubsub.MessageEvent: - addr := evt.Topic - store, ok := o.getStore(addr) - - if !ok { - o.logger.Error(fmt.Sprintf("unable to find store for address %s", addr)) - continue - } - - headsEntriesBytes := evt.Content - var headsEntries []*entry.Entry - - err := json.Unmarshal(headsEntriesBytes, &headsEntries) - if err != nil { - o.logger.Error("unable to unmarshal head entries") - } - - if len(headsEntries) == 0 { - o.logger.Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr)) - } - - o.logger.Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr)) +func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic iface.PubSubTopic, addr address.Address) error { + chPeers, err := topic.WatchPeers(ctx) + if err != nil { + return err + } - entries := make([]ipfslog.Entry, len(headsEntries)) - for i := range headsEntries { - entries[i] = headsEntries[i] - } + chMessages, err := topic.WatchMessages(ctx) + if err != nil { + return err + } - if err := store.Sync(ctx, entries); err != nil { - o.logger.Debug(fmt.Sprintf("Error while syncing heads for %s:", addr)) - } - case *peermonitor.EventPeerJoin: + go func() { + for e := range chPeers { + switch evt := e.(type) { + case *iface.EventPubSubJoin: go func() { o.onNewPeerJoined(ctx, evt.Peer, addr) o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID())) }() - case *peermonitor.EventPeerLeave: + case *iface.EventPubSubLeave: o.logger.Debug(fmt.Sprintf("peer %s left from %s self is %s", evt.Peer.String(), addr, o.PeerID())) default: @@ -901,6 +849,37 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, ps pubsub.Subscription } } }() + + go func() { + for evt := range chMessages { + o.logger.Debug("Got pub sub message") + + headsEntriesBytes := evt.Content + var headsEntries []*entry.Entry + + err := json.Unmarshal(headsEntriesBytes, &headsEntries) + if err != nil { + o.logger.Error("unable to unmarshal head entries") + } + + if len(headsEntries) == 0 { + o.logger.Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr)) + } + + o.logger.Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr)) + + entries := make([]ipfslog.Entry, len(headsEntries)) + for i := range headsEntries { + entries[i] = headsEntries[i] + } + + if err := store.Sync(ctx, entries); err != nil { + o.logger.Debug(fmt.Sprintf("Error while syncing heads for %s:", addr)) + } + } + }() + + return nil } func (o *orbitDB) onNewPeerJoined(ctx context.Context, p p2pcore.PeerID, addr address.Address) { @@ -984,7 +963,7 @@ func (o *orbitDB) watchOneOnOneMessage(ctx context.Context, channel iface.Direct o.logger.Debug("received one on one message") switch e := evt.(type) { - case *pubsub.EventPayload: + case *iface.EventPubSubPayload: heads := &exchangedHeads{} err := json.Unmarshal(e.Payload, &heads) if err != nil { diff --git a/go.mod b/go.mod index 5e494736..f5c9f025 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/ipfs/interface-go-ipfs-core v0.3.0 github.com/libp2p/go-libp2p v0.10.0 github.com/libp2p/go-libp2p-core v0.6.0 - github.com/libp2p/go-libp2p-peerstore v0.2.6 + github.com/libp2p/go-libp2p-pubsub v0.3.1 github.com/pkg/errors v0.9.1 github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 diff --git a/iface/interface.go b/iface/interface.go index 8a22a5b0..269f9970 100644 --- a/iface/interface.go +++ b/iface/interface.go @@ -6,17 +6,19 @@ import ( ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-ipfs-log/identityprovider" "berty.tech/go-ipfs-log/keystore" - "berty.tech/go-orbit-db/accesscontroller" - "berty.tech/go-orbit-db/address" - "berty.tech/go-orbit-db/events" - "berty.tech/go-orbit-db/stores/operation" - "berty.tech/go-orbit-db/stores/replicator" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" coreapi "github.com/ipfs/interface-go-ipfs-core" p2pcore "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel/api/trace" "go.uber.org/zap" + + "berty.tech/go-orbit-db/accesscontroller" + "berty.tech/go-orbit-db/address" + "berty.tech/go-orbit-db/events" + "berty.tech/go-orbit-db/stores/operation" + "berty.tech/go-orbit-db/stores/replicator" ) // CreateDBOptions lists the arguments to create a store @@ -293,3 +295,51 @@ type OnWritePrototype func(ctx context.Context, addr cid.Cid, entry ipfslog.Entr // AccessControllerConstructor Required prototype for custom controllers constructors type AccessControllerConstructor func(context.Context, BaseOrbitDB, accesscontroller.ManifestParams, ...accesscontroller.Option) (accesscontroller.Interface, error) + +// PubSubTopic is a pub sub subscription to a topic +type PubSubTopic interface { + // Publish Posts a new message on a topic + Publish(ctx context.Context, message []byte) error + + // Peers Lists peers connected to the topic + Peers(ctx context.Context) ([]peer.ID, error) + + // WatchPeers subscribes to peers joining or leaving the topic + WatchPeers(ctx context.Context) (<-chan events.Event, error) + + // WatchMessages + WatchMessages(ctx context.Context) (<-chan *EventPubSubMessage, error) + + // Returns the topic name + Topic() string +} + +type PubSubInterface interface { + // Subscribe Subscribes to a topic + TopicSubscribe(ctx context.Context, topic string) (PubSubTopic, error) +} + +type PubSubSubscriptionOptions struct { + Logger *zap.Logger + Tracer trace.Tracer +} + +// EventPubSubMessage Indicates a new message posted on a pubsub topic +type EventPubSubMessage struct { + Content []byte +} + +// EventPubSubPayload An event received on new messages +type EventPubSubPayload struct { + Payload []byte +} + +// EventPubSubJoin Is an event triggered when a peer joins the channel +type EventPubSubJoin struct { + Peer peer.ID +} + +// EventPubSubLeave Is an event triggered when a peer leave the channel +type EventPubSubLeave struct { + Peer peer.ID +} diff --git a/pubsub/directchannel/channel_test.go b/pubsub/directchannel/channel_test.go index 61492ad0..8c22aae7 100644 --- a/pubsub/directchannel/channel_test.go +++ b/pubsub/directchannel/channel_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "berty.tech/go-orbit-db/iface" - "berty.tech/go-orbit-db/pubsub" ) func TestInitDirectChannelFactory(t *testing.T) { @@ -97,7 +96,7 @@ func TestInitDirectChannelFactory(t *testing.T) { subWg.Done() for evt := range sub { - if e, ok := evt.(*pubsub.EventPayload); ok { + if e, ok := evt.(*iface.EventPubSubPayload); ok { if bytes.Equal(e.Payload, expectedMessage) { count := atomic.AddUint32(&receivedMessages, 1) t.Log(fmt.Sprintf("successfully received message from %d for %d (%d)", i, j, count)) diff --git a/pubsub/event.go b/pubsub/event.go index 37d6f7f7..1ff0d0ee 100644 --- a/pubsub/event.go +++ b/pubsub/event.go @@ -1,29 +1,36 @@ package pubsub -import "berty.tech/go-orbit-db/events" +import ( + "github.com/libp2p/go-libp2p-core/peer" -// MessageEvent Indicates a new message posted on a pubsub topic -type MessageEvent struct { - Topic string - Content []byte -} + "berty.tech/go-orbit-db/events" + "berty.tech/go-orbit-db/iface" +) // Creates a new Message event -func NewMessageEvent(topic string, content []byte) events.Event { - return &MessageEvent{ - Topic: topic, +func NewEventMessage(content []byte) *iface.EventPubSubMessage { + return &iface.EventPubSubMessage{ Content: content, } } -// EventPayload An event received on new messages -type EventPayload struct { - Payload []byte -} - // NewEventPayload Creates a new Message event -func NewEventPayload(payload []byte) *EventPayload { - return &EventPayload{ +func NewEventPayload(payload []byte) *iface.EventPubSubPayload { + return &iface.EventPubSubPayload{ Payload: payload, } } + +// NewEventPeerJoin creates a new EventPubSubJoin event +func NewEventPeerJoin(p peer.ID) events.Event { + return &iface.EventPubSubJoin{ + Peer: p, + } +} + +// NewEventPeerLeave creates a new EventPubSubLeave event +func NewEventPeerLeave(p peer.ID) events.Event { + return &iface.EventPubSubLeave{ + Peer: p, + } +} diff --git a/pubsub/interface.go b/pubsub/interface.go deleted file mode 100644 index 5e72f0fa..00000000 --- a/pubsub/interface.go +++ /dev/null @@ -1,28 +0,0 @@ -package pubsub - -import ( - "context" - "io" - - "berty.tech/go-orbit-db/events" -) - -// Subscription is a pub sub subscription to a topic -type Subscription interface { - events.EmitterInterface - io.Closer -} - -type Interface interface { - // Subscribe Subscribes to a topic - Subscribe(ctx context.Context, topic string) (Subscription, error) - - // Unsubscribe Unsubscribe from a topic - Unsubscribe(topic string) error - - // Close Unsubscribe from all topics - Close() error - - // Publish Posts a new message on a topic - Publish(ctx context.Context, topic string, message []byte) error -} diff --git a/pubsub/oneonone/events.go b/pubsub/oneonone/events.go deleted file mode 100644 index c1c96b37..00000000 --- a/pubsub/oneonone/events.go +++ /dev/null @@ -1 +0,0 @@ -package oneonone diff --git a/pubsub/peermonitor/doc.go b/pubsub/peermonitor/doc.go deleted file mode 100644 index 5eab96f6..00000000 --- a/pubsub/peermonitor/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// peermonitor is a package for watching peers on a pub sub channel -package peermonitor // import "berty.tech/go-orbit-db/pubsub/peermonitor" diff --git a/pubsub/peermonitor/event.go b/pubsub/peermonitor/event.go deleted file mode 100644 index 80216adf..00000000 --- a/pubsub/peermonitor/event.go +++ /dev/null @@ -1,30 +0,0 @@ -package peermonitor - -import ( - "berty.tech/go-orbit-db/events" - "github.com/libp2p/go-libp2p-core/peer" -) - -// EventPeerJoin Is an event triggered when a peer joins the channel -type EventPeerJoin struct { - Peer peer.ID -} - -// EventPeerLeave Is an event triggered when a peer leave the channel -type EventPeerLeave struct { - Peer peer.ID -} - -// NewEventPeerJoin creates a new EventPeerJoin event -func NewEventPeerJoin(p peer.ID) events.Event { - return &EventPeerJoin{ - Peer: p, - } -} - -// NewEventPeerLeave creates a new EventPeerLeave event -func NewEventPeerLeave(p peer.ID) events.Event { - return &EventPeerLeave{ - Peer: p, - } -} diff --git a/pubsub/peermonitor/interface.go b/pubsub/peermonitor/interface.go deleted file mode 100644 index 351cd169..00000000 --- a/pubsub/peermonitor/interface.go +++ /dev/null @@ -1,28 +0,0 @@ -package peermonitor - -import ( - "context" - - "berty.tech/go-orbit-db/events" - "github.com/libp2p/go-libp2p-core/peer" -) - -// Interface Watches for peers on the pub sub, emits messages on join/leave -type Interface interface { - events.EmitterInterface - - // Start Starts watching the topic for new joins or leaves - Start(ctx context.Context) func() - - // Stop Stops the watcher - Stop() - - // GetPeers Lists peers currently present on the topic - GetPeers() []peer.ID - - // HasPeer Checks if a peer is present on the topic - HasPeer(id peer.ID) bool - - // Started Returns whether the peer monitor has been started - Started() bool -} diff --git a/pubsub/peermonitor/peermonitor.go b/pubsub/peermonitor/peermonitor.go deleted file mode 100644 index 42a3f6d2..00000000 --- a/pubsub/peermonitor/peermonitor.go +++ /dev/null @@ -1,195 +0,0 @@ -package peermonitor - -import ( - "context" - "sync" - "time" - - "berty.tech/go-orbit-db/events" - coreapi "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/libp2p/go-libp2p-core/peer" - "go.uber.org/zap" -) - -// NewPeerMonitorOptions Options for creating a new PeerMonitor instance -type NewPeerMonitorOptions struct { - Start *bool - PollInterval *time.Duration - Logger *zap.Logger -} - -func durationPtr(duration time.Duration) *time.Duration { - return &duration -} - -func boolPtr(val bool) *bool { - return &val -} - -var defaultPeerMonitorOptions = &NewPeerMonitorOptions{ - Start: boolPtr(true), - PollInterval: durationPtr(time.Second), -} - -type peerMonitor struct { - events.EventEmitter - cancelFunc func() - ipfs coreapi.CoreAPI - topic string - started bool - pollInterval time.Duration - peers map[peer.ID]struct{} - - muPeers sync.RWMutex - muStarted sync.RWMutex - logger *zap.Logger -} - -func (p *peerMonitor) Start(ctx context.Context) func() { - if p.Started() { - p.Stop() - } - - ctx, cancelFunc := context.WithCancel(ctx) - - p.muStarted.Lock() - p.started = true - p.cancelFunc = cancelFunc - p.muStarted.Unlock() - - go func() { - for { - select { - case <-ctx.Done(): - p.muStarted.Lock() - p.cancelFunc = nil - p.started = false - p.muStarted.Unlock() - return - - case <-time.After(p.pollInterval): - err := p.pollPeers(ctx) - if err != nil { - p.logger.Error("error while polling peers", zap.Error(err)) - } - - break - } - } - }() - - return cancelFunc -} - -func (p *peerMonitor) Stop() { - p.muStarted.RLock() - cancelFunc := p.cancelFunc - p.muStarted.RUnlock() - - if cancelFunc != nil { - cancelFunc() - } -} - -func (p *peerMonitor) GetPeers() []peer.ID { - p.muPeers.RLock() - defer p.muPeers.RUnlock() - - peerIDs := make([]peer.ID, len(p.peers)) - i := 0 - - for p := range p.peers { - peerIDs[i] = p - i++ - } - - return peerIDs -} - -func (p *peerMonitor) HasPeer(id peer.ID) bool { - p.muPeers.RLock() - defer p.muPeers.RUnlock() - - _, ok := p.peers[id] - - return ok -} - -func (p *peerMonitor) pollPeers(ctx context.Context) error { - p.muPeers.Lock() - defer p.muPeers.Unlock() - - peerIDs, err := p.ipfs.PubSub().Peers(ctx, options.PubSub.Topic(p.topic)) - - currentlyKnownPeers := map[peer.ID]struct{}{} - allPeers := map[peer.ID]struct{}{} - - for peerID := range p.peers { - currentlyKnownPeers[peerID] = struct{}{} - } - - if err != nil { - return err - } - - for _, peerID := range peerIDs { - allPeers[peerID] = struct{}{} - - if _, ok := currentlyKnownPeers[peerID]; ok { - delete(currentlyKnownPeers, peerID) - } else { - p.Emit(ctx, NewEventPeerJoin(peerID)) - } - } - - for peerID := range currentlyKnownPeers { - p.Emit(ctx, NewEventPeerLeave(peerID)) - } - - p.peers = allPeers - - return nil -} - -func (p *peerMonitor) Started() bool { - p.muStarted.RLock() - defer p.muStarted.RUnlock() - - return p.started -} - -// NewPeerMonitor Creates a new PeerMonitor instance -func NewPeerMonitor(ctx context.Context, ipfs coreapi.CoreAPI, topic string, options *NewPeerMonitorOptions) Interface { - if options == nil { - options = defaultPeerMonitorOptions - } - - if options.PollInterval == nil { - options.PollInterval = defaultPeerMonitorOptions.PollInterval - } - - if options.Start == nil { - options.Start = defaultPeerMonitorOptions.Start - } - - logger := options.Logger - if logger == nil { - logger = zap.NewNop() - } - - monitor := &peerMonitor{ - ipfs: ipfs, - topic: topic, - pollInterval: *options.PollInterval, - logger: logger, - } - - if *options.Start { - monitor.Start(ctx) - } - - return monitor -} - -var _ Interface = &peerMonitor{} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go deleted file mode 100644 index 8447467f..00000000 --- a/pubsub/pubsub.go +++ /dev/null @@ -1,118 +0,0 @@ -package pubsub - -import ( - "context" - "fmt" - "sync" - - coreapi "github.com/ipfs/interface-go-ipfs-core" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" - "go.opentelemetry.io/otel/api/kv" - "go.opentelemetry.io/otel/api/trace" - "go.uber.org/zap" -) - -type pubSub struct { - coreAPI coreapi.CoreAPI - id peer.ID - subscriptions map[string]Subscription - muSubscriptions sync.RWMutex - logger *zap.Logger - tracer trace.Tracer -} - -// NewPubSub Creates a new pubsub client -func NewPubSub(coreAPI coreapi.CoreAPI, id peer.ID, opts *Options) (Interface, error) { - if opts == nil { - opts = &Options{} - } - - if opts.Logger == nil { - opts.Logger = zap.NewNop() - } - - if opts.Tracer == nil { - opts.Tracer = trace.NoopTracer{} - } - - if coreAPI == nil { - return nil, errors.New("coreAPI is not defined") - } - - if ps := coreAPI.PubSub(); ps == nil { - return nil, errors.New("pubsub service is not provided by the current ipfs instance") - } - - return &pubSub{ - coreAPI: coreAPI, - id: id, - subscriptions: map[string]Subscription{}, - logger: opts.Logger, - tracer: opts.Tracer, - }, nil -} - -func (p *pubSub) Subscribe(ctx context.Context, topic string) (Subscription, error) { - p.muSubscriptions.Lock() - defer p.muSubscriptions.Unlock() - - if sub, ok := p.subscriptions[topic]; ok { - return sub, nil - } - - p.logger.Debug(fmt.Sprintf("starting pubsub listener for peer %s on topic %s", p.id, topic)) - - s, err := NewSubscription(ctx, p.coreAPI, topic, &Options{ - Logger: p.logger, - Tracer: p.tracer, - }) - if err != nil { - return nil, errors.Wrap(err, "unable to create new pubsub subscription") - } - - p.subscriptions[topic] = s - - return s, nil -} - -func (p *pubSub) Publish(ctx context.Context, topic string, message []byte) error { - p.muSubscriptions.RLock() - if _, ok := p.subscriptions[topic]; !ok { - return errors.New("not subscribed to this topic") - } - p.muSubscriptions.RUnlock() - - ctx, span := p.tracer.Start(ctx, "pubsub-publish", trace.WithAttributes(kv.String("topic", topic), kv.String("peerid", p.id.String()))) - defer span.End() - - return p.coreAPI.PubSub().Publish(ctx, topic, message) -} - -func (p *pubSub) Close() error { - p.muSubscriptions.RLock() - subs := p.subscriptions - p.muSubscriptions.RUnlock() - - for _, sub := range subs { - _ = sub.Close() - } - - return nil -} - -func (p *pubSub) Unsubscribe(topic string) error { - p.muSubscriptions.RLock() - s, ok := p.subscriptions[topic] - p.muSubscriptions.RUnlock() - - if !ok { - return errors.New("no subscription found") - } - - _ = s.Close() - - return nil -} - -var _ Interface = &pubSub{} diff --git a/pubsub/pubsubcoreapi/doc.go b/pubsub/pubsubcoreapi/doc.go new file mode 100644 index 00000000..50be926a --- /dev/null +++ b/pubsub/pubsubcoreapi/doc.go @@ -0,0 +1,2 @@ +// pubsubcoreapi pubsub helpers using IPFS CoreAPI +package pubsubcoreapi diff --git a/pubsub/pubsubcoreapi/pubsub.go b/pubsub/pubsubcoreapi/pubsub.go new file mode 100644 index 00000000..fb469fdb --- /dev/null +++ b/pubsub/pubsubcoreapi/pubsub.go @@ -0,0 +1,185 @@ +package pubsubcoreapi + +import ( + "context" + "io" + "sync" + "time" + + coreapi "github.com/ipfs/interface-go-ipfs-core" + options "github.com/ipfs/interface-go-ipfs-core/options" + p2pcore "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/api/trace" + "go.uber.org/zap" + + "berty.tech/go-orbit-db/events" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/pubsub" +) + +type psTopic struct { + topic string + ps *coreAPIPubSub + members []peer.ID + muMembers sync.RWMutex +} + +func (p *psTopic) Publish(ctx context.Context, message []byte) error { + return p.ps.api.PubSub().Publish(ctx, p.topic, message) +} + +func (p *psTopic) Peers(_ context.Context) ([]p2pcore.PeerID, error) { + p.muMembers.RLock() + members := p.members + p.muMembers.RUnlock() + + return members, nil +} + +func (p *psTopic) peersDiff(ctx context.Context) (joining, leaving []p2pcore.PeerID, err error) { + p.muMembers.RLock() + oldMembers := map[peer.ID]struct{}{} + + for _, m := range p.members { + oldMembers[m] = struct{}{} + } + p.muMembers.RUnlock() + + all, err := p.ps.api.PubSub().Peers(ctx, options.PubSub.Topic(p.topic)) + if err != nil { + return nil, nil, err + } + + for _, m := range all { + if _, ok := oldMembers[m]; !ok { + joining = append(joining, m) + } else { + delete(oldMembers, m) + } + } + + for m := range oldMembers { + leaving = append(leaving, m) + } + + p.muMembers.Lock() + p.members = all + p.muMembers.Unlock() + + return joining, leaving, nil +} + +func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) { + ch := make(chan events.Event) + + go func() { + for { + joining, leaving, err := p.peersDiff(ctx) + if err != nil { + p.ps.logger.Error("", zap.Error(err)) + return + } + + for _, p := range joining { + ch <- pubsub.NewEventPeerJoin(p) + } + + for _, p := range leaving { + ch <- pubsub.NewEventPeerLeave(p) + } + + select { + case <-ctx.Done(): + return + case <-time.After(p.ps.pollInterval): + continue + } + } + }() + + return ch, nil +} + +func (p *psTopic) WatchMessages(ctx context.Context) (<-chan *iface.EventPubSubMessage, error) { + ch := make(chan *iface.EventPubSubMessage) + + sub, err := p.ps.api.PubSub().Subscribe(ctx, p.topic) + if err != nil { + return nil, err + } + + go func() { + for { + msg, err := sub.Next(ctx) + if err != nil { + if err == io.EOF { + return + } + + p.ps.logger.Error("error while retrieving pubsub message", zap.Error(err)) + continue + } + + if msg.From() == p.ps.id { + continue + } + + ch <- pubsub.NewEventMessage(msg.Data()) + } + }() + + return ch, nil +} + +func (p *psTopic) Topic() string { + return p.topic +} + +type coreAPIPubSub struct { + api coreapi.CoreAPI + logger *zap.Logger + id peer.ID + pollInterval time.Duration + tracer trace.Tracer + topics map[string]*psTopic + muTopics sync.Mutex +} + +func (c *coreAPIPubSub) TopicSubscribe(_ context.Context, topic string) (iface.PubSubTopic, error) { + c.muTopics.Lock() + defer c.muTopics.Unlock() + + if t, ok := c.topics[topic]; ok { + return t, nil + } + + c.topics[topic] = &psTopic{ + topic: topic, + ps: c, + } + + return c.topics[topic], nil +} + +func NewPubSub(api coreapi.CoreAPI, id peer.ID, pollInterval time.Duration, logger *zap.Logger, tracer trace.Tracer) iface.PubSubInterface { + if logger == nil { + logger = zap.NewNop() + } + + if tracer == nil { + tracer = trace.NoopTracer{} + } + + return &coreAPIPubSub{ + topics: map[string]*psTopic{}, + api: api, + id: id, + logger: logger, + pollInterval: pollInterval, + tracer: tracer, + } +} + +var _ iface.PubSubInterface = &coreAPIPubSub{} +var _ iface.PubSubTopic = &psTopic{} diff --git a/pubsub/pubsubraw/doc.go b/pubsub/pubsubraw/doc.go new file mode 100644 index 00000000..0e8821d3 --- /dev/null +++ b/pubsub/pubsubraw/doc.go @@ -0,0 +1,2 @@ +// pubsubraw pubsub helpers using IPFS Golang implementation +package pubsubraw diff --git a/pubsub/pubsubraw/pubsub.go b/pubsub/pubsubraw/pubsub.go new file mode 100644 index 00000000..c84a7a41 --- /dev/null +++ b/pubsub/pubsubraw/pubsub.go @@ -0,0 +1,146 @@ +package pubsubraw + +import ( + "context" + "io" + "sync" + + p2pcore "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" + p2ppubsub "github.com/libp2p/go-libp2p-pubsub" + "go.opentelemetry.io/otel/api/trace" + "go.uber.org/zap" + + "berty.tech/go-orbit-db/events" + "berty.tech/go-orbit-db/iface" + "berty.tech/go-orbit-db/pubsub" +) + +type psTopic struct { + topic *p2ppubsub.Topic + ps *rawPubSub + topicName string +} + +func (p *psTopic) Publish(ctx context.Context, message []byte) error { + return p.topic.Publish(ctx, message) +} + +func (p *psTopic) Peers(_ context.Context) ([]p2pcore.PeerID, error) { + return p.topic.ListPeers(), nil +} + +func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) { + ph, err := p.topic.EventHandler() + if err != nil { + return nil, err + } + + ch := make(chan events.Event) + + go func() { + for { + evt, err := ph.NextPeerEvent(ctx) + if err != nil { + p.ps.logger.Error("", zap.Error(err)) + return + } + + switch evt.Type { + case p2ppubsub.PeerJoin: + ch <- pubsub.NewEventPeerJoin(evt.Peer) + case p2ppubsub.PeerLeave: + ch <- pubsub.NewEventPeerLeave(evt.Peer) + } + } + }() + + return ch, nil +} + +func (p *psTopic) WatchMessages(ctx context.Context) (<-chan *iface.EventPubSubMessage, error) { + ch := make(chan *iface.EventPubSubMessage) + + sub, err := p.topic.Subscribe() + if err != nil { + return nil, err + } + + go func() { + for { + msg, err := sub.Next(ctx) + if err != nil { + if err == io.EOF { + return + } + + p.ps.logger.Error("error while retrieving pubsub message", zap.Error(err)) + continue + } + + if msg.ReceivedFrom == p.ps.id { + continue + } + + ch <- pubsub.NewEventMessage(msg.Data) + } + }() + + return ch, nil +} + +func (p *psTopic) Topic() string { + return p.topicName +} + +type rawPubSub struct { + logger *zap.Logger + id peer.ID + tracer trace.Tracer + topics map[string]*psTopic + muTopics sync.Mutex + pubsub *p2ppubsub.PubSub +} + +func (c *rawPubSub) TopicSubscribe(_ context.Context, topic string) (iface.PubSubTopic, error) { + c.muTopics.Lock() + defer c.muTopics.Unlock() + + if t, ok := c.topics[topic]; ok { + return t, nil + } + + joinedTopic, err := c.pubsub.Join(topic) + if err != nil { + return nil, err + } + + c.topics[topic] = &psTopic{ + topicName: topic, + topic: joinedTopic, + ps: c, + } + + return c.topics[topic], nil +} + +func NewPubSub(ps *p2ppubsub.PubSub, id peer.ID, logger *zap.Logger, tracer trace.Tracer) iface.PubSubInterface { + if logger == nil { + logger = zap.NewNop() + } + + if tracer == nil { + tracer = trace.NoopTracer{} + } + + return &rawPubSub{ + pubsub: ps, + topics: map[string]*psTopic{}, + id: id, + logger: logger, + tracer: tracer, + } +} + +var _ iface.PubSubInterface = &rawPubSub{} +var _ iface.PubSubTopic = &psTopic{} diff --git a/pubsub/subscription.go b/pubsub/subscription.go deleted file mode 100644 index 0c3487a4..00000000 --- a/pubsub/subscription.go +++ /dev/null @@ -1,136 +0,0 @@ -package pubsub - -import ( - "context" - "fmt" - - "berty.tech/go-orbit-db/events" - "berty.tech/go-orbit-db/pubsub/peermonitor" - coreapi "github.com/ipfs/interface-go-ipfs-core" - iface "github.com/ipfs/interface-go-ipfs-core" - p2pcore "github.com/libp2p/go-libp2p-core" - "github.com/pkg/errors" - "go.opentelemetry.io/otel/api/kv" - "go.opentelemetry.io/otel/api/trace" - "go.uber.org/zap" -) - -type subscription struct { - events.EventEmitter - pubSubSub iface.PubSubSubscription - ipfs coreapi.CoreAPI - id p2pcore.PeerID - logger *zap.Logger - span trace.Span -} - -type Options struct { - Logger *zap.Logger - Tracer trace.Tracer -} - -// NewSubscription Creates a new pub sub subscription -func NewSubscription(ctx context.Context, ipfs coreapi.CoreAPI, topic string, opts *Options) (Subscription, error) { - if opts == nil { - opts = &Options{} - } - - if opts.Logger == nil { - opts.Logger = zap.NewNop() - } - - if opts.Tracer == nil { - opts.Tracer = trace.NoopTracer{} - } - - ctx, span := opts.Tracer.Start(ctx, "pubsub-subscription", trace.WithAttributes(kv.String("pubsub-topic", topic))) - go func() { - <-ctx.Done() - span.End() - }() - - pubSubSub, err := ipfs.PubSub().Subscribe(ctx, topic) - if err != nil { - return nil, err - } - - id, err := ipfs.Key().Self(ctx) - if err != nil { - return nil, errors.Wrap(err, "unable to get id for user") - } - - s := &subscription{ - ipfs: ipfs, - pubSubSub: pubSubSub, - id: id.ID(), - logger: opts.Logger, - span: span, - } - - go s.listener(ctx, pubSubSub, topic) - go s.topicMonitor(ctx, topic) - - return s, nil -} - -func (s *subscription) Close() error { - if err := s.pubSubSub.Close(); err != nil { - s.logger.Error("error while closing subscription", zap.Error(err)) - } - - return nil -} - -func (s *subscription) topicMonitor(ctx context.Context, topic string) { - pm := peermonitor.NewPeerMonitor(ctx, s.ipfs, topic, &peermonitor.NewPeerMonitorOptions{Logger: s.logger}) - - go func() { - for evt := range pm.Subscribe(ctx) { - switch e := evt.(type) { - case *peermonitor.EventPeerJoin: - s.span.AddEvent(ctx, "pubsub-join", kv.String("topic", topic), kv.String("peerid", e.Peer.String())) - s.logger.Debug(fmt.Sprintf("peer %s joined topic %s", e.Peer, topic)) - - case *peermonitor.EventPeerLeave: - s.span.AddEvent(ctx, "pubsub-leave", kv.String("topic", topic), kv.String("peerid", e.Peer.String())) - s.logger.Debug(fmt.Sprintf("peer %s left topic %s", e.Peer, topic)) - } - - s.Emit(ctx, evt) - } - }() - - pm.Start(ctx) - -} - -func (s *subscription) listener(ctx context.Context, subSubscription iface.PubSubSubscription, topic string) { - for { - msg, err := subSubscription.Next(ctx) - if err != nil { - if ctx.Err() == nil { - s.logger.Error("unable to get pub sub message", zap.Error(err)) - } - - break - } - - if msg.From() == s.id { - continue - } - - msgTopic := msg.Topics()[0] - - if topic != msgTopic { - s.logger.Debug("message is from another topic, ignoring") - continue - } - - s.logger.Debug(fmt.Sprintf("got pub sub message from %s", s.id)) - - s.span.AddEvent(ctx, "pubsub-new-message", kv.String("topic", topic), kv.String("peerid", msg.From().String())) - s.Emit(ctx, NewMessageEvent(topic, msg.Data())) - } -} - -var _ Subscription = &subscription{} diff --git a/tests/directchannel_test.go b/tests/directchannel_test.go deleted file mode 100644 index 04badbf7..00000000 --- a/tests/directchannel_test.go +++ /dev/null @@ -1,123 +0,0 @@ -package tests - -import ( - "context" - "fmt" - "testing" - "time" - - orbitdb "berty.tech/go-orbit-db" - "berty.tech/go-orbit-db/accesscontroller" - "berty.tech/go-orbit-db/pubsub/directchannel" - peerstore "github.com/libp2p/go-libp2p-peerstore" - . "github.com/smartystreets/goconvey/convey" - "go.uber.org/zap" -) - -func TestDirectChannel(t *testing.T) { - Convey("orbit-db - Replication using Direct Channel", t, FailureHalts, func(c C) { - var db1, db2 orbitdb.EventLogStore - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) - defer cancel() - - dbPath1, clean := testingTempDir(t, "db1") - defer clean() - - dbPath2, clean := testingTempDir(t, "db2") - defer clean() - - mocknet := testingMockNet(ctx) - - node1, clean := testingIPFSNode(ctx, t, mocknet) - defer clean() - - node2, clean := testingIPFSNode(ctx, t, mocknet) - defer clean() - - ipfs1 := testingCoreAPI(t, node1) - ipfs2 := testingCoreAPI(t, node2) - - zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node1 is %s", node1.Identity.String())) - zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node2 is %s", node2.Identity.String())) - - _, err := mocknet.LinkPeers(node1.Identity, node2.Identity) - c.So(err, ShouldBeNil) - - peerInfo2 := peerstore.PeerInfo{ID: node2.Identity, Addrs: node2.PeerHost.Addrs()} - err = ipfs1.Swarm().Connect(ctx, peerInfo2) - c.So(err, ShouldBeNil) - - peerInfo1 := peerstore.PeerInfo{ID: node1.Identity, Addrs: node1.PeerHost.Addrs()} - err = ipfs2.Swarm().Connect(ctx, peerInfo1) - c.So(err, ShouldBeNil) - - orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{ - Directory: &dbPath1, - DirectChannelFactory: directchannel.InitDirectChannelFactory(node1.PeerHost), - }) - c.So(err, ShouldBeNil) - - defer orbitdb1.Close() - - orbitdb2, err := orbitdb.NewOrbitDB(ctx, ipfs2, &orbitdb.NewOrbitDBOptions{ - Directory: &dbPath2, - DirectChannelFactory: directchannel.InitDirectChannelFactory(node2.PeerHost), - }) - c.So(err, ShouldBeNil) - - defer orbitdb2.Close() - - access := &accesscontroller.CreateAccessControllerOptions{ - Access: map[string][]string{ - "write": { - orbitdb1.Identity().ID, - orbitdb2.Identity().ID, - }, - }, - } - - c.So(err, ShouldBeNil) - - db1, err = orbitdb1.Log(ctx, "replication-tests", &orbitdb.CreateDBOptions{ - Directory: &dbPath1, - AccessController: access, - }) - c.So(err, ShouldBeNil) - - defer db1.Close() - - for _, amount := range []int{1, 10} { - // TODO: find out why this tests fails for 100 entries on CircleCI while having the `-race` flag on - - c.Convey(fmt.Sprintf("replicates database of %d entries", amount), FailureContinues, func(c C) { - db2, err = orbitdb2.Log(ctx, db1.Address().String(), &orbitdb.CreateDBOptions{ - Directory: &dbPath2, - AccessController: access, - }) - c.So(err, ShouldBeNil) - - defer db2.Close() - - infinity := -1 - - for i := 0; i < amount; i++ { - _, err = db1.Add(ctx, []byte(fmt.Sprintf("hello%d", i))) - c.So(err, ShouldBeNil) - } - - items, err := db1.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) - c.So(err, ShouldBeNil) - c.So(len(items), ShouldEqual, amount) - - <-time.After(time.Millisecond * 2000) - items, err = db2.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) - c.So(err, ShouldBeNil) - c.So(len(items), ShouldEqual, amount) - c.So(string(items[0].GetValue()), ShouldEqual, "hello0") - c.So(string(items[len(items)-1].GetValue()), ShouldEqual, fmt.Sprintf("hello%d", amount-1)) - }) - } - }) - -} diff --git a/tests/replicate_test.go b/tests/replicate_test.go index 848cb1e6..128fcd56 100644 --- a/tests/replicate_test.go +++ b/tests/replicate_test.go @@ -6,111 +6,188 @@ import ( "testing" "time" - "berty.tech/go-orbit-db/accesscontroller" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" orbitdb "berty.tech/go-orbit-db" - peerstore "github.com/libp2p/go-libp2p-peerstore" - . "github.com/smartystreets/goconvey/convey" - "go.uber.org/zap" + "berty.tech/go-orbit-db/accesscontroller" + "berty.tech/go-orbit-db/pubsub/directchannel" + "berty.tech/go-orbit-db/pubsub/pubsubraw" ) -func TestReplication(t *testing.T) { - Convey("orbit-db - Replication", t, FailureHalts, func(c C) { - var db1, db2 orbitdb.EventLogStore +func testLogAppendReplicate(t *testing.T, amount int, nodeGen func(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func())) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() - ctx, cancel := context.WithCancel(context.Background()) + dbs := make([]orbitdb.OrbitDB, 2) + dbPaths := make([]string, 2) + mn := testingMockNet(ctx) + + for i := 0; i < 2; i++ { + dbs[i], dbPaths[i], cancel = nodeGen(t, mn, i) defer cancel() + } + + err := mn.LinkAll() + require.NoError(t, err) + + err = mn.ConnectAllButSelf() + require.NoError(t, err) + + access := &accesscontroller.CreateAccessControllerOptions{ + Access: map[string][]string{ + "write": { + dbs[0].Identity().ID, + dbs[1].Identity().ID, + }, + }, + } - dbPath1, clean := testingTempDir(t, "db1") - defer clean() + store0, err := dbs[0].Log(ctx, "replication-tests", &orbitdb.CreateDBOptions{ + Directory: &dbPaths[0], + AccessController: access, + }) + require.NoError(t, err) - dbPath2, clean := testingTempDir(t, "db2") - defer clean() + defer func() { _ = store0.Close() }() - mocknet := testingMockNet(ctx) + store1, err := dbs[1].Log(ctx, store0.Address().String(), &orbitdb.CreateDBOptions{ + Directory: &dbPaths[1], + AccessController: access, + }) + require.NoError(t, err) - node1, clean := testingIPFSNode(ctx, t, mocknet) - defer clean() + defer func() { _ = store1.Close() }() - node2, clean := testingIPFSNode(ctx, t, mocknet) - defer clean() + infinity := -1 - ipfs1 := testingCoreAPI(t, node1) - ipfs2 := testingCoreAPI(t, node2) + for i := 0; i < amount; i++ { + _, err = store0.Add(ctx, []byte(fmt.Sprintf("hello%d", i))) + require.NoError(t, err) + } - zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node1 is %s", node1.Identity.String())) - zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node2 is %s", node2.Identity.String())) + items, err := store0.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) + require.NoError(t, err) + require.Equal(t, amount, len(items)) - _, err := mocknet.LinkPeers(node1.Identity, node2.Identity) - c.So(err, ShouldBeNil) + <-time.After(time.Millisecond * 2000) + items, err = store1.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) + require.NoError(t, err) + require.Equal(t, amount, len(items)) + require.Equal(t, "hello0", string(items[0].GetValue())) + require.Equal(t, fmt.Sprintf("hello%d", amount-1), string(items[len(items)-1].GetValue())) +} - peerInfo2 := peerstore.PeerInfo{ID: node2.Identity, Addrs: node2.PeerHost.Addrs()} - err = ipfs1.Swarm().Connect(ctx, peerInfo2) - c.So(err, ShouldBeNil) +func testDirectChannelNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func()) { + var closeOps []func() - peerInfo1 := peerstore.PeerInfo{ID: node1.Identity, Addrs: node1.PeerHost.Addrs()} - err = ipfs2.Swarm().Connect(ctx, peerInfo1) - c.So(err, ShouldBeNil) + performCloseOps := func() { + for i := len(closeOps) - 1; i >= 0; i-- { + closeOps[i]() + } + } - orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{Directory: &dbPath1}) - c.So(err, ShouldBeNil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + closeOps = append(closeOps, cancel) - defer orbitdb1.Close() + dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i)) + closeOps = append(closeOps, clean) - orbitdb2, err := orbitdb.NewOrbitDB(ctx, ipfs2, &orbitdb.NewOrbitDBOptions{Directory: &dbPath2}) - c.So(err, ShouldBeNil) + node1, clean := testingIPFSNode(ctx, t, mn) + closeOps = append(closeOps, clean) - defer orbitdb2.Close() + ipfs1 := testingCoreAPI(t, node1) + zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node%d is %s", i, node1.Identity.String())) - access := &accesscontroller.CreateAccessControllerOptions{ - Access: map[string][]string{ - "write": { - orbitdb1.Identity().ID, - orbitdb2.Identity().ID, - }, - }, + orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{ + Directory: &dbPath1, + DirectChannelFactory: directchannel.InitDirectChannelFactory(node1.PeerHost), + }) + require.NoError(t, err) + + closeOps = append(closeOps, func() { _ = orbitdb1.Close() }) + + return orbitdb1, dbPath1, performCloseOps +} + +func testRawPubSubNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func()) { + var closeOps []func() + + performCloseOps := func() { + for i := len(closeOps) - 1; i >= 0; i-- { + closeOps[i]() } + } - c.So(err, ShouldBeNil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + closeOps = append(closeOps, cancel) - db1, err = orbitdb1.Log(ctx, "replication-tests", &orbitdb.CreateDBOptions{ - Directory: &dbPath1, - AccessController: access, - }) - c.So(err, ShouldBeNil) + dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i)) + closeOps = append(closeOps, clean) - defer db1.Close() + node1, clean := testingIPFSNode(ctx, t, mn) + closeOps = append(closeOps, clean) - for _, amount := range []int{1, 10} { - // TODO: find out why this tests fails for 100 entries on CircleCI while having the `-race` flag on + ipfs1 := testingCoreAPI(t, node1) + zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node%d is %s", i, node1.Identity.String())) + + orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{ + Directory: &dbPath1, + PubSub: pubsubraw.NewPubSub(node1.PubSub, node1.Identity, nil, nil), + }) + require.NoError(t, err) - c.Convey(fmt.Sprintf("replicates database of %d entries", amount), FailureContinues, func(c C) { - db2, err = orbitdb2.Log(ctx, db1.Address().String(), &orbitdb.CreateDBOptions{ - Directory: &dbPath2, - AccessController: access, - }) - c.So(err, ShouldBeNil) + closeOps = append(closeOps, func() { _ = orbitdb1.Close() }) - defer db2.Close() + return orbitdb1, dbPath1, performCloseOps +} + +func testDefaultNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func()) { + var closeOps []func() + + performCloseOps := func() { + for i := len(closeOps) - 1; i >= 0; i-- { + closeOps[i]() + } + } - infinity := -1 + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + closeOps = append(closeOps, cancel) - for i := 0; i < amount; i++ { - _, err = db1.Add(ctx, []byte(fmt.Sprintf("hello%d", i))) - c.So(err, ShouldBeNil) - } + dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i)) + closeOps = append(closeOps, clean) - items, err := db1.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) - c.So(err, ShouldBeNil) - c.So(len(items), ShouldEqual, amount) + node1, clean := testingIPFSNode(ctx, t, mn) + closeOps = append(closeOps, clean) - <-time.After(time.Millisecond * 2000) - items, err = db2.List(ctx, &orbitdb.StreamOptions{Amount: &infinity}) - c.So(err, ShouldBeNil) - c.So(len(items), ShouldEqual, amount) - c.So(string(items[0].GetValue()), ShouldEqual, "hello0") - c.So(string(items[len(items)-1].GetValue()), ShouldEqual, fmt.Sprintf("hello%d", amount-1)) + ipfs1 := testingCoreAPI(t, node1) + zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node%d is %s", i, node1.Identity.String())) + + orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{ + Directory: &dbPath1, + }) + require.NoError(t, err) + + closeOps = append(closeOps, func() { _ = orbitdb1.Close() }) + + return orbitdb1, dbPath1, performCloseOps +} + +func TestReplication(t *testing.T) { + for _, amount := range []int{ + 1, + 10, + // 100, + } { + for nodeType, nodeGen := range map[string]func(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func()){ + "default": testDefaultNodeGenerator, + "direct-channel": testDirectChannelNodeGenerator, + "raw-pubsub": testRawPubSubNodeGenerator, + } { + t.Run(fmt.Sprintf("replicates database of %d entries with node type %s", amount, nodeType), func(t *testing.T) { + testLogAppendReplicate(t, amount, nodeGen) }) } - }) + } }