Skip to content

Commit

Permalink
fixes bug in orchestrate
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-hanna committed Oct 8, 2019
1 parent 1c5e4bf commit ae81699
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 59 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Beijing, China (2007) 301 – 310
```

The metrics computed are:
1. TotalNanoTime - the time (in nano seconds) for the message to propogate the network
2. LastDeliveryHop - the hop count of the last message that is delivered by a gossip protocol or, in other words, is the maximum number of hops that a message must be forwarded in the overlay before it is delivered.
3. RelativeMessageRedundancy - RelativeMessageRedundancy (RMR) this metric measures the messages overhead in a gossip protocol. It is defined as: (m / (n - 1)) - 1. where m is the total number of payload messages exchanged during the broadcast procedure and n is the total number of nodes that received that broadcast. This metric is only applicable when at least 2 nodes receive the message. A RMR value of zero means that there is exactly one payload message exchange for each node in the system, which is clearly the optimal value. By opposition, high values of RMR are indicative of a broadcast strategy that promotes a poor network usage. Note that it is possible to achieve a very low RMR by failing to be reliable. Thus the aim is to combine low RMR values with high reliability. Furthermore, RMR values are only comparable for protocols that exhibit similar reliability. Finally, note that in pure gossip approaches, RMR is closely related with the protocol fanout, as it tends to fanout−1.
1. **TotalNanoTime** - the time (in nano seconds) for the message to propogate the network
2. **LastDeliveryHop** - the hop count of the last message that is delivered by a gossip protocol or, in other words, is the maximum number of hops that a message must be forwarded in the overlay before it is delivered.
3. **RelativeMessageRedundancy** - RelativeMessageRedundancy (RMR) this metric measures the messages overhead in a gossip protocol. It is defined as: (m / (n - 1)) - 1. where m is the total number of payload messages exchanged during the broadcast procedure and n is the total number of nodes that received that broadcast. This metric is only applicable when at least 2 nodes receive the message. A RMR value of zero means that there is exactly one payload message exchange for each node in the system, which is clearly the optimal value. By opposition, high values of RMR are indicative of a broadcast strategy that promotes a poor network usage. Note that it is possible to achieve a very low RMR by failing to be reliable. Thus the aim is to combine low RMR values with high reliability. Furthermore, RMR values are only comparable for protocols that exhibit similar reliability. Finally, note that in pure gossip approaches, RMR is closely related with the protocol fanout, as it tends to fanout−1.

The commands availabe are:

Expand Down
81 changes: 46 additions & 35 deletions cmd/orchestrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func setup() *cobra.Command {
defer cancel()

var hostAddrs []string
eChan := make(chan error)
started := make(chan struct{})
if !conf.Orchestra.OmitSubnet {
sConf := config.BuildSubnetConfig(conf)

Expand All @@ -92,13 +94,20 @@ func setup() *cobra.Command {

// start the subnet
if !conf.Orchestra.OmitSubnet {
if err = snet.Start(); err != nil {
logger.Errorf("err starting subnet\n%v", err)
return err
}
go func(s *subnet.Subnet, e chan error) {
if err = s.Start(started); err != nil {
logger.Errorf("err starting subnet\n%v", err)
e <- err
}
}(snet, eChan)
}

select {
case <-started:
logger.Info("subnet started")
}

hostAddrs = snet.Addresses()
hostAddrs = snet.RPCAddresses()
}

if conf.Orchestra.OmitSubnet {
Expand All @@ -113,37 +122,39 @@ func setup() *cobra.Command {
ticker := time.NewTicker(time.Duration(conf.Orchestra.MessageNanoSecondInterval) * time.Nanosecond)
defer ticker.Stop()

eChan := make(chan error)
select {
case <-stop:
// note: I don't like '^C' showing up on the same line as the next logged line...
fmt.Println("")
logger.Info("Received stop signal from os. Shutting down...")

case <-ticker.C:
go func() {
id, err := uuid.NewRandom()
if err != nil {
logger.Errorf("err generating uuid:\n%v", err)
eChan <- err
}

peerIdx := randBetween(0, len(hostAddrs)-1)
peer := hostAddrs[peerIdx]

logger.Infof("sending message to %s for gossip", peer)
if err := client.Gossip(id[:], conf.Orchestra.MessageLocation, peer, conf.Orchestra.MessageByteSize, conf.Orchestra.ClientTimeoutSeconds); err != nil {
logger.Fatalf("err sending messages\n%v", err)
eChan <- err
}
}()

case err := <-eChan:
logger.Errorf("received err on channel:\n%v", err)
return err
logger.Warnf("peers: %v\ninterval: %d", hostAddrs, conf.Orchestra.MessageNanoSecondInterval)

for {
select {
case <-stop:
// note: I don't like '^C' showing up on the same line as the next logged line...
fmt.Println("")
logger.Info("Received stop signal from os. Shutting down...")
return nil

case <-ticker.C:
go func(peers []string, c config.Config, e chan error) {
id, err := uuid.NewRandom()
if err != nil {
logger.Errorf("err generating uuid:\n%v", err)
eChan <- err
}

peerIdx := randBetween(0, len(peers)-1)
peer := peers[peerIdx]

logger.Infof("sending message to %s for gossip", peer)
if err := client.Gossip([]byte(id.String()), c.Orchestra.MessageLocation, peer, c.Orchestra.MessageByteSize, c.Orchestra.ClientTimeoutSeconds); err != nil {
logger.Fatalf("err sending messages\n%v", err)
e <- err
}
}(hostAddrs, conf, eChan)

case err := <-eChan:
logger.Errorf("received err on channel:\n%v", err)
return err
}
}

return nil
},
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/grpc/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/agencyenterprise/gossip-host/pkg/logger"
pb "github.com/agencyenterprise/gossip-host/pkg/pb/publisher"

"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/ptypes/empty"
ipfsaddr "github.com/ipfs/go-ipfs-addr"
peerstore "github.com/libp2p/go-libp2p-peerstore"
Expand Down Expand Up @@ -46,7 +45,7 @@ func (h *Host) Listen(ctx context.Context, addr string) error {
// PublishMessage implements
func (h *Host) PublishMessage(ctx context.Context, in *pb.Message) (*pb.PublishReply, error) {
logger.Info("received rpc message; will now publish to subscribers")
spew.Dump(in)
//spew.Dump(in)

bs, err := in.XXX_Marshal(nil, true)
if err != nil {
Expand Down
35 changes: 19 additions & 16 deletions pkg/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (h *Host) IPFSAddresses() []string {
return addresses
}

// RPCAddress returns the host rpc address
func (h *Host) RPCAddress() string {
return h.conf.Host.RPCAddress
}

// Connect connects the host to the list of peers
// note: it expects the peers to be in IPFS form
func (h *Host) Connect(peers []string) error {
Expand Down Expand Up @@ -209,22 +214,20 @@ func (h *Host) BuildPubSub() (*pubsub.PubSub, error) {
// BuildRPC returns an rpc service
func (h *Host) BuildRPC(ch chan error, ps *pubsub.PubSub) error {
// Start the RPC server
if !h.conf.Host.OmitRPCServer {
rHost := rpcHost.New(&rpcHost.Props{
Host: h.host,
CH: ch,
PS: ps,
PubsubTopic: pubsubTopic,
CTX: h.ctx,
Shutdown: h.shtDwn,
})
go func(rh *rpcHost.Host, c chan error) {
if err := rh.Listen(h.ctx, h.conf.Host.RPCAddress); err != nil {
logger.Errorf("err listening on rpc:\n%v", err)
c <- err
}
}(rHost, ch)
}
rHost := rpcHost.New(&rpcHost.Props{
Host: h.host,
CH: ch,
PS: ps,
PubsubTopic: pubsubTopic,
CTX: h.ctx,
Shutdown: h.shtDwn,
})
go func(rh *rpcHost.Host, c chan error) {
if err := rh.Listen(h.ctx, h.conf.Host.RPCAddress); err != nil {
logger.Errorf("err listening on rpc:\n%v", err)
c <- err
}
}(rHost, ch)

return nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/host/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/agencyenterprise/gossip-host/pkg/logger"
"github.com/davecgh/go-spew/spew"

pb "github.com/agencyenterprise/gossip-host/pkg/pb/publisher"
peer "github.com/libp2p/go-libp2p-peer"
Expand All @@ -27,7 +26,7 @@ func pubsubHandler(ctx context.Context, hostID peer.ID, sub *pubsub.Subscription
logger.Errorf("err unmarshaling next message:\n%v", err)
continue
}
spew.Dump(msg)
//spew.Dump(msg)

// TODO: how to increment sequence before sending out?
logger.Infof("Pubsub message received: %v,%v,%v,%v,%d,%d", hostID, nxt.GetFrom(), msg.GetId(), binary.BigEndian.Uint64(nxt.GetSeqno()), time.Now().UnixNano(), msg.GetSequence())
Expand Down
14 changes: 13 additions & 1 deletion pkg/subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func New(props *Props) (*Subnet, error) {
}

// Start begins the subnet
func (s *Subnet) Start() error {
func (s *Subnet) Start(started chan struct{}) error {
// parse pubsub cidr
pubsubIP, pubsubNet, err := net.ParseCIDR(s.props.Conf.Subnet.PubsubCIDR)
if err != nil {
Expand Down Expand Up @@ -74,6 +74,7 @@ func (s *Subnet) Start() error {
}(ch, stop, h)
}

started <- struct{}{}
select {
case <-stop:
// note: I don't like '^C' showing up on the same line as the next logged line...
Expand Down Expand Up @@ -104,3 +105,14 @@ func (s *Subnet) Addresses() []string {

return addresses
}

// RPCAddresses returns the host rpc addresses
func (s *Subnet) RPCAddresses() []string {
var addresses []string

for _, host := range s.hosts {
addresses = append(addresses, host.RPCAddress())
}

return addresses
}

0 comments on commit ae81699

Please sign in to comment.