Skip to content

Commit

Permalink
Merge branch 'di-probe-notifier' into HEAD
Browse files Browse the repository at this point in the history
* di-probe-notifier:
  DEBUG-2334 Probe Notifier Worker component
  • Loading branch information
p committed Oct 19, 2024
2 parents 7654529 + 28c4170 commit 833065b
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 0 deletions.
211 changes: 211 additions & 0 deletions lib/datadog/di/probe_notifier_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# frozen_string_literal: true

module Datadog
module DI
# Background worker thread for sending probe statuses and snapshots
# to the backend (via the agent).
#
# The loop inside the worker rescues all exceptions to prevent termination
# due to unhandled exceptions raised by any downstream code.
# This includes communication and protocol errors when sending the
# payloads to the agent.
#
# The worker groups the data to send into batches. The goal is to perform
# no more than one network operation per event type per second.
# There is also a limit on the length of the sending queue to prevent
# it from growing without bounds if upstream code generates an enormous
# number of events for some reason.
#
# Wake-up events are used (via ConditionVariable) to keep the thread
# asleep if there is no work to be done.
#
# @api private
class ProbeNotifierWorker
def initialize(settings, agent_settings, transport)
@settings = settings
@status_queue = []
@snapshot_queue = []
@transport = transport
@lock = Mutex.new
@wake_lock = Mutex.new
@wake = ConditionVariable.new
@io_in_progress = false
end

attr_reader :settings

def start
return if defined?(@thread) && @thread
@thread = Thread.new do
loop do
break if @stop_requested
begin
if maybe_send
# Run next iteration immediately in case more work is
# in the queue
end
rescue NoMemoryError, SystemExit, Interrupt
raise
rescue => exc
raise if settings.dynamic_instrumentation.propagate_all_exceptions

warn "Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})"
end
wake_lock.synchronize do
wake.wait(wake_lock, 1)
end
end
end
end

# Stops the background thread.
#
# Attempts a graceful stop with the specified timeout, then falls back
# to killing the thread using Thread#kill.
def stop(timeout = 1)
@stop_requested = true
wake_lock.synchronize do
wake.signal
end
unless thread&.join(timeout)
thread.kill
end
@thread = nil
end

# Waits for background thread to send pending notifications.
#
# This method waits for the notification queue to become empty
# rather than for a particular set of notifications to be sent out,
# therefore, it should only be called when there is no parallel
# activity (in another thread) that causes more notifications
# to be generated.
def flush
loop do
if @thread.nil? || !@thread.alive?
return
end

io_in_progress, queues_empty = @lock.synchronize do
[io_in_progress?, status_queue.empty? && snapshot_queue.empty?]
end

if io_in_progress
# If we just call Thread.pass we could be in a busy loop -
# add a sleep.
sleep 0.25
next
elsif queues_empty
break
else
sleep 0.25
next
end
end
end

private

attr_reader :transport
attr_reader :wake
attr_reader :wake_lock
attr_reader :thread

# This method should be called while @lock is held.
def io_in_progress?
@io_in_progress
end

[
[:status, 'probe status'],
[:snapshot, 'snapshot'],
].each do |(event_type, event_name)|
attr_reader "#{event_type}_queue"
attr_reader "last_#{event_type}_sent"

# Adds a status or a snapshot to the queue to be sent to the agent
# at the next opportunity.
#
# If the queue is too large, the event will not be added.
#
# Signals the background thread to wake up (and do the sending)
# if it has been more than 1 second since the last send of the same
# event type.
define_method("add_#{event_type}") do |event|
@lock.synchronize do
queue = send("#{event_type}_queue")
# TODO determine a suitable limit via testing/benchmarking
if queue.length > 100
# TODO use datadog logger
warn "dropping #{event_type} because queue is full"
else
queue << event
end
end
last_sent = @lock.synchronize do
send("last_#{event_type}_sent")
end
if last_sent
now = Core::Utils::Time.get_time
if now - last_sent > 1
wake_lock.synchronize do
wake.signal
end
end
else
# First time sending
wake_lock.synchronize do
wake.signal
end
end
end

public "add_#{event_type}"

# Sends pending probe statuses or snapshots.
#
# This method should ideallyy only be called when there are actually
# events to send, but it can be called when there is nothing to do.
# Currently we only have one wake-up signaling object and two
# types of events. Therefore on most wake-ups we expect to only
# send one type of events.
define_method("maybe_send_#{event_type}") do
batch = nil
@lock.synchronize do
batch = instance_variable_get("@#{event_type}_queue")
instance_variable_set("@#{event_type}_queue", [])
@io_in_progress = batch.any?
end
if batch.any?
begin
transport.public_send("send_#{event_type}", batch)
time = Core::Utils::Time.get_time
@lock.synchronize do
instance_variable_set("@last_#{event_type}_sent", time)
end
rescue => exc
raise if settings.dynamic_instrumentation.propagate_all_exceptions
# TODO log to logger
puts "failed to send #{event_name}: #{exc.class}: #{exc} (at #{exc.backtrace.first})"
end
end
batch.any?
rescue ThreadError
# Normally the queue should only be consumed in this method,
# however if anyone consumes it elsewhere we don't want to block
# while consuming it here. Rescue ThreadError and return.
warn "unexpected #{event_name} queue underflow - consumed elsewhere?"
ensure
@lock.synchronize do
@io_in_progress = false
end
end
end

def maybe_send
rv = maybe_send_status
rv || maybe_send_snapshot
end
end
end
end
46 changes: 46 additions & 0 deletions sig/datadog/di/probe_notifier_worker.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module Datadog
module DI
class ProbeNotifierWorker
@settings: untyped

@status_queue: Array

@snapshot_queue: Array

@transport: Transport

@lock: Mutex

@wake_lock: Mutex

@wake: ConditionVariable

@io_in_progress: bool

@thread: Thread

@stop_requested: bool

def initialize: (untyped settings, untyped agent_settings, Transport transport) -> void

attr_reader settings: untyped

def start: () -> void
def stop: (?::Integer timeout) -> void
def flush: () -> void

private

attr_reader transport: Transport

attr_reader wake: ConditionVariable

attr_reader wake_lock: Mutex

attr_reader thread: Thread
def io_in_progress?: () -> bool

def maybe_send: () -> bool
end
end
end
116 changes: 116 additions & 0 deletions spec/datadog/di/probe_notifier_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
require "datadog/di/spec_helper"
require "datadog/di/probe_notifier_worker"

RSpec.describe Datadog::DI::ProbeNotifierWorker do
di_test

let(:settings) do
double('settings').tap do |settings|
allow(settings).to receive(:dynamic_instrumentation).and_return(di_settings)
end
end

let(:di_settings) do
double('di settings').tap do |settings|
allow(settings).to receive(:propagate_all_exceptions).and_return(false)
end
end

let(:agent_settings) do
double('agent settings')
end

let(:transport) do
double('transport')
end

let(:worker) { described_class.new(settings, agent_settings, transport) }

context 'not started' do
describe '#add_snapshot' do
let(:snapshot) do
{hello: 'world'}
end

it 'adds snapshot to queue' do
expect(worker.send(:snapshot_queue)).to be_empty

worker.add_snapshot(snapshot)

expect(worker.send(:snapshot_queue)).to eq([snapshot])
end
end
end

describe '#stop' do
before do
worker.start
end

it 'stops the thread' do
worker.stop
expect(worker.send(:thread)).to be nil
end
end

context 'started' do
before do
worker.start
end

after do
worker.stop
end

describe '#add_snapshot' do
let(:snapshot) do
{hello: 'world'}
end

it 'sends the snapshot' do
expect(worker.send(:snapshot_queue)).to be_empty

expect(transport).to receive(:send_snapshot).once.with([snapshot])

worker.add_snapshot(snapshot)

# Since sending is asynchronous, we need to relinquish execution
# for the sending thread to run.
sleep(0.1)

expect(worker.send(:snapshot_queue)).to eq([])
end

context 'when three snapshots are added in quick succession' do
it 'sends two batches' do
expect(worker.send(:snapshot_queue)).to be_empty

expect(transport).to receive(:send_snapshot).once.with([snapshot])

worker.add_snapshot(snapshot)
sleep 0.1
worker.add_snapshot(snapshot)
sleep 0.1
worker.add_snapshot(snapshot)

# Since sending is asynchronous, we need to relinquish execution
# for the sending thread to run.
sleep(0.1)

# At this point the first snapshot should have been sent,
# with the remaining two in the queue
expect(worker.send(:snapshot_queue)).to eq([snapshot, snapshot])

sleep 0.4
# Still within the cooldown period
expect(worker.send(:snapshot_queue)).to eq([snapshot, snapshot])

expect(transport).to receive(:send_snapshot).once.with([snapshot, snapshot])

sleep 0.5
expect(worker.send(:snapshot_queue)).to eq([])
end
end
end
end
end

0 comments on commit 833065b

Please sign in to comment.