Skip to content

Commit 27123b2

Browse files
committed
perf: check token status
1 parent b53c461 commit 27123b2

File tree

13 files changed

+1799
-445
lines changed

13 files changed

+1799
-445
lines changed

cmd/common/beat_service.go

+76-12
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@ import (
1515

1616
func NewBeatService(apiClient *service.JMService) *BeatService {
1717
return &BeatService{
18-
sessMap: make(map[string]struct{}),
18+
sessMap: make(map[string]*SessionToken),
1919
apiClient: apiClient,
2020
taskChan: make(chan *model.TerminalTask, 5),
2121
}
2222
}
2323

24+
type SessionToken struct {
25+
model.Session
26+
TokenId string
27+
invalid bool
28+
}
29+
2430
type BeatService struct {
25-
sessMap map[string]struct{}
31+
sessMap map[string]*SessionToken
2632

2733
apiClient *service.JMService
2834

@@ -88,18 +94,14 @@ func (b *BeatService) receiveWsTask(ws *websocket.Conn, done chan struct{}) {
8894
}
8995
if len(tasks) != 0 {
9096
for i := range tasks {
91-
select {
92-
case b.taskChan <- &tasks[i]:
93-
default:
94-
logger.Infof("Discard task %v", tasks[i])
95-
}
97+
b.sendTask(&tasks[i])
9698
}
9799
}
98100
}
99101
}
100102

101103
func (b *BeatService) GetStatusData() interface{} {
102-
sessions := b.getSessions()
104+
sessions := b.getSessionIds()
103105
payload := model.HeartbeatData{
104106
SessionOnlineIds: sessions,
105107
CpuUsed: common.CpuLoad1Usage(),
@@ -113,7 +115,7 @@ func (b *BeatService) GetStatusData() interface{} {
113115
}
114116
}
115117

116-
func (b *BeatService) getSessions() []string {
118+
func (b *BeatService) getSessionIds() []string {
117119
b.Lock()
118120
defer b.Unlock()
119121
sids := make([]string, 0, len(b.sessMap))
@@ -123,12 +125,20 @@ func (b *BeatService) getSessions() []string {
123125
return sids
124126
}
125127

126-
var empty = struct{}{}
128+
func (b *BeatService) StoreSessionId(sess *SessionToken) {
129+
b.Lock()
130+
defer b.Unlock()
131+
b.sessMap[sess.ID] = sess
132+
}
127133

128-
func (b *BeatService) StoreSessionId(sid string) {
134+
func (b *BeatService) GetSessions() []*SessionToken {
129135
b.Lock()
130136
defer b.Unlock()
131-
b.sessMap[sid] = empty
137+
sids := make([]*SessionToken, 0, len(b.sessMap))
138+
for sid := range b.sessMap {
139+
sids = append(sids, b.sessMap[sid])
140+
}
141+
return sids
132142
}
133143

134144
func (b *BeatService) RemoveSessionId(sid string) {
@@ -144,3 +154,57 @@ func (b *BeatService) GetTerminalTaskChan() <-chan *model.TerminalTask {
144154
func (b *BeatService) FinishTask(taskId string) error {
145155
return b.apiClient.FinishTask(taskId)
146156
}
157+
158+
func (b *BeatService) KeepCheckTokens() {
159+
for {
160+
time.Sleep(5 * time.Minute)
161+
sessions := b.GetSessions()
162+
tokens := make(map[string]model.TokenCheckStatus, len(sessions))
163+
for _, s := range sessions {
164+
ret, ok := tokens[s.TokenId]
165+
if ok {
166+
b.handleTokenCheck(s, &ret)
167+
continue
168+
}
169+
ret, err := b.apiClient.CheckTokenStatus(s.TokenId)
170+
if err != nil && ret.Code == "" {
171+
logger.Errorf("Check token status failed: %s", err)
172+
continue
173+
}
174+
tokens[s.TokenId] = ret
175+
b.handleTokenCheck(s, &ret)
176+
}
177+
}
178+
}
179+
180+
func (b *BeatService) sendTask(task *model.TerminalTask) {
181+
select {
182+
case b.taskChan <- task:
183+
default:
184+
logger.Errorf("Discard task %v", task)
185+
}
186+
}
187+
188+
func (b *BeatService) handleTokenCheck(session *SessionToken, tokenStatus *model.TokenCheckStatus) {
189+
var action string
190+
switch tokenStatus.Code {
191+
case model.CodePermOk:
192+
action = model.TaskPermValid
193+
if !session.invalid {
194+
return
195+
}
196+
session.invalid = false
197+
default:
198+
if session.invalid {
199+
return
200+
}
201+
session.invalid = true
202+
action = model.TaskPermExpired
203+
}
204+
task := model.TerminalTask{
205+
Name: action,
206+
Args: session.ID,
207+
TokenStatus: *tokenStatus,
208+
}
209+
b.sendTask(&task)
210+
}

cmd/impl/convert_model.go

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func ConvertToSession(sees *pb.Session) model.Session {
3232
AssetID: sees.AssetId,
3333
AccountID: sees.AccountId,
3434
Type: model.NORMALType,
35+
TokenId: sees.TokenId,
3536
}
3637
}
3738

cmd/impl/jms.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,12 @@ func (j *JMServer) CreateSession(ctx context.Context, req *pb.SessionCreateReque
9999
return &pb.SessionCreateResponse{Status: &status}, nil
100100
}
101101
status.Ok = true
102-
j.beat.StoreSessionId(apiResp.ID)
103-
logger.Debugf("Creat session %s", apiResp.ID)
102+
sessionToken := common.SessionToken{
103+
Session: apiResp,
104+
TokenId: req.Data.TokenId,
105+
}
106+
j.beat.StoreSessionId(&sessionToken)
107+
logger.Debugf("Creat session %s", apiSess.ID)
104108
return &pb.SessionCreateResponse{Status: &status,
105109
Data: ConvertToProtobufSession(apiResp)}, nil
106110
}
@@ -199,6 +203,17 @@ func (j *JMServer) sendStreamTask(ctx context.Context, stream pb.Service_Dispatc
199203
case model.TaskUnlockSession:
200204
pbTask.Action = pb.TaskAction_UnlockSession
201205
pbTask.CreatedBy = task.Kwargs.CreatedByUser
206+
case model.TaskPermExpired:
207+
pbTask.Action = pb.TaskAction_TokenPermExpired
208+
pbTask.TokenStatus = &pb.TokenStatus{
209+
Code: "",
210+
Detail: "",
211+
IsExpired: false,
212+
}
213+
214+
case model.TaskPermValid:
215+
pbTask.Action = pb.TaskAction_TokenPermValid
216+
202217
default:
203218
logger.Errorf("Unknown task name %s", task.Name)
204219
continue

cmd/root.go

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ var rootCmd = &cobra.Command{
5555
beat := common.NewBeatService(apiClient)
5656
{
5757
go beat.KeepHeartBeat()
58+
go beat.KeepCheckTokens()
5859
}
5960
ctx := common.GetSignalCtx()
6061
grpcImplSrv := impl.NewJMServer(apiClient, uploader, beat)

pkg/common/random.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77

88
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
99

10+
var localRand = rand.New(rand.NewSource(time.Now().UnixNano()))
11+
1012
func RandomStr(length int) string {
11-
rand.Seed(time.Now().UnixNano())
13+
localRand.Seed(time.Now().UnixNano())
1214
b := make([]byte, length)
1315
for i := range b {
1416
b[i] = letters[rand.Intn(len(letters))]

pkg/jms-sdk-go/model/session.go

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type Session struct {
5050
AssetID string `json:"asset_id"`
5151
AccountID string `json:"account_id"`
5252
Type LabelFiled `json:"type"`
53+
TokenId string `json:"token_id"`
5354
}
5455

5556
type ReplayVersion string

pkg/jms-sdk-go/model/terminal.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,23 @@ type Terminal struct {
3333
}
3434

3535
type TerminalTask struct {
36-
ID string `json:"id"`
37-
Name string `json:"name"`
38-
Args string `json:"args"`
39-
Kwargs TaskKwargs `json:"kwargs"`
40-
IsFinished bool
36+
ID string `json:"id"`
37+
Name string `json:"name"`
38+
Args string `json:"args"`
39+
Kwargs TaskKwargs `json:"kwargs"`
40+
41+
TokenStatus TokenCheckStatus `json:"-"`
4142
}
4243

4344
const (
4445
TaskKillSession = "kill_session"
4546
TaskLockSession = "lock_session"
4647
TaskUnlockSession = "unlock_session"
48+
49+
// TaskPermExpired TaskPermValid 非 api 数据,仅用于内部处理
50+
51+
TaskPermExpired = "perm_expired"
52+
TaskPermValid = "perm_valid"
4753
)
4854

4955
type TaskKwargs struct {

pkg/jms-sdk-go/model/token.go

+14
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,17 @@ type ConnectTokenInfo struct {
3737
AccountName string `json:"account_name"`
3838
Protocol string `json:"protocol"`
3939
}
40+
41+
// token 授权和过期状态
42+
43+
type TokenCheckStatus struct {
44+
Detail string `json:"detail"`
45+
Code string `json:"code"`
46+
Expired bool `json:"expired"`
47+
}
48+
49+
const (
50+
CodePermOk = "perm_ok"
51+
CodePermAccountInvalid = "perm_account_invalid"
52+
CodePermExpired = "perm_expired"
53+
)

pkg/jms-sdk-go/service/jms_token.go

+6
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,9 @@ type TokenRenewalResponse struct {
3232
Ok bool `json:"ok"`
3333
Msg string `json:"msg"`
3434
}
35+
36+
func (s *JMService) CheckTokenStatus(tokenId string) (res model.TokenCheckStatus, err error) {
37+
reqURL := fmt.Sprintf(SuperConnectTokenCheckURL, tokenId)
38+
_, err = s.authClient.Get(reqURL, &res)
39+
return
40+
}

pkg/jms-sdk-go/service/url.go

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const (
7676
SuperConnectTokenSecretURL = "/api/v1/authentication/super-connection-token/secret/"
7777
SuperConnectTokenInfoURL = "/api/v1/authentication/super-connection-token/"
7878
SuperTokenRenewalURL = "/api/v1/authentication/super-connection-token/renewal/"
79+
SuperConnectTokenCheckURL = "/api/v1/authentication/super-connection-token/%s/check/"
7980

8081
UserPermsAssetsURL = "/api/v1/perms/users/%s/assets/"
8182

0 commit comments

Comments
 (0)