From cb16327674002071913256b46aecf31aec1c68e8 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Thu, 20 Feb 2025 18:57:54 -0800 Subject: [PATCH] Address review comments Signed-off-by: Antonin Bas --- build/charts/flow-aggregator/README.md | 2 +- pkg/flowaggregator/exporter/ipfix.go | 4 ++++ pkg/flowaggregator/exporter/ipfix_test.go | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/build/charts/flow-aggregator/README.md b/build/charts/flow-aggregator/README.md index 978493c76fb..3f5df866dae 100644 --- a/build/charts/flow-aggregator/README.md +++ b/build/charts/flow-aggregator/README.md @@ -34,7 +34,7 @@ Kubernetes: `>= 1.19.0-0` | flowAggregatorAddress | string | `""` | Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate. | | flowCollector.address | string | `""` | Provide the flow collector address as string with format :[:], where proto is tcp or udp. If no L4 transport proto is given, we consider tcp as default. | | flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. | -| flowCollector.maxIPFIXMsgSize | int | `0` | Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default value will be used based on the protocol (tcp or udp) used to connect to the collector. | +| flowCollector.maxIPFIXMsgSize | int | `0` | Maximum message size to use for IPFIX records. If set to 0 (recommended), a reasonable default value will be used based on the protocol (tcp or udp) used to connect to the collector. Min valid value is 512 and max valid value is 65535. | | flowCollector.observationDomainID | string | `""` | Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated from the persistent cluster UUID generated by Antrea. | | flowCollector.recordFormat | string | `"IPFIX"` | Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON. | | flowCollector.templateRefreshTimeout | string | `"600s"` | Template retransmission interval when using the udp protocol to export records. The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index f9410335993..a35314fd6d0 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -154,6 +154,9 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) { klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout, "maxIPFIXMsgSize", e.maxIPFIXMsgSize) if e.exportingProcess != nil { + if err := e.bufferedExporter.Flush(); err != nil { + klog.ErrorS(err, "Error when flushing buffered IPFIX exporter") + } e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil } @@ -255,6 +258,7 @@ func (e *IPFIXExporter) createAndSendTemplate(isRecordIPv6 bool) error { e.templateIDv4 = templateID } if err := e.sendTemplateSet(isRecordIPv6); err != nil { + // No need to flush first, as no data records should have been sent yet. e.exportingProcess.CloseConnToCollector() e.exportingProcess = nil return fmt.Errorf("sending %s template set failed, err: %v", recordIPFamily, err) diff --git a/pkg/flowaggregator/exporter/ipfix_test.go b/pkg/flowaggregator/exporter/ipfix_test.go index b57992ff469..9c401944eb5 100644 --- a/pkg/flowaggregator/exporter/ipfix_test.go +++ b/pkg/flowaggregator/exporter/ipfix_test.go @@ -130,6 +130,7 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) { setCount += 1 }).Return(nil).Times(2) // connection will be closed when updating the external flow collector address + mockIPFIXBufferedExp.EXPECT().Flush() mockIPFIXExpProc.EXPECT().CloseConnToCollector() require.NoError(t, ipfixExporter.AddRecord(mockRecord, false))