Skip to content

Commit

Permalink
enhance session pool for http2 (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarrisChu authored May 22, 2024
1 parent 5d9b457 commit 3e403b8
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 29 deletions.
119 changes: 90 additions & 29 deletions session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package nebula_go
import (
"container/list"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -53,7 +52,8 @@ type pureSession struct {
sessPool *SessionPool
returnedAt time.Time // the timestamp that the session was created or returned.
timezoneInfo
spaceName string
spaceName string
enableHttp bool
}

// NewSessionPool creates a new session pool with the given configs.
Expand Down Expand Up @@ -143,8 +143,13 @@ func (pool *SessionPool) ExecuteWithParameter(stmt string, params map[string]int

rs, err := pool.executeWithRetry(session, execFunc, pool.conf.retryGetSessionTimes)
if err != nil {
session.close()
pool.removeSessionFromActive(session)
if !pool.enableHttp() {
session.close()
pool.removeSessionFromActive(session)
}
// for http connections like client->LB->Server.
// if LB->Server is broken, suppose client->LB is always valid
// so we do not need to close the session
return nil, err
}

Expand Down Expand Up @@ -367,44 +372,93 @@ func (pool *SessionPool) getSessionFromIdle() (*pureSession, error) {
}

// retryGetSession tries to create a new session when:
// 1. the current session is invalid.
// 2. connection is invalid.
// and then change the original session to the new one.
func (pool *SessionPool) executeWithRetry(
session *pureSession,
f func(*pureSession) (*ResultSet, error),
retry int) (*ResultSet, error) {
// if do not enable http:
// 1. the current session is invalid.
// 2. connection is invalid.
// and then change the original session to the new one.
//
// else:
// 1. the current session is invalid, and then get idle session from pool
// 2. the connection is invalid, and do not logout, just retry execution
func (pool *SessionPool) executeWithRetry(session *pureSession,
f func(*pureSession) (*ResultSet, error), retry int) (*ResultSet, error) {
return pool.executeWithRetryLimit(session, f, 0, retry)
}

func (pool *SessionPool) executeWithRetryLimit(session *pureSession,
f func(*pureSession) (*ResultSet, error), retryTimes, retryLimit int) (*ResultSet, error) {
rs, err := f(session)
if retryTimes >= retryLimit {
return rs, err
}
if err == nil {
if rs.GetErrorCode() == ErrorCode_SUCCEEDED {
if rs.GetErrorCode() != ErrorCode_E_SESSION_INVALID {
return rs, nil
} else if rs.GetErrorCode() != ErrorCode_E_SESSION_INVALID { // only retry when the session is invalid
return rs, err
} else {
if err := pool.retryStrategySessionInvalid(session); err != nil {
pool.log.Error(fmt.Sprintf("cannot retry when session is invalid, error: %s", err.Error()))
return nil, err
}
}
} else {
if err := pool.retryStrategyErr(session); err != nil {
pool.log.Error(fmt.Sprintf("cannot retry when error, error: %s", err.Error()))
return nil, err
}
}
pool.log.Info(fmt.Sprintf("retry to execute the query %d times", retryTimes+1))
return pool.executeWithRetryLimit(session, f, retryTimes+1, retryLimit)
}

// If the session is invalid, close it first
session.close()
// get a new session
for i := 0; i < retry; i++ {
pool.log.Info("retry to get sessions")
func (pool *SessionPool) retryStrategySessionInvalid(session *pureSession) error {
if !pool.enableHttp() {
session.close()
pool.log.Info("retry to get new session")
newSession, err := pool.newSession()
if err != nil {
return nil, err
return err
}

pingErr := newSession.ping()
if pingErr != nil {
pool.log.Error("failed to ping the session, error: " + pingErr.Error())
continue
if err := newSession.ping(); err != nil {
return err
}
pool.log.Info("retry to get session successfully")
*session = *newSession
} else {
pool.log.Info("retry to get session from idle list")
newSession, err := pool.getSessionFromIdle()
if err != nil {
return err
}
if newSession == nil {
pool.log.Info("retry to get new session")
newSession, err = pool.newSession()
if err != nil {
return err
}
pool.log.Info("retry to get session successfully")
}
pool.log.Info("retry to get sessions successfully")
*session = *newSession
}
return nil

}

return f(session)
func (pool *SessionPool) retryStrategyErr(session *pureSession) error {
if !pool.enableHttp() {
session.close()
pool.log.Info("retry to get new session")
newSession, err := pool.newSession()
if err != nil {
return err
}
if err := newSession.ping(); err != nil {
return err
}
pool.log.Info("retry to get session successfully")
*session = *newSession
}
pool.log.Error(fmt.Sprintf("failed to get session after " + strconv.Itoa(retry) + " retries"))
return nil, fmt.Errorf("failed to get session after %d retries", retry)
// http do nothing for connection error
return nil
}

// startCleaner starts sessionCleaner if idleTime > 0.
Expand Down Expand Up @@ -560,6 +614,13 @@ func (pool *SessionPool) setSessionSpaceToDefault(session *pureSession) error {
rs.GetErrorCode(), rs.GetErrorMsg())
}

// enableHttp returns whether the pool is using HTTP2
// If the pool is using HTTP2, the session will be created with
// an HTTP2 connection in L7 load balancing.
func (pool *SessionPool) enableHttp() bool {
return pool.conf.useHTTP2
}

func (session *pureSession) execute(stmt string) (*ResultSet, error) {
return session.executeWithParameter(stmt, nil)
}
Expand Down
111 changes: 111 additions & 0 deletions session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,117 @@ func TestSessionPoolRetry(t *testing.T) {
}
}

type retryFn struct {
fn func(*pureSession) (*ResultSet, error)
retryTimes int
}

func (r *retryFn) retry(s *pureSession) (*ResultSet, error) {
r.retryTimes++
return r.fn(s)
}

func newRetryFn(rs *ResultSet, err error) *retryFn {
return &retryFn{
fn: func(*pureSession) (*ResultSet, error) {
return rs, err
},
}
}

func TestSessionPoolRetryHttp(t *testing.T) {
t.Skip("Skipping test because it is not supported in CI environment")
err := prepareSpace("client_test")
if err != nil {
t.Fatal(err)
}
defer dropSpace("client_test")

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
"client_test",
WithHTTP2(true),
)
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}
config.minSize = 2
config.maxSize = 2
config.retryGetSessionTimes = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

testcaes := []struct {
name string
retryFn *retryFn
retry bool
newSession bool
}{
{
name: "success",
retryFn: newRetryFn(&ResultSet{
resp: &graph.ExecutionResponse{
ErrorCode: nebula.ErrorCode_SUCCEEDED,
}}, nil),
retry: false,
},
{
name: "error",
retryFn: newRetryFn(nil, fmt.Errorf("error")),
retry: true,
newSession: false,
},
{
name: "invalid session error code",
retryFn: newRetryFn(&ResultSet{
resp: &graph.ExecutionResponse{
ErrorCode: nebula.ErrorCode_E_SESSION_INVALID,
}}, nil),
retry: true,
newSession: true,
},
{
name: "execution error code",
retryFn: newRetryFn(&ResultSet{
resp: &graph.ExecutionResponse{
ErrorCode: nebula.ErrorCode_E_EXECUTION_ERROR,
}}, nil),
retry: false,
},
}
for _, tc := range testcaes {
session, err := sessionPool.newSession()
if err != nil {
t.Fatal(err)
}
original := session.sessionID
conn := session.connection
_, _ = sessionPool.executeWithRetry(session, tc.retryFn.retry, 1)
if tc.retry {
if tc.newSession {
assert.NotEqual(t, original, session.sessionID, fmt.Sprintf("test case: %s", tc.name))
assert.NotEqual(t, conn, session.connection, fmt.Sprintf("test case: %s", tc.name))
} else {
assert.Equal(t, original, session.sessionID, fmt.Sprintf("test case: %s", tc.name))
assert.Equal(t, conn, session.connection, fmt.Sprintf("test case: %s", tc.name))
}
assert.Equal(t, 2, tc.retryFn.retryTimes, fmt.Sprintf("test case: %s", tc.name))
} else {
assert.Equal(t, original, session.sessionID, fmt.Sprintf("test case: %s", tc.name))
assert.Equal(t, conn, session.connection, fmt.Sprintf("test case: %s", tc.name))
assert.Equal(t, 1, tc.retryFn.retryTimes, fmt.Sprintf("test case: %s", tc.name))
}
}
}

func TestSessionPoolClose(t *testing.T) {
err := prepareSpace("client_test")
if err != nil {
Expand Down

0 comments on commit 3e403b8

Please sign in to comment.