From 1a1880f57362725cfb1c81cf75ac5437f40786ac Mon Sep 17 00:00:00 2001 From: Aaron Ellington Date: Mon, 11 Mar 2024 11:05:05 -0400 Subject: [PATCH 1/3] wip --- five9/authentication.go | 2 +- five9/supervisor_websocket.go | 119 +++++++++++++++++----------------- go.mod | 3 +- go.sum | 2 + 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/five9/authentication.go b/five9/authentication.go index 173f110..8421ed4 100644 --- a/five9/authentication.go +++ b/five9/authentication.go @@ -110,7 +110,7 @@ func (a *authenticationState) endpointLogin(ctx context.Context) (five9types.Log payload := five9types.LoginPayload{ PasswordCredentials: a.client.credentials, AppKey: "web-ui", - Policy: five9types.PolicyAttachExisting, + Policy: five9types.PolicyForceIn, } request, err := http.NewRequestWithContext( diff --git a/five9/supervisor_websocket.go b/five9/supervisor_websocket.go index 18ad5ec..5971d5a 100644 --- a/five9/supervisor_websocket.go +++ b/five9/supervisor_websocket.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/equalsgibson/concur/concur" "github.com/equalsgibson/five9-go/five9/five9types" "github.com/equalsgibson/five9-go/five9/internal/utils" "github.com/google/uuid" @@ -35,13 +36,11 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error { s.resetCache() // If we encounter an error on the WebsocketErr channel, cancel the context, thus cancelling all other goroutines. - ctx, cancel := context.WithCancel(parentCtx) + ctx, cancel := context.WithCancelCause(parentCtx) defer func() { // Clear the cache when closing the connection s.resetCache() - s.webSocketHandler.Close() - cancel() }() login, err := s.authState.getLogin(ctx) @@ -54,20 +53,42 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error { if err := s.webSocketHandler.Connect(ctx, connectionURL, s.authState.client.httpClient); err != nil { return err } + defer s.webSocketHandler.Close() - websocketError := make(chan error) + asyncReader := concur.NewAsyncReader(s.webSocketHandler.Read) + go asyncReader.Loop(ctx) + defer asyncReader.Close() - // Ping intervals + pingTicker := time.NewTicker(time.Second * 5) + defer pingTicker.Stop() go func() { - if err := s.ping(ctx); err != nil { - websocketError <- err + for { + select { + case <-pingTicker.C: + if err := s.ping(ctx); err != nil { + cancel(err) + return + } + case <-ctx.Done(): + return + } } }() - // Pong monitoring + pongMonitorTicker := time.NewTicker(time.Second * 5) + defer pongMonitorTicker.Stop() go func() { - if err := s.pong(ctx); err != nil { - websocketError <- err + for { + select { + case <-pingTicker.C: + if err := s.pong(ctx); err != nil { + cancel(err) + return + // asyncReader.Close() + } + case <-ctx.Done(): + return + } } }() @@ -76,18 +97,25 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error { // When starting a new session, this is called by Five9. Account for rejoining an existing session by also // calling this. if err := s.requestWebSocketFullStatistics(ctx); err != nil { - websocketError <- err + cancel(err) + // asyncReader.Close() } }() - // Forever read the bytes - go func() { - if err := s.read(ctx); err != nil { - websocketError <- err - } - }() + for { + select { + case update := <-asyncReader.Updates(): + if update.Err != nil { + return update.Err + } - return <-websocketError + if err := s.handleWebsocketMessage(update.Item); err != nil { + return err + } + case <-ctx.Done(): + return context.Cause(ctx) + } + } } func (s *SupervisorService) WSAgentState(ctx context.Context) (map[five9types.UserName]five9types.AgentState, error) { @@ -190,60 +218,31 @@ func (s *SupervisorService) WSACDState(ctx context.Context) (map[string]five9typ } func (s *SupervisorService) ping(ctx context.Context) error { - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil { return err } - for { - select { - case <-ticker.C: - if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil { - return err - } - case <-ctx.Done(): - return nil - } - } + return nil } -func (s *SupervisorService) pong(ctx context.Context) error { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived) - if !ok { - return errors.New("could not obtain last pong time from cache") - } - - if time.Since(*lastPongReceived) > time.Second*45 { - return errors.New("last valid ping response from WS is older than 45 seconds, closing connection") - } - case <-ctx.Done(): - return nil - } +func (s *SupervisorService) pong(_ context.Context) error { + lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived) + if !ok { + return errors.New("could not obtain last pong time from cache") } -} -func (s *SupervisorService) read(ctx context.Context) error { - for { - messageBytes, err := s.webSocketHandler.Read(ctx) - if err != nil { - return err - } - - if err := s.handleWebsocketMessage(messageBytes); err != nil { - return err - } + if time.Since(*lastPongReceived) > time.Second*45 { + return errors.New("last valid ping response from WS is older than 45 seconds, closing connection") } + + return nil } func (s *SupervisorService) resetCache() { + s.authState.loginMutex.Lock() + s.authState.loginResponse = nil + s.authState.loginMutex.Unlock() + s.webSocketCache.acdState.Reset() s.webSocketCache.agentState.Reset() s.webSocketCache.agentStatistics.Reset() diff --git a/go.mod b/go.mod index 7c194fa..f961b9e 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/equalsgibson/five9-go -go 1.21 +go 1.22 require ( + github.com/equalsgibson/concur v0.0.1 github.com/google/uuid v1.3.1 nhooyr.io/websocket v1.8.7 ) diff --git a/go.sum b/go.sum index 20f9961..671da6a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/equalsgibson/concur v0.0.1 h1:9Tw0YTfEtHwdTl85bJFP8QG0yc49XLHmghQnHeIrh9w= +github.com/equalsgibson/concur v0.0.1/go.mod h1:03SVNgaFh3J0VAJs9swtEy9DclBhZPIem/eKB4jymWg= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= From 5ddd72797d7278f562107d3e47015fa3006147c0 Mon Sep 17 00:00:00 2001 From: Aaron Ellington Date: Mon, 11 Mar 2024 11:10:07 -0400 Subject: [PATCH 2/3] wip --- example/webSocket/getAgentState.go | 73 ++++++++++++++++++------------ 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/example/webSocket/getAgentState.go b/example/webSocket/getAgentState.go index 692f5c4..e10a06a 100644 --- a/example/webSocket/getAgentState.go +++ b/example/webSocket/getAgentState.go @@ -3,18 +3,31 @@ package main import ( "context" "errors" + "fmt" "log" "net/http" "os" + "runtime" + "runtime/pprof" "time" "github.com/equalsgibson/five9-go/five9" "github.com/equalsgibson/five9-go/five9/five9types" ) +func startConnection(ctx context.Context, c *five9.Service) error { + if err := c.Supervisor().StartWebsocket(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + return fmt.Errorf("websocket exiting, restarting. Here is the error message: %s", err.Error()) + } + } + + return nil +} + func main() { - // Set up a new Five9 service ctx := context.Background() + c := five9.NewService( five9types.PasswordCredentials{ Username: os.Getenv("FIVE9USERNAME"), @@ -27,44 +40,46 @@ func main() { }), ) - apiUserInfo, err := c.Supervisor().GetOwnUserInfo(ctx) - if err != nil { - log.Fatal(err) - } + // Run a function every 5 seconds to obtain some information from the + // supervisor websocket connection. - // Start a websocket connection go func() { - if err := c.Supervisor().StartWebsocket(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - log.Fatalf("Websocket exiting, restarting. Here is the error message: %s", err.Error()) + for range time.NewTicker(time.Second * 5).C { + agents, err := c.Supervisor().WSAgentState(ctx) + if err != nil { + log.Printf("Err: %s", err) + + continue } + + log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+") + log.Printf("Found %d total agents", len(agents)) + log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+") } }() - // Run a function every 5 seconds to obtain some information from the - // supervisor websocket connection. - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - - for range ticker.C { - agents, err := c.Supervisor().WSAgentState(ctx) - if err != nil { - log.Printf("Err: %s", err) - - continue + count := runtime.NumGoroutine() + defer func() { + time.Sleep(time.Second) + diff := runtime.NumGoroutine() - count + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + if diff > 0 { + log.Printf("Too many goruoutes: %d extra (%d now and started with %d)", diff, runtime.NumGoroutine(), count) + } else { + log.Printf("Looking good") } + }() - myUserState, ok := agents[apiUserInfo.UserName] - if !ok { - log.Print("could not find API User ID in agent state map") + errCount := 0 + for { + if err := startConnection(ctx, c); err != nil { + errCount++ + log.Print(err) + time.Sleep(time.Second * 1) + } + if errCount > 3 { return } - - log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+") - log.Printf("Found %d total agents", len(agents)) - log.Printf("Found API User ID and UserName: %s -> %s", apiUserInfo.ID, apiUserInfo.UserName) - log.Printf("Found the API Users Current State: %+v", myUserState.Presence.CurrentState) - log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+") } } From a162ab5ba975fc93143d7870164b20287301fed6 Mon Sep 17 00:00:00 2001 From: Chris Gibson Date: Mon, 11 Mar 2024 16:52:02 +0000 Subject: [PATCH 3/3] Fix go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f961b9e..7175bae 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/equalsgibson/five9-go -go 1.22 +go 1.21.1 require ( github.com/equalsgibson/concur v0.0.1