Skip to content

Commit

Permalink
Fixed session expiry with multiple session in a shard (#454)
Browse files Browse the repository at this point in the history
When multiple session are presents on a shards, depending on the key
ordering, it was possible that during expiration the index cleanup was
also removing index from other session.

That resulted in the ephemeral keys for other session not getting
removed when those session expired too.
  • Loading branch information
merlimat authored Apr 26, 2024
1 parent e665418 commit 281313c
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 3 deletions.
22 changes: 19 additions & 3 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package server

import (
"context"
"fmt"
"log/slog"
"net/url"
"sync"
"time"

"github.com/streamnative/oxia/common"

"github.com/streamnative/oxia/proto"
)

Expand Down Expand Up @@ -58,7 +61,15 @@ func startSession(sessionId SessionId, sessionMetadata *proto.SessionMetadata, s
sm.sessions[sessionId] = s

s.ctx, s.cancel = context.WithCancel(context.Background())
go s.waitForHeartbeats()

go common.DoWithLabels(s.ctx, map[string]string{
"oxia": "session",
"client-identity": sessionMetadata.Identity,
"session-id": fmt.Sprintf("%d", sessionId),
"namespace": sm.namespace,
"shard": fmt.Sprintf("%d", sm.shardId),
}, s.waitForHeartbeats)

s.log.Info("Session started",
slog.Duration("session-timeout", s.timeout))
return s
Expand Down Expand Up @@ -112,14 +123,19 @@ func (s *session) delete() error {
})
}
}

// Delete the base session metadata
deletes = append(deletes, &proto.DeleteRequest{
Key: sessionKey,
})
_, err = s.sm.leaderController.Write(context.Background(), &proto.WriteRequest{
ShardId: &s.shardId,
Puts: nil,
Deletes: deletes,
// Delete the index and the session keys
// Delete the whole index of ephemeral keys for the session
DeleteRanges: []*proto.DeleteRangeRequest{
{
StartInclusive: sessionKey,
StartInclusive: sessionKey + "/",
EndExclusive: sessionKey + "//",
},
},
Expand Down
103 changes: 103 additions & 0 deletions server/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,109 @@ func TestSessionManager(t *testing.T) {
assert.NoError(t, walf.Close())
}

func TestMultipleSessionsExpiry(t *testing.T) {
shardId := int64(1)
// Invalid session timeout
kvf, walf, sManager, lc := createSessionManager(t)

// Create 2 sessions
createResp1, err := sManager.createSession(&proto.CreateSessionRequest{
ShardId: shardId,
SessionTimeoutMs: uint32(3000),
ClientIdentity: "session-1",
}, 0)
assert.NoError(t, err)
sessionId1 := createResp1.SessionId

createResp2, err := sManager.createSession(&proto.CreateSessionRequest{
ShardId: shardId,
SessionTimeoutMs: uint32(50),
ClientIdentity: "session-2",
}, 0)
assert.NoError(t, err)
sessionId2 := createResp2.SessionId

_, err = lc.Write(context.Background(), &proto.WriteRequest{
ShardId: &shardId,
Puts: []*proto.PutRequest{{
Key: "/ephemeral-1",
Value: []byte("hello"),
SessionId: &sessionId1,
}},
})
assert.NoError(t, err)

_, err = lc.Write(context.Background(), &proto.WriteRequest{
ShardId: &shardId,
Puts: []*proto.PutRequest{{
Key: "/ephemeral-2",
Value: []byte("hello"),
SessionId: &sessionId2,
}},
})
assert.NoError(t, err)

// Let session-2 expire and verify its key was deleted
assert.Eventually(t, func() bool {
return getSessionMetadata(t, lc, sessionId2) == nil
}, 10*time.Second, 30*time.Millisecond)

readCh := lc.Read(context.Background(), &proto.ReadRequest{
ShardId: &shardId,
Gets: []*proto.GetRequest{{
Key: "/ephemeral-1",
IncludeValue: true,
}, {
Key: "/ephemeral-2",
IncludeValue: true,
}},
})

// ephemeral-1
rr, ok := <-readCh
assert.True(t, ok)
assert.NoError(t, rr.Err)
assert.Equal(t, proto.Status_OK, rr.Response.Status)

// ephemeral-2
rr, ok = <-readCh
assert.True(t, ok)
assert.NoError(t, rr.Err)
assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status)

// Now Let session-1 expire and verify its key was deleted
assert.Eventually(t, func() bool {
return getSessionMetadata(t, lc, sessionId1) == nil
}, 10*time.Second, 30*time.Millisecond)

readCh = lc.Read(context.Background(), &proto.ReadRequest{
ShardId: &shardId,
Gets: []*proto.GetRequest{{
Key: "/ephemeral-1",
IncludeValue: true,
}, {
Key: "/ephemeral-2",
IncludeValue: true,
}},
})

// ephemeral-1
rr, ok = <-readCh
assert.True(t, ok)
assert.NoError(t, rr.Err)
assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status)

// ephemeral-2
rr, ok = <-readCh
assert.True(t, ok)
assert.NoError(t, rr.Err)
assert.Equal(t, proto.Status_KEY_NOT_FOUND, rr.Response.Status)

assert.NoError(t, lc.Close())
assert.NoError(t, kvf.Close())
assert.NoError(t, walf.Close())
}

func TestSessionManagerReopening(t *testing.T) {
shardId := int64(1)
// Invalid session timeout
Expand Down

0 comments on commit 281313c

Please sign in to comment.