diff --git a/build/charts/flow-aggregator/README.md b/build/charts/flow-aggregator/README.md index 776cd974eb3..3f5df866dae 100644 --- a/build/charts/flow-aggregator/README.md +++ b/build/charts/flow-aggregator/README.md @@ -34,6 +34,7 @@ Kubernetes: `>= 1.19.0-0` | flowAggregatorAddress | string | `""` | Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate. | | flowCollector.address | string | `""` | Provide the flow collector address as string with format :[:], where proto is tcp or udp. If no L4 transport proto is given, we consider tcp as default. | | flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. | +| flowCollector.maxIPFIXMsgSize | int | `0` | Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default value will be used based on the protocol (tcp or udp) used to connect to the collector. Min valid value is 512 and max valid value is 65535. | | flowCollector.observationDomainID | string | `""` | Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated from the persistent cluster UUID generated by Antrea. | | flowCollector.recordFormat | string | `"IPFIX"` | Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON. | | flowCollector.templateRefreshTimeout | string | `"600s"` | Template retransmission interval when using the udp protocol to export records. The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | diff --git a/build/charts/flow-aggregator/conf/flow-aggregator.conf b/build/charts/flow-aggregator/conf/flow-aggregator.conf index 1c96b9c13ae..4aa24ba8efc 100644 --- a/build/charts/flow-aggregator/conf/flow-aggregator.conf +++ b/build/charts/flow-aggregator/conf/flow-aggregator.conf @@ -75,6 +75,11 @@ flowCollector: # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: {{ .Values.flowCollector.templateRefreshTimeout | quote }} + # Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default + # value will be used based on the protocol (tcp or udp) used to connect to the collector. + # Min valid value is 512 and max valid value is 65535. + maxIPFIXMsgSize: {{ .Values.flowCollector.maxIPFIXMsgSize }} + # clickHouse contains ClickHouse related configuration options. clickHouse: # Enable is the switch to enable exporting flow records to ClickHouse. diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index 5159cff6ee4..05999b8e26f 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -52,6 +52,10 @@ flowCollector: # -- Template retransmission interval when using the udp protocol to export records. # The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: "600s" + # -- Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable + # default value will be used based on the protocol (tcp or udp) used to connect to the collector. + # Min valid value is 512 and max valid value is 65535. + maxIPFIXMsgSize: 0 # clickHouse contains ClickHouse related configuration options. clickHouse: # -- Determine whether to enable exporting flow records to ClickHouse. diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index caec1584de2..04ee120131f 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -227,6 +227,11 @@ data: # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". templateRefreshTimeout: "600s" + # Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default + # value will be used based on the protocol (tcp or udp) used to connect to the collector. + # Min valid value is 512 and max valid value is 65535. + maxIPFIXMsgSize: 0 + # clickHouse contains ClickHouse related configuration options. clickHouse: # Enable is the switch to enable exporting flow records to ClickHouse. @@ -401,7 +406,7 @@ spec: template: metadata: annotations: - checksum/config: 5ba1a6d1b9d3b40e2ea26e37aa2bea38fda2558c20564873936472136651de37 + checksum/config: 13cccea100703f4180b6b432d44ec50b97c0508f9bdf06dd1ef4fbad08b4bed6 labels: app: flow-aggregator spec: diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index eabc9952416..c2f8bc7c6d7 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -261,7 +261,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.40" \ "antrea/nginx:1.21.6-alpine" \ "antrea/toolbox:1.5-1") -FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.12.0" \ +FLOW_VISIBILITY_IMAGE_LIST=("antrea/ipfix-collector:v0.13.0" \ "antrea/clickhouse-operator:0.21.0" \ "antrea/metrics-exporter:0.21.0" \ "antrea/clickhouse-server:23.4") diff --git a/go.mod b/go.mod index 4ef3eecae6a..3ba98299843 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/ti-mo/conntrack v0.5.1 github.com/vishvananda/netlink v1.3.0 - github.com/vmware/go-ipfix v0.12.0 + github.com/vmware/go-ipfix v0.13.0 go.uber.org/mock v0.5.0 golang.org/x/crypto v0.33.0 golang.org/x/mod v0.23.0 @@ -168,7 +168,7 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/k-sone/critbitgo v1.4.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect @@ -192,7 +192,7 @@ require ( github.com/paulmach/orb v0.8.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/transport/v2 v2.2.10 // indirect diff --git a/go.sum b/go.sum index 2c35ff741df..95df7912cf6 100644 --- a/go.sum +++ b/go.sum @@ -485,8 +485,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= @@ -622,8 +622,8 @@ github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -775,8 +775,8 @@ github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= -github.com/vmware/go-ipfix v0.12.0 h1:a4YXeCWTa251aZO7u7e9dKDOoU2eHJID45SPlq9j+HI= -github.com/vmware/go-ipfix v0.12.0/go.mod h1:9PiutVWLhQQ6WHncRrGkH0i2Rx82DEOKhu80VSd9jds= +github.com/vmware/go-ipfix v0.13.0 h1:v3paBzd7oq7LEU1SzDwD5RGoYcGROLQycYyN3EzLvDk= +github.com/vmware/go-ipfix v0.13.0/go.mod h1:UTIR38AuEePzrWYjQOvnORCYRG33xZJ56E0K75mSosM= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index c0e552be17a..91d712120b2 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -90,7 +90,7 @@ MOCKGEN_TARGETS=( "pkg/controller/networkpolicy EndpointQuerier,PolicyRuleQuerier testing" "pkg/controller/querier ControllerQuerier testing" "pkg/flowaggregator/exporter Interface testing" - "pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" + "pkg/ipfix IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing" "pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder,PacketOutBuilder,Meter,MeterBandBuilder testing" "pkg/ovs/ovsconfig OVSBridgeClient testing" "pkg/ovs/ovsctl OVSCtlClient testing" diff --git a/pkg/config/flowaggregator/config.go b/pkg/config/flowaggregator/config.go index 0952da0ac74..9f27ae7e270 100644 --- a/pkg/config/flowaggregator/config.go +++ b/pkg/config/flowaggregator/config.go @@ -105,6 +105,10 @@ type FlowCollectorConfig struct { // The value must be provided as a duration string. Defaults to 600s. // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". TemplateRefreshTimeout string `yaml:"templateRefreshTimeout,omitempty"` + // Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable + // default value will be used based on the protocol (tcp or udp) used to connect to the collector. + // Min valid value is 512 and max valid value is 65535. + MaxIPFIXMsgSize int32 `yaml:"maxIPFIXMsgSize,omitempty"` } type ClickHouseConfig struct { diff --git a/pkg/config/flowaggregator/default.go b/pkg/config/flowaggregator/default.go index 9a9478577a1..9301869c6b0 100644 --- a/pkg/config/flowaggregator/default.go +++ b/pkg/config/flowaggregator/default.go @@ -30,6 +30,8 @@ const ( DefaultAggregatorTransportProtocol = "TLS" DefaultRecordFormat = "IPFIX" DefaultTemplateRefreshTimeout = "600s" + MinValidIPFIXMsgSize = 512 + MaxValidIPFIXMsgSize = 65535 DefaultClickHouseDatabase = "default" DefaultClickHouseCommitInterval = "8s" diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index bc06d1aee7e..1fd766e1681 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -122,3 +122,7 @@ func (e *ClickHouseExporter) UpdateOptions(opt *options.Options) { } klog.InfoS("New ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) } + +func (e *ClickHouseExporter) Flush() error { + return nil +} diff --git a/pkg/flowaggregator/exporter/interface.go b/pkg/flowaggregator/exporter/interface.go index af3ccd8c18f..2c1601ab19c 100644 --- a/pkg/flowaggregator/exporter/interface.go +++ b/pkg/flowaggregator/exporter/interface.go @@ -28,4 +28,7 @@ type Interface interface { Stop() AddRecord(record ipfixentities.Record, isRecordIPv6 bool) error UpdateOptions(opt *options.Options) + // Some exporters may be buffered, in which case the FlowAggregator + // should call this method periodically. + Flush() error } diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index f18915a407f..a35314fd6d0 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -17,6 +17,7 @@ package exporter import ( "fmt" "hash/fnv" + "net" "reflect" "time" @@ -30,6 +31,7 @@ import ( "antrea.io/antrea/pkg/flowaggregator/infoelements" "antrea.io/antrea/pkg/flowaggregator/options" "antrea.io/antrea/pkg/ipfix" + "antrea.io/antrea/pkg/util/env" ) // this is used for unit testing @@ -44,6 +46,7 @@ type IPFIXExporter struct { externalFlowCollectorAddr string externalFlowCollectorProto string exportingProcess ipfix.IPFIXExportingProcess + bufferedExporter ipfix.IPFIXBufferedExporter sendJSONRecord bool aggregatorMode flowaggregatorconfig.AggregatorMode observationDomainID uint32 @@ -51,8 +54,8 @@ type IPFIXExporter struct { templateIDv4 uint16 templateIDv6 uint16 registry ipfix.IPFIXRegistry - set ipfixentities.Set clusterUUID uuid.UUID + maxIPFIXMsgSize int } // genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the @@ -93,8 +96,8 @@ func NewIPFIXExporter( observationDomainID: observationDomainID, templateRefreshTimeout: opt.TemplateRefreshTimeout, registry: registry, - set: ipfixentities.NewSet(false), clusterUUID: clusterUUID, + maxIPFIXMsgSize: int(opt.Config.FlowCollector.MaxIPFIXMsgSize), } return exporter @@ -107,6 +110,9 @@ func (e *IPFIXExporter) Start() { func (e *IPFIXExporter) Stop() { if e.exportingProcess != nil { + if err := e.bufferedExporter.Flush(); err != nil { + klog.ErrorS(err, "Error when flushing buffered IPFIX exporter") + } e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil } @@ -144,9 +150,13 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { e.observationDomainID = genObservationDomainID(e.clusterUUID) } e.templateRefreshTimeout = opt.TemplateRefreshTimeout - klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout) + e.maxIPFIXMsgSize = int(opt.Config.FlowCollector.MaxIPFIXMsgSize) + klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout, "maxIPFIXMsgSize", e.maxIPFIXMsgSize) if e.exportingProcess != nil { + if err := e.bufferedExporter.Flush(); err != nil { + klog.ErrorS(err, "Error when flushing buffered IPFIX exporter") + } e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil } @@ -159,33 +169,26 @@ func (e *IPFIXExporter) sendRecord(record ipfixentities.Record, isRecordIPv6 boo return fmt.Errorf("error when initializing IPFIX exporting process: %v", err) } } - - templateID := e.templateIDv4 - if isRecordIPv6 { - templateID = e.templateIDv6 - } - - // TODO: more records per data set will be supported when go-ipfix supports size check when adding records - e.set.ResetSet() - if err := e.set.PrepareSet(ipfixentities.Data, templateID); err != nil { + if err := e.bufferedExporter.AddRecord(record); err != nil { return err } - if err := e.set.AddRecordV2(record.GetOrderedElementList(), templateID); err != nil { - return err - } - sentBytes, err := e.exportingProcess.SendSet(e.set) + klog.V(7).InfoS("Data record added successfully") + return nil +} + +func inPod() bool { + return env.GetPodNamespace() != "" +} + +func getMTU(ifaceName string) (int, error) { + iface, err := net.InterfaceByName(ifaceName) if err != nil { - return err - } - if klog.V(7).Enabled() { - klog.InfoS("Data set sent successfully", "bytes sent", sentBytes) + return 0, err } - return nil + return iface.MTU, nil } func (e *IPFIXExporter) initExportingProcess() error { - // TODO: This code can be further simplified by changing the go-ipfix API to accept - // externalFlowCollectorAddr and externalFlowCollectorProto instead of net.Addr input. var expInput exporter.ExporterInput if e.externalFlowCollectorProto == "tcp" { expInput = exporter.ExporterInput{ @@ -206,12 +209,32 @@ func (e *IPFIXExporter) initExportingProcess() error { TLSClientConfig: nil, SendJSONRecord: e.sendJSONRecord, } + if inPod() { + // In a Pod, the primary network interface is always "eth0", and we assume + // this is the interface used to connect to the IPFIX collector. + // The FlowAggregator is not meant to be run in the host network. + mtu, err := getMTU("eth0") + if err != nil { + klog.ErrorS(err, "Failed to determine uplink MTU") + } else { + // In practice the only guarantee we have is that PMTU <= + // MTU. However, this is a reasonable approximation for most + // scenarios. Note that MaxMessageSize is an available override in + // the config. + expInput.PathMTU = mtu + } + } else { + klog.InfoS("Not running as Pod, cannot determine interface MTU") + } } + expInput.MaxMsgSize = e.maxIPFIXMsgSize + ep, err := exporter.InitExportingProcess(expInput) if err != nil { - return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err) + return fmt.Errorf("got error when initializing IPFIX exporting process: %w", err) } e.exportingProcess = ep + e.bufferedExporter = exporter.NewBufferedIPFIXExporter(ep) // Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster if err = e.createAndSendTemplate(false); err != nil { return err @@ -234,18 +257,17 @@ func (e *IPFIXExporter) createAndSendTemplate(isRecordIPv6 bool) error { } else { e.templateIDv4 = templateID } - bytesSent, err := e.sendTemplateSet(isRecordIPv6) - if err != nil { + if err := e.sendTemplateSet(isRecordIPv6); err != nil { + // No need to flush first, as no data records should have been sent yet. e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil - e.set.ResetSet() return fmt.Errorf("sending %s template set failed, err: %v", recordIPFamily, err) } - klog.V(2).InfoS("Exporting process initialized", "bytesSent", bytesSent, "templateSetIPFamily", recordIPFamily) + klog.V(2).InfoS("Exporting process initialized", "templateSetIPFamily", recordIPFamily) return nil } -func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { +func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) error { elements := make([]ipfixentities.InfoElementWithValue, 0) ianaInfoElements := infoelements.IANAInfoElementsIPv4 antreaInfoElements := infoelements.AntreaInfoElementsIPv4 @@ -258,21 +280,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { for _, ieName := range ianaInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range infoelements.IANAReverseInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAReversedEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range antreaInfoElements { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -284,21 +306,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { ieName := infoelements.AntreaSourceStatsElementList[i] ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add Antrea destination stats fields ieName = infoelements.AntreaDestinationStatsElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } for _, ieName := range infoelements.AntreaFlowEndSecondsElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -307,21 +329,21 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { ieName := infoelements.AntreaThroughputElementList[i] ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add source node specific throughput fields ieName = infoelements.AntreaSourceThroughputElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) // Add destination node specific throughput fields ieName = infoelements.AntreaDestinationThroughputElementList[i] ie, err = e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } @@ -329,33 +351,28 @@ func (e *IPFIXExporter) sendTemplateSet(isIPv6 bool) (int, error) { for _, ieName := range infoelements.AntreaLabelsElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } ie, err := e.createInfoElementForTemplateSet("clusterId", ipfixregistry.AntreaEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) if e.aggregatorMode == flowaggregatorconfig.AggregatorModeProxy { for _, ieName := range infoelements.IANAProxyModeElementList { ie, err := e.createInfoElementForTemplateSet(ieName, ipfixregistry.IANAEnterpriseID) if err != nil { - return 0, err + return err } elements = append(elements, ie) } } - e.set.ResetSet() - if err := e.set.PrepareSet(ipfixentities.Template, templateID); err != nil { - return 0, err - } - if err := e.set.AddRecordV2(elements, templateID); err != nil { - return 0, fmt.Errorf("error when adding record to set, error: %v", err) - } - bytesSent, err := e.exportingProcess.SendSet(e.set) - return bytesSent, err + record := ipfixentities.NewTemplateRecordFromElements(templateID, elements, false) + // Ideally we would not have to do it explicitly, it would be taken care of by the go-ipfix library. + record.PrepareRecord() + return e.bufferedExporter.AddRecord(record) } func (e *IPFIXExporter) createInfoElementForTemplateSet(ieName string, enterpriseID uint32) (ipfixentities.InfoElementWithValue, error) { @@ -369,3 +386,10 @@ func (e *IPFIXExporter) createInfoElementForTemplateSet(ieName string, enterpris } return ie, nil } + +func (e *IPFIXExporter) Flush() error { + if e.exportingProcess == nil { + return nil + } + return e.bufferedExporter.Flush() +} diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index e9749807573..9c401944eb5 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -17,6 +17,7 @@ package exporter import ( "fmt" "net" + "reflect" "testing" "time" @@ -55,18 +56,16 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { runTest := func(t *testing.T, isIPv6 bool) { ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockIPFIXRegistry := ipfixtesting.NewMockIPFIXRegistry(ctrl) - mockTempSet := ipfixentitiestesting.NewMockSet(ctrl) exporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", - exportingProcess: mockIPFIXExpProc, + bufferedExporter: mockIPFIXBufferedExp, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, registry: mockIPFIXRegistry, - set: mockTempSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } @@ -75,15 +74,11 @@ func TestIPFIXExporter_sendTemplateSet(t *testing.T) { if isIPv6 { testTemplateID = exporter.templateIDv6 } - mockTempSet.EXPECT().ResetSet() - mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil) - mockTempSet.EXPECT().AddRecordV2(elemList, testTemplateID).Return(nil) - // Passing 0 for sentBytes as it is not used anywhere in the test. If this not a call to mock, the actual sentBytes - // above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements. - mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil) - - _, err := exporter.sendTemplateSet(isIPv6) - assert.NoErrorf(t, err, "Error when sending template record") + mockIPFIXBufferedExp.EXPECT().AddRecord(gomock.Cond(func(record ipfixentities.Record) bool { + return record.GetTemplateID() == testTemplateID && reflect.DeepEqual(record.GetOrderedElementList(), elemList) + })).Return(nil) + + assert.NoErrorf(t, exporter.sendTemplateSet(isIPv6), "Error when sending template record") } t.Run("IPv4", func(t *testing.T) { runTest(t, false) }) @@ -94,17 +89,18 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) // we override the initIPFIXExportingProcess var function: it will - // simply set the exportingProcess member field of the ipfixExporter to - // our mock instance. + // simply set the exportingProcess and bufferedExporter member fields of + // the ipfixExporter to our mocks. // note that even though we "update" the external flow collector address - // as part of the test, we still use the same mock for simplicity's sake. + // as part of the test, we still use the same mocks for simplicity's sake. initIPFIXExportingProcessSaved := initIPFIXExportingProcess initIPFIXExportingProcess = func(exporter *IPFIXExporter) error { exporter.exportingProcess = mockIPFIXExpProc + exporter.bufferedExporter = mockIPFIXBufferedExp return nil } defer func() { @@ -125,21 +121,16 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 setCount := 0 - mockSet.EXPECT().ResetSet().Times(2) - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil).Times(2) - mockRecord.EXPECT().GetOrderedElementList().Return(nil).Times(2) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil).Times(2) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Do(func(set interface{}) { + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Do(func(record ipfixentities.Record) { setCount += 1 - }).Return(0, nil).Times(2) + }).Return(nil).Times(2) // connection will be closed when updating the external flow collector address + mockIPFIXBufferedExp.EXPECT().Flush() mockIPFIXExpProc.EXPECT().CloseConnToCollector() require.NoError(t, ipfixExporter.AddRecord(mockRecord, false)) @@ -171,13 +162,12 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { func TestIPFIXExporter_AddRecord(t *testing.T) { ctrl := gomock.NewController(t) - mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) initIPFIXExportingProcessSaved := initIPFIXExportingProcess initIPFIXExportingProcess = func(exporter *IPFIXExporter) error { - exporter.exportingProcess = mockIPFIXExpProc + exporter.bufferedExporter = mockIPFIXBufferedExp return nil } defer func() { @@ -189,17 +179,11 @@ func TestIPFIXExporter_AddRecord(t *testing.T) { externalFlowCollectorProto: "", templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 - mockSet.EXPECT().ResetSet() - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil) - mockRecord.EXPECT().GetOrderedElementList().Return(nil) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Return(0, nil) + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Return(nil) assert.NoError(t, ipfixExporter.AddRecord(mockRecord, false)) } @@ -232,26 +216,21 @@ func TestIPFIXExporter_sendRecord_Error(t *testing.T) { ctrl := gomock.NewController(t) mockIPFIXExpProc := ipfixtesting.NewMockIPFIXExportingProcess(ctrl) - mockSet := ipfixentitiestesting.NewMockSet(ctrl) + mockIPFIXBufferedExp := ipfixtesting.NewMockIPFIXBufferedExporter(ctrl) mockRecord := ipfixentitiestesting.NewMockRecord(ctrl) ipfixExporter := &IPFIXExporter{ externalFlowCollectorAddr: "", externalFlowCollectorProto: "", exportingProcess: mockIPFIXExpProc, + bufferedExporter: mockIPFIXBufferedExp, templateIDv4: testTemplateIDv4, templateIDv6: testTemplateIDv6, - set: mockSet, aggregatorMode: flowaggregatorconfig.AggregatorModeAggregate, observationDomainID: testObservationDomainID, } - testTemplateID := testTemplateIDv4 - mockSet.EXPECT().ResetSet() - mockSet.EXPECT().PrepareSet(gomock.Any(), testTemplateID).Return(nil) - mockRecord.EXPECT().GetOrderedElementList().Return(nil) - mockSet.EXPECT().AddRecordV2(gomock.Any(), testTemplateID).Return(nil) - mockIPFIXExpProc.EXPECT().SendSet(mockSet).Return(0, fmt.Errorf("send error")) + mockIPFIXBufferedExp.EXPECT().AddRecord(mockRecord).Return(fmt.Errorf("send error")) mockIPFIXExpProc.EXPECT().CloseConnToCollector() assert.Error(t, ipfixExporter.AddRecord(mockRecord, false)) diff --git a/pkg/flowaggregator/exporter/log.go b/pkg/flowaggregator/exporter/log.go index f7a5fa8eaff..ed6dba0449b 100644 --- a/pkg/flowaggregator/exporter/log.go +++ b/pkg/flowaggregator/exporter/log.go @@ -159,3 +159,8 @@ func (e *LogExporter) UpdateOptions(opt *options.Options) { e.buildFilters() e.start() } + +func (e *LogExporter) Flush() error { + // TODO: replace FlushLoop in flowlogger.FlowLogger? + return nil +} diff --git a/pkg/flowaggregator/exporter/s3.go b/pkg/flowaggregator/exporter/s3.go index d4d61c9c56f..a1e48e5b364 100644 --- a/pkg/flowaggregator/exporter/s3.go +++ b/pkg/flowaggregator/exporter/s3.go @@ -85,3 +85,7 @@ func (e *S3Exporter) UpdateOptions(opt *options.Options) { } klog.InfoS("New S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.Config.UploadInterval) } + +func (e *S3Exporter) Flush() error { + return nil +} diff --git a/pkg/flowaggregator/exporter/testing/mock_exporter.go b/pkg/flowaggregator/exporter/testing/mock_exporter.go index e07c1fc9acb..dcf156f1212 100644 --- a/pkg/flowaggregator/exporter/testing/mock_exporter.go +++ b/pkg/flowaggregator/exporter/testing/mock_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -70,6 +70,20 @@ func (mr *MockInterfaceMockRecorder) AddRecord(record, isRecordIPv6 any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockInterface)(nil).AddRecord), record, isRecordIPv6) } +// Flush mocks base method. +func (m *MockInterface) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockInterfaceMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockInterface)(nil).Flush)) +} + // Start mocks base method. func (m *MockInterface) Start() { m.ctrl.T.Helper() diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 313d0d8f70c..fb9b2985275 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -560,6 +560,9 @@ func (fa *flowAggregator) proxyRecord(record ipfixentities.Record, obsDomainID u func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) { logTicker := time.NewTicker(fa.logTickerDuration) defer logTicker.Stop() + const flushTickerDuration = 1 * time.Second + flushTicker := time.NewTicker(flushTickerDuration) + defer flushTicker.Stop() msgCh := fa.preprocessorOutCh proxyRecords := func(msg *ipfixentities.Message) { @@ -587,6 +590,10 @@ func (fa *flowAggregator) flowExportLoopProxy(stopCh <-chan struct{}) { break } proxyRecords(msg) + case <-flushTicker.C: + if err := fa.flushExporters(); err != nil { + klog.ErrorS(err, "Error when flushing exporters") + } case <-logTicker.C: // Add visibility of processing stats of Flow Aggregator klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived()) @@ -627,6 +634,9 @@ func (fa *flowAggregator) flowExportLoopAggregate(stopCh <-chan struct{}) { } // Get the new expiry and reset the timer. expireTimer.Reset(fa.aggregationProcess.GetExpiryFromExpirePriorityQueue()) + if err := fa.flushExporters(); err != nil { + klog.ErrorS(err, "Error when flushing exporters") + } case <-logTicker.C: // Add visibility of processing stats of Flow Aggregator klog.V(4).InfoS("Total number of records received", "count", fa.collectingProcess.GetNumRecordsReceived()) @@ -672,6 +682,16 @@ func (fa *flowAggregator) sendRecord(record ipfixentities.Record, isRecordIPv6 b return nil } +func (fa *flowAggregator) flushExporters() error { + if fa.ipfixExporter != nil { + if err := fa.ipfixExporter.Flush(); err != nil { + return err + } + } + // Other exporters don't leverage Flush for now, so we skip them. + return nil +} + func (fa *flowAggregator) sendAggregatedRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error { isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record) startTime, err := fa.getRecordStartTime(record.Record) diff --git a/pkg/flowaggregator/options/options.go b/pkg/flowaggregator/options/options.go index 508c48f6f28..ca2849eb972 100644 --- a/pkg/flowaggregator/options/options.go +++ b/pkg/flowaggregator/options/options.go @@ -89,7 +89,7 @@ func LoadConfig(configBytes []byte) (*Options, error) { return nil, err } // Validate flow collector specific parameters - if opt.Config.FlowCollector.Enable && len(opt.Config.FlowCollector.Address) > 0 { + if opt.Config.FlowCollector.Enable { host, port, proto, err := flowexport.ParseFlowCollectorAddr( opt.Config.FlowCollector.Address, flowaggregatorconfig.DefaultExternalFlowCollectorPort, flowaggregatorconfig.DefaultExternalFlowCollectorTransport) @@ -110,6 +110,18 @@ func LoadConfig(configBytes []byte) (*Options, error) { if opt.TemplateRefreshTimeout < 0 { return nil, fmt.Errorf("templateRefreshTimeout cannot be a negative duration") } + + if opt.Config.FlowCollector.MaxIPFIXMsgSize < 0 { + return nil, fmt.Errorf("maxIPFIXMsgSize cannot be negative") + } + if opt.Config.FlowCollector.MaxIPFIXMsgSize > 0 { + if opt.Config.FlowCollector.MaxIPFIXMsgSize < flowaggregatorconfig.MinValidIPFIXMsgSize { + return nil, fmt.Errorf("maxIPFIXMsgSize cannot be smaller than the minimum valid IPFIX mesage size %d", flowaggregatorconfig.MinValidIPFIXMsgSize) + } + if opt.Config.FlowCollector.MaxIPFIXMsgSize > flowaggregatorconfig.MaxValidIPFIXMsgSize { + return nil, fmt.Errorf("maxIPFIXMsgSize cannot be greater than the maximum valid IPFIX mesage size %d", flowaggregatorconfig.MaxValidIPFIXMsgSize) + } + } } // Validate clickhouse specific parameters if opt.Config.ClickHouse.Enable { diff --git a/pkg/ipfix/ipfix_process.go b/pkg/ipfix/ipfix_exporter.go similarity index 77% rename from pkg/ipfix/ipfix_process.go rename to pkg/ipfix/ipfix_exporter.go index 6a185a1bb8b..c0708658697 100644 --- a/pkg/ipfix/ipfix_process.go +++ b/pkg/ipfix/ipfix_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,3 +24,9 @@ type IPFIXExportingProcess interface { SendSet(set ipfixentities.Set) (int, error) CloseConnToCollector() } + +// IPFIXBufferedExporter interface is added to facilitate unit testing without involving the code from go-ipfix library. +type IPFIXBufferedExporter interface { + AddRecord(record ipfixentities.Record) error + Flush() error +} diff --git a/pkg/ipfix/testing/mock_ipfix.go b/pkg/ipfix/testing/mock_ipfix.go index 930102779cc..fe6509c5dd7 100644 --- a/pkg/ipfix/testing/mock_ipfix.go +++ b/pkg/ipfix/testing/mock_ipfix.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,11 +14,11 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: antrea.io/antrea/pkg/ipfix (interfaces: IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess) +// Source: antrea.io/antrea/pkg/ipfix (interfaces: IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess) // // Generated by this command: // -// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/ipfix/testing/mock_ipfix.go -package testing antrea.io/antrea/pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess +// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/ipfix/testing/mock_ipfix.go -package testing antrea.io/antrea/pkg/ipfix IPFIXExportingProcess,IPFIXBufferedExporter,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess // // Package testing is a generated GoMock package. @@ -98,6 +98,58 @@ func (mr *MockIPFIXExportingProcessMockRecorder) SendSet(set any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendSet", reflect.TypeOf((*MockIPFIXExportingProcess)(nil).SendSet), set) } +// MockIPFIXBufferedExporter is a mock of IPFIXBufferedExporter interface. +type MockIPFIXBufferedExporter struct { + ctrl *gomock.Controller + recorder *MockIPFIXBufferedExporterMockRecorder + isgomock struct{} +} + +// MockIPFIXBufferedExporterMockRecorder is the mock recorder for MockIPFIXBufferedExporter. +type MockIPFIXBufferedExporterMockRecorder struct { + mock *MockIPFIXBufferedExporter +} + +// NewMockIPFIXBufferedExporter creates a new mock instance. +func NewMockIPFIXBufferedExporter(ctrl *gomock.Controller) *MockIPFIXBufferedExporter { + mock := &MockIPFIXBufferedExporter{ctrl: ctrl} + mock.recorder = &MockIPFIXBufferedExporterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockIPFIXBufferedExporter) EXPECT() *MockIPFIXBufferedExporterMockRecorder { + return m.recorder +} + +// AddRecord mocks base method. +func (m *MockIPFIXBufferedExporter) AddRecord(record entities.Record) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddRecord", record) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddRecord indicates an expected call of AddRecord. +func (mr *MockIPFIXBufferedExporterMockRecorder) AddRecord(record any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockIPFIXBufferedExporter)(nil).AddRecord), record) +} + +// Flush mocks base method. +func (m *MockIPFIXBufferedExporter) Flush() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Flush") + ret0, _ := ret[0].(error) + return ret0 +} + +// Flush indicates an expected call of Flush. +func (mr *MockIPFIXBufferedExporterMockRecorder) Flush() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockIPFIXBufferedExporter)(nil).Flush)) +} + // MockIPFIXRegistry is a mock of IPFIXRegistry interface. type MockIPFIXRegistry struct { ctrl *gomock.Controller diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 135a5218c68..e01e586f9ce 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -182,8 +182,12 @@ type testFlow struct { checkDstSvc bool } +type flowRecord struct { + Data string `json:"data"` +} + type IPFIXCollectorResponse struct { - FlowRecords []string `json:"flowRecords"` + FlowRecords []flowRecord `json:"flowRecords"` } func setupFlowAggregatorTest(t *testing.T, options flowVisibilityTestOptions) (*TestData, bool, bool) { @@ -1485,9 +1489,6 @@ func checkL7FlowExporterDataClickHouse(t *testing.T, record *ClickHouseFullRow, } func getUint64FieldFromRecord(t require.TestingT, record string, field string) uint64 { - if strings.Contains(record, "TEMPLATE SET") { - return 0 - } splitLines := strings.Split(record, "\n") for _, line := range splitLines { if strings.Contains(line, field) { @@ -1529,7 +1530,10 @@ func getCollectorOutput(t require.TestingT, srcIP, dstIP, srcPort string, isDstS if err := json.Unmarshal([]byte(collectorOutput), &response); err != nil { return false, fmt.Errorf("error when unmarshalling output from IPFIX collector Pod: %w", err) } - allRecords = response.FlowRecords + allRecords := make([]string, len(response.FlowRecords)) + for idx := range response.FlowRecords { + allRecords[idx] = response.FlowRecords[idx].Data + } records = filterCollectorRecords(allRecords, labelFilter, src, dst, srcPort) if lookForFlowEnd { for _, record := range records { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 8064723c2f2..0f2abedd4d8 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -129,7 +129,7 @@ const ( mcjoinImage = "antrea/mcjoin:v2.9" nginxImage = "antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" - ipfixCollectorImage = "antrea/ipfix-collector:v0.12.0" + ipfixCollectorImage = "antrea/ipfix-collector:v0.13.0" nginxLBService = "nginx-loadbalancer"