Skip to content

Commit

Permalink
Send create connection after disconnect from management plane (#967)
Browse files Browse the repository at this point in the history
  • Loading branch information
aphralG authored and sean-breen committed Jan 28, 2025
1 parent e909d03 commit 0100645
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 23 deletions.
55 changes: 39 additions & 16 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ package command
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"sync/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cenkalti/backoff/v4"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
Expand Down Expand Up @@ -40,11 +44,11 @@ type (
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
instances []*mpi.Instance
resource *mpi.Resource
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
instancesMutex sync.Mutex
resourceMutex sync.Mutex
}
)

Expand All @@ -63,7 +67,7 @@ func NewCommandService(
isConnected: isConnected,
subscribeChannel: subscribeChannel,
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
instances: []*mpi.Instance{},
resource: &mpi.Resource{},
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -130,9 +134,9 @@ func (cs *CommandService) UpdateDataPlaneStatus(
}
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()
cs.resourceMutex.Lock()
defer cs.resourceMutex.Unlock()
cs.resource = resource

return err
}
Expand Down Expand Up @@ -259,9 +263,9 @@ func (cs *CommandService) CreateConnection(

cs.isConnected.Store(true)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()
cs.resourceMutex.Lock()
defer cs.resourceMutex.Unlock()
cs.resource = resource

return response, nil
}
Expand Down Expand Up @@ -405,14 +409,15 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
var err error
cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to create subscribe client", "error", err)
subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client")
cs.subscribeClientMutex.Unlock()

return err
return subscribeErr
}

if cs.subscribeClient == nil {
cs.subscribeClientMutex.Unlock()

return errors.New("subscribe service client not initialized yet")
}
}
Expand All @@ -421,10 +426,9 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {

request, recvError := cs.subscribeClient.Recv()
if recvError != nil {
slog.ErrorContext(ctx, "Failed to receive message from subscribe stream", "error", recvError)
cs.subscribeClient = nil

return recvError
return cs.handleSubscribeError(ctx, recvError, "receive message from subscribe stream")
}

if cs.isValidRequest(ctx, request) {
Expand All @@ -440,6 +444,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
}
}

func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, errorMsg string) error {
codeError, ok := status.FromError(err)

if ok && codeError.Code() == codes.Unavailable {
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+
"Trying create connection rpc", errorMsg), "error", err)
_, connectionErr := cs.CreateConnection(ctx, cs.resource)
if connectionErr != nil {
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
}

return nil
}

slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err)

return err
}

func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()
Expand Down Expand Up @@ -484,13 +507,13 @@ func (cs *CommandService) checkIfInstanceExists(
) bool {
instanceFound := false

cs.instancesMutex.Lock()
for _, instance := range cs.instances {
cs.resourceMutex.Lock()
for _, instance := range cs.resource.GetInstances() {
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
instanceFound = true
}
}
cs.instancesMutex.Unlock()
cs.resourceMutex.Unlock()

if !instanceFound {
slog.WarnContext(
Expand Down
12 changes: 6 additions & 6 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
)

nginxInstance := protos.GetNginxOssInstance([]string{})
commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()
commandService.resourceMutex.Lock()
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
commandService.resourceMutex.Unlock()

defer commandService.CancelSubscription(ctx)

Expand Down Expand Up @@ -414,9 +414,9 @@ func TestCommandService_isValidRequest(t *testing.T) {

nginxInstance := protos.GetNginxOssInstance([]string{})

commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()
commandService.resourceMutex.Lock()
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
commandService.resourceMutex.Unlock()

testCases := []struct {
req *mpi.ManagementPlaneRequest
Expand Down
4 changes: 3 additions & 1 deletion internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc

added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers)

slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
if updateError != nil {
slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
}

return added, updated, deleted, createPlusAPIError(updateError)
}
Expand Down
28 changes: 28 additions & 0 deletions test/integration/grpc_management_plane_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,34 @@ func setupLocalEnvironment(tb testing.TB) {
}(tb)
}

func TestGrpc_Reconnection(t *testing.T) {
ctx := context.Background()
teardownTest := setupConnectionTest(t, false, false)
defer teardownTest(t)

timeout := 15 * time.Second

originalID := verifyConnection(t, 2)

stopErr := mockManagementPlaneGrpcContainer.Stop(ctx, &timeout)

require.NoError(t, stopErr)

startErr := mockManagementPlaneGrpcContainer.Start(ctx)
require.NoError(t, startErr)

ipAddress, err := mockManagementPlaneGrpcContainer.Host(ctx)
require.NoError(t, err)
ports, err := mockManagementPlaneGrpcContainer.Ports(ctx)
require.NoError(t, err)
mockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9093/tcp"][0].HostPort)

time.Sleep(5 * time.Second)

currentID := verifyConnection(t, 2)
assert.Equal(t, originalID, currentID)
}

// Verify that the agent sends a connection request and an update data plane status request
func TestGrpc_StartUp(t *testing.T) {
teardownTest := setupConnectionTest(t, true, false)
Expand Down

0 comments on commit 0100645

Please sign in to comment.