Skip to content

Commit

Permalink
rewrite ConnectionMaster, fix a bug with connection being unable to r…
Browse files Browse the repository at this point in the history
…ecover from initial connect failure
  • Loading branch information
norwnd committed Feb 8, 2025
1 parent a70a3c6 commit ed25bd4
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 92 deletions.
12 changes: 9 additions & 3 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ func (conn *wsConn) setConnectionStatus(status ConnectionStatus) {

// connect attempts to establish a websocket connection.
func (conn *wsConn) connect(ctx context.Context) error {
connStatus := Disconnected
initializeConnStatus := func() {
conn.setConnectionStatus(connStatus)
}
defer initializeConnStatus()

dialer := &websocket.Dialer{
HandshakeTimeout: DefaultResponseTimeout,
TLSClientConfig: conn.tlsCfg,
Expand All @@ -256,13 +262,12 @@ func (conn *wsConn) connect(ctx context.Context) error {
ws, _, err := dialer.DialContext(ctx, conn.url(), conn.cfg.ConnectHeaders)
if err != nil {
if isErrorInvalidCert(err) {
conn.setConnectionStatus(InvalidCert)
connStatus = InvalidCert
if len(conn.cfg.Cert) == 0 {
return dex.NewError(ErrCertRequired, err.Error())
}
return dex.NewError(ErrInvalidCert, err.Error())
}
conn.setConnectionStatus(Disconnected)
return err
}

Expand Down Expand Up @@ -311,7 +316,7 @@ func (conn *wsConn) connect(ctx context.Context) error {
conn.ws = ws
conn.wsMtx.Unlock()

conn.setConnectionStatus(Connected)
connStatus = Connected
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
Expand Down Expand Up @@ -538,6 +543,7 @@ func (conn *wsConn) Connect(ctx context.Context) (*sync.WaitGroup, error) {
}()
}

// take care of graceful shutdown
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
Expand Down
10 changes: 4 additions & 6 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ const (
// or a config response field if it should be considered variable.
preimageReqTimeout = 20 * time.Second

// wsMaxAnomalyCount is the maximum websocket connection anomaly after which
// a client receives a notification to check their connectivity.
// wsMaxAnomalyCount is how many anomalies we tolerate before notifying
// user to check if his internet connection is fine(stable).
wsMaxAnomalyCount = 3
// If a client's websocket connection to a server disconnects before
// wsAnomalyDuration since last connect time, the client's websocket
Expand Down Expand Up @@ -5228,8 +5228,6 @@ func (c *Core) initializeDEXConnection(dc *dexConnection, crypter encrypt.Crypte
if dc.IsDown() {
c.log.Warnf("Connection to %v not available for authorization. "+
"It will automatically authorize when it connects.", dc.acct.host)
subject, details := c.formatDetails(TopicDEXDisconnected, dc.acct.host)
c.notify(newConnEventNote(TopicDEXDisconnected, subject, dc.acct.host, comms.Disconnected, details, db.ErrorLevel))
return
}

Expand Down Expand Up @@ -8705,7 +8703,7 @@ func (c *Core) handleConnectEvent(dc *dexConnection, status comms.ConnectionStat
// Increase anomalies count for this connection.
count := atomic.AddUint32(&dc.anomaliesCount, 1)
if count%wsMaxAnomalyCount == 0 {
// Send notification to check connectivity.
// Send notification to ask user to check if his internet is fine(stable).
subject, details := c.formatDetails(TopicDexConnectivity, dc.acct.host)
c.notify(newConnEventNote(TopicDexConnectivity, subject, dc.acct.host, dc.status(), details, db.Poke))
}
Expand Down Expand Up @@ -8733,7 +8731,7 @@ func (c *Core) handleConnectEvent(dc *dexConnection, status comms.ConnectionStat

if dc.broadcastingConnect() {
subject, details := c.formatDetails(topic, dc.acct.host)
dc.notify(newConnEventNote(topic, subject, dc.acct.host, status, details, db.Poke))
c.notify(newConnEventNote(topic, subject, dc.acct.host, status, details, db.Poke))
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/core/locale_ntfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ var originLocale = map[Topic]*translation{
template: intl.Translation{T: "%s", Notes: "args: [host]"},
},
TopicDEXDisconnected: {
subject: intl.Translation{T: "Server disconnect"},
subject: intl.Translation{T: "Server disconnected"},
template: intl.Translation{T: "%s", Notes: "args: [host]"},
},
TopicDexConnectivity: {
subject: intl.Translation{T: "Internet Connectivity"},
template: intl.Translation{T: "Your internet connection to %s is unstable, check your internet connection", Notes: "args: [host]"},
template: intl.Translation{T: "Your connection to %s is unstable, check your internet connection", Notes: "args: [host]"},
},
TopicPenalized: {
subject: intl.Translation{T: "Server has penalized you"},
Expand Down
7 changes: 5 additions & 2 deletions client/webserver/site/src/js/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,11 @@ export default class Application {
}, 6000)
}
// Success and higher severity go to the bell dropdown.
if (note.severity === ntfn.POKE) this.prependPokeElement(note)
else this.prependNoteElement(note)
if (note.severity === ntfn.POKE) {
this.prependPokeElement(note)
} else {
this.prependNoteElement(note)
}

// show desktop notification
ntfn.desktopNotify(note)
Expand Down
200 changes: 121 additions & 79 deletions dex/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package dex

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -89,114 +88,157 @@ type Connector interface {

// ConnectionMaster manages a Connector.
type ConnectionMaster struct {
// connector is the actual connection ConnectionMaster wraps/manages
connector Connector
cancel context.CancelFunc
done atomic.Value // chan struct{}
}

// NewConnectionMaster creates a new ConnectionMaster. The Connect method should
// be used before Disconnect. The On, Done, and Wait methods may be used at any
// time. However, prior to Connect, Wait and Done immediately return and signal
// completion, respectively.
// wg is WaitGroup associated with Connector so that ConnectionMaster is able to tell
// when Connector is done, it's atomic so ConnectionMaster can work in concurrent setting
wg atomic.Value
// ctxCancel is a cancel for ctx passed down to wrapped Connector, it's used to forcefully
// terminate wrapped Connector if necessary, it's atomic so ConnectionMaster can work
// in concurrent setting
ctxCancel atomic.Value
// connectInitiated indicates whether connect attempt has been initiated at least once,
// used for sanity-checks only
connectInitiated atomic.Bool
// connectCompleted indicates whether connect attempt has been completed (note, it doesn't
// necessarily have to succeed since Connector might be doing retries on its side - see
// Connect / ConnectOnce methods for details)
connectCompleted atomic.Bool
// on indicates if the Connector is running (meaning ConnectionMaster instance is still
// relevant to keep around and use)
on atomic.Bool
}

// NewConnectionMaster creates a new ConnectionMaster.
func NewConnectionMaster(c Connector) *ConnectionMaster {
return &ConnectionMaster{
connector: c,
}
}

// Connect connects the Connector, and returns any initial connection error. Use
// Disconnect to shut down the Connector. Even if Connect returns a non-nil
// error, On may report true until Disconnect is called. You would use Connect
// if the wrapped Connector has a reconnect loop to continually attempt to
// establish a connection even if the initial attempt fails. Use ConnectOnce if
// the Connector should be given one chance to connect before being considered
// not to be "on". If the ConnectionMaster is discarded on error, it is not
// important which method is used.
func (c *ConnectionMaster) Connect(ctx context.Context) (err error) {
if c.On() { // probably a bug in the consumer
return errors.New("already running")
// Connect connects the Connector, and returns any initial connection error. Disconnect
// method may be called to shut down the Connector and stop it from automatically retrying
// to connect further (as well as release associated resources).
// Even if Connect returns a non-nil error, On may report true until Disconnect is called.
//
// Connect (or ConnectOnce) must be called at most 1 time per ConnectionMaster instance.
//
// You would use Connect if the wrapped Connector has a reconnect loop to continually
// attempt to establish a connection even if the initial attempt fails. Use ConnectOnce
// if the Connector should be given only one chance to connect.
func (c *ConnectionMaster) Connect(ctx context.Context) error {
// trying to call Connect (or ConnectOnce) multiple times is probably a bug on the
// caller side, sanity check we aren't doing that (panic to spot this sooner)
if c.connectInitiated.Load() {
panic("ConnectionMaster.Connect spotted multiple connect attempts on single ConnectionMaster instance")
}
c.connectInitiated.Store(true)

// Attempt to start the Connector.
defer c.connectCompleted.Store(true)

// prepare dedicated context for wrapped Connector
ctx, cancel := context.WithCancel(ctx)
c.ctxCancel.Store(cancel)

// make an initial attempt to start wrapped Connector, a non-nil error does not indicate
// that wrapped Connector is not running, only that the initial connection attempt has
// failed, hence we have to set c.on to true regardless to signal we didn't give up on
// this connection just yet
wg, err := c.connector.Connect(ctx)
if wg != nil {
// non-nil WaitGroup means Connector can communicate back to us to tell us when it is
// done with this connection (and hence when this ConnectionMaster instance is no
// longer relevant, no longer considered to be on)
go func() {
wg.Wait()
c.on.Store(false) // ConnectionMaster instance is no longer relevant
// release context resources only after wrapped Connector has finished, we have to
// cancel context ourselves here because we cannot rely on ConnectionMaster caller
// to always call Disconnect method just so resources are freed
cancel()
}()
} else {
// we have to initialize c.wg on our own so ConnectionMaster caller(s) can use
// Done/Wait/Disconnect methods regardless of whether connect succeeded or not
wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
<-ctx.Done() // expecting corresponding cancel call to be issued through Disconnect
wg.Done()
}()
}
// note: c.wg must be initialized before c.on is set to true for ConnectionMaster.Done
// method to work properly
c.wg.Store(wg)
c.on.Store(true) // error or not, ConnectionMaster instance is regarded as relevant
if err != nil {
// TODO - work-around for annoying bug that Bison wallet doesn't recover from
//cancel() // no context leak
return fmt.Errorf("connect failure: %w", err)
}
// NOTE: A non-nil error currently does not indicate that the Connector is
// not running, only that the initial connection attempt has failed. As long
// as the WaitGroup is non-nil we need to wait on it. We return the error so
// that the caller may decide to stop it or wait (see ConnectOnce).

// It's running, enable Disconnect.
c.cancel = cancel // caller should synchronize Connect/Disconnect calls

// Done and On may be checked at any time.
done := make(chan struct{})
c.done.Store(done)

go func() { // capture the local variables
wg.Wait()
cancel() // if the Connector just died on its own, don't leak the context
close(done)
}()
return nil
}

return err
}

// ConnectOnce is like Connect, but on error the internal status is updated so
// that the On method returns false. This method may be used if an error from
// the Connector is terminal. The caller may also use Connect if they cancel the
// parent context or call Disconnect.
func (c *ConnectionMaster) ConnectOnce(ctx context.Context) (err error) {
if err = c.Connect(ctx); err != nil {
// If still "On", disconnect.
// c.Disconnect() // no-op if not "On"
if c.cancel != nil {
c.cancel()
<-c.done.Load().(chan struct{}) // wait for Connector
}
// ConnectOnce is similar to Connect but is designed to be used with Connector that is meant
// to try connecting only 1 time (meaning Connector cannot recover from unsuccessful connect
// attempt on his own).
// If ConnectOnce returns nil error - Disconnect method must be called to clean up associated
// resources.
func (c *ConnectionMaster) ConnectOnce(ctx context.Context) error {
err := c.Connect(ctx)
if err != nil {
// disconnect here explicitly to prevent any retries wrapped Connector might want to
// do as well as clean up associated resources automatically (it's not customary to
// expect the caller do a cleanup on error cases)
c.Disconnect()
return err
}
return err
return nil
}

// On indicates if the Connector is running (meaning ConnectionMaster instance is still relevant
// to keep around and use).
func (c *ConnectionMaster) On() bool {
return c.on.Load()
}

// Done returns a channel that is closed when the Connector's WaitGroup is done.
// If called before Connect, a closed channel is returned.
// If called before Connect method has finished, a closed channel is returned.
func (c *ConnectionMaster) Done() <-chan struct{} {
done, ok := c.done.Load().(chan struct{})
if ok {
return done
if !c.on.Load() {
closedChan := make(chan struct{})
close(closedChan)
return closedChan
}
done = make(chan struct{})
close(done)
return done
}

// On indicates if the Connector is running. This returns false if never
// connected, or if the Connector has completed shut down.
func (c *ConnectionMaster) On() bool {
select {
case <-c.Done():
return false
default:
return true
}
done := make(chan struct{})
go func() {
wg := c.wg.Load().(*sync.WaitGroup)
wg.Wait()
close(done)
}()

return done
}

// Wait waits for the Connector to shut down. It returns immediately if
// Connect has not been called yet.
// Connect method hasn't finished yet.
func (c *ConnectionMaster) Wait() {
<-c.Done() // let the anon goroutine from Connect return
<-c.Done()
}

// Disconnect closes the connection and waits for shutdown. This must not be
// used before or concurrently with Connect.
// Disconnect closes the connection and waits for shutdown. This method is meant to be used
// only AFTER connection has been established (through Connect or ConnectOnce method).
// It's the caller responsibility to always call it only AFTER Connect method has finished.
func (c *ConnectionMaster) Disconnect() {
if !c.On() {
return
// trying to call Disconnect before finishing connection establishing is probably
// a bug on the caller side, sanity check we aren't doing that (panic to spot this sooner)
if !c.connectCompleted.Load() {
panic("ConnectionMaster.Disconnect called BEFORE connection has completed")
}
c.cancel()

c.on.Store(false) // ConnectionMaster instance is no longer relevant

cancel := c.ctxCancel.Load().(context.CancelFunc)
cancel()
c.Wait()
}

0 comments on commit ed25bd4

Please sign in to comment.