Skip to content

Commit

Permalink
feat: use low level libp2p pubsub, old peermonitor refactor (#50)
Browse files Browse the repository at this point in the history
Signed-off-by: Guillaume Louvigny <glouvigny@users.noreply.github.com>
  • Loading branch information
glouvigny authored Jul 15, 2020
1 parent d7b1088 commit f126128
Show file tree
Hide file tree
Showing 19 changed files with 636 additions and 850 deletions.
163 changes: 71 additions & 92 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path"
"strings"
"sync"
"time"

ipfslog "berty.tech/go-ipfs-log"
"berty.tech/go-ipfs-log/entry"
Expand All @@ -21,6 +22,7 @@ import (
coreapi "github.com/ipfs/interface-go-ipfs-core"
p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/api/trace"
"go.uber.org/zap"

"berty.tech/go-orbit-db/accesscontroller"
Expand All @@ -30,12 +32,10 @@ import (
"berty.tech/go-orbit-db/cache/cacheleveldown"
"berty.tech/go-orbit-db/iface"
_ "berty.tech/go-orbit-db/internal/buildconstraints" // fail for bad go version
"berty.tech/go-orbit-db/pubsub"
"berty.tech/go-orbit-db/pubsub/oneonone"
"berty.tech/go-orbit-db/pubsub/peermonitor"
"berty.tech/go-orbit-db/pubsub/pubsubcoreapi"
"berty.tech/go-orbit-db/stores"
"berty.tech/go-orbit-db/utils"
"go.opentelemetry.io/otel/api/trace"
)

// OrbitDB An alias of the type defined in the iface package
Expand Down Expand Up @@ -95,6 +95,7 @@ type NewOrbitDBOptions struct {
Logger *zap.Logger
Tracer trace.Tracer
DirectChannelFactory iface.DirectChannelFactory
PubSub iface.PubSubInterface
}

type orbitDB struct {
Expand All @@ -103,7 +104,7 @@ type orbitDB struct {
ipfs coreapi.CoreAPI
identity *idp.Identity
id p2pcore.PeerID
pubsub pubsub.Interface
pubsub iface.PubSubInterface
keystore keystore.Interface
closeKeystore func() error
stores map[string]Store
Expand All @@ -118,7 +119,6 @@ type orbitDB struct {
muStores sync.RWMutex
muIdentity sync.RWMutex
muID sync.RWMutex
muPubSub sync.RWMutex
muIPFS sync.RWMutex
muKeyStore sync.RWMutex
muCaches sync.RWMutex
Expand Down Expand Up @@ -155,13 +155,6 @@ func (o *orbitDB) PeerID() p2pcore.PeerID {
return o.id
}

func (o *orbitDB) PubSub() pubsub.Interface {
o.muPubSub.RLock()
defer o.muPubSub.RUnlock()

return o.pubsub
}

func (o *orbitDB) KeyStore() keystore.Interface {
// TODO: check why o.keystore is never set
o.muKeyStore.RLock()
Expand Down Expand Up @@ -214,19 +207,6 @@ func (o *orbitDB) closeAllStores() {
o.stores = map[string]Store{}
}

func (o *orbitDB) closePubSub() {
o.muPubSub.Lock()
defer o.muPubSub.Unlock()

if o.pubsub != nil {
if err := o.pubsub.Close(); err != nil {
o.logger.Error("unable to close pubsub", zap.Error(err))
}
}

o.pubsub = nil
}

func (o *orbitDB) closeCache() {
o.muCaches.Lock()
defer o.muCaches.Unlock()
Expand Down Expand Up @@ -389,19 +369,15 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
return nil, err
}

ps, err := pubsub.NewPubSub(is, k.ID(), &pubsub.Options{
Logger: options.Logger,
Tracer: options.Tracer,
})
if err != nil {
return nil, err
}

if options.PeerID == nil {
id := k.ID()
options.PeerID = &id
}

if options.PubSub == nil {
options.PubSub = pubsubcoreapi.NewPubSub(is, k.ID(), time.Second, options.Logger, options.Tracer)
}

if options.Cache == nil {
options.Cache = cacheleveldown.New(&cache.Options{Logger: options.Logger})
}
Expand All @@ -414,7 +390,7 @@ func newOrbitDB(ctx context.Context, is coreapi.CoreAPI, identity *idp.Identity,
ipfs: is,
identity: identity,
id: *options.PeerID,
pubsub: ps,
pubsub: options.PubSub,
cache: options.Cache,
directory: *options.Directory,
stores: map[string]Store{},
Expand Down Expand Up @@ -498,7 +474,6 @@ func NewOrbitDB(ctx context.Context, ipfs coreapi.CoreAPI, options *NewOrbitDBOp
func (o *orbitDB) Close() error {
o.closeAllStores()
o.closeDirectConnections()
o.closePubSub()
o.closeCache()
o.closeKeyStore()

Expand Down Expand Up @@ -756,7 +731,7 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd
options.Replicate = boolPtr(true)
}

//options.AccessController = accessController
// options.AccessController = accessController
options.Keystore = o.KeyStore()
options.Cache = c

Expand All @@ -783,38 +758,33 @@ func (o *orbitDB) createStore(ctx context.Context, storeType string, parsedDBAdd
return nil, errors.Wrap(err, "unable to instantiate store")
}

o.storeListener(ctx, store)
topic, err := o.pubsub.TopicSubscribe(ctx, parsedDBAddress.String())
if err != nil {
return nil, errors.Wrap(err, "unable to subscribe to pubsub")
}

o.storeListener(ctx, store, topic)
o.setStore(parsedDBAddress.String(), store)

// Subscribe to pubsub to get updates from peers,
// this is what hooks us into the message propagation layer
// and the p2p network
if ps := o.PubSub(); *options.Replicate && ps != nil {
sub, err := ps.Subscribe(ctx, parsedDBAddress.String())
if err != nil {
return nil, errors.Wrap(err, "unable to subscribe to pubsub")
if *options.Replicate {
if err := o.pubSubChanListener(ctx, store, topic, parsedDBAddress); err != nil {
return nil, err
}

o.pubSubChanListener(ctx, sub, parsedDBAddress)
}

return store, nil
}

func (o *orbitDB) onClose(ctx context.Context, addr cid.Cid) error {
// Unsubscribe from pubsub
if ps := o.PubSub(); ps != nil {
if err := ps.Unsubscribe(addr.String()); err != nil {
return errors.Wrap(err, "unable to unsubscribe from pubsub")
}
}

func (o *orbitDB) onClose(addr cid.Cid) error {
o.deleteStore(addr.String())

return nil
}

func (o *orbitDB) storeListener(ctx context.Context, store Store) {
func (o *orbitDB) storeListener(ctx context.Context, store Store, topic iface.PubSubTopic) {
go func() {
for evt := range store.Subscribe(ctx) {
switch e := evt.(type) {
Expand All @@ -825,14 +795,14 @@ func (o *orbitDB) storeListener(ctx context.Context, store Store) {
continue
}

if ps := o.PubSub(); ps != nil {
if topic != nil {
headsBytes, err := json.Marshal(e.Heads)
if err != nil {
o.logger.Debug(fmt.Sprintf("unable to serialize heads %v", err))
continue
}

err = ps.Publish(ctx, e.Address.String(), headsBytes)
err = topic.Publish(ctx, headsBytes)
if err != nil {
o.logger.Debug(fmt.Sprintf("unable to publish message on pubsub %v", err))
continue
Expand All @@ -845,62 +815,71 @@ func (o *orbitDB) storeListener(ctx context.Context, store Store) {

o.logger.Debug("received stores.close event")

if err := o.onClose(ctx, store.Address().GetRoot()); err != nil {
if err := o.onClose(store.Address().GetRoot()); err != nil {
o.logger.Debug(fmt.Sprintf("unable to perform onClose %v", err))
}
}()
}

func (o *orbitDB) pubSubChanListener(ctx context.Context, ps pubsub.Subscription, addr address.Address) {
go func() {
for e := range ps.Subscribe(ctx) {
o.logger.Debug("Got pub sub message")
switch evt := e.(type) {
case *pubsub.MessageEvent:
addr := evt.Topic
store, ok := o.getStore(addr)

if !ok {
o.logger.Error(fmt.Sprintf("unable to find store for address %s", addr))
continue
}

headsEntriesBytes := evt.Content
var headsEntries []*entry.Entry

err := json.Unmarshal(headsEntriesBytes, &headsEntries)
if err != nil {
o.logger.Error("unable to unmarshal head entries")
}

if len(headsEntries) == 0 {
o.logger.Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr))
}

o.logger.Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr))
func (o *orbitDB) pubSubChanListener(ctx context.Context, store Store, topic iface.PubSubTopic, addr address.Address) error {
chPeers, err := topic.WatchPeers(ctx)
if err != nil {
return err
}

entries := make([]ipfslog.Entry, len(headsEntries))
for i := range headsEntries {
entries[i] = headsEntries[i]
}
chMessages, err := topic.WatchMessages(ctx)
if err != nil {
return err
}

if err := store.Sync(ctx, entries); err != nil {
o.logger.Debug(fmt.Sprintf("Error while syncing heads for %s:", addr))
}
case *peermonitor.EventPeerJoin:
go func() {
for e := range chPeers {
switch evt := e.(type) {
case *iface.EventPubSubJoin:
go func() {
o.onNewPeerJoined(ctx, evt.Peer, addr)
o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
}()

case *peermonitor.EventPeerLeave:
case *iface.EventPubSubLeave:
o.logger.Debug(fmt.Sprintf("peer %s left from %s self is %s", evt.Peer.String(), addr, o.PeerID()))

default:
o.logger.Debug("unhandled event, can't match type")
}
}
}()

go func() {
for evt := range chMessages {
o.logger.Debug("Got pub sub message")

headsEntriesBytes := evt.Content
var headsEntries []*entry.Entry

err := json.Unmarshal(headsEntriesBytes, &headsEntries)
if err != nil {
o.logger.Error("unable to unmarshal head entries")
}

if len(headsEntries) == 0 {
o.logger.Debug(fmt.Sprintf("Nothing to synchronize for %s:", addr))
}

o.logger.Debug(fmt.Sprintf("Received %d heads for %s:", len(headsEntries), addr))

entries := make([]ipfslog.Entry, len(headsEntries))
for i := range headsEntries {
entries[i] = headsEntries[i]
}

if err := store.Sync(ctx, entries); err != nil {
o.logger.Debug(fmt.Sprintf("Error while syncing heads for %s:", addr))
}
}
}()

return nil
}

func (o *orbitDB) onNewPeerJoined(ctx context.Context, p p2pcore.PeerID, addr address.Address) {
Expand Down Expand Up @@ -984,7 +963,7 @@ func (o *orbitDB) watchOneOnOneMessage(ctx context.Context, channel iface.Direct
o.logger.Debug("received one on one message")

switch e := evt.(type) {
case *pubsub.EventPayload:
case *iface.EventPubSubPayload:
heads := &exchangedHeads{}
err := json.Unmarshal(e.Payload, &heads)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/ipfs/interface-go-ipfs-core v0.3.0
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.3.1
github.com/pkg/errors v0.9.1
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
Expand Down
Loading

0 comments on commit f126128

Please sign in to comment.