Skip to content

Commit

Permalink
apm: fix data race accessing Tracer.active
Browse files Browse the repository at this point in the history
When Tracer.loop exits, it sets Tracer.active
to false, which races with reads by the method
Tracer.Active. We fix this by changing the
Tracer.active field from a bool to int32, and
using atomic ops.
  • Loading branch information
axw committed Jan 4, 2019
1 parent d26c08b commit c4e4bca
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
4 changes: 2 additions & 2 deletions env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestTracerSpanFramesMinDurationEnvInvalid(t *testing.T) {
assert.EqualError(t, err, "failed to parse ELASTIC_APM_SPAN_FRAMES_MIN_DURATION: invalid duration aeon")
}

func TestTracerActive(t *testing.T) {
func TestTracerActiveEnv(t *testing.T) {
os.Setenv("ELASTIC_APM_ACTIVE", "false")
defer os.Unsetenv("ELASTIC_APM_ACTIVE")

Expand All @@ -285,7 +285,7 @@ func TestTracerActive(t *testing.T) {
assert.Zero(t, transport.Payloads())
}

func TestTracerActiveInvalid(t *testing.T) {
func TestTracerActiveEnvInvalid(t *testing.T) {
os.Setenv("ELASTIC_APM_ACTIVE", "yep")
defer os.Unsetenv("ELASTIC_APM_ACTIVE")

Expand Down
12 changes: 7 additions & 5 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"math/rand"
"sync"
"sync/atomic"
"time"

"go.elastic.co/apm/internal/apmconfig"
Expand Down Expand Up @@ -180,7 +181,7 @@ type Tracer struct {
process *model.Process
system *model.System

active bool
active int32
bufferSize int
metricsBufferSize int
closing chan struct{}
Expand Down Expand Up @@ -247,19 +248,20 @@ func newTracer(opts options) *Tracer {
transactions: make(chan *TransactionData, transactionsChannelCap),
spans: make(chan *SpanData, spansChannelCap),
errors: make(chan *ErrorData, errorsChannelCap),
active: 1,
maxSpans: opts.maxSpans,
sampler: opts.sampler,
captureBody: opts.captureBody,
spanFramesMinDuration: opts.spanFramesMinDuration,
active: opts.active,
bufferSize: opts.bufferSize,
metricsBufferSize: opts.metricsBufferSize,
}
t.Service.Name = opts.serviceName
t.Service.Version = opts.serviceVersion
t.Service.Environment = opts.serviceEnvironment

if !t.active {
if !opts.active {
t.active = 0
close(t.closed)
return t
}
Expand Down Expand Up @@ -325,7 +327,7 @@ func (t *Tracer) Flush(abort <-chan struct{}) {
// Active reports whether the tracer is active. If the tracer is inactive,
// no transactions or errors will be sent to the Elastic APM server.
func (t *Tracer) Active() bool {
return t.active
return atomic.LoadInt32(&t.active) == 1
}

// SetRequestDuration sets the maximum amount of time to keep a request open
Expand Down Expand Up @@ -481,7 +483,7 @@ func (t *Tracer) loop() {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()
defer close(t.closed)
defer func() { t.active = false }()
defer atomic.StoreInt32(&t.active, 0)

var req iochan.ReadRequest
var requestBuf bytes.Buffer
Expand Down
15 changes: 15 additions & 0 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,21 @@ func TestTracerKubernetesMetadata(t *testing.T) {
})
}

func TestTracerActive(t *testing.T) {
tracer, _ := transporttest.NewRecorderTracer()
defer tracer.Close()
assert.True(t, tracer.Active())

// Kick off calls to tracer.Active concurrently
// with the tracer.Close, to test that we ensure
// there are no data races.
go func() {
for i := 0; i < 100; i++ {
tracer.Active()
}
}()
}

type blockedTransport struct {
transport.Transport
unblocked chan struct{}
Expand Down

0 comments on commit c4e4bca

Please sign in to comment.