From 281313c5be96734e50b3e17e89ed4e5179b0d9bf Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 25 Apr 2024 18:35:29 -0700 Subject: [PATCH] Fixed session expiry with multiple session in a shard (#454) When multiple session are presents on a shards, depending on the key ordering, it was possible that during expiration the index cleanup was also removing index from other session. That resulted in the ephemeral keys for other session not getting removed when those session expired too. --- server/session.go | 22 ++++++- server/session_manager_test.go | 103 +++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 3 deletions(-) diff --git a/server/session.go b/server/session.go index a2573d41..d4f43426 100644 --- a/server/session.go +++ b/server/session.go @@ -16,11 +16,14 @@ package server import ( "context" + "fmt" "log/slog" "net/url" "sync" "time" + "github.com/streamnative/oxia/common" + "github.com/streamnative/oxia/proto" ) @@ -58,7 +61,15 @@ func startSession(sessionId SessionId, sessionMetadata *proto.SessionMetadata, s sm.sessions[sessionId] = s s.ctx, s.cancel = context.WithCancel(context.Background()) - go s.waitForHeartbeats() + + go common.DoWithLabels(s.ctx, map[string]string{ + "oxia": "session", + "client-identity": sessionMetadata.Identity, + "session-id": fmt.Sprintf("%d", sessionId), + "namespace": sm.namespace, + "shard": fmt.Sprintf("%d", sm.shardId), + }, s.waitForHeartbeats) + s.log.Info("Session started", slog.Duration("session-timeout", s.timeout)) return s @@ -112,14 +123,19 @@ func (s *session) delete() error { }) } } + + // Delete the base session metadata + deletes = append(deletes, &proto.DeleteRequest{ + Key: sessionKey, + }) _, err = s.sm.leaderController.Write(context.Background(), &proto.WriteRequest{ ShardId: &s.shardId, Puts: nil, Deletes: deletes, - // Delete the index and the session keys + // Delete the whole index of ephemeral keys for the session DeleteRanges: []*proto.DeleteRangeRequest{ { - StartInclusive: sessionKey, + StartInclusive: sessionKey + "/", EndExclusive: sessionKey + "//", }, }, diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 3da30b1d..ec050868 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -322,6 +322,109 @@ func TestSessionManager(t *testing.T) { assert.NoError(t, walf.Close()) } +func TestMultipleSessionsExpiry(t *testing.T) { + shardId := int64(1) + // Invalid session timeout + kvf, walf, sManager, lc := createSessionManager(t) + + // Create 2 sessions + createResp1, err := sManager.createSession(&proto.CreateSessionRequest{ + ShardId: shardId, + SessionTimeoutMs: uint32(3000), + ClientIdentity: "session-1", + }, 0) + assert.NoError(t, err) + sessionId1 := createResp1.SessionId + + createResp2, err := sManager.createSession(&proto.CreateSessionRequest{ + ShardId: shardId, + SessionTimeoutMs: uint32(50), + ClientIdentity: "session-2", + }, 0) + assert.NoError(t, err) + sessionId2 := createResp2.SessionId + + _, err = lc.Write(context.Background(), &proto.WriteRequest{ + ShardId: &shardId, + Puts: []*proto.PutRequest{{ + Key: "/ephemeral-1", + Value: []byte("hello"), + SessionId: &sessionId1, + }}, + }) + assert.NoError(t, err) + + _, err = lc.Write(context.Background(), &proto.WriteRequest{ + ShardId: &shardId, + Puts: []*proto.PutRequest{{ + Key: "/ephemeral-2", + Value: []byte("hello"), + SessionId: &sessionId2, + }}, + }) + assert.NoError(t, err) + + // Let session-2 expire and verify its key was deleted + assert.Eventually(t, func() bool { + return getSessionMetadata(t, lc, sessionId2) == nil + }, 10*time.Second, 30*time.Millisecond) + + readCh := lc.Read(context.Background(), &proto.ReadRequest{ + ShardId: &shardId, + Gets: []*proto.GetRequest{{ + Key: "/ephemeral-1", + IncludeValue: true, + }, { + Key: "/ephemeral-2", + IncludeValue: true, + }}, + }) + + // ephemeral-1 + rr, ok := <-readCh + assert.True(t, ok) + assert.NoError(t, rr.Err) + assert.Equal(t, proto.Status_OK, rr.Response.Status) + + // ephemeral-2 + rr, ok = <-readCh + assert.True(t, ok) + assert.NoError(t, rr.Err) + assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status) + + // Now Let session-1 expire and verify its key was deleted + assert.Eventually(t, func() bool { + return getSessionMetadata(t, lc, sessionId1) == nil + }, 10*time.Second, 30*time.Millisecond) + + readCh = lc.Read(context.Background(), &proto.ReadRequest{ + ShardId: &shardId, + Gets: []*proto.GetRequest{{ + Key: "/ephemeral-1", + IncludeValue: true, + }, { + Key: "/ephemeral-2", + IncludeValue: true, + }}, + }) + + // ephemeral-1 + rr, ok = <-readCh + assert.True(t, ok) + assert.NoError(t, rr.Err) + assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status) + + // ephemeral-2 + rr, ok = <-readCh + assert.True(t, ok) + assert.NoError(t, rr.Err) + assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status) + + assert.NoError(t, lc.Close()) + assert.NoError(t, kvf.Close()) + assert.NoError(t, walf.Close()) +} + func TestSessionManagerReopening(t *testing.T) { shardId := int64(1) // Invalid session timeout