Skip to content

Commit

Permalink
better routing selection
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjfree committed Nov 23, 2022
1 parent 3acfeea commit 58e25c1
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 30 deletions.
63 changes: 50 additions & 13 deletions pkg/routing/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
statusConnected = 1
statusConnecting = 2
statusFailed = 3

// rtt stats
rttAlphaMean = 0.15
rttAlphaVariance = 0.15
)

// Connector interface
Expand Down Expand Up @@ -59,14 +63,22 @@ type BaseConnector struct {
router *Router
}

// rtt stats of port
type rttStats struct {
// start
start time.Time
// mean rtt in ms
mean float32
// variance
variance float32
}

// port is a connect session with a node
type Port struct {
// wire
w wire.Wire
// rtt
rtt int
// last announce
lastAnnounce time.Time
rttStats rttStats
// router
router *Router
// output queue
Expand Down Expand Up @@ -138,9 +150,9 @@ func (c *BaseConnector) setFailed(endpoint string) error {
if ok && state.status == statusConnecting || state.status == statusConnected {
state.status = statusFailed
state.failed += 1
// remove endpoint failed to many times
// remove endpoint failed too many times
if state.failed >= connMaxRetries {
logger.Printf("endpoint %s failed to many times, remove it", endpoint)
logger.Printf("endpoint %s failed too many times, remove it", endpoint)
delete(c.epStats, endpoint)
} else {
c.epStats[endpoint] = state
Expand Down Expand Up @@ -196,14 +208,15 @@ func (c *BaseConnector) newPort(w wire.Wire, reconnect bool) *Port {
}

p := &Port{
w: w,
router: c.router,
output: make(chan message.Packet, portBufferSize),
announce: make(chan message.Routing),
closeFunc: closeFunc,
ctx: ctx,
rtt: 0,
lastAnnounce: time.Now(),
w: w,
router: c.router,
output: make(chan message.Packet, portBufferSize),
announce: make(chan message.Routing),
closeFunc: closeFunc,
ctx: ctx,
rttStats: rttStats{
start: time.Now(),
},
}
go func() {
logger.Printf("handle port(%s) output: %+v", p, p.handleOutput())
Expand Down Expand Up @@ -376,6 +389,30 @@ func (p *Port) String() string {
return fmt.Sprintf("%s@%s", p.w.Endpoint(), p.w.Address().String())
}

func (p *Port) BeginRttTiming() {
p.rttStats.start = time.Now()
}

func (p *Port) EndRttTiming() {
rtt := float32(time.Now().Sub(p.rttStats.start).Milliseconds())
rttVariance := (rtt - p.rttStats.mean) * (rtt - p.rttStats.mean)
p.rttStats.mean = p.rttStats.mean*(1-rttAlphaMean) + rtt*rttAlphaMean
p.rttStats.variance = rttVariance*(1-rttAlphaVariance) + rttVariance*rttAlphaVariance
}

// return true, if port has smaller rtt. law of large number
func (p *Port) Faster(base int) bool {
delta := float32(base) - p.rttStats.mean
if delta > 0 && delta*delta > 9*p.rttStats.variance {
return true
}
return false
}

func (p *Port) Rtt() int {
return int(p.rttStats.mean)
}

// close port
func (p *Port) handleOutput() error {
// close wire when done
Expand Down
43 changes: 26 additions & 17 deletions pkg/routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,34 @@ func (r *Router) RegisterPort(p *Port) error {
return nil
}

// update single routing entry
func (r *Router) updateEntry(myEntry, peerEntry *routingEntry) {
// update routings for entries with smaller metric
if myEntry.metric > peerEntry.metric {
myEntry.port = peerEntry.port
myEntry.metric = peerEntry.metric
myEntry.rtt = peerEntry.rtt
myEntry.updatedAt = time.Now()
return
}
// same metric, select the one with smaller rtt
if myEntry.metric == peerEntry.metric {
if myEntry.port != peerEntry.port && peerEntry.port.Faster(myEntry.rtt-peerEntry.port.Rtt()) {
myEntry.port = peerEntry.port
myEntry.metric = peerEntry.metric
}
myEntry.rtt = peerEntry.rtt
myEntry.updatedAt = time.Now()
}
// TODO: metric + rtt
}

// update routing tables for this port
func (r *Router) UpdateRouting(p *Port, routing message.Routing) error {

// handle routing ack. to get the peer rtt
if routing.Type == message.RoutingRegisterAck {
p.rtt = int(float64(p.rtt)*0.75 + float64(time.Now().Sub(p.lastAnnounce).Milliseconds())*0.25)
p.EndRttTiming()
return nil
}
// log the peer provided networks
Expand All @@ -142,7 +164,7 @@ func (r *Router) UpdateRouting(p *Port, routing message.Routing) error {
port: p,
// inc distance
metric: entry.Metric + 1,
rtt: p.rtt + entry.Rtt,
rtt: p.Rtt() + entry.Rtt,
updatedAt: time.Now(),
}
// routings reach max hops
Expand All @@ -162,20 +184,7 @@ func (r *Router) UpdateRouting(p *Port, routing message.Routing) error {
if n.String() == peerEntry.network.String() {
matched = true
// only update routings for entries with smaller metric
if myEntry.metric > peerEntry.metric {
myEntry.port = peerEntry.port
myEntry.metric = peerEntry.metric
myEntry.rtt = peerEntry.rtt
myEntry.updatedAt = time.Now()
} else if myEntry.metric == peerEntry.metric && myEntry.port != peerEntry.port && myEntry.rtt > peerEntry.rtt {
myEntry.port = peerEntry.port
myEntry.metric = peerEntry.metric
myEntry.rtt = peerEntry.rtt
myEntry.updatedAt = time.Now()
} else if myEntry.metric == peerEntry.metric && myEntry.port == peerEntry.port {
myEntry.rtt = peerEntry.rtt
myEntry.updatedAt = time.Now()
}
r.updateEntry(myEntry, &peerEntry)
break
}
}
Expand Down Expand Up @@ -342,7 +351,7 @@ func (r *Router) handleRouting(p *Port) error {
}
msg := message.Routing{Routings: routings}
// send routings
p.lastAnnounce = time.Now()
p.BeginRttTiming()
if err := p.AnnouceRouting(&msg); err != nil {
return err
}
Expand Down

0 comments on commit 58e25c1

Please sign in to comment.