diff --git a/CHANGELOG.md b/CHANGELOG.md index f50093ef..3cc1160d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Release 0.21.0 + +- New `ListenOptions` field: `WaitForNEstablishedListeners`. Allows specifying that you want at least N listeners to be established before the `Listen` method returns. Defaults to 0. + # Release 0.20.145 - New `Context` API method: `RefreshService`, which allows refreshing a single service, when that's all that's needed. diff --git a/version b/version index 9f4eca25..5320adc1 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.20 +0.21 diff --git a/ziti/edge/conn.go b/ziti/edge/conn.go index 7ebe4bef..99a04a78 100644 --- a/ziti/edge/conn.go +++ b/ziti/edge/conn.go @@ -48,6 +48,7 @@ type RouterConn interface { IsClosed() bool Key() string GetRouterName() string + GetBoolHeader(key int32) bool } type Identifiable interface { @@ -211,6 +212,12 @@ func (d DialOptions) GetConnectTimeout() time.Duration { return d.ConnectTimeout } +func NewListenOptions() *ListenOptions { + return &ListenOptions{ + eventC: make(chan *ListenerEvent, 3), + } +} + type ListenOptions struct { Cost uint16 Precedence Precedence @@ -222,6 +229,11 @@ type ListenOptions struct { ManualStart bool ListenerId string KeyPair *kx.KeyPair + eventC chan *ListenerEvent +} + +func (options *ListenOptions) GetEventChannel() chan *ListenerEvent { + return options.eventC } func (options *ListenOptions) GetConnectTimeout() time.Duration { @@ -231,3 +243,13 @@ func (options *ListenOptions) GetConnectTimeout() time.Duration { func (options *ListenOptions) String() string { return fmt.Sprintf("[ListenOptions cost=%v, max-connections=%v]", options.Cost, options.MaxConnections) } + +type ListenerEventType int + +const ( + ListenerEstablished ListenerEventType = 1 +) + +type ListenerEvent struct { + EventType ListenerEventType +} diff --git a/ziti/edge/messages.go b/ziti/edge/messages.go index 5b263c59..22b9f9c7 100644 --- a/ziti/edge/messages.go +++ b/ziti/edge/messages.go @@ -42,6 +42,7 @@ const ( ContentTypeTraceRouteResponse = 60797 ContentTypeConnInspectRequest = 60798 ContentTypeConnInspectResponse = 60799 + ContentTypeBindSuccess = 60800 ConnIdHeader = 1000 SeqHeader = 1001 @@ -67,6 +68,7 @@ const ( ListenerId = 1021 ConnTypeHeader = 1022 SupportsInspectHeader = 1023 + SupportsBindSuccessHeader = 1024 ErrorCodeInternal = 1 ErrorCodeInvalidApiSession = 2 @@ -208,6 +210,7 @@ func NewDialMsg(connId uint32, token string, callerId string) *channel.Message { func NewBindMsg(connId uint32, token string, pubKey []byte, options *ListenOptions) *channel.Message { msg := newMsg(ContentTypeBind, connId, 0, []byte(token)) msg.PutBoolHeader(SupportsInspectHeader, true) + msg.PutBoolHeader(SupportsBindSuccessHeader, true) if pubKey != nil { msg.Headers[PublicKeyHeader] = pubKey diff --git a/ziti/edge/network/conn.go b/ziti/edge/network/conn.go index ab42eab8..1cffcb9c 100644 --- a/ziti/edge/network/conn.go +++ b/ziti/edge/network/conn.go @@ -190,6 +190,18 @@ func (conn *edgeConn) Accept(msg *channel.Message) { go conn.newChildConnection(msg) } else if msg.ContentType == edge.ContentTypeStateClosed { conn.close(true) + } else if msg.ContentType == edge.ContentTypeBindSuccess { + for entry := range conn.hosting.IterBuffered() { + entry.Val.established.Store(true) + event := &edge.ListenerEvent{ + EventType: edge.ListenerEstablished, + } + select { + case entry.Val.eventC <- event: + default: + logrus.WithFields(edge.GetLoggerFields(msg)).Warn("unable to send listener established event") + } + } } default: logrus.WithFields(edge.GetLoggerFields(msg)).Errorf("invalid connection type: %v", conn.connType) @@ -341,7 +353,7 @@ func (conn *edgeConn) establishServerCrypto(keypair *kx.KeyPair, peerKey []byte, return txHeader, nil } -func (conn *edgeConn) Listen(session *rest_model.SessionDetail, service *rest_model.ServiceDetail, options *edge.ListenOptions) (edge.Listener, error) { +func (conn *edgeConn) listen(session *rest_model.SessionDetail, service *rest_model.ServiceDetail, options *edge.ListenOptions) (*edgeListener, error) { logger := pfxlog.ContextLogger(conn.Channel.Label()). WithField("connId", conn.Id()). WithField("serviceName", *service.Name). @@ -356,6 +368,7 @@ func (conn *edgeConn) Listen(session *rest_model.SessionDetail, service *rest_mo token: *session.Token, edgeChan: conn, manualStart: options.ManualStart, + eventC: options.GetEventChannel(), } logger.Debug("adding listener for session") conn.hosting.Set(*session.Token, listener) diff --git a/ziti/edge/network/factory.go b/ziti/edge/network/factory.go index 28c79244..0c6b513a 100644 --- a/ziti/edge/network/factory.go +++ b/ziti/edge/network/factory.go @@ -37,6 +37,11 @@ type routerConn struct { owner RouterConnOwner } +func (conn *routerConn) GetBoolHeader(key int32) bool { + val := conn.ch.Underlay().Headers()[key] + return len(val) == 1 && val[0] == 1 +} + func (conn *routerConn) Key() string { return conn.key } @@ -69,6 +74,7 @@ func (conn *routerConn) BindChannel(binding channel.Binding) error { binding.AddReceiveHandlerF(edge.ContentTypeStateClosed, conn.msgMux.HandleReceive) binding.AddReceiveHandlerF(edge.ContentTypeTraceRoute, conn.msgMux.HandleReceive) binding.AddReceiveHandlerF(edge.ContentTypeConnInspectRequest, conn.msgMux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeBindSuccess, conn.msgMux.HandleReceive) // Since data is the common message type, it gets to be dispatched directly binding.AddTypedReceiveHandler(conn.msgMux) @@ -151,7 +157,7 @@ func (conn *routerConn) Listen(service *rest_model.ServiceDetail, session *rest_ WithField("serviceId", *service.ID). WithField("serviceName", *service.Name) - listener, err := ec.Listen(session, service, options) + listener, err := ec.listen(session, service, options) if err != nil { log.WithError(err).Error("failed to establish listener") @@ -160,6 +166,9 @@ func (conn *routerConn) Listen(service *rest_model.ServiceDetail, session *rest_ Error("failed to cleanup listener for service after failed bind") } } else { + if !conn.GetBoolHeader(edge.SupportsBindSuccessHeader) { + listener.established.Store(true) + } log.Debug("established listener") } return listener, err diff --git a/ziti/edge/network/listener.go b/ziti/edge/network/listener.go index 75f2baaf..98e35c27 100644 --- a/ziti/edge/network/listener.go +++ b/ziti/edge/network/listener.go @@ -91,6 +91,8 @@ type edgeListener struct { token string edgeChan *edgeConn manualStart bool + established atomic.Bool + eventC chan *edge.ListenerEvent } func (listener *edgeListener) UpdateCost(cost uint16) error { @@ -171,6 +173,7 @@ type MultiListener interface { GetServiceName() string GetService() *rest_model.ServiceDetail CloseWithError(err error) + GetEstablishedCount() uint } func NewMultiListener(service *rest_model.ServiceDetail, getSessionF func() *rest_model.SessionDetail) MultiListener { @@ -180,120 +183,134 @@ func NewMultiListener(service *rest_model.ServiceDetail, getSessionF func() *res acceptC: make(chan edge.Conn), errorC: make(chan error), }, - listeners: map[edge.Listener]struct{}{}, - getSessionF: getSessionF, + listeners: map[*edgeListener]struct{}{}, + getSessionF: getSessionF, + listenerEventC: make(chan *edge.ListenerEvent, 3), } } type multiListener struct { baseListener - listeners map[edge.Listener]struct{} + listeners map[*edgeListener]struct{} listenerLock sync.Mutex getSessionF func() *rest_model.SessionDetail listenerEventHandler atomic.Value errorEventHandler atomic.Value + listenerEventC chan *edge.ListenerEvent } -func (listener *multiListener) SetConnectionChangeHandler(handler func([]edge.Listener)) { - listener.listenerEventHandler.Store(handler) +func (self *multiListener) GetEstablishedCount() uint { + var count uint + self.listenerLock.Lock() + defer self.listenerLock.Unlock() + for v := range self.listeners { + if v.established.Load() { + count++ + } + } + return count +} - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() - listener.notifyOfConnectionChange() +func (self *multiListener) SetConnectionChangeHandler(handler func([]edge.Listener)) { + self.listenerEventHandler.Store(handler) + + self.listenerLock.Lock() + defer self.listenerLock.Unlock() + self.notifyOfConnectionChange() } -func (listener *multiListener) GetConnectionChangeHandler() func([]edge.Listener) { - val := listener.listenerEventHandler.Load() +func (self *multiListener) GetConnectionChangeHandler() func([]edge.Listener) { + val := self.listenerEventHandler.Load() if val == nil { return nil } return val.(func([]edge.Listener)) } -func (listener *multiListener) SetErrorEventHandler(handler func(error)) { - listener.errorEventHandler.Store(handler) +func (self *multiListener) SetErrorEventHandler(handler func(error)) { + self.errorEventHandler.Store(handler) } -func (listener *multiListener) GetErrorEventHandler() func(error) { - val := listener.errorEventHandler.Load() +func (self *multiListener) GetErrorEventHandler() func(error) { + val := self.errorEventHandler.Load() if val == nil { return nil } return val.(func(error)) } -func (listener *multiListener) NotifyOfChildError(err error) { +func (self *multiListener) NotifyOfChildError(err error) { pfxlog.Logger().Infof("notify error handler of error: %v", err) - if handler := listener.GetErrorEventHandler(); handler != nil { + if handler := self.GetErrorEventHandler(); handler != nil { handler(err) } } -func (listener *multiListener) notifyOfConnectionChange() { - if handler := listener.GetConnectionChangeHandler(); handler != nil { +func (self *multiListener) notifyOfConnectionChange() { + if handler := self.GetConnectionChangeHandler(); handler != nil { var list []edge.Listener - for k := range listener.listeners { + for k := range self.listeners { list = append(list, k) } go handler(list) } } -func (listener *multiListener) GetCurrentSession() *rest_model.SessionDetail { - return listener.getSessionF() +func (self *multiListener) GetCurrentSession() *rest_model.SessionDetail { + return self.getSessionF() } -func (listener *multiListener) UpdateCost(cost uint16) error { - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() +func (self *multiListener) UpdateCost(cost uint16) error { + self.listenerLock.Lock() + defer self.listenerLock.Unlock() var resultErrors []error - for child := range listener.listeners { + for child := range self.listeners { if err := child.UpdateCost(cost); err != nil { resultErrors = append(resultErrors, err) } } - return listener.condenseErrors(resultErrors) + return self.condenseErrors(resultErrors) } -func (listener *multiListener) UpdatePrecedence(precedence edge.Precedence) error { - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() +func (self *multiListener) UpdatePrecedence(precedence edge.Precedence) error { + self.listenerLock.Lock() + defer self.listenerLock.Unlock() var resultErrors []error - for child := range listener.listeners { + for child := range self.listeners { if err := child.UpdatePrecedence(precedence); err != nil { resultErrors = append(resultErrors, err) } } - return listener.condenseErrors(resultErrors) + return self.condenseErrors(resultErrors) } -func (listener *multiListener) UpdateCostAndPrecedence(cost uint16, precedence edge.Precedence) error { - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() +func (self *multiListener) UpdateCostAndPrecedence(cost uint16, precedence edge.Precedence) error { + self.listenerLock.Lock() + defer self.listenerLock.Unlock() var resultErrors []error - for child := range listener.listeners { + for child := range self.listeners { if err := child.UpdateCostAndPrecedence(cost, precedence); err != nil { resultErrors = append(resultErrors, err) } } - return listener.condenseErrors(resultErrors) + return self.condenseErrors(resultErrors) } -func (listener *multiListener) SendHealthEvent(pass bool) error { - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() +func (self *multiListener) SendHealthEvent(pass bool) error { + self.listenerLock.Lock() + defer self.listenerLock.Unlock() // only send to first child, otherwise we get duplicate event reporting - for child := range listener.listeners { + for child := range self.listeners { return child.SendHealthEvent(pass) } return nil } -func (listener *multiListener) condenseErrors(errors []error) error { +func (self *multiListener) condenseErrors(errors []error) error { if len(errors) == 0 { return nil } @@ -303,44 +320,44 @@ func (listener *multiListener) condenseErrors(errors []error) error { return MultipleErrors(errors) } -func (listener *multiListener) GetServiceName() string { - return *listener.service.Name +func (self *multiListener) GetServiceName() string { + return *self.service.Name } -func (listener *multiListener) GetService() *rest_model.ServiceDetail { - return listener.service +func (self *multiListener) GetService() *rest_model.ServiceDetail { + return self.service } -func (listener *multiListener) AddListener(netListener edge.Listener, closeHandler func()) { - if listener.closed.Load() { +func (self *multiListener) AddListener(netListener edge.Listener, closeHandler func()) { + if self.closed.Load() { return } edgeListener, ok := netListener.(*edgeListener) if !ok { - pfxlog.Logger().Errorf("multi-listener expects only listeners created by the SDK, not %v", reflect.TypeOf(listener)) + pfxlog.Logger().Errorf("multi-listener expects only listeners created by the SDK, not %v", reflect.TypeOf(self)) return } - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() - listener.listeners[edgeListener] = struct{}{} + self.listenerLock.Lock() + defer self.listenerLock.Unlock() + self.listeners[edgeListener] = struct{}{} closer := func() { - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() - delete(listener.listeners, edgeListener) + self.listenerLock.Lock() + defer self.listenerLock.Unlock() + delete(self.listeners, edgeListener) - listener.notifyOfConnectionChange() + self.notifyOfConnectionChange() go closeHandler() } - listener.notifyOfConnectionChange() + self.notifyOfConnectionChange() - go listener.forward(edgeListener, closer) + go self.forward(edgeListener, closer) } -func (listener *multiListener) forward(edgeListener *edgeListener, closeHandler func()) { +func (self *multiListener) forward(edgeListener *edgeListener, closeHandler func()) { defer func() { if err := edgeListener.Close(); err != nil { pfxlog.Logger().Errorf("failure closing edge listener: (%v)", err) @@ -351,24 +368,24 @@ func (listener *multiListener) forward(edgeListener *edgeListener, closeHandler ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() - for !listener.closed.Load() && !edgeListener.closed.Load() { + for !self.closed.Load() && !edgeListener.closed.Load() { select { case conn, ok := <-edgeListener.acceptC: if !ok || conn == nil { // closed, returning return } - listener.accept(conn, ticker) + self.accept(conn, ticker) case <-ticker.C: // lets us check if the listener is closed, and exit if it has } } } -func (listener *multiListener) accept(conn edge.Conn, ticker *time.Ticker) { - for !listener.closed.Load() { +func (self *multiListener) accept(conn edge.Conn, ticker *time.Ticker) { + for !self.closed.Load() { select { - case listener.acceptC <- conn: + case self.acceptC <- conn: return case <-ticker.C: // lets us check if the listener is closed, and exit if it has @@ -376,38 +393,38 @@ func (listener *multiListener) accept(conn edge.Conn, ticker *time.Ticker) { } } -func (listener *multiListener) Close() error { - listener.closed.Store(true) +func (self *multiListener) Close() error { + self.closed.Store(true) - listener.listenerLock.Lock() - defer listener.listenerLock.Unlock() + self.listenerLock.Lock() + defer self.listenerLock.Unlock() var resultErrors []error - for child := range listener.listeners { + for child := range self.listeners { if err := child.Close(); err != nil { resultErrors = append(resultErrors, err) } } - listener.listeners = nil + self.listeners = nil select { - case listener.acceptC <- nil: + case self.acceptC <- nil: default: // If the queue is full, bail out, we're just popping a nil on the // accept queue to let it return from accept more quickly } - return listener.condenseErrors(resultErrors) + return self.condenseErrors(resultErrors) } -func (listener *multiListener) CloseWithError(err error) { +func (self *multiListener) CloseWithError(err error) { select { - case listener.errorC <- err: + case self.errorC <- err: default: } - listener.closed.Store(true) + self.closed.Store(true) } type MultipleErrors []error diff --git a/ziti/options.go b/ziti/options.go index a749c515..5c2d003e 100644 --- a/ziti/options.go +++ b/ziti/options.go @@ -49,13 +49,14 @@ func (d DialOptions) GetConnectTimeout() time.Duration { } type ListenOptions struct { - Cost uint16 - Precedence Precedence - ConnectTimeout time.Duration - MaxConnections int - Identity string - BindUsingEdgeIdentity bool - ManualStart bool + Cost uint16 + Precedence Precedence + ConnectTimeout time.Duration + MaxConnections int + Identity string + BindUsingEdgeIdentity bool + ManualStart bool + WaitForNEstablishedListeners uint } func DefaultListenOptions() *ListenOptions { diff --git a/ziti/ziti.go b/ziti/ziti.go index cab7b2f1..3e16c5a9 100644 --- a/ziti/ziti.go +++ b/ziti/ziti.go @@ -25,6 +25,8 @@ import ( "github.com/openziti/edge-api/rest_client_api_client/authentication" "github.com/openziti/edge-api/rest_client_api_client/service" rest_session "github.com/openziti/edge-api/rest_client_api_client/session" + "github.com/openziti/foundation/v2/concurrenz" + "github.com/openziti/foundation/v2/errorz" apis "github.com/openziti/sdk-golang/edge-apis" "github.com/openziti/secretstream/kx" "math" @@ -1077,19 +1079,18 @@ func (context *ContextImpl) ListenWithOptions(serviceName string, options *Liste if s, ok := context.GetService(serviceName); ok { return context.listenSession(s, options) } - return nil, errors.Errorf("service '%s' not found in ZT", serviceName) + return nil, errors.Errorf("service '%s' not found in ziti network", serviceName) } func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, options *ListenOptions) (edge.Listener, error) { - edgeListenOptions := &edge.ListenOptions{ - Cost: options.Cost, - Precedence: edge.Precedence(options.Precedence), - ConnectTimeout: options.ConnectTimeout, - MaxConnections: options.MaxConnections, - Identity: options.Identity, - BindUsingEdgeIdentity: options.BindUsingEdgeIdentity, - ManualStart: options.ManualStart, - } + edgeListenOptions := edge.NewListenOptions() + edgeListenOptions.Cost = options.Cost + edgeListenOptions.Precedence = edge.Precedence(options.Precedence) + edgeListenOptions.ConnectTimeout = options.ConnectTimeout + edgeListenOptions.MaxConnections = options.MaxConnections + edgeListenOptions.Identity = options.Identity + edgeListenOptions.BindUsingEdgeIdentity = options.BindUsingEdgeIdentity + edgeListenOptions.ManualStart = options.ManualStart if edgeListenOptions.ConnectTimeout == 0 { edgeListenOptions.ConnectTimeout = time.Minute @@ -1099,7 +1100,7 @@ func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, opt edgeListenOptions.MaxConnections = 1 } - if listenerMgr, err := newListenerManager(service, context, edgeListenOptions); err != nil { + if listenerMgr, err := newListenerManager(service, context, edgeListenOptions, options.WaitForNEstablishedListeners); err != nil { return nil, err } else { return listenerMgr.listener, nil @@ -1495,7 +1496,32 @@ func (context *ContextImpl) RemoveZitiMfa(code string) error { return context.CtrlClt.RemoveMfa(code) } -func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, options *edge.ListenOptions) (*listenerManager, error) { +type waitForNHelper struct { + count uint + mgr *listenerManager + notify chan struct{} + closed atomic.Bool +} + +func (self *waitForNHelper) Notify(eventType ListenEventType) { + if eventType == ListenerEstablished && self.mgr.listener.GetEstablishedCount() >= self.count { + if self.closed.CompareAndSwap(false, true) { + close(self.notify) + } + } +} + +func (self *waitForNHelper) WaitForN(timeout time.Duration) error { + select { + case <-time.After(timeout): + return fmt.Errorf("timed out waiting for %v listeners to be established, only had %v", self.count, self.mgr.listener.GetEstablishedCount()) + case <-self.notify: + + } + return nil +} + +func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, options *edge.ListenOptions, waitForN uint) (*listenerManager, error) { now := time.Now() var keyPair *kx.KeyPair @@ -1523,8 +1549,30 @@ func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, listenerMgr.listener = network.NewMultiListener(service, listenerMgr.GetCurrentSession) + var helper *waitForNHelper + if waitForN > 0 { + helper = &waitForNHelper{ + count: waitForN, + mgr: listenerMgr, + notify: make(chan struct{}), + } + listenerMgr.AddObserver(helper) + defer listenerMgr.RemoveObserver(helper) + } + go listenerMgr.run() + if helper != nil { + if err := helper.WaitForN(options.ConnectTimeout); err != nil { + result := errorz.MultipleErrors{} + result = append(result, err) + if closeErr := listenerMgr.listener.Close(); closeErr != nil { + result = append(result, closeErr) + } + return nil, result.ToError() + } + } + return listenerMgr, nil } @@ -1540,6 +1588,21 @@ type listenerManager struct { eventChan chan listenerEvent sessionRefreshTime time.Time disconnectedTime *time.Time + observers concurrenz.CopyOnWriteSlice[ListenEventObserver] +} + +func (mgr *listenerManager) AddObserver(observer ListenEventObserver) { + mgr.observers.Append(observer) +} + +func (mgr *listenerManager) RemoveObserver(observer ListenEventObserver) { + mgr.observers.Delete(observer) +} + +func (mgr *listenerManager) notify(eventType ListenEventType) { + for _, observer := range mgr.observers.Value() { + go observer.Notify(eventType) + } } func (mgr *listenerManager) run() { @@ -1610,6 +1673,8 @@ func (mgr *listenerManager) run() { mgr.refreshSession() case <-ticker.C: mgr.makeMoreListeners() + case <-mgr.options.GetEventChannel(): + mgr.notify(ListenerEstablished) case <-mgr.context.closeNotify: mgr.listener.CloseWithError(errors.New("context closed")) } @@ -1657,6 +1722,12 @@ func (mgr *listenerManager) createListener(routerConnection edge.RouterConn, ses } }) mgr.eventChan <- listenSuccessEvent{} + if !routerConnection.GetBoolHeader(edge.SupportsBindSuccessHeader) { + select { + case mgr.options.GetEventChannel() <- &edge.ListenerEvent{EventType: edge.ListenerEstablished}: + default: + } + } } else { logger.Errorf("creating listener failed after %vms: %v", elapsed.Milliseconds(), err) mgr.listener.NotifyOfChildError(err) @@ -1806,6 +1877,7 @@ func (event *routerConnectionListenFailedEvent) handle(mgr *listenerManager) { if len(mgr.routerConnections) == 0 { mgr.disconnectedTime = &now } + mgr.notify(ListenerRemoved) mgr.makeMoreListeners() } @@ -1819,6 +1891,7 @@ type listenSuccessEvent struct{} func (event listenSuccessEvent) handle(mgr *listenerManager) { mgr.disconnectedTime = nil + mgr.notify(ListenerAdded) } type getSessionEvent struct { @@ -1830,3 +1903,15 @@ func (event *getSessionEvent) handle(mgr *listenerManager) { defer close(event.doneC) event.session = mgr.session } + +type ListenEventType int + +const ( + ListenerAdded ListenEventType = 1 + ListenerEstablished ListenEventType = 2 + ListenerRemoved ListenEventType = 3 +) + +type ListenEventObserver interface { + Notify(eventType ListenEventType) +}