@@ -34,7 +34,6 @@ import (
34
34
"os/user"
35
35
"strconv"
36
36
"strings"
37
- "sync"
38
37
"testing"
39
38
"time"
40
39
@@ -2217,13 +2216,17 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {
2217
2216
require .NoError (t , err )
2218
2217
participantStdoutR , participantStdoutW , err := os .Pipe ()
2219
2218
require .NoError (t , err )
2220
- streamsMu := & sync.Mutex {}
2221
- streams := make ([]* client.KubeSession , 0 , 3 )
2222
- observerCaptures := make ([]* bytes.Buffer , 0 , 2 )
2219
+
2220
+ observerCaptures := make ([]* bytes.Buffer , 2 )
2223
2221
albProxy := helpers .MustStartMockALBProxy (t , teleport .Config .Proxy .WebAddr .Addr )
2224
2222
2225
2223
// join peer by KubeProxyAddr
2226
2224
group .Go (func () error {
2225
+ defer func () {
2226
+ // close participant stdout so that we can read it after till EOF
2227
+ participantStdoutW .Close ()
2228
+ }()
2229
+
2227
2230
tc , err := teleport .NewClient (helpers.ClientConfig {
2228
2231
Login : hostUsername ,
2229
2232
Cluster : helpers .Site ,
@@ -2245,43 +2248,52 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {
2245
2248
if err != nil {
2246
2249
return trace .Wrap (err )
2247
2250
}
2248
- streamsMu .Lock ()
2249
- streams = append (streams , stream )
2250
- streamsMu .Unlock ()
2251
+
2251
2252
stream .Wait ()
2252
- // close participant stdout so that we can read it after till EOF
2253
- participantStdoutW .Close ()
2253
+
2254
+ if err := stream .Detach (); err != nil && ! terminal .IsOKWebsocketCloseError (err ) {
2255
+ return trace .Wrap (err )
2256
+ }
2257
+
2254
2258
return nil
2255
2259
})
2256
2260
2257
2261
// join observer by WebProxyAddr
2258
2262
group .Go (func () error {
2259
2263
stream , capture := kubeJoinByWebAddr (t , teleport , participantUsername , kubeUsers , kubeGroups )
2260
- streamsMu .Lock ()
2261
- streams = append (streams , stream )
2262
- observerCaptures = append (observerCaptures , capture )
2263
- streamsMu .Unlock ()
2264
+ observerCaptures [0 ] = capture
2264
2265
stream .Wait ()
2266
+
2267
+ if err := stream .Detach (); err != nil && ! terminal .IsOKWebsocketCloseError (err ) {
2268
+ return trace .Wrap (err )
2269
+ }
2270
+
2265
2271
return nil
2266
2272
})
2267
2273
2268
2274
// join observer with ALPN conn upgrade
2269
2275
group .Go (func () error {
2270
2276
stream , capture := kubeJoinByALBAddr (t , teleport , participantUsername , kubeUsers , kubeGroups , albProxy .Addr ().String ())
2271
- streamsMu .Lock ()
2272
- streams = append (streams , stream )
2273
- observerCaptures = append (observerCaptures , capture )
2274
- streamsMu .Unlock ()
2277
+ observerCaptures [1 ] = capture
2275
2278
stream .Wait ()
2279
+
2280
+ if err := stream .Detach (); err != nil && ! terminal .IsOKWebsocketCloseError (err ) {
2281
+ return trace .Wrap (err )
2282
+ }
2276
2283
return nil
2277
2284
})
2278
2285
2279
2286
// We wait again for the second user to finish joining the session.
2280
- // We allow a bit of time to pass here to give the session manager time to recognize the
2281
- // new IO streams of the second client.
2282
- time .Sleep (time .Second * 5 )
2287
+ require .EventuallyWithT (t , func (t * assert.CollectT ) {
2288
+ session , err := teleport .Process .GetAuthServer ().GetSessionTracker (context .Background (), session .GetName ())
2289
+ if ! assert .NoError (t , err ) {
2290
+ return
2291
+ }
2292
+
2293
+ assert .Len (t , session .GetParticipants (), 2 )
2294
+ }, 15 * time .Second , 500 * time .Millisecond )
2283
2295
2284
- // sent a test message from the participant
2296
+ // send a test message from the participant
2285
2297
participantStdinW .Write ([]byte ("\a hi from peer\n \r " ))
2286
2298
2287
2299
// lets type "echo hi" followed by "enter" and then "exit" + "enter":
0 commit comments