Skip to content

Commit

Permalink
fix: direct channel and peer monitor (#48)
Browse files Browse the repository at this point in the history
Signed-off-by: Guillaume Louvigny <glouvigny@users.noreply.github.com>
  • Loading branch information
glouvigny authored Jul 9, 2020
1 parent 5a52900 commit ab1bcab
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
6 changes: 4 additions & 2 deletions baseorbitdb/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,8 +888,10 @@ func (o *orbitDB) pubSubChanListener(ctx context.Context, ps pubsub.Subscription
o.logger.Debug(fmt.Sprintf("Error while syncing heads for %s:", addr))
}
case *peermonitor.EventPeerJoin:
o.onNewPeerJoined(ctx, evt.Peer, addr)
o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
go func() {
o.onNewPeerJoined(ctx, evt.Peer, addr)
o.logger.Debug(fmt.Sprintf("peer %s joined from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
}()

case *peermonitor.EventPeerLeave:
o.logger.Debug(fmt.Sprintf("peer %s left from %s self is %s", evt.Peer.String(), addr, o.PeerID()))
Expand Down
25 changes: 15 additions & 10 deletions pubsub/directchannel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"strings"
"sync"
"time"

p2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -30,15 +31,16 @@ type channel struct {
receiverID p2pcore.PeerID
logger *zap.Logger
holder *channelHolder
stream network.Stream
stream io.Writer
muStream sync.Mutex
}

func (c *channel) Send(ctx context.Context, bytes []byte) error {
c.muStream.Lock()
defer c.muStream.Unlock()
stream := c.stream
c.muStream.Unlock()

if c.stream == nil {
if stream == nil {
return fmt.Errorf("stream is not opened")
}

Expand All @@ -49,19 +51,18 @@ func (c *channel) Send(ctx context.Context, bytes []byte) error {
b := make([]byte, 2)
binary.LittleEndian.PutUint16(b, uint16(len(bytes)))

if _, err := c.stream.Write(b); err != nil {
if _, err := stream.Write(b); err != nil {
return err
}

_, err := c.stream.Write(bytes)
_, err := stream.Write(bytes)
return err
}

func (c *channel) Close() error {
c.muStream.Lock()
defer c.muStream.Unlock()

c.stream = nil
c.muStream.Unlock()

return nil
}
Expand All @@ -74,6 +75,7 @@ func (c *channel) Connect(ctx context.Context) error {
)

if strings.Compare(c.holder.host.ID().String(), c.receiverID.String()) < 0 {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
id = c.holder.hostProtocolID(c.receiverID)
c.holder.muExpected.Lock()
streamChan := c.holder.expectedPID[id]
Expand All @@ -83,8 +85,11 @@ func (c *channel) Connect(ctx context.Context) error {
case stream = <-streamChan:
// nothing else to do, stream is acquired
case <-ctx.Done():
cancel()
return fmt.Errorf("unable to create stream, err: %w", ctx.Err())
}

cancel()
} else {
id = c.holder.hostProtocolID(c.holder.host.ID())
stream, err = c.holder.host.NewStream(ctx, c.receiverID, id)
Expand Down Expand Up @@ -169,9 +174,8 @@ func (h *channelHolder) NewChannel(ctx context.Context, receiver p2pcore.PeerID,
go func() {
<-ctx.Done()
h.muExpected.Lock()
defer h.muExpected.Unlock()

delete(h.expectedPID, id)
h.muExpected.Unlock()
}()
}

Expand All @@ -192,13 +196,14 @@ func (h *channelHolder) checkExpectedStream(s string) bool {

func (h *channelHolder) incomingConnHandler(stream network.Stream) {
h.muExpected.Lock()
defer h.muExpected.Unlock()

ch, ok := h.expectedPID[stream.Protocol()]
if !ok {
return
}

h.muExpected.Unlock()

ch <- stream
}

Expand Down

0 comments on commit ab1bcab

Please sign in to comment.