Skip to content

Commit

Permalink
Make session refresh much less aggressive. Fix some logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Nov 14, 2023
1 parent bb13a84 commit 33d77ad
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
9 changes: 8 additions & 1 deletion ziti/edge/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ func (conn *edgeConn) Listen(session *rest_model.SessionDetail, service *rest_mo
if !success {
logger.Debug("removing listener for session")
conn.hosting.Delete(*session.Token)

unbindRequest := edge.NewUnbindMsg(conn.Id(), listener.token)
listener.edgeChan.TraceMsg("close", unbindRequest)
if err := unbindRequest.WithTimeout(5 * time.Second).SendAndWaitForWire(conn.Channel); err != nil {
logger.WithError(err).Error("unable to unbind session for conn")
}

}
}()

Expand Down Expand Up @@ -468,7 +475,7 @@ func (conn *edgeConn) close(closedByRemote bool) {
conn.hosting.Range(func(key, value interface{}) bool {
listener := value.(*edgeListener)
if err := listener.close(closedByRemote); err != nil {
log.WithError(err).WithField("serviceName", listener.service.Name).Error("failed to close listener")
log.WithError(err).WithField("serviceName", *listener.service.Name).Error("failed to close listener")
}
return true
})
Expand Down
4 changes: 3 additions & 1 deletion ziti/edge/network/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (conn *routerConn) Listen(service *rest_model.ServiceDetail, session *rest_
listener, err := ec.Listen(session, service, options)
if err != nil {
if err2 := ec.Close(); err2 != nil {
pfxlog.Logger().Errorf("failed to cleanup listenet for service '%v' (%v)", service.Name, err2)
pfxlog.Logger().WithError(err2).
WithField("serviceName", *service.Name).
Error("failed to cleanup listener for service after failed bind")
}
}
return listener, err
Expand Down
79 changes: 45 additions & 34 deletions ziti/ziti.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ func (context *ContextImpl) connectEdgeRouter(routerName, ingressUrl string, ret
useConn := context.routerConnections.Upsert(ingressUrl, edgeConn,
func(exist bool, oldV edge.RouterConn, newV edge.RouterConn) edge.RouterConn {
if exist { // use the routerConnection already in the map, close new one
pfxlog.Logger().Infof("connection to %s already established, closing duplicate connection", ingressUrl)
go func() {
if err := newV.Close(); err != nil {
pfxlog.Logger().Errorf("unable to close router connection (%v)", err)
Expand Down Expand Up @@ -1293,7 +1294,13 @@ func (context *ContextImpl) getOrCreateSession(serviceId string, sessionType Ses

func (context *ContextImpl) createSessionWithBackoff(service *rest_model.ServiceDetail, sessionType SessionType, options edge.ConnOptions) (*rest_model.SessionDetail, error) {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 50 * time.Millisecond

if sessionType == SessionType(rest_model.DialBindDial) {
expBackoff.InitialInterval = 50 * time.Millisecond
} else {
expBackoff.InitialInterval = time.Second
}

expBackoff.MaxInterval = 10 * time.Second
expBackoff.MaxElapsedTime = options.GetConnectTimeout()

Expand Down Expand Up @@ -1464,7 +1471,11 @@ type listenerManager struct {
}

func (mgr *listenerManager) run() {
mgr.createSessionWithBackoff()
// need to either establish a session, or fail if we can't create one
for mgr.session == nil {
mgr.createSessionWithBackoff()
}

mgr.makeMoreListeners()

if mgr.options.BindUsingEdgeIdentity {
Expand All @@ -1487,18 +1498,38 @@ func (mgr *listenerManager) run() {
}

ticker := time.NewTicker(250 * time.Millisecond)
refreshTicker := time.NewTicker(30 * time.Second)

defer ticker.Stop()
defer refreshTicker.Stop()

refreshSessionTimerInterval := 10 * time.Second

for !mgr.listener.IsClosed() {
var refreshSessionTimer <-chan time.Time
if len(mgr.session.EdgeRouters) == 0 {
refreshSessionTimer = time.After(refreshSessionTimerInterval)
} else if len(mgr.session.EdgeRouters) < mgr.options.MaxConnections {
if refreshSessionTimerInterval < 5*time.Minute {
refreshSessionTimerInterval = 5 * time.Minute
}
refreshSessionTimer = time.After(refreshSessionTimerInterval)
}

if refreshSessionTimer != nil {
refreshSessionTimerInterval *= 2
if refreshSessionTimerInterval > 30*time.Minute {
refreshSessionTimerInterval = 30 * time.Minute
}
} else {
refreshSessionTimerInterval = 10 * time.Second
}

//goland:noinspection GoNilness
select {
case routerConnectionResult := <-mgr.connectChan:
mgr.handleRouterConnectResult(routerConnectionResult)
case event := <-mgr.eventChan:
event.handle(mgr)
case <-refreshTicker.C:
case <-refreshSessionTimer:
mgr.refreshSession()
case <-ticker.C:
mgr.makeMoreListeners()
Expand All @@ -1518,6 +1549,10 @@ func (mgr *listenerManager) handleRouterConnectResult(result *edgeRouterConnResu
if len(mgr.routerConnections) < mgr.options.MaxConnections {
if _, ok := mgr.routerConnections[routerConnection.GetRouterName()]; !ok {
mgr.routerConnections[routerConnection.GetRouterName()] = routerConnection
pfxlog.Logger().
WithField("serviceName", *mgr.service.Name).
WithField("listenerCount", len(mgr.routerConnections)).
Debugf("establishing listener to %s", routerConnection.Key())
go mgr.createListener(routerConnection, mgr.session)
}
} else {
Expand All @@ -1527,7 +1562,7 @@ func (mgr *listenerManager) handleRouterConnectResult(result *edgeRouterConnResu

func (mgr *listenerManager) createListener(routerConnection edge.RouterConn, session *rest_model.SessionDetail) {
start := time.Now()
logger := pfxlog.Logger()
logger := pfxlog.Logger().WithField("serviceName", *mgr.service.Name)
svc := mgr.listener.GetService()
listener, err := routerConnection.Listen(svc, session, mgr.options)
elapsed := time.Since(start)
Expand All @@ -1553,27 +1588,7 @@ func (mgr *listenerManager) createListener(routerConnection edge.RouterConn, ses
}

func (mgr *listenerManager) makeMoreListeners() {
if mgr.listener.IsClosed() {
return
}

// If we don't have any connections and there are no available edge routers, refresh the session more often
if mgr.session == nil || len(mgr.session.EdgeRouters) == 0 && len(mgr.routerConnections) == 0 {
now := time.Now()
if mgr.disconnectedTime.Add(mgr.options.ConnectTimeout).Before(now) {
pfxlog.Logger().Warn("disconnected for longer than configured connect timeout. closing")
err := errors.New("disconnected for longer than connect timeout. closing")
mgr.listener.CloseWithError(err)
return
}

if mgr.sessionRefreshTime.Add(time.Second).Before(now) {
pfxlog.Logger().Warnf("no edge routers available, polling more frequently")
mgr.refreshSession()
}
}

if mgr.session == nil || mgr.listener.IsClosed() || len(mgr.routerConnections) >= mgr.options.MaxConnections || len(mgr.session.EdgeRouters) <= len(mgr.routerConnections) {
if mgr.listener.IsClosed() || len(mgr.routerConnections) >= mgr.options.MaxConnections || len(mgr.session.EdgeRouters) <= len(mgr.routerConnections) {
return
}

Expand All @@ -1600,11 +1615,6 @@ func (mgr *listenerManager) makeMoreListeners() {
}

func (mgr *listenerManager) refreshSession() {
if mgr.session == nil {
mgr.createSessionWithBackoff()
return
}

session, err := mgr.context.refreshSession(*mgr.session.ID)
if err != nil {
var target error = &rest_session.DetailSessionNotFound{}
Expand Down Expand Up @@ -1707,13 +1717,14 @@ type routerConnectionListenFailedEvent struct {
}

func (event *routerConnectionListenFailedEvent) handle(mgr *listenerManager) {
pfxlog.Logger().Debugf("child listener connection closed. parent listener closed: %v", mgr.listener.IsClosed())
delete(mgr.routerConnections, event.router)
pfxlog.Logger().WithField("serviceName", *mgr.service.Name).
WithField("listenerCount", len(mgr.routerConnections)).
Debugf("child listener connection closed. parent listener closed: %v", mgr.listener.IsClosed())
now := time.Now()
if len(mgr.routerConnections) == 0 {
mgr.disconnectedTime = &now
}
mgr.refreshSession()
mgr.makeMoreListeners()
}

Expand Down

0 comments on commit 33d77ad

Please sign in to comment.