From 31bbde9bd3f87d1e6012694b6640f885998d572a Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Thu, 13 Feb 2025 14:54:32 -0800 Subject: [PATCH] Upgrade go-ipfix to 0.13.0 As part of this upgrade, we make the following changes: * Switch to buffered IPFIX exporter in the Flow Aggregator. This exporter has better performance for UDP IPFIX messages, by ensuring that multiple data records can be batched together in a single message. * Provide Path MTU (PMTU) when creating the IPFIX exporter in the Flow Aggregator. The value is used by the new buffered exporter to determine how many IPFIX records can fit in a single message while avoiding IP fragmentation. In our case, we "approximate" the Path MTU by looking up the MTU of the Flow Aggregator Pod's eth0 interface. * Add a MaxMsgSize configuration parameter to the Flow Aggregator as a way to override the default behavior, which is to use the MTU (minus header overhead) when the UDP protocol is used. * Add periodic flushing when exporting IPFIX records, which is necessary after switching to the buffered exporter. In Aggregation mode, flushing happens after processing a given batch of expired records. In Proxy mode, flushing happens every second. * Use updated reference IPFIX collector in e2e tests. The updated collector handles the case where multiple data records are included in the same IPFIX message more gracefully, which leads to some simplification in the test code. Signed-off-by: Antonin Bas --- build/charts/flow-aggregator/README.md | 1 + .../flow-aggregator/conf/flow-aggregator.conf | 4 + build/charts/flow-aggregator/values.yaml | 3 + build/yamls/flow-aggregator.yml | 6 +- ci/kind/test-e2e-kind.sh | 2 +- go.mod | 6 +- go.sum | 12 +- hack/update-codegen-dockerized.sh | 2 +- pkg/config/flowaggregator/config.go | 3 + pkg/flowaggregator/exporter/clickhouse.go | 4 + pkg/flowaggregator/exporter/interface.go | 3 + pkg/flowaggregator/exporter/ipfix.go | 120 +++++++++++------- pkg/flowaggregator/exporter/ipfix_test.go | 64 +++------- pkg/flowaggregator/exporter/log.go | 5 + pkg/flowaggregator/exporter/s3.go | 4 + .../exporter/testing/mock_exporter.go | 16 ++- pkg/flowaggregator/flowaggregator.go | 20 +++ .../{ipfix_process.go => ipfix_exporter.go} | 8 +- pkg/ipfix/testing/mock_ipfix.go | 58 ++++++++- test/e2e/flowaggregator_test.go | 14 +- test/e2e/framework.go | 2 +- 21 files changed, 242 insertions(+), 115 deletions(-) rename pkg/ipfix/{ipfix_process.go => ipfix_exporter.go} (77%) diff --git a/build/charts/flow-aggregator/README.md b/build/charts/flow-aggregator/README.md index 776cd974eb3..978493c76fb 100644 --- a/build/charts/flow-aggregator/README.md +++ b/build/charts/flow-aggregator/README.md @@ -34,6 +34,7 @@ Kubernetes: `>= 1.19.0-0` | flowAggregatorAddress | string | `""` | Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate. | | flowCollector.address | string | `""` | Provide the flow collector address as string with format :[:], where proto is tcp or udp. If no L4 transport proto is given, we consider tcp as default. | | flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. | +| flowCollector.maxIPFIXMsgSize | int | `0` | Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default value will be used based on the protocol (tcp or udp) used to connect to the collector. | | flowCollector.observationDomainID | string | `""` | Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated from the persistent cluster UUID generated by Antrea. | | flowCollector.recordFormat | string | `"IPFIX"` | Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON. | | flowCollector.templateRefreshTimeout | string | `"600s"` | Template retransmission interval when using the udp protocol to export records. The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | diff --git a/build/charts/flow-aggregator/conf/flow-aggregator.conf b/build/charts/flow-aggregator/conf/flow-aggregator.conf index 1c96b9c13ae..3a99156abb6 100644 --- a/build/charts/flow-aggregator/conf/flow-aggregator.conf +++ b/build/charts/flow-aggregator/conf/flow-aggregator.conf @@ -75,6 +75,10 @@ flowCollector: # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: {{ .Values.flowCollector.templateRefreshTimeout | quote }} + # Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default + # value will be used based on the protocol (tcp or udp) used to connect to the collector. + maxIPFIXMsgSize: {{ .Values.flowCollector.maxIPFIXMsgSize }} + # clickHouse contains ClickHouse related configuration options. clickHouse: # Enable is the switch to enable exporting flow records to ClickHouse. diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index 5159cff6ee4..ea2ca991366 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -52,6 +52,9 @@ flowCollector: # -- Template retransmission interval when using the udp protocol to export records. # The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: "600s" + # -- Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable + # default value will be used based on the protocol (tcp or udp) used to connect to the collector. + maxIPFIXMsgSize: 0 # clickHouse contains ClickHouse related configuration options. clickHouse: # -- Determine whether to enable exporting flow records to ClickHouse. diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index caec1584de2..9a0bc698447 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -227,6 +227,10 @@ data: # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: "600s" + # Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default + # value will be used based on the protocol (tcp or udp) used to connect to the collector. + maxIPFIXMsgSize: 0 + # clickHouse contains ClickHouse related configuration options. clickHouse: # Enable is the switch to enable exporting flow records to ClickHouse. @@ -401,7 +405,7 @@ spec: template: metadata: annotations: - checksum/config: 5ba1a6d1b9d3b40e2ea26e37aa2bea38fda2558c20564873936472136651de37 + checksum/config: 96acfb574fbfb758e6388d677cbc8359c0375031fd68875d4ec2d03f34d2e49c labels: app: flow-aggregator spec: diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index eabc9952416..c2f8bc7c6d7 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -261,7 +261,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.40" \ "antrea/nginx:1.21.6-alpine" \ "antrea/toolbox:1.5-1") -FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.12.0" \ +FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.13.0" \ "antrea/clickhouse-operator:0.21.0" \ "antrea/metrics-exporter:0.21.0" \ "antrea/clickhouse-server:23.4") diff --git a/go.mod b/go.mod index 4ef3eecae6a..3ba98299843 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/ti-mo/conntrack v0.5.1 github.com/vishvananda/netlink v1.3.0 - github.com/vmware/go-ipfix v0.12.0 + github.com/vmware/go-ipfix v0.13.0 go.uber.org/mock v0.5.0 golang.org/x/crypto v0.33.0 golang.org/x/mod v0.23.0 @@ -168,7 +168,7 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/k-sone/critbitgo v1.4.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect @@ -192,7 +192,7 @@ require ( github.com/paulmach/orb v0.8.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/transport/v2 v2.2.10 // indirect diff --git a/go.sum b/go.sum index 2c35ff741df..95df7912cf6 100644 --- a/go.sum +++ b/go.sum @@ -485,8 +485,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= @@ -622,8 +622,8 @@ github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -775,8 +775,8 @@ github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= -github.com/vmware/go-ipfix v0.12.0 h1:a4YXeCWTa251aZO7u7e9dKDOoU2eHJID45SPlq9j+HI= -github.com/vmware/go-ipfix v0.12.0/go.mod h1:9PiutVWLhQQ6WHncRrGkH0i2Rx82DEOKhu80VSd9jds= +github.com/vmware/go-ipfix v0.13.0 h1:v3paBzd7oq7LEU1SzDwD5RGoYcGROLQycYyN3EzLvDk= +github.com/vmware/go-ipfix v0.13.0/go.mod h1:UTIR38AuEePzrWYjQOvnORCYRG33xZJ56E0K75mSosM= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index c0e552be17a..91d712120b2 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -90,7 +90,7 @@ MOCKGEN_TARGETS=( "pkg/controller/networkpolicy EndpointQuerier,PolicyRuleQuerier testing" "pkg/controller/querier ControllerQuerier testing" "pkg/flowaggregator/exporter Interface testing" - "pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" + "pkg/ipfix IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" "pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder,PacketOutBuilder,Meter,MeterBandBuilder testing" "pkg/ovs/ovsconfig OVSBridgeClient testing" "pkg/ovs/ovsctl OVSCtlClient testing" diff --git a/pkg/config/flowaggregator/config.go b/pkg/config/flowaggregator/config.go index 0952da0ac74..aeccbf90cf3 100644 --- a/pkg/config/flowaggregator/config.go +++ b/pkg/config/flowaggregator/config.go @@ -105,6 +105,9 @@ type FlowCollectorConfig struct { // The value must be provided as a duration string. Defaults to 600s. // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". TemplateRefreshTimeout string `yaml:"templateRefreshTimeout,omitempty"` + // Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable + // default value will be used based on the protocol (tcp or udp) used to connect to the collector. + MaxIPFIXMsgSize int32 `yaml:"maxIPFIXMsgSize,omitempty"` } type ClickHouseConfig struct { diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index bc06d1aee7e..1fd766e1681 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -122,3 +122,7 @@ func (e *ClickHouseExporter) UpdateOptions(opt *options.Options) { } klog.InfoS("New ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) } + +func (e *ClickHouseExporter) Flush() error { + return nil +} diff --git a/pkg/flowaggregator/exporter/interface.go b/pkg/flowaggregator/exporter/interface.go index af3ccd8c18f..2c1601ab19c 100644 --- a/pkg/flowaggregator/exporter/interface.go +++ b/pkg/flowaggregator/exporter/interface.go @@ -28,4 +28,7 @@ type Interface interface { Stop() AddRecord(record ipfixentities.Record, isRecordIPv6 bool) error UpdateOptions(opt *options.Options) + // Some exporters may be buffered, in which case the FlowAggregator + // should call this method periodically. + Flush() error } diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index f18915a407f..392b279a31a 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -17,6 +17,7 @@ package exporter import ( "fmt" "hash/fnv" + "net" "reflect" "time" @@ -30,6 +31,7 @@ import ( "antrea.io/antrea/pkg/flowaggregator/infoelements" "antrea.io/antrea/pkg/flowaggregator/options" "antrea.io/antrea/pkg/ipfix" + "antrea.io/antrea/pkg/util/env" ) // this is used for unit testing @@ -44,6 +46,7 @@ type IPFIXExporter struct { externalFlowCollectorAddr string externalFlowCollectorProto string exportingProcess ipfix.IPFIXExportingProcess + bufferedExporter ipfix.IPFIXBufferedExporter sendJSONRecord bool aggregatorMode flowaggregatorconfig.AggregatorMode observationDomainID uint32 @@ -51,8 +54,8 @@ type IPFIXExporter struct { templateIDv4 uint16 templateIDv6 uint16 registry ipfix.IPFIXRegistry - set ipfixentities.Set clusterUUID uuid.UUID + maxIPFIXMsgSize int } // genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the @@ -93,8 +96,8 @@ func NewIPFIXExporter( observationDomainID: observationDomainID, templateRefreshTimeout: opt.TemplateRefreshTimeout, registry: registry, - set: ipfixentities.NewSet(false), clusterUUID: clusterUUID, + maxIPFIXMsgSize: int(opt.Config.FlowCollector.MaxIPFIXMsgSize), } return exporter @@ -107,6 +110,9 @@ func (e *IPFIXExporter) Start() { func (e *IPFIXExporter) Stop() { if e.exportingProcess != nil { + if err := e.bufferedExporter.Flush(); err != nil { + klog.ErrorS(err, "Error when flushing buffered IPFIX exporter") + } e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil } @@ -144,7 +150,8 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { e.observationDomainID = genObservationDomainID(e.clusterUUID) } e.templateRefreshTimeout = opt.TemplateRefreshTimeout - klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout) + e.maxIPFIXMsgSize = int(opt.Config.FlowCollector.MaxIPFIXMsgSize) + klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout, "maxIPFIXMsgSize", e.maxIPFIXMsgSize) if e.exportingProcess != nil { e.exportingProcess.CloseConnToCollector() @@ -159,33 +166,28 @@ func (e *IPFIXExporter) sendRecord(record ipfixentities.Record, isRecordIPv6 boo return fmt.Errorf("error when initializing IPFIX exporting process: %v", err) } } - - templateID := e.templateIDv4 - if isRecordIPv6 { - templateID = e.templateIDv6 - } - - // TODO: more records per data set will be supported when go-ipfix supports size check when adding records - e.set.ResetSet() - if err := e.set.PrepareSet(ipfixentities.Data, templateID); err != nil { - return err - } - if err := e.set.AddRecordV2(record.GetOrderedElementList(), templateID); err != nil { - return err - } - sentBytes, err := e.exportingProcess.SendSet(e.set) - if err != nil { + if err := e.bufferedExporter.AddRecord(record); err != nil { return err } if klog.V(7).Enabled() { - klog.InfoS("Data set sent successfully", "bytes sent", sentBytes) + klog.InfoS("Data record added successfully") } return nil } +func inPod() bool { + return env.GetPodNamespace() != "" +} + +func getMTU(ifaceName string) (int, error) { + iface, err := net.InterfaceByName(ifaceName) + if err != nil { + return 0, err + } + return iface.MTU, nil +} + func (e *IPFIXExporter) initExportingProcess() error { - // TODO: This code can be further simplified by changing the go-ipfix API to accept - // externalFlowCollectorAddr and externalFlowCollectorProto instead of net.Addr input. var expInput exporter.ExporterInput if e.externalFlowCollectorProto == "tcp" { expInput = exporter.ExporterInput{ @@ -206,12 +208,32 @@ func (e *IPFIXExporter) initExportingProcess() error { TLSClientConfig: nil, SendJSONRecord: e.sendJSONRecord, } + if inPod() { + // In a Pod, the primary network interface is always "eth0", and we assume + // this is the interface used to connect to the IPFIX collector. + // The FlowAggregator is not meant to be run in the host network. + mtu, err := getMTU("eth0") + if err != nil { + klog.ErrorS(err, "Failed to determine uplink MTU") + } else { + // In practice the only guarantee we have is that PMTU <= + // MTU. However, this is a reasonable approximation for most + // scenarios. Note that MaxMessageSize is an available override in + // the config. + expInput.PathMTU = mtu + } + } else { + klog.InfoS("Not running as Pod, cannot determine interface MTU") + } } + expInput.MaxMsgSize = e.maxIPFIXMsgSize + ep, err := exporter.InitExportingProcess(expInput) if err != nil { - return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err) + return fmt.Errorf("got error when initializing IPFIX exporting process: %w", err) } e.exportingProcess = ep + e.bufferedExporter = exporter.NewBufferedIPFIXExporter(ep) // Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster if err = e.createAndSendTemplate(false); err != nil { return err @@ -234,18 +256,16 @@ func (e *IPFIXExporter) createAndSendTemplate(isRecordIPv6 bool) error { } else { e.templateIDv4 = templateID } - bytesSent, err := e.sendTemplateSet(isRecordIPv6) - if err != nil { + if err := e.sendTemplateSet(isRecordIPv6); err != nil { e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil - e.set.ResetSet() return fmt.Errorf("sending %s template set failed, err: %v", recordIPFamily, err) } - klog.V(2).InfoS("Exporting process initialized", "bytesSent", bytesSent, "templateSetIPFamily", recordIPFamily) + klog.V(2).InfoS("Exporting process initialized", "templateSetIPFamily", recordIPFamily) return nil } -func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { +func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) error { elements := make([]ipfixentities.InfoElementWithValue, 0) ianaInfoElements := infoelements.IANAInfoElementsIPv4 antreaInfoElements := infoelements.AntreaInfoElementsIPv4 @@ -258,21 +278,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { for _, ieName := range ianaInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range infoelements.IANAReverseInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAReversedEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range antreaInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -284,21 +304,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { ieName := infoelements.AntreaSourceStatsElementList[i] ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add Antrea destination stats fields ieName = infoelements.AntreaDestinationStatsElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range infoelements.AntreaFlowEndSecondsElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -307,21 +327,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { ieName := infoelements.AntreaThroughputElementList[i] ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add source node specific throughput fields ieName = infoelements.AntreaSourceThroughputElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add destination node specific throughput fields ieName = infoelements.AntreaDestinationThroughputElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -329,33 +349,28 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { for _, ieName := range infoelements.AntreaLabelsElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy { for _, ieName := range infoelements.IANAProxyModeElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } } - e.set.ResetSet() - if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { - return 0, err - } - if err := e.set.AddRecordV2(elements, templateID); err != nil { - return 0, fmt.Errorf("error when adding record to set, error: %v", err) - } - bytesSent, err := e.exportingProcess.SendSet(e.set) - return bytesSent, err + record := ipfixentities.NewTemplateRecordFromElements(templateID, elements, false) + // Ideally we would not have to do it explicitly, it would be taken care of by the go-ipfix library. + record.PrepareRecord() + return e.bufferedExporter.AddRecord(record) } func (e *IPFIXExporter) createInfoElementForTemplateSet(ieName string, enterpriseID uint32) (ipfixentities.InfoElementWithValue, error) { @@ -369,3 +384,10 @@ func (e *IPFIXExporter) createInfoElementForTemplateSet(ieName string, enterpris } return ie, nil } + +func (e *IPFIXExporter) Flush() error { + if e.exportingProcess == nil { + return nil + } + return e.bufferedExporter.Flush() +} diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index e9749807573..b57992ff469 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -17,6 +17,7 @@ package exporter import ( "fmt" "net" + "reflect" "testing" "time" @@ -55,18 +56,16 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { runTest := func(t *testing.T, isIPv6 bool) { ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) exporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", - exportingProcess: mockIPFIXExpProc, + bufferedExporter: mockIPFIXBufferedExp, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, registry: mockIPFIXRegistry, - set: mockTempSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } @@ -75,15 +74,11 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { if isIPv6 { testTemplateID = exporter.templateIDv6 } - mockTempSet.EXPECT().ResetSet() - mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil) - mockTempSet.EXPECT().AddRecordV2(elemList, testTemplateID).Return(nil) - // Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes - // above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements. - mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) - - _, err := exporter.sendTemplateSet(isIPv6) - assert.NoErrorf(t, err, "Error when sending template record") + mockIPFIXBufferedExp.EXPECT().AddRecord(gomock.Cond(func(record ipfixentities.Record) bool { + return record.GetTemplateID() == testTemplateID && reflect.DeepEqual(record.GetOrderedElementList(), elemList) + })).Return(nil) + + assert.NoErrorf(t, exporter.sendTemplateSet(isIPv6), "Error when sending template record") } t.Run("IPv4", func(t *testing.T) { runTest(t, false) }) @@ -94,17 +89,18 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) // we override the initIPFIXExportingProcess var function: it will - // simply set the exportingProcess member field of the ipfixExporter to - // our mock instance. + // simply set the exportingProcess and bufferedExporter member fields of + // the ipfixExporter to our mocks. // note that even though we "update" the external flow collector address - // as part of the test, we still use the same mock for simplicity's sake. + // as part of the test, we still use the same mocks for simplicity's sake. initIPFIXExportingProcessSaved := initIPFIXExportingProcess initIPFIXExportingProcess = func(exporter *IPFIXExporter) error { exporter.exportingProcess = mockIPFIXExpProc + exporter.bufferedExporter = mockIPFIXBufferedExp return nil } defer func() { @@ -125,20 +121,14 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 setCount := 0 - mockSet.EXPECT().ResetSet().Times(2) - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil).Times(2) - mockRecord.EXPECT().GetOrderedElementList().Return(nil).Times(2) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil).Times(2) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Do(func(set interface{}) { + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Do(func(record ipfixentities.Record) { setCount += 1 - }).Return(0, nil).Times(2) + }).Return(nil).Times(2) // connection will be closed when updating the external flow collector address mockIPFIXExpProc.EXPECT().CloseConnToCollector() @@ -171,13 +161,12 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { func TestIPFIXExporter_AddRecord(t *testing.T) { ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) initIPFIXExportingProcessSaved := initIPFIXExportingProcess initIPFIXExportingProcess = func(exporter *IPFIXExporter) error { - exporter.exportingProcess = mockIPFIXExpProc + exporter.bufferedExporter = mockIPFIXBufferedExp return nil } defer func() { @@ -189,17 +178,11 @@ func TestIPFIXExporter_AddRecord(t *testing.T) { externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 - mockSet.EXPECT().ResetSet() - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil) - mockRecord.EXPECT().GetOrderedElementList().Return(nil) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Return(0, nil) + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Return(nil) assert.NoError(t, ipfixExporter.AddRecord(mockRecord, false)) } @@ -232,26 +215,21 @@ func TestIPFIXExporter_sendRecord_Error(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) ipfixExporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", exportingProcess: mockIPFIXExpProc, + bufferedExporter: mockIPFIXBufferedExp, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 - mockSet.EXPECT().ResetSet() - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil) - mockRecord.EXPECT().GetOrderedElementList().Return(nil) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Return(0, fmt.Errorf("send error")) + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Return(fmt.Errorf("send error")) mockIPFIXExpProc.EXPECT().CloseConnToCollector() assert.Error(t, ipfixExporter.AddRecord(mockRecord, false)) diff --git a/pkg/flowaggregator/exporter/log.go b/pkg/flowaggregator/exporter/log.go index f7a5fa8eaff..ed6dba0449b 100644 --- a/pkg/flowaggregator/exporter/log.go +++ b/pkg/flowaggregator/exporter/log.go @@ -159,3 +159,8 @@ func (e *LogExporter) UpdateOptions(opt *options.Options) { e.buildFilters() e.start() } + +func (e *LogExporter) Flush() error { + // TODO: replace FlushLoop in flowlogger.FlowLogger? + return nil +} diff --git a/pkg/flowaggregator/exporter/s3.go b/pkg/flowaggregator/exporter/s3.go index d4d61c9c56f..a1e48e5b364 100644 --- a/pkg/flowaggregator/exporter/s3.go +++ b/pkg/flowaggregator/exporter/s3.go @@ -85,3 +85,7 @@ func (e *S3Exporter) UpdateOptions(opt *options.Options) { } klog.InfoS("New S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.Config.UploadInterval) } + +func (e *S3Exporter) Flush() error { + return nil +} diff --git a/pkg/flowaggregator/exporter/testing/mock_exporter.go b/pkg/flowaggregator/exporter/testing/mock_exporter.go index e07c1fc9acb..dcf156f1212 100644 --- a/pkg/flowaggregator/exporter/testing/mock_exporter.go +++ b/pkg/flowaggregator/exporter/testing/mock_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -70,6 +70,20 @@ func (mr *MockInterfaceMockRecorder) AddRecord(record, isRecordIPv6 any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockInterface)(nil).AddRecord), record, isRecordIPv6) } +// Flush mocks base method. +func (m *MockInterface) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockInterfaceMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockInterface)(nil).Flush)) +} + // Start mocks base method. func (m *MockInterface) Start() { m.ctrl.T.Helper() diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 313d0d8f70c..fb9b2985275 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -560,6 +560,9 @@ func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID u func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) { logTicker := time.NewTicker(fa.logTickerDuration) defer logTicker.Stop() + const flushTickerDuration = 1 * time.Second + flushTicker := time.NewTicker(flushTickerDuration) + defer flushTicker.Stop() msgCh := fa.preprocessorOutCh proxyRecords := func(msg *ipfixentities.Message) { @@ -587,6 +590,10 @@ func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) { break } proxyRecords(msg) + case <-flushTicker.C: + if err := fa.flushExporters(); err != nil { + klog.ErrorS(err, "Error when flushing exporters") + } case <-logTicker.C: // Add visibility of processing stats of Flow Aggregator klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived()) @@ -627,6 +634,9 @@ func (fa *flowAggregator) flowExportLoopAggregate(stopCh <-chan struct{}) { } // Get the new expiry and reset the timer. expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue()) + if err := fa.flushExporters(); err != nil { + klog.ErrorS(err, "Error when flushing exporters") + } case <-logTicker.C: // Add visibility of processing stats of Flow Aggregator klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived()) @@ -672,6 +682,16 @@ func (fa *flowAggregator) sendRecord(record ipfixentities.Record, isRecordIPv6 b return nil } +func (fa *flowAggregator) flushExporters() error { + if fa.ipfixExporter != nil { + if err := fa.ipfixExporter.Flush(); err != nil { + return err + } + } + // Other exporters don't leverage Flush for now, so we skip them. + return nil +} + func (fa *flowAggregator) sendAggregatedRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error { isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record) startTime, err := fa.getRecordStartTime(record.Record) diff --git a/pkg/ipfix/ipfix_process.go b/pkg/ipfix/ipfix_exporter.go similarity index 77% rename from pkg/ipfix/ipfix_process.go rename to pkg/ipfix/ipfix_exporter.go index 6a185a1bb8b..c0708658697 100644 --- a/pkg/ipfix/ipfix_process.go +++ b/pkg/ipfix/ipfix_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,3 +24,9 @@ type IPFIXExportingProcess interface { SendSet(set ipfixentities.Set) (int, error) CloseConnToCollector() } + +// IPFIXBufferedExporter interface is added to facilitate unit testing without involving the code from go-ipfix library. +type IPFIXBufferedExporter interface { + AddRecord(record ipfixentities.Record) error + Flush() error +} diff --git a/pkg/ipfix/testing/mock_ipfix.go b/pkg/ipfix/testing/mock_ipfix.go index 930102779cc..fe6509c5dd7 100644 --- a/pkg/ipfix/testing/mock_ipfix.go +++ b/pkg/ipfix/testing/mock_ipfix.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,11 +14,11 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: antrea.io/antrea/pkg/ipfix (interfaces: IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess) +// Source: antrea.io/antrea/pkg/ipfix (interfaces: IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess) // // Generated by this command: // -// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/ipfix/testing/mock_ipfix.go -package testing antrea.io/antrea/pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess +// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/ipfix/testing/mock_ipfix.go -package testing antrea.io/antrea/pkg/ipfix IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess // // Package testing is a generated GoMock package. @@ -98,6 +98,58 @@ func (mr *MockIPFIXExportingProcessMockRecorder) SendSet(set any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendSet", reflect.TypeOf((*MockIPFIXExportingProcess)(nil).SendSet), set) } +// MockIPFIXBufferedExporter is a mock of IPFIXBufferedExporter interface. +type MockIPFIXBufferedExporter struct { + ctrl *gomock.Controller + recorder *MockIPFIXBufferedExporterMockRecorder + isgomock struct{} +} + +// MockIPFIXBufferedExporterMockRecorder is the mock recorder for MockIPFIXBufferedExporter. +type MockIPFIXBufferedExporterMockRecorder struct { + mock *MockIPFIXBufferedExporter +} + +// NewMockIPFIXBufferedExporter creates a new mock instance. +func NewMockIPFIXBufferedExporter(ctrl *gomock.Controller) *MockIPFIXBufferedExporter { + mock := &MockIPFIXBufferedExporter{ctrl: ctrl} + mock.recorder = &MockIPFIXBufferedExporterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockIPFIXBufferedExporter) EXPECT() *MockIPFIXBufferedExporterMockRecorder { + return m.recorder +} + +// AddRecord mocks base method. +func (m *MockIPFIXBufferedExporter) AddRecord(record entities.Record) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddRecord", record) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddRecord indicates an expected call of AddRecord. +func (mr *MockIPFIXBufferedExporterMockRecorder) AddRecord(record any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockIPFIXBufferedExporter)(nil).AddRecord), record) +} + +// Flush mocks base method. +func (m *MockIPFIXBufferedExporter) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockIPFIXBufferedExporterMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockIPFIXBufferedExporter)(nil).Flush)) +} + // MockIPFIXRegistry is a mock of IPFIXRegistry interface. type MockIPFIXRegistry struct { ctrl *gomock.Controller diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 135a5218c68..e01e586f9ce 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -182,8 +182,12 @@ type testFlow struct { checkDstSvc bool } +type flowRecord struct { + Data string `json:"data"` +} + type IPFIXCollectorResponse struct { - FlowRecords []string `json:"flowRecords"` + FlowRecords []flowRecord `json:"flowRecords"` } func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*TestData, bool, bool) { @@ -1485,9 +1489,6 @@ func checkL7FlowExporterDataClickHouse(t *testing.T, record *ClickHouseFullRow, } func getUint64FieldFromRecord(t require.TestingT, record string, field string) uint64 { - if strings.Contains(record, "TEMPLATE SET") { - return 0 - } splitLines := strings.Split(record, "\n") for _, line := range splitLines { if strings.Contains(line, field) { @@ -1529,7 +1530,10 @@ func getCollectorOutput(t require.TestingT, srcIP, dstIP, srcPort string, isDstS if err := json.Unmarshal([]byte(collectorOutput), &response); err != nil { return false, fmt.Errorf("error when unmarshalling output from IPFIX collector Pod: %w", err) } - allRecords = response.FlowRecords + allRecords := make([]string, len(response.FlowRecords)) + for idx := range response.FlowRecords { + allRecords[idx] = response.FlowRecords[idx].Data + } records = filterCollectorRecords(allRecords, labelFilter, src, dst, srcPort) if lookForFlowEnd { for _, record := range records { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 8064723c2f2..0f2abedd4d8 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -129,7 +129,7 @@ const ( mcjoinImage = "antrea/mcjoin:v2.9" nginxImage = "antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" - ipfixCollectorImage = "antrea/ipfix-collector:v0.12.0" + ipfixCollectorImage = "antrea/ipfix-collector:v0.13.0" nginxLBService = "nginx-loadbalancer"