-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsession.go
267 lines (237 loc) · 6.22 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package k8s_exec_pod
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/Shanghai-Lunara/pkg/zaplogger"
"github.com/gorilla/websocket"
"io"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"sync"
"time"
)
// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
io.Reader
io.Writer
remotecommand.TerminalSizeQueue
}
// Session implements PtyHandler (using a websocket connection)
type Session interface {
Read(p []byte) (int, error)
Write(p []byte) (int, error)
Next() *remotecommand.TerminalSize
Id() string
Wait()
HandleLog(p Proxy)
HandleSSH(p Proxy)
Option() *ExecOptions
Close(reason string)
Ctx() context.Context
ReadCloser(rc io.ReadCloser)
}
const (
ReasonProcessExited = "process exited"
ReasonStreamStopped = "stream stopped"
ReasonConnTimeout = "conn wait timeout"
ReasonContextCancel = "ctx cancel"
)
// NewSession returns a new Session Interface
func NewSession(ctx context.Context, connTimeout int64, k8sClient kubernetes.Interface, cfg *rest.Config, option *ExecOptions) (Session, error) {
sessionId, err := genTerminalSessionId()
if err != nil {
return nil, err
}
subCtx, cancel := context.WithCancel(ctx)
s := &session{
sessionId: sessionId,
connTimeout: connTimeout,
option: option,
startChan: make(chan proxyChan, 1),
sizeChan: make(chan remotecommand.TerminalSize),
k8sClient: k8sClient,
cfg: cfg,
context: subCtx,
cancel: cancel,
}
go s.Wait()
return s, nil
}
type handleType string
const (
handleSSH handleType = "ssh"
handleLog handleType = "log"
)
type proxyChan struct {
t handleType
p Proxy
}
type session struct {
sessionId string
creatTm time.Time
connTimeout int64
expireTime time.Time
option *ExecOptions
sizeChan chan remotecommand.TerminalSize
readCloser io.ReadCloser
startChan chan proxyChan
websocketProxy Proxy
k8sClient kubernetes.Interface
cfg *rest.Config
context context.Context
cancel context.CancelFunc
once sync.Once
}
func (s *session) Id() string {
return s.sessionId
}
func (s *session) Wait() {
select {
case <-time.After(time.Second * time.Duration(s.connTimeout)):
s.Close(ReasonConnTimeout)
return
case proxyChan := <-s.startChan:
s.websocketProxy = proxyChan.p
switch proxyChan.t {
case handleSSH:
Terminal(s.k8sClient, s.cfg, s)
case handleLog:
go func() {
for {
var buf []byte
if _, err := s.Read(buf); err != nil {
zaplogger.Sugar().Error(err)
if s.readCloser != nil {
zaplogger.Sugar().Info("readCloser was not set")
return
}
go func() {
defer func() {
if r := recover(); r != nil {
zaplogger.Sugar().Info("s.readCloser.Close Recovered in: ", r)
}
}()
if err = s.readCloser.Close(); err != nil {
zaplogger.Sugar().Error(err)
}
}()
return
}
}
}()
if err := LogTransmit(s.k8sClient, s); err != nil {
zaplogger.Sugar().Error(err)
}
}
case <-s.context.Done():
return
}
}
func (s *session) HandleLog(p Proxy) {
select {
case s.startChan <- proxyChan{t: handleLog, p: p}:
return
case <-time.After(time.Second * 1):
return
}
}
func (s *session) HandleSSH(p Proxy) {
select {
case s.startChan <- proxyChan{t: handleSSH, p: p}:
return
case <-time.After(time.Second * 1):
return
}
}
const EndOfTransmission = "\u0004"
// Read handles pty->process messages (stdin, resize)
// Called in a loop from remotecommand as long as the process is running
func (s *session) Read(p []byte) (int, error) {
//zaplogger.Sugar().Infow("TerminalSession", "Read", string(p))
if n, err := s.websocketProxy.LoadBuffers(p); err != nil {
return 0, err
} else {
if n > 0 {
return n, nil
}
}
var wsMsg *message
var err error
if wsMsg, err = s.websocketProxy.Recv(); err != nil {
zaplogger.Sugar().Error(err)
return copy(p, EndOfTransmission), err
}
var msg TermMsg
if err := json.Unmarshal(wsMsg.data, &msg); err != nil {
zaplogger.Sugar().Error(err)
return copy(p, EndOfTransmission), err
}
switch msg.MsgType {
case TermResize:
s.sizeChan <- remotecommand.TerminalSize{Width: msg.Cols, Height: msg.Rows}
return 0, nil
case TermInput:
return s.websocketProxy.HandleInput(p, []byte(msg.Input))
case TermPing:
s.websocketProxy.HandlePing()
return 0, nil
default:
return copy(p, EndOfTransmission), fmt.Errorf("unknown message type '%s'", msg.MsgType)
}
}
// Write handles process->pty stdout
// Called from remotecommand whenever there is any output
// If the TermMsg.MsgType was TermPing, then it would handle Proxy.HandlePing
func (s *session) Write(p []byte) (int, error) {
//zaplogger.Sugar().Infow("TerminalSession", "Write", string(p))
data := make([]byte, len(p))
copy(data, p)
if err := s.websocketProxy.Send(websocket.BinaryMessage, data); err != nil {
zaplogger.Sugar().Error(err)
return 0, err
}
return len(p), nil
}
// Next handles pty->process resize events
// Called in a loop from remotecommand as long as the process is running
func (s *session) Next() *remotecommand.TerminalSize {
select {
case size := <-s.sizeChan:
return &size
case <-s.context.Done():
return nil
}
}
func (s *session) Option() *ExecOptions {
return s.option
}
func (s *session) Close(reason string) {
zaplogger.Sugar().Infow("TerminalSession trigger close", "sessionId", s.Id(), "reason", reason)
s.once.Do(func() {
zaplogger.Sugar().Infow("TerminalSession successfully close", "sessionId", s.Id(), "reason", reason)
s.cancel()
})
}
func (s *session) Ctx() context.Context {
return s.context
}
func (s *session) ReadCloser(rc io.ReadCloser) {
s.readCloser = rc
}
// genTerminalSessionId generates a random session ID string. The format is not really interesting.
// This ID is used to identify the session when the client opens the Websocket connection.
// Not the same as the Websocket session id! We can't use that as that is generated
// on the client side and we don't have it yet at this point.
func genTerminalSessionId() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
id := make([]byte, hex.EncodedLen(len(bytes)))
hex.Encode(id, bytes)
return string(id), nil
}