Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket changes (concur package) #35

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions example/webSocket/getAgentState.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,31 @@ package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/equalsgibson/five9-go/five9"
"github.com/equalsgibson/five9-go/five9/five9types"
)

func startConnection(ctx context.Context, c *five9.Service) error {
if err := c.Supervisor().StartWebsocket(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
return fmt.Errorf("websocket exiting, restarting. Here is the error message: %s", err.Error())
}
}

return nil
}

func main() {
// Set up a new Five9 service
ctx := context.Background()

c := five9.NewService(
five9types.PasswordCredentials{
Username: os.Getenv("FIVE9USERNAME"),
Expand All @@ -27,44 +40,46 @@ func main() {
}),
)

apiUserInfo, err := c.Supervisor().GetOwnUserInfo(ctx)
if err != nil {
log.Fatal(err)
}
// Run a function every 5 seconds to obtain some information from the
// supervisor websocket connection.

// Start a websocket connection
go func() {
if err := c.Supervisor().StartWebsocket(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
log.Fatalf("Websocket exiting, restarting. Here is the error message: %s", err.Error())
for range time.NewTicker(time.Second * 5).C {
agents, err := c.Supervisor().WSAgentState(ctx)
if err != nil {
log.Printf("Err: %s", err)

continue
}

log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
log.Printf("Found %d total agents", len(agents))
log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
}
}()

// Run a function every 5 seconds to obtain some information from the
// supervisor websocket connection.
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for range ticker.C {
agents, err := c.Supervisor().WSAgentState(ctx)
if err != nil {
log.Printf("Err: %s", err)

continue
count := runtime.NumGoroutine()
defer func() {
time.Sleep(time.Second)
diff := runtime.NumGoroutine() - count
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
if diff > 0 {
log.Printf("Too many goruoutes: %d extra (%d now and started with %d)", diff, runtime.NumGoroutine(), count)
} else {
log.Printf("Looking good")
}
}()

myUserState, ok := agents[apiUserInfo.UserName]
if !ok {
log.Print("could not find API User ID in agent state map")
errCount := 0
for {
if err := startConnection(ctx, c); err != nil {
errCount++
log.Print(err)
time.Sleep(time.Second * 1)
}

if errCount > 3 {
return
}

log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
log.Printf("Found %d total agents", len(agents))
log.Printf("Found API User ID and UserName: %s -> %s", apiUserInfo.ID, apiUserInfo.UserName)
log.Printf("Found the API Users Current State: %+v", myUserState.Presence.CurrentState)
log.Println("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
}
}
2 changes: 1 addition & 1 deletion five9/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (a *authenticationState) endpointLogin(ctx context.Context) (five9types.Log
payload := five9types.LoginPayload{
PasswordCredentials: a.client.credentials,
AppKey: "web-ui",
Policy: five9types.PolicyAttachExisting,
Policy: five9types.PolicyForceIn,
}

request, err := http.NewRequestWithContext(
Expand Down
119 changes: 59 additions & 60 deletions five9/supervisor_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/equalsgibson/concur/concur"
"github.com/equalsgibson/five9-go/five9/five9types"
"github.com/equalsgibson/five9-go/five9/internal/utils"
"github.com/google/uuid"
Expand Down Expand Up @@ -35,13 +36,11 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error {
s.resetCache()

// If we encounter an error on the WebsocketErr channel, cancel the context, thus cancelling all other goroutines.
ctx, cancel := context.WithCancel(parentCtx)
ctx, cancel := context.WithCancelCause(parentCtx)

defer func() {
// Clear the cache when closing the connection
s.resetCache()
s.webSocketHandler.Close()
cancel()
}()

login, err := s.authState.getLogin(ctx)
Expand All @@ -54,20 +53,42 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error {
if err := s.webSocketHandler.Connect(ctx, connectionURL, s.authState.client.httpClient); err != nil {
return err
}
defer s.webSocketHandler.Close()

websocketError := make(chan error)
asyncReader := concur.NewAsyncReader(s.webSocketHandler.Read)
go asyncReader.Loop(ctx)
defer asyncReader.Close()

// Ping intervals
pingTicker := time.NewTicker(time.Second * 5)
defer pingTicker.Stop()
go func() {
if err := s.ping(ctx); err != nil {
websocketError <- err
for {
select {
case <-pingTicker.C:
if err := s.ping(ctx); err != nil {
cancel(err)
return
}
case <-ctx.Done():
return
}
}
}()

// Pong monitoring
pongMonitorTicker := time.NewTicker(time.Second * 5)
defer pongMonitorTicker.Stop()
go func() {
if err := s.pong(ctx); err != nil {
websocketError <- err
for {
select {
case <-pingTicker.C:
if err := s.pong(ctx); err != nil {
cancel(err)
return
// asyncReader.Close()
}
case <-ctx.Done():
return
}
}
}()

Expand All @@ -76,18 +97,25 @@ func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error {
// When starting a new session, this is called by Five9. Account for rejoining an existing session by also
// calling this.
if err := s.requestWebSocketFullStatistics(ctx); err != nil {
websocketError <- err
cancel(err)
// asyncReader.Close()
}
}()

// Forever read the bytes
go func() {
if err := s.read(ctx); err != nil {
websocketError <- err
}
}()
for {
select {
case update := <-asyncReader.Updates():
if update.Err != nil {
return update.Err
}

return <-websocketError
if err := s.handleWebsocketMessage(update.Item); err != nil {
return err
}
case <-ctx.Done():
return context.Cause(ctx)
}
}
}

func (s *SupervisorService) WSAgentState(ctx context.Context) (map[five9types.UserName]five9types.AgentState, error) {
Expand Down Expand Up @@ -190,60 +218,31 @@ func (s *SupervisorService) WSACDState(ctx context.Context) (map[string]five9typ
}

func (s *SupervisorService) ping(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil {
return err
}

for {
select {
case <-ticker.C:
if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
return nil
}

func (s *SupervisorService) pong(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived)
if !ok {
return errors.New("could not obtain last pong time from cache")
}

if time.Since(*lastPongReceived) > time.Second*45 {
return errors.New("last valid ping response from WS is older than 45 seconds, closing connection")
}
case <-ctx.Done():
return nil
}
func (s *SupervisorService) pong(_ context.Context) error {
lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived)
if !ok {
return errors.New("could not obtain last pong time from cache")
}
}

func (s *SupervisorService) read(ctx context.Context) error {
for {
messageBytes, err := s.webSocketHandler.Read(ctx)
if err != nil {
return err
}

if err := s.handleWebsocketMessage(messageBytes); err != nil {
return err
}
if time.Since(*lastPongReceived) > time.Second*45 {
return errors.New("last valid ping response from WS is older than 45 seconds, closing connection")
}

return nil
}

func (s *SupervisorService) resetCache() {
s.authState.loginMutex.Lock()
s.authState.loginResponse = nil
s.authState.loginMutex.Unlock()

s.webSocketCache.acdState.Reset()
s.webSocketCache.agentState.Reset()
s.webSocketCache.agentStatistics.Reset()
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/equalsgibson/five9-go

go 1.21
go 1.21.1

require (
github.com/equalsgibson/concur v0.0.1
github.com/google/uuid v1.3.1
nhooyr.io/websocket v1.8.7
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/equalsgibson/concur v0.0.1 h1:9Tw0YTfEtHwdTl85bJFP8QG0yc49XLHmghQnHeIrh9w=
github.com/equalsgibson/concur v0.0.1/go.mod h1:03SVNgaFh3J0VAJs9swtEy9DclBhZPIem/eKB4jymWg=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
Expand Down
Loading