Skip to content

Commit

Permalink
[chore] Simplify setFeatureGateForTest definition and usage (#12361)
Browse files Browse the repository at this point in the history
Followup is to make `setFeatureGateForTest` part of featuregatetest
package.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Feb 12, 2025
1 parent 9be1b84 commit d97904a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 59 deletions.
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newNoopExportSender() Sender[request.Request] {
func TestBaseExporter(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := NewBaseExporter(defaultSettings, defaultSignal)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -64,7 +64,7 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal,
Expand All @@ -84,7 +84,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
func TestQueueOptionsWithRequestExporter(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
Expand All @@ -108,7 +108,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
func TestBaseExporterLogging(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
},
) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
Expand Down
41 changes: 15 additions & 26 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ func TestBatchSender_Merge(t *testing.T) {
},
) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := newQueueBatchExporter(exporterqueue.NewDefaultConfig(), tt.batchCfg)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, be.Shutdown(context.Background()))
resetFeatureGate()
})

sink := requesttest.NewSink()
Expand Down Expand Up @@ -144,18 +143,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
},
) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := newQueueBatchExporter(exporterqueue.NewDefaultConfig(), tt.batchCfg)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

t.Cleanup(func() {
require.NoError(t, be.Shutdown(context.Background()))
resetFeatureGate()
})

sink := requesttest.NewSink()

require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))

Expand All @@ -173,6 +166,8 @@ func TestBatchSender_BatchExportError(t *testing.T) {
sink.ItemsCount() == tt.expectedItems &&
be.queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
})
}
for _, tt := range tests {
Expand All @@ -184,7 +179,7 @@ func TestBatchSender_BatchExportError(t *testing.T) {
func TestBatchSender_MergeOrSplit(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)

batchCfg := exporterbatcher.NewDefaultConfig()
batchCfg.MinSizeItems = 5
Expand All @@ -194,13 +189,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

t.Cleanup(func() {
require.NoError(t, be.Shutdown(context.Background()))
resetFeatureGate()
})

sink := requesttest.NewSink()

// should be sent right away by reaching the minimum items size.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
assert.Eventually(t, func() bool {
Expand All @@ -221,10 +210,10 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {

// big request should be broken down into two requests, both are sent right away.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
}, 500*time.Millisecond, 10*time.Millisecond)
require.NoError(t, be.Shutdown(context.Background()))
})
}

Expand All @@ -235,7 +224,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
func TestBatchSender_Shutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
batchCfg := exporterbatcher.NewDefaultConfig()
batchCfg.MinSizeItems = 10
be, err := newQueueBatchExporter(exporterqueue.NewDefaultConfig(), batchCfg)
Expand Down Expand Up @@ -293,7 +282,7 @@ func TestBatchSender_Shutdown(t *testing.T) {
func TestBatchSender_PostShutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := newQueueBatchExporter(exporterqueue.Config{}, exporterbatcher.NewDefaultConfig())
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -361,7 +350,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
// To avoid blocking, the concurrency limit is set to the number of concurrent goroutines that are in charge of
// reading from the queue and adding to batch. With the new model, we are pulling instead of pushing so we don't
// block the reading goroutine anymore.
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, false)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, false)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -419,7 +408,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
func TestBatchSender_BatchBlocking(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 3
be, err := newQueueBatchExporter(exporterqueue.Config{}, bCfg)
Expand Down Expand Up @@ -453,7 +442,7 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
func TestBatchSender_BatchCancelled(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2
be, err := newQueueBatchExporter(exporterqueue.Config{}, bCfg)
Expand Down Expand Up @@ -492,7 +481,7 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
func TestBatchSender_DrainActiveRequests(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2

Expand Down Expand Up @@ -531,7 +520,7 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) {
func TestBatchSender_UnstartedShutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := newQueueBatchExporter(exporterqueue.NewDefaultConfig(), exporterbatcher.NewDefaultConfig())
require.NoError(t, err)
err = be.Shutdown(context.Background())
Expand Down Expand Up @@ -592,7 +581,7 @@ func TestBatchSender_UnstartedShutdown(t *testing.T) {
func TestBatchSenderWithTimeout(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 10

Expand Down Expand Up @@ -667,7 +656,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
func TestBatchSenderTimerFlush(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
if runtime.GOOS == "windows" {
t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802")
}
Expand Down
36 changes: 11 additions & 25 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestQueuedBatchStopWhileWaiting(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 1
be, err := newQueueBatchExporter(qCfg, exporterbatcher.Config{})
Expand All @@ -55,19 +55,13 @@ func TestQueuedBatchStopWhileWaiting(t *testing.T) {
func TestQueueBatchDoNotPreserveCancellation(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 1
be, err := newQueueBatchExporter(qCfg, exporterbatcher.Config{})
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
resetFeatureGate()
})

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()

Expand Down Expand Up @@ -118,7 +112,7 @@ func TestQueueBatchHappyPath(t *testing.T) {
},
) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := newQueueBatchExporter(tt.qCfg, exporterbatcher.Config{})
require.NoError(t, err)

Expand All @@ -131,14 +125,10 @@ func TestQueueBatchHappyPath(t *testing.T) {
require.Error(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
resetFeatureGate()
})

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 10 && sink.ItemsCount() == 45
}, 1*time.Second, 10*time.Millisecond)
require.NoError(t, be.Shutdown(context.Background()))
})
}
for _, tt := range tests {
Expand All @@ -150,7 +140,7 @@ func TestQueueBatchHappyPath(t *testing.T) {
func TestQueueConfig_Validate(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qCfg := NewDefaultQueueConfig()
require.NoError(t, qCfg.Validate())

Expand All @@ -175,7 +165,7 @@ func TestQueueConfig_Validate(t *testing.T) {
func TestQueueFailedRequestDropped(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qSet := exporterqueue.Settings{
Signal: defaultSignal,
ExporterSettings: defaultSettings,
Expand All @@ -202,7 +192,7 @@ func TestQueueFailedRequestDropped(t *testing.T) {
func TestQueueBatchPersistenceEnabled(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qSet := exporterqueue.Settings{
Signal: defaultSignal,
ExporterSettings: defaultSettings,
Expand Down Expand Up @@ -234,7 +224,7 @@ func TestQueueBatchPersistenceEnabled(t *testing.T) {
func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
storageError := errors.New("could not get storage client")

qSet := exporterqueue.Settings{
Expand Down Expand Up @@ -267,7 +257,7 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) {
func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 1

Expand Down Expand Up @@ -320,16 +310,12 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
}),
qSet, qCfg, exporterbatcher.Config{}, "", newNoopExportSender())
require.NoError(t, err)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), host))
t.Cleanup(func() {
require.NoError(t, be.Shutdown(context.Background()))
resetFeatureGate()
})

assert.Eventually(t, func() bool {
return sink.ItemsCount() == 7 && sink.RequestsCount() == 1
}, 1*time.Second, 10*time.Millisecond)
require.NoError(t, be.Shutdown(context.Background()))
})
}
runTest("enable_queue_batcher", true)
Expand All @@ -339,7 +325,7 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
func TestQueueBatchNoStartShutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
set := exportertest.NewNopSettings()
set.ID = exporterID
qSet := exporterqueue.Settings{
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"go.opentelemetry.io/collector/featuregate"
)

func setFeatureGateForTest(tb testing.TB, gate *featuregate.Gate, enabled bool) func() {
func setFeatureGateForTest(tb testing.TB, gate *featuregate.Gate, enabled bool) {
originalValue := gate.IsEnabled()
require.NoError(tb, featuregate.GlobalRegistry().Set(gate.ID(), enabled))
return func() {
tb.Cleanup(func() {
require.NoError(tb, featuregate.GlobalRegistry().Set(gate.ID(), originalValue))
}
})
}

0 comments on commit d97904a

Please sign in to comment.