@@ -20,11 +20,19 @@ import (
20
20
"net"
21
21
"strings"
22
22
"sync"
23
+ "time"
23
24
24
25
current "github.com/containernetworking/cni/pkg/types/100"
25
26
"github.com/containernetworking/cni/pkg/version"
26
27
corev1 "k8s.io/api/core/v1"
27
28
"k8s.io/apimachinery/pkg/util/sets"
29
+ k8swait "k8s.io/apimachinery/pkg/util/wait"
30
+ clientset "k8s.io/client-go/kubernetes"
31
+ "k8s.io/client-go/kubernetes/scheme"
32
+ typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
33
+ v1 "k8s.io/client-go/listers/core/v1"
34
+ "k8s.io/client-go/tools/record"
35
+ "k8s.io/client-go/util/workqueue"
28
36
"k8s.io/klog/v2"
29
37
30
38
"antrea.io/antrea/pkg/agent/cniserver/ipam"
@@ -61,6 +69,10 @@ const (
61
69
62
70
var (
63
71
getNSPath = util .GetNSPath
72
+ // retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
73
+ // Note, using a variable rather than constant for retryInterval because we may use a shorter time in the
74
+ // test code.
75
+ retryInterval = 5 * time .Second
64
76
)
65
77
66
78
type podConfigurator struct {
@@ -76,6 +88,13 @@ type podConfigurator struct {
76
88
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
77
89
// Pod secondary network interfaces.
78
90
isSecondaryNetwork bool
91
+ podIfMonitor * podIfaceMonitor
92
+
93
+ eventBroadcaster record.EventBroadcaster
94
+ record record.EventRecorder
95
+ podLister v1.PodLister
96
+ kubeClient clientset.Interface
97
+ queue workqueue.TypedDelayingInterface [string ]
79
98
}
80
99
81
100
func newPodConfigurator (
@@ -88,11 +107,23 @@ func newPodConfigurator(
88
107
isOvsHardwareOffloadEnabled bool ,
89
108
disableTXChecksumOffload bool ,
90
109
podUpdateNotifier channel.Notifier ,
110
+ podLister v1.PodLister ,
91
111
) (* podConfigurator , error ) {
92
112
ifConfigurator , err := newInterfaceConfigurator (ovsDatapathType , isOvsHardwareOffloadEnabled , disableTXChecksumOffload )
93
113
if err != nil {
94
114
return nil , err
95
115
}
116
+ queue := workqueue.NewTypedDelayingQueueWithConfig [string ](
117
+ workqueue.TypedDelayingQueueConfig [string ]{
118
+ Name : "podMonitor" ,
119
+ },
120
+ )
121
+ eventBroadcaster := record .NewBroadcaster ()
122
+ recorder := eventBroadcaster .NewRecorder (
123
+ scheme .Scheme ,
124
+ corev1.EventSource {Component : "AntreaAgentPodConfigurator" },
125
+ )
126
+ ifMonitor := newPodInterfaceMonitor (ofClient , ifaceStore , queue )
96
127
return & podConfigurator {
97
128
ovsBridgeClient : ovsBridgeClient ,
98
129
ofClient : ofClient ,
@@ -101,6 +132,10 @@ func newPodConfigurator(
101
132
gatewayMAC : gatewayMAC ,
102
133
ifConfigurator : ifConfigurator ,
103
134
podUpdateNotifier : podUpdateNotifier ,
135
+ podIfMonitor : ifMonitor ,
136
+ record : recorder ,
137
+ podLister : podLister ,
138
+ queue : queue ,
104
139
}, nil
105
140
}
106
141
@@ -166,13 +201,13 @@ func getContainerIPsString(ips []net.IP) string {
166
201
// not created for a Pod interface.
167
202
func ParseOVSPortInterfaceConfig (portData * ovsconfig.OVSPortData , portConfig * interfacestore.OVSPortConfig ) * interfacestore.InterfaceConfig {
168
203
if portData .ExternalIDs == nil {
169
- klog .V (2 ).Infof ("OVS port %s has no external_ids" , portData .Name )
204
+ klog .V (2 ).InfoS ("OVS port has no external_ids" , "port " , portData .Name )
170
205
return nil
171
206
}
172
207
173
208
containerID , found := portData .ExternalIDs [ovsExternalIDContainerID ]
174
209
if ! found {
175
- klog .V (2 ).Infof ("OVS port %s has no %s in external_ids" , portData .Name , ovsExternalIDContainerID )
210
+ klog .V (2 ).InfoS ("OVS port has no containerID in external_ids" , "port" , portData .Name )
176
211
return nil
177
212
}
178
213
@@ -187,8 +222,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in
187
222
188
223
containerMAC , err := net .ParseMAC (portData .ExternalIDs [ovsExternalIDMAC ])
189
224
if err != nil {
190
- klog .Errorf ("Failed to parse MAC address from OVS external config %s: %v" ,
191
- portData .ExternalIDs [ovsExternalIDMAC ], err )
225
+ klog .ErrorS (err , "Failed to parse MAC address from OVS external config" )
192
226
}
193
227
podName , _ := portData .ExternalIDs [ovsExternalIDPodName ]
194
228
podNamespace , _ := portData .ExternalIDs [ovsExternalIDPodNamespace ]
@@ -279,7 +313,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
279
313
func (pc * podConfigurator ) removeInterfaces (containerID string ) error {
280
314
containerConfig , found := pc .ifaceStore .GetContainerInterface (containerID )
281
315
if ! found {
282
- klog .V (2 ).Infof ("Did not find the port for container %s in local cache" , containerID )
316
+ klog .V (2 ).InfoS ("Did not find the port for container in local cache" , "container " , containerID )
283
317
return nil
284
318
}
285
319
@@ -498,7 +532,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
498
532
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
499
533
func (pc * podConfigurator ) disconnectInterfaceFromOVS (containerConfig * interfacestore.InterfaceConfig ) error {
500
534
containerID := containerConfig .ContainerID
501
- klog .V (2 ).Infof ("Deleting Openflow entries for container %s " , containerID )
535
+ klog .V (2 ).InfoS ("Deleting Openflow entries for container" , "container " , containerID )
502
536
if ! pc .isSecondaryNetwork {
503
537
if err := pc .ofClient .UninstallPodFlows (containerConfig .InterfaceName ); err != nil {
504
538
return fmt .Errorf ("failed to delete Openflow entries for container %s: %v" , containerID , err )
@@ -513,6 +547,12 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
513
547
if err := pc .ovsBridgeClient .DeletePort (containerConfig .PortUUID ); err != nil {
514
548
return fmt .Errorf ("failed to delete OVS port for container %s interface %s: %v" , containerID , containerConfig .InterfaceName , err )
515
549
}
550
+
551
+ // Remove unready Pod info from local cache when Pod is deleted. This is called only on Windows.
552
+ if pc .podIfMonitor != nil {
553
+ pc .podIfMonitor .deleteUnreadyPod (containerConfig .InterfaceName )
554
+ }
555
+
516
556
// Remove container configuration from cache.
517
557
pc .ifaceStore .DeleteInterface (containerConfig )
518
558
if ! pc .isSecondaryNetwork {
@@ -558,7 +598,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
558
598
func (pc * podConfigurator ) disconnectInterceptedInterface (podName , podNamespace , containerID string ) error {
559
599
containerConfig , found := pc .ifaceStore .GetContainerInterface (containerID )
560
600
if ! found {
561
- klog .V (2 ).Infof ("Did not find the port for container %s in local cache" , containerID )
601
+ klog .V (2 ).InfoS ("Did not find the port for container in local cache" , "container " , containerID )
562
602
return nil
563
603
}
564
604
for _ , ip := range containerConfig .IPs {
@@ -570,3 +610,98 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace,
570
610
return pc .disconnectInterfaceFromOVS (containerConfig )
571
611
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
572
612
}
613
+
614
+ func (pc * podConfigurator ) Run (stopCh <- chan struct {}) {
615
+ pc .eventBroadcaster .StartStructuredLogging (0 )
616
+ pc .eventBroadcaster .StartRecordingToSink (& typedv1.EventSinkImpl {
617
+ Interface : pc .kubeClient .CoreV1 ().Events ("" ),
618
+ })
619
+ defer pc .eventBroadcaster .Shutdown ()
620
+ go k8swait .Until (pc .worker , time .Second , stopCh )
621
+ pc .podIfMonitor .Run (stopCh )
622
+ }
623
+
624
+ func (pc * podConfigurator ) processNextWorkItem () bool {
625
+ key , quit := pc .queue .Get ()
626
+ if quit {
627
+ return false
628
+ }
629
+ defer pc .queue .Done (key )
630
+
631
+ if err := pc .updateUnreadyPod (key ); err != nil {
632
+ // Put the item back on the workqueue to handle any transient errors.
633
+ pc .queue .AddAfter (key , retryInterval )
634
+ }
635
+ return true
636
+ }
637
+
638
+ // worker is a long-running function that will continually call the processNextWorkItem function in
639
+ // order to read and process a message on the workqueue.
640
+ func (pc * podConfigurator ) worker () {
641
+ for pc .processNextWorkItem () {
642
+ }
643
+ }
644
+
645
+ func (pc * podConfigurator ) updateUnreadyPod (ovsPort string ) error {
646
+ if ! pc .podIfMonitor .unreadyInterfaceExists (ovsPort ) {
647
+ klog .InfoS ("Interface does not exist in un-ready state" , "name" , ovsPort )
648
+ return nil
649
+ }
650
+
651
+ ifConfig , found := pc .ifaceStore .GetInterfaceByName (ovsPort )
652
+ if ! found {
653
+ klog .InfoS ("Interface config is not found in local cache, remove from unready cache" , "name" , ovsPort )
654
+ pc .podIfMonitor .deleteUnreadyPod (ovsPort )
655
+ return nil
656
+ }
657
+
658
+ if ifConfig .OFPort == 0 {
659
+ // Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated.
660
+ // Returns error so that we can have a retry after 5s.
661
+ _ = pc .processPodEvents (ifConfig , false )
662
+ return fmt .Errorf ("pod's OpenFlow port is not ready yet" )
663
+ }
664
+
665
+ if ! pc .podIfMonitor .interfaceFlowsInstalled (ovsPort ) {
666
+ // Install OpenFlow entries for the Pod.
667
+ klog .V (2 ).InfoS ("Setting up Openflow entries for OVS port" , "port" , ovsPort )
668
+ if err := pc .ofClient .InstallPodFlows (ovsPort , ifConfig .IPs , ifConfig .MAC , uint32 (ifConfig .OFPort ), ifConfig .VLANID , nil ); err != nil {
669
+ // Add Pod not-ready event if the pod flows installation are failed.
670
+ // Returns error so that we can have a retry after 5s.
671
+ _ = pc .processPodEvents (ifConfig , false )
672
+ return fmt .Errorf ("failed to add Openflow entries for OVS port %s: %v" , ovsPort , err )
673
+ }
674
+
675
+ // Notify the Pod update event to required components.
676
+ event := agenttypes.PodUpdate {
677
+ PodName : ifConfig .PodName ,
678
+ PodNamespace : ifConfig .PodNamespace ,
679
+ IsAdd : true ,
680
+ ContainerID : ifConfig .ContainerID ,
681
+ }
682
+ pc .podUpdateNotifier .Notify (event )
683
+ }
684
+
685
+ if err := pc .processPodEvents (ifConfig , true ); err != nil {
686
+ return err
687
+ }
688
+ pc .podIfMonitor .deleteUnreadyPod (ovsPort )
689
+ return nil
690
+ }
691
+
692
+ func (pc * podConfigurator ) processPodEvents (ifConfig * interfacestore.InterfaceConfig , installed bool ) error {
693
+ pod , err := pc .podLister .Pods (ifConfig .PodNamespace ).Get (ifConfig .PodName )
694
+ if err != nil {
695
+ klog .ErrorS (err , "Failed to get Pod, retrying" , "Pod" , klog .KRef (ifConfig .PodNamespace , ifConfig .PodName ))
696
+ return err
697
+ }
698
+
699
+ if installed {
700
+ // Add normal event to record Pod network is ready.
701
+ pc .record .Eventf (pod , corev1 .EventTypeNormal , "NetworkIsReady" , "Installed Pod '%s/%s' network forwarding rules" , ifConfig .PodNamespace , ifConfig .PodName )
702
+ return nil
703
+ }
704
+
705
+ pc .record .Eventf (pod , corev1 .EventTypeWarning , "NetworkNotReady" , "Pod '%s/%s' network forwarding rules not installed" , ifConfig .PodNamespace , ifConfig .PodName )
706
+ return nil
707
+ }
0 commit comments