Skip to content

Commit

Permalink
Merge branch 'main' into websocket-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
equalsgibson authored Mar 12, 2024
2 parents a162ab5 + 1138759 commit 93bcbe4
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 37 deletions.
128 changes: 93 additions & 35 deletions five9/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package five9

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -44,7 +43,39 @@ func (a *authenticationState) requestWithAuthentication(request *http.Request, t
request.URL.Path = strings.ReplaceAll(request.URL.Path, ":userID", string(login.UserID))
request.URL.Path = strings.ReplaceAll(request.URL.Path, ":organizationID", string(login.OrgID))

return a.client.request(request, target)
var latestAttemptErr error
tries := 0
for tries < 3 {
tries++
latestAttemptErr = a.client.request(request, target)
if latestAttemptErr != nil {
if five9Error, ok := latestAttemptErr.(*Error); ok {
if five9Error.StatusCode == http.StatusUnauthorized {
// The login is not registered by other endpoints for a short time.
// I think this has to do with Five9 propagating the session across their data centers.
// We login using the app.five9.com domain but then make subsequent calls to the data center specific domain
time.Sleep(time.Second * 2)

continue
}

// Five9 reply with Status 435 if a service has been migrated. This is not an official status code, so check directly.
if five9Error.StatusCode == int(435) {
// Clear out the login state
a.loginMutex.Lock()
defer a.loginMutex.Unlock()

a.loginResponse = nil

return latestAttemptErr
}
}
}

return nil
}

return latestAttemptErr
}

func (a *authenticationState) getLogin(
Expand Down Expand Up @@ -101,6 +132,23 @@ func (a *authenticationState) getLogin(
if err := a.handleMaintenanceNotices(ctx); err != nil {
return nil, err
}

case five9types.UserLoginStateRelogin: // Can occur if the service has been migrated
if err := a.endpointRestartSession(ctx); err != nil {
return nil, err
}

// Check the login state after restarting the session
newLoginState, err := a.endpointGetLoginState(ctx)
if err != nil {
return nil, err
}

if newLoginState == five9types.UserLoginStateAcceptNotice {
if err := a.handleMaintenanceNotices(ctx); err != nil {
return nil, err
}
}
}

return a.loginResponse, nil
Expand Down Expand Up @@ -140,42 +188,25 @@ func (a *authenticationState) endpointGetLoginState(ctx context.Context) (five9t

var target five9types.UserLoginState

tries := 0
for tries < 3 {
tries++

request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf(
"/%s/%s/:userID/login_state",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return "", err
}

if err := a.requestWithAuthentication(request, &target); err != nil {
five9Error, ok := err.(*Error)
if ok && five9Error.StatusCode == http.StatusUnauthorized {
// The login is not registered by other endpoints for a short time.
// I think this has to do with Five9 propagating the session across their data centers.
// We login using the app.five9.com domain but then make subsequent calls to the data center specific domain
time.Sleep(time.Second * 2)

continue
}

return "", err
}
request, err := http.NewRequestWithContext(
ctx,
http.MethodGet,
fmt.Sprintf(
"/%s/%s/:userID/login_state",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return "", err
}

return target, nil
if err := a.requestWithAuthentication(request, &target); err != nil {
return "", err
}

return "", errors.New("Five9 login timeout")
return target, nil
}

func (a *authenticationState) endpointStartSession(ctx context.Context) error {
Expand Down Expand Up @@ -208,6 +239,33 @@ func (a *authenticationState) endpointStartSession(ctx context.Context) error {
return nil
}

func (a *authenticationState) endpointRestartSession(ctx context.Context) error {
path := agentAPIPath
if a.apiContextPath == supervisorAPIContextPath {
path = supervisorAPIPath
}

request, err := http.NewRequestWithContext(
ctx,
http.MethodPut,
fmt.Sprintf(
"/%s/%s/:userID/session_restart",
a.apiContextPath,
path,
),
http.NoBody,
)
if err != nil {
return err
}

if err := a.requestWithAuthentication(request, nil); err != nil {
return err
}

return nil
}

func (a *authenticationState) handleMaintenanceNotices(ctx context.Context) error {
notices, err := a.endpointGetMaintenanceNotices(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion five9/five9types/UserLoginState.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type WebSocketIncrementalACDStateData struct {
DataSource DataSource `json:"dataSource"`
Added []ACDState `json:"added"`
Updated []ACDState `json:"updated"`
Removed []UserID `json:"removed"`
Removed []QueueID `json:"removed"`
}

type WebSocketStatisticsAgentStateData struct {
Expand Down
1 change: 1 addition & 0 deletions five9/five9types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
UserLoginStateWorking UserLoginState = "WORKING"
UserLoginStateSelectStation UserLoginState = "SELECT_STATION"
UserLoginStateAcceptNotice UserLoginState = "ACCEPT_NOTICE"
UserLoginStateRelogin UserLoginState = "RELOGIN"
)

const (
Expand Down
33 changes: 32 additions & 1 deletion five9/supervisor_websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (s *SupervisorService) handlerIncrementalStatsUpdate(payload any) error {
return err
}

if dataSource == five9types.DataSourceAgentState {
switch dataSource {
// ** //
case five9types.DataSourceAgentState:
eventTarget := five9types.WebSocketIncrementalAgentStateData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
Expand All @@ -105,6 +107,19 @@ func (s *SupervisorService) handlerIncrementalStatsUpdate(payload any) error {
if err := s.handleAgentStateUpdate(eventTarget); err != nil {
return err
}
// ** //
case five9types.DataSourceACDStatus:
eventTarget := five9types.WebSocketIncrementalACDStateData{}
if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil {
return websocketFrameProcessingError{
OriginalError: err,
MessageBytes: payloadItemBytes,
}
}

if err := s.handleACDStateUpdate(eventTarget); err != nil {
return err
}
}
}

Expand Down Expand Up @@ -212,3 +227,19 @@ func (s *SupervisorService) handleAgentStateUpdate(eventData five9types.WebSocke

return nil
}

func (s *SupervisorService) handleACDStateUpdate(eventData five9types.WebSocketIncrementalACDStateData) error {
for _, addedData := range eventData.Added {
s.webSocketCache.acdState.Update(addedData.ID, addedData)
}

for _, updatedData := range eventData.Updated {
s.webSocketCache.acdState.Update(updatedData.ID, updatedData)
}

for _, removedID := range eventData.Removed {
s.webSocketCache.acdState.Delete(removedID)
}

return nil
}

0 comments on commit 93bcbe4

Please sign in to comment.