diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index dfd740ed2e0..4e34d57c341 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -498,7 +498,7 @@ func run(o *Options) error { } var l7Reconciler *l7engine.Reconciler if l7NetworkPolicyEnabled || l7FlowExporterEnabled { - l7Reconciler = l7engine.NewReconciler() + l7Reconciler = l7engine.NewReconciler(ofClient) } networkPolicyController, err := networkpolicy.NewNetworkPolicyController( antreaClientProvider, diff --git a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller.go b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller.go index 16f30eb75b4..997bad8b708 100644 --- a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller.go +++ b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller.go @@ -69,6 +69,8 @@ type L7FlowExporterController struct { targetPort uint32 + once sync.Once + queue workqueue.TypedRateLimitingInterface[string] } @@ -324,7 +326,9 @@ func (l7c *L7FlowExporterController) syncPod(podNN string) error { sourceOfPort := []uint32{uint32(podInterfaces[0].OFPort)} // Start Suricata before starting traffic control mark flows - l7c.l7Reconciler.StartSuricataOnce() + l7c.once.Do(func() { + l7c.l7Reconciler.StartSuricata() + }) oldDirection, exists := l7c.getMirroredDirection(podNN) if exists { diff --git a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go index ef88e7baadd..7cfc0501ab1 100644 --- a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go +++ b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go @@ -108,7 +108,7 @@ func newFakeControllerAndWatcher(t *testing.T, objects []runtime.Object, interfa ifaceStore.AddInterface(itf) } - l7Reconciler := l7engine.NewReconciler() + l7Reconciler := l7engine.NewReconciler(nil) l7w := NewL7FlowExporterController(mockOFClient, ifaceStore, localPodInformer, nsInformer, l7Reconciler) return &fakeController{ diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go index 40ff2ffa854..b84f1449915 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go @@ -32,6 +32,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/logdir" ) @@ -153,10 +154,12 @@ type Reconciler struct { suricataTenantCache *threadSafeSet[uint32] suricataTenantHandlerCache *threadSafeSet[uint32] + ofClient openflow.Client + once sync.Once } -func NewReconciler() *Reconciler { +func NewReconciler(ofClient openflow.Client) *Reconciler { return &Reconciler{ suricataScFn: suricataSc, startSuricataFn: startSuricata, @@ -166,6 +169,7 @@ func NewReconciler() *Reconciler { suricataTenantHandlerCache: &threadSafeSet[uint32]{ cached: sets.New[uint32](), }, + ofClient: ofClient, } } @@ -260,9 +264,15 @@ func convertProtocolTLS(tls *v1beta.TLSProtocol) string { return strings.Join(keywords, " ") } -func (r *Reconciler) StartSuricataOnce() { +func (r *Reconciler) ensureL7FlowsAndStartSuricataOnce() { r.once.Do(func() { - r.startSuricata() + if r.ofClient != nil { + if err := r.ofClient.InstallL7NetworkPolicyFlows(); err != nil { + klog.ErrorS(err, "Failed to install l7 NetworkPolicy flows") + } + } + + r.StartSuricata() }) } @@ -272,7 +282,7 @@ func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protoco klog.V(5).Infof("AddRule took %v", time.Since(start)) }() - r.StartSuricataOnce() + r.ensureL7FlowsAndStartSuricataOnce() // Generate the keyword part used in Suricata rules. protoKeywords := make(map[string]sets.Set[string]) @@ -461,7 +471,7 @@ func (r *Reconciler) unregisterSuricataTenantHandler(tenantID, vlanID uint32) (* return r.suricataScFn(scCmd) } -func (r *Reconciler) startSuricata() { +func (r *Reconciler) StartSuricata() { f, err := defaultFS.Create(antreaSuricataConfigPath) if err != nil { klog.ErrorS(err, "Failed to create Suricata config file", "FilePath", antreaSuricataConfigPath) diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go index 39a1745fd80..a710498642a 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go @@ -124,12 +124,12 @@ func TestStartSuricata(t *testing.T) { _, err := defaultFS.Create(defaultSuricataConfigPath) assert.NoError(t, err) - fe := NewReconciler() + fe := NewReconciler(nil) fs := newFakeSuricata() fe.suricataScFn = fs.suricataScFunc fe.startSuricataFn = fs.startSuricataFn - fe.startSuricata() + fe.StartSuricata() ok, err := afero.FileContainsBytes(defaultFS, antreaSuricataConfigPath, []byte(suricataAntreaConfigData)) assert.NoError(t, err) @@ -183,7 +183,7 @@ func TestRuleLifecycle(t *testing.T) { _, err := defaultFS.Create(defaultSuricataConfigPath) assert.NoError(t, err) - fe := NewReconciler() + fe := NewReconciler(nil) fs := newFakeSuricata() fe.suricataScFn = fs.suricataScFunc fe.startSuricataFn = fs.startSuricataFn diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 64eb5c97972..d5ed6ddca64 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -78,7 +78,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} fs := afero.NewMemMapFs() - l7reconciler := l7engine.NewReconciler() + l7reconciler := l7engine.NewReconciler(nil) controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 4cd1134a457..364d6fb7140 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -411,6 +411,9 @@ type Client interface { // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) + + // InstallL7NetworkPolicyFlows will be only called when one L7 NetworkPolicy is provisioned. + InstallL7NetworkPolicyFlows() error } // GetFlowTableStatus returns an array of flow table status. @@ -1704,3 +1707,13 @@ func (c *client) getMeterStats() { func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { c.bridge.SubscribePortStatusConsumer(statusCh) } + +// InstallL7NetworkPolicyFlows will be only called when one L7 NetworkPolicy is provisioned. +func (c *client) InstallL7NetworkPolicyFlows() error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + cacheKey := "l7_np_provision_flows" + flows := c.featureNetworkPolicy.l7NPTrafficControlFlows() + return c.addFlows(c.featureNetworkPolicy.cachedFlows, cacheKey, flows) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index ab5bd20dcca..47cef4c312c 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2719,7 +2719,7 @@ func Test_client_ReplayFlows(t *testing.T) { expectedFlows := append(pipelineDefaultFlows(true /* egressTrafficShapingEnabled */, false /* externalNodeEnabled */, true /* isEncap */, true /* isIPv4 */), egressInitFlows(true)...) expectedFlows = append(expectedFlows, multicastInitFlows(true)...) - expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false, false)...) + expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false)...) expectedFlows = append(expectedFlows, podConnectivityInitFlows(config.TrafficEncapModeEncap, config.TrafficEncryptionModeNone, false, true, true, true)...) expectedFlows = append(expectedFlows, serviceInitFlows(true, true, false, false)...) @@ -2917,3 +2917,30 @@ func TestSubscribeOFPortStatusMessage(t *testing.T) { bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) c.SubscribeOFPortStatusMessage(ch) } + +func Test_client_InstallL7NetworkPolicyFlows(t *testing.T) { + ctrl := gomock.NewController(t) + m := opstest.NewMockOFEntryOperations(ctrl) + + fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, enableL7NetworkPolicy) + defer resetPipelines() + + expectedFlows := []string{ + "cookie=0x1020000000000, table=Classifier, priority=200,in_port=11,vlan_tci=0x1000/0x1000 actions=pop_vlan,set_field:0x6/0xf->reg0,goto_table:UnSNAT", + "cookie=0x1020000000000, table=ConntrackZone, priority=212,ip,reg0=0x0/0x800000 actions=set_field:0x800000/0x800000->reg0,ct(table=ConntrackZone,zone=65520)", + "cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=+rpl+trk,ct_mark=0x80/0x80,ip actions=goto_table:Output", + "cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=+rpl+trk,ip,reg0=0x6/0xf actions=ct(table=L3Forwarding,zone=65520,nat)", + "cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=-rpl+trk,ip,reg0=0x6/0xf actions=goto_table:L3Forwarding", + "cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=-rpl+trk,ct_mark=0x80/0x80,ip actions=ct(table=ConntrackState,zone=65520,nat)", + "cookie=0x1020000000000, table=TrafficControl, priority=210,reg0=0x6/0xf actions=goto_table:Output", + "cookie=0x1020000000000, table=Output, priority=213,reg0=0x6/0xf actions=output:NXM_NX_REG1[]", + "cookie=0x1020000000000, table=Output, priority=212,ct_mark=0x80/0x80 actions=push_vlan:0x8100,move:NXM_NX_CT_LABEL[64..75]->OXM_OF_VLAN_VID[0..11],output:10", + } + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + cacheKey := "l7_np_provision_flows" + assert.NoError(t, fc.InstallL7NetworkPolicyFlows()) + fCacheI, ok := fc.featureNetworkPolicy.cachedFlows.Load(cacheKey) + require.True(t, ok) + assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI)) +} diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index 78f0845a143..d97ef8c37d4 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -85,6 +85,9 @@ var ( OutputRegField = binding.NewRegField(0, 21, 22) OutputToOFPortRegMark = binding.NewRegMark(OutputRegField, outputToPortVal) OutputToControllerRegMark = binding.NewRegMark(OutputRegField, outputToControllerVal) + // reg0[23]: + CtStateNotRestoredRegMark = binding.NewOneBitZeroRegMark(0, 23) + CtStateRestoredRegMark = binding.NewOneBitRegMark(0, 23) // reg0[25..32]: Field to indicate Antrea-native policy packetIn operations PacketInOperationField = binding.NewRegField(0, 25, 32) diff --git a/pkg/agent/openflow/framework_test.go b/pkg/agent/openflow/framework_test.go index 2c69c85ffa5..25a075ff091 100644 --- a/pkg/agent/openflow/framework_test.go +++ b/pkg/agent/openflow/framework_test.go @@ -284,49 +284,6 @@ func TestBuildPipeline(t *testing.T) { }, }, }, - { - name: "K8s Node, IPv4 only, with L7NetworkPolicy enabled", - ipStack: ipv6Only, - features: []feature{ - newTestFeaturePodConnectivity(ipStackMap[ipv4Only]), - newTestFeatureNetworkPolicy(config.K8sNode, enableL7NetworkPolicy), - newTestFeatureService(), - newTestFeatureEgress(), - }, - expectedTables: map[binding.PipelineID][]*Table{ - pipelineRoot: { - PipelineRootClassifierTable, - }, - pipelineIP: { - ClassifierTable, - SpoofGuardTable, - UnSNATTable, - ConntrackTable, - ConntrackStateTable, - PreRoutingClassifierTable, - SessionAffinityTable, - ServiceLBTable, - EndpointDNATTable, - AntreaPolicyEgressRuleTable, - EgressRuleTable, - EgressDefaultTable, - EgressMetricTable, - L3ForwardingTable, - EgressMarkTable, - L3DecTTLTable, - SNATMarkTable, - SNATTable, - L2ForwardingCalcTable, - TrafficControlTable, - AntreaPolicyIngressRuleTable, - IngressRuleTable, - IngressDefaultTable, - IngressMetricTable, - ConntrackCommitTable, - OutputTable, - }, - }, - }, { name: "K8s Node, IPv4 only, with AntreaPolicy disabled", ipStack: ipv6Only, diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index babc9e83971..c141dabdee7 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -1653,6 +1653,9 @@ func (f *featureNetworkPolicy) replayFlows() []*openflow15.FlowMod { for _, ctx := range f.globalConjMatchFlowCache { addMatchFlows(ctx) } + + flows = append(flows, getCachedFlowMessages(f.cachedFlows)...) + return flows } @@ -2081,8 +2084,10 @@ type featureNetworkPolicy struct { // egressTables map records all IDs of tables related to egress rules. egressTables map[uint8]struct{} + cachedFlows *flowCategoryCache + // loggingGroupCache is a storage for the logging groups, each one includes 2 buckets: one is to send the packet - // to antrea-agent using the packetIn mechanism, the other is to force the packet to continue forwardingin the + // to antrea-agent using the packetIn mechanism, the other is to force the packet to continue forwarding the // OVS pipeline. The key is the next table used in the second bucket, and the value is the Openflow group. loggingGroupCache sync.Map groupAllocator GroupAllocator @@ -2137,6 +2142,7 @@ func newFeatureNetworkPolicy( category: cookie.NetworkPolicy, ctZoneSrcField: getZoneSrcField(connectUplinkToBridge), loggingGroupCache: sync.Map{}, + cachedFlows: newFlowCategoryCache(), groupAllocator: grpAllocator, } } @@ -2152,9 +2158,6 @@ func (f *featureNetworkPolicy) initFlows() []*openflow15.FlowMod { var flows []binding.Flow if f.nodeType == config.K8sNode { flows = append(flows, f.ingressClassifierFlows()...) - if f.enableL7NetworkPolicy { - flows = append(flows, f.l7NPTrafficControlFlows()...) - } } flows = append(flows, f.skipPolicyRuleCheckFlows()...) flows = append(flows, f.initLoggingFlows()...) @@ -2213,37 +2216,99 @@ func (f *featureNetworkPolicy) skipPolicyRuleCheckFlows() []binding.Flow { func (f *featureNetworkPolicy) l7NPTrafficControlFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() vlanMask := uint16(openflow15.OFPVID_PRESENT) - return []binding.Flow{ + flows := []binding.Flow{ + // This generates the flow to output the packets returned from an application-aware engine or a TrafficControl + // return port to their original target ofPort. It has the highest priority to prevent these packets from being + // matched by other flows in this table that redirect packets to an application-aware engine or TrafficControl + // target ofPort again. + OutputTable.ofTable.BuildFlow(priorityHigh + 3). + Cookie(cookieID). + MatchRegMark(FromTCReturnRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), // This generates the flow to output the packets marked with L7NPRedirectCTMark to an application-aware engine // via the target ofPort. Note that, before outputting the packets, VLAN ID stored on field L7NPRuleVlanIDCTMarkField // will be copied to VLAN ID register (OXM_OF_VLAN_VID) to set VLAN ID of the packets. OutputTable.ofTable.BuildFlow(priorityHigh+2). Cookie(cookieID). - MatchRegMark(OutputToOFPortRegMark). MatchCTMark(L7NPRedirectCTMark). Action().PushVLAN(EtherTypeDot1q). Action().MoveRange(binding.NxmFieldCtLabel, binding.OxmFieldVLANVID, *L7NPRuleVlanIDCTLabel.GetRange(), *binding.VLANVIDRange). Action().Output(f.l7NetworkPolicyConfig.TargetOFPort). Done(), // This generates the flow to mark the packets from an application-aware engine via the return ofPort and forward - // the packets to stageRouting directly. Note that, for the packets which are originally to be output to a tunnel - // port, value of NXM_NX_TUN_IPV4_DST needs to be loaded in L3ForwardingTable of stageRouting. + // the packets to stageConntrackState directly. Note that, for the packets which are originally to be output to a + // tunnel port, value of NXM_NX_TUN_IPV4_DST needs to be loaded in L3ForwardingTable of stageRouting. ClassifierTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchInPort(f.l7NetworkPolicyConfig.ReturnOFPort). MatchVLAN(false, 0, &vlanMask). Action().PopVLAN(). Action().LoadRegMark(FromTCReturnRegMark). - Action().GotoStage(stageRouting). + Action().GotoStage(stageConntrackState). Done(), // This generates the flow to forward the returned packets (with FromTCReturnRegMark) to stageOutput directly // after loading output port number to reg1 in L2ForwardingCalcTable. TrafficControlTable.ofTable.BuildFlow(priorityHigh). Cookie(cookieID). - MatchRegMark(OutputToOFPortRegMark, FromTCReturnRegMark). + MatchRegMark(FromTCReturnRegMark). Action().GotoStage(stageOutput). Done(), } + for _, ipProtocol := range f.ipProtocols { + ctZone := CtZone + if ipProtocol == binding.ProtocolIPv6 { + ctZone = CtZoneV6 + } + flows = append(flows, + ConntrackTable.ofTable.BuildFlow(priorityHigh+2). + MatchProtocol(ipProtocol). + MatchRegMark(CtStateNotRestoredRegMark). + Action().LoadRegMark(CtStateRestoredRegMark). + Action().CT(false, ConntrackTable.GetID(), ctZone, f.ctZoneSrcField). + CTDone(). + Cookie(cookieID). + Done(), + ConntrackTable.ofTable.BuildFlow(priorityHigh+1). + MatchProtocol(ipProtocol). + MatchRegMark(FromTCReturnRegMark). + MatchCTStateRpl(true). + MatchCTStateTrk(true). + Action().CT(false, L3ForwardingTable.GetID(), ctZone, f.ctZoneSrcField). + NAT(). + CTDone(). + Cookie(cookieID). + Done(), + ConntrackTable.ofTable.BuildFlow(priorityHigh+1). + MatchProtocol(ipProtocol). + MatchRegMark(FromTCReturnRegMark). + MatchCTStateRpl(false). + MatchCTStateTrk(true). + Action().GotoStage(stageRouting). + Cookie(cookieID). + Done(), + ConntrackTable.ofTable.BuildFlow(priorityHigh). + MatchProtocol(ipProtocol). + MatchCTStateRpl(true). + MatchCTStateTrk(true). + MatchCTMark(L7NPRedirectCTMark). + Action().GotoStage(stageOutput). + Cookie(cookieID). + Done(), + ConntrackTable.ofTable.BuildFlow(priorityHigh). + MatchProtocol(ipProtocol). + MatchCTStateRpl(false). + MatchCTStateTrk(true). + MatchCTMark(L7NPRedirectCTMark). + Action().CT(false, ConntrackTable.GetNext(), ctZone, f.ctZoneSrcField). + NAT(). + CTDone(). + Cookie(cookieID). + Done(), + ) + } + + return flows } func (f *featureNetworkPolicy) loggingNPPacketFlowWithOperations(cookieID uint64, operations uint8) binding.Flow { diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 3951e1b0afd..d3a77026876 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1360,7 +1360,7 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { } } -func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPolicyEnabled bool) []string { +func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled bool) []string { loggingFlows := []string{ "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=controller(id=32776,reason=no_match,userdata=01.01,max_len=65535)", "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=controller(id=32776,reason=no_match,userdata=01.02,max_len=65535)", @@ -1399,13 +1399,6 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64990,ct_state=-new+est,ip actions=goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64990,ct_state=-new+rel,ip actions=goto_table:IngressMetric", ) - if l7NetworkPolicyEnabled { - initFlows = append(initFlows, - "cookie=0x1020000000000, table=Classifier, priority=200,in_port=11,vlan_tci=0x1000/0x1000 actions=pop_vlan,set_field:0x6/0xf->reg0,goto_table:L3Forwarding", - "cookie=0x1020000000000, table=TrafficControl, priority=210,reg0=0x200006/0x60000f actions=goto_table:Output", - "cookie=0x1020000000000, table=Output, priority=212,ct_mark=0x80/0x80,reg0=0x200000/0x600000 actions=push_vlan:0x8100,move:NXM_NX_CT_LABEL[64..75]->OXM_OF_VLAN_VID[0..11],output:10", - ) - } return initFlows } @@ -1417,22 +1410,16 @@ func Test_featureNetworkPolicy_initFlows(t *testing.T) { clientOptions []clientOptionsFn expectedFlows []string }{ - { - name: "K8s Node with Multicast and L7NetworkPolicy", - nodeType: config.K8sNode, - clientOptions: []clientOptionsFn{enableMulticast, enableL7NetworkPolicy}, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false, true), - }, { name: "K8s Node with Multicast", nodeType: config.K8sNode, clientOptions: []clientOptionsFn{enableMulticast}, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false, false), + expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false), }, { name: "External Node", nodeType: config.ExternalNode, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, true, false), + expectedFlows: networkPolicyInitFlows(ovsMetersSupported, true), }, } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index eafaa90f344..4053f569285 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -296,6 +296,20 @@ func (mr *MockClientMockRecorder) InstallEndpointFlows(protocol, endpoints any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), protocol, endpoints) } +// InstallL7NetworkPolicyFlows mocks base method. +func (m *MockClient) InstallL7NetworkPolicyFlows() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallL7NetworkPolicyFlows") + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallL7NetworkPolicyFlows indicates an expected call of InstallL7NetworkPolicyFlows. +func (mr *MockClientMockRecorder) InstallL7NetworkPolicyFlows() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallL7NetworkPolicyFlows", reflect.TypeOf((*MockClient)(nil).InstallL7NetworkPolicyFlows)) +} + // InstallMulticastFlexibleIPAMFlows mocks base method. func (m *MockClient) InstallMulticastFlexibleIPAMFlows() error { m.ctrl.T.Helper() diff --git a/test/e2e/l7networkpolicy_test.go b/test/e2e/l7networkpolicy_test.go index be0cf549c10..bfc9ffe1073 100644 --- a/test/e2e/l7networkpolicy_test.go +++ b/test/e2e/l7networkpolicy_test.go @@ -29,11 +29,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" - agentconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/features" . "antrea.io/antrea/test/e2e/utils" ) @@ -48,17 +48,6 @@ func TestL7NetworkPolicy(t *testing.T) { } defer teardownTest(t, data) - ac := func(config *agentconfig.AgentConfig) { config.DisableTXChecksumOffload = true } - if err = data.mutateAntreaConfigMap(nil, ac, false, true); err != nil { - t.Fatalf("Failed to enable option DisableTXChecksumOffload: %v", err) - } - defer func() { - ac = func(config *agentconfig.AgentConfig) { config.DisableTXChecksumOffload = false } - if err = data.mutateAntreaConfigMap(nil, ac, false, true); err != nil { - t.Fatalf("Failed to disable option DisableTXChecksumOffload: %v", err) - } - }() - t.Run("HTTP", func(t *testing.T) { testL7NetworkPolicyHTTP(t, data) }) @@ -132,8 +121,8 @@ func createL7NetworkPolicy(t *testing.T, assert.NoError(t, err) } -func probeL7NetworkPolicyHTTP(t *testing.T, data *TestData, serverPodName, clientPodName string, serverIPs []*net.IP, allowHTTPPathHostname, allowHTTPPathClientIP bool) { - for _, ip := range serverIPs { +func probeL7NetworkPolicyHTTP(t *testing.T, data *TestData, serverPodName, clientPodName string, targetIPs []*net.IP, allowHTTPPathHostname, allowHTTPPathClientIP bool) { + for _, ip := range targetIPs { baseURL := net.JoinHostPort(ip.String(), "8080") // Verify that access to path /clientip is as expected. @@ -210,13 +199,33 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { _, err := data.podWaitForIPs(defaultTimeout, clientPodName, data.testNamespace) require.NoError(t, err, "Expected IP for Pod '%s'", clientPodName) + // Create a backend Pod which will be selected by test NetworkPolices. serverPodName := "test-l7-http-server" serverPodLabels := map[string]string{"test-l7-http-e2e": "server"} cmd := []string{"/agnhost", "netexec", "--http-port=8080"} require.NoError(t, NewPodBuilder(serverPodName, data.testNamespace, agnhostImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(serverPodLabels).Create(data)) podIPs, err := data.podWaitForIPs(defaultTimeout, serverPodName, data.testNamespace) require.NoError(t, err, "Expected IP for Pod '%s'", serverPodName) - serverIPs := podIPs.AsSlice() + targetPodIPs := podIPs.AsSlice() + + // Create a Service whose backend is the above backend Pod. + var ipFamilies []corev1.IPFamily + if len(clusterInfo.podV4NetworkCIDR) != 0 { + ipFamilies = append(ipFamilies, corev1.IPv4Protocol) + } + if len(clusterInfo.podV6NetworkCIDR) != 0 { + ipFamilies = append(ipFamilies, corev1.IPv6Protocol) + } + mutator := func(service *corev1.Service) { + service.Spec.IPFamilies = ipFamilies + } + svc, err := data.CreateServiceWithAnnotations("svc-agnhost", data.testNamespace, p8080, p8080, corev1.ProtocolTCP, serverPodLabels, false, false, corev1.ServiceTypeClusterIP, nil, nil, mutator) + require.NoError(t, err) + var targetServiceIPs []*net.IP + for _, clusterIP := range svc.Spec.ClusterIPs { + serviceIP := net.ParseIP(clusterIP) + targetServiceIPs = append(targetServiceIPs, &serviceIP) + } l7ProtocolAllowsPathHostname := []crdv1beta1.L7Protocol{ { @@ -250,7 +259,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // the first L7 NetworkPolicy has higher priority, matched packets will be only matched by the first L7 NetworkPolicy. // As a result, only HTTP path 'hostname' is allowed by the first L7 NetworkPolicy, other HTTP path like 'clientip' // will be rejected. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetPodIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetServiceIPs, true, false) // Delete the first L7 NetworkPolicy that only allows HTTP path 'hostname'. data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowPathHostname, metav1.DeleteOptions{}) @@ -258,7 +268,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // Since the fist L7 NetworkPolicy has been deleted, corresponding packets will be matched by the second L7 NetworkPolicy, // and the second L7 NetworkPolicy allows any HTTP path, then both path 'hostname' and 'clientip' are allowed. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetPodIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetServiceIPs, true, true) data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowAnyPath, metav1.DeleteOptions{}) }) @@ -277,7 +288,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // the first L7 NetworkPolicy has higher priority, matched packets will be only matched by the first L7 NetworkPolicy. // As a result, only HTTP path 'hostname' is allowed by the first L7 NetworkPolicy, other HTTP path like 'clientip' // will be rejected. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetPodIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetServiceIPs, true, false) // Delete the first L7 NetworkPolicy that only allows HTTP path 'hostname'. data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowPathHostname, metav1.DeleteOptions{}) @@ -285,7 +297,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // Since the fist L7 NetworkPolicy has been deleted, corresponding packets will be matched by the second L7 NetworkPolicy, // and the second L7 NetworkPolicy allows any HTTP path, then both path 'hostname' and 'clientip' are allowed. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetPodIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, targetServiceIPs, true, true) }) }