Skip to content

Commit

Permalink
di rate limiter wip
Browse files Browse the repository at this point in the history
  • Loading branch information
p committed Aug 27, 2024
1 parent 57a7bac commit e2d51d3
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 20 deletions.
36 changes: 22 additions & 14 deletions lib/datadog/di/hook_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def initialize(settings)
if cls_name.to_s == tp.self.name
# TODO is it OK to hook from trace point handler?
# TODO the class is now defined, but can hooking still fail?
# TODO pass rate_limiter here, need to get it from somewhere
hook_method(cls_name, method_name, &block)
pending_methods.delete(pm)
end
Expand Down Expand Up @@ -85,7 +86,7 @@ def clear_hooks
end
end

def hook_method(cls_name, meth_name)
def hook_method(cls_name, meth_name, rate_limiter: nil)
cls = symbolize_class_name(cls_name)
id = next_id

Expand All @@ -95,12 +96,16 @@ def hook_method(cls_name, meth_name)
remove_method(meth_name)
define_method(meth_name) do |*args, **kwargs|
if DI.component.hook_manager.instrumented_methods[[cls_name, meth_name]] == id
rv = nil
duration = Benchmark.realtime do
rv = saved.bind(self).call(*args, **kwargs)
if rate_limiter.nil? || rate_limiter.allow?
rv = nil
duration = Benchmark.realtime do
rv = saved.bind(self).call(*args, **kwargs)
end
yield rv: rv, duration: duration, callers: caller, args: args, kwargs: kwargs
rv
else
saved.bind(self).call(*args, **kwargs)
end
yield rv: rv, duration: duration, callers: caller, args: args, kwargs: kwargs
rv
else
saved.bind(self).call(*args, **kwargs)
end
Expand All @@ -110,9 +115,9 @@ def hook_method(cls_name, meth_name)
instrumented_methods[[cls_name, meth_name]] = id
end

def hook_method_when_defined(cls_name, meth_name, &block)
def hook_method_when_defined(cls_name, meth_name, rate_limiter: nil, &block)
begin
hook_method(cls_name, meth_name, &block)
hook_method(cls_name, meth_name, rate_limiter: rate_limiter, &block)
true
rescue Error::DITargetNotDefined
pending_methods[[cls_name, meth_name]] = block
Expand All @@ -125,7 +130,7 @@ def hook_method_when_defined(cls_name, meth_name, &block)
# not for eval'd code, unless the eval'd code is associated with
# a filenam and client invokes this method with the correct
# file name for the eval'd code.
def hook_line_now(file, line_no, &block)
def hook_line_now(file, line_no, rate_limiter: nil, &block)
# TODO is file a basename, path suffix or full path?
# Maybe support all?
file = File.basename(file)
Expand Down Expand Up @@ -187,7 +192,9 @@ def hook_line_now(file, line_no, &block)
tp&.disable

tp = TracePoint.new(:line) do |tp|
on_line_trace_point(tp, callers: caller, &block)
if rate_limiter.nil? || rate_limiter.allow?
on_line_trace_point(tp, callers: caller, &block)
end
end

# Put the trace point into our tracking map first to prevent
Expand All @@ -214,18 +221,19 @@ def hook_line_now(file, line_no, &block)
end
end

def hook_line_when_defined(file, line_no, &block)
def hook_line_when_defined(file, line_no, rate_limiter: nil, &block)
begin
hook_line_now(file, line_no, &block)
hook_line_now(file, line_no, rate_limiter: rate_limiter, &block)
true
rescue Error::DITargetNotDefined
# TODO store rate limiter
pending_lines[[file, line_no]] = block
false
end
end

def hook_line(file, line_no, &block)
hook_line_when_defined(file, line_no, &block)
def hook_line(file, line_no, rate_limiter: nil, &block)
hook_line_when_defined(file, line_no, rate_limiter: rate_limiter, &block)
end

def install_pending_line_hooks(file)
Expand Down
15 changes: 15 additions & 0 deletions lib/datadog/di/probe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def initialize(id:, type:,
@method_name = method_name
@template = template
@capture_snapshot = capture_snapshot

if capture_snapshot
@rate_limiter = Datadog::Tracing::Sampling::TokenBucket.new(1)
end
end

attr_reader :id
Expand All @@ -30,6 +34,9 @@ def initialize(id:, type:,
attr_reader :method_name
attr_reader :template

# For internal DI use only
attr_reader :rate_limiter

def capture_snapshot?
@capture_snapshot
end
Expand Down Expand Up @@ -59,6 +66,14 @@ def location
raise Error::UnknownProbeType, 'Unhandled probe type: neither method nor line probe'
end
end

# Returns whether this probe is currently under its rate limit.
# If the probe is not rate limited, always returns true.
# Calling this method records the invocation in the rate limiter.
# TODO remove
def under_rate_limit?
rate_limiter.nil? || rate_limiter.allow?(1)
end
end
end
end
14 changes: 9 additions & 5 deletions lib/datadog/di/remote_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ def process(config)
begin
if probe.line?
# TODO Test that stack trace is correct with user code being top entry
hook_manager.hook_line(probe.file, probe.line_nos.first) do |tp, callers:|
puts '*** line probe executed ***'
ProbeNotifier.notify_emitting(probe)
ProbeNotifier.notify_executed(probe, tracepoint: tp, callers: callers)
hook_manager.hook_line(probe.file, probe.line_nos.first, rate_limiter: probe.rate_limiter) do |tp, callers:|
if probe.under_rate_limit?
puts '*** line probe executed ***'
ProbeNotifier.notify_emitting(probe)
ProbeNotifier.notify_executed(probe, tracepoint: tp, callers: callers)
else
puts '*** line probe rate limited ***'
end
end
elsif probe.method?
# TODO Test that stack trace is correct with user code being top entry
hook_manager.hook_method_when_defined(probe.type_name, probe.method_name) do |**opts|
hook_manager.hook_method_when_defined(probe.type_name, probe.method_name, rate_limiter: probe.rate_limiter) do |**opts|
puts "*** method probe executed: #{opts} ***"
ProbeNotifier.notify_emitting(probe)
ProbeNotifier.notify_executed(probe, **opts)
Expand Down
2 changes: 1 addition & 1 deletion spec/datadog/di/remote_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
end

it 'parses the probe and adds it to the defined probe list' do
expect(hook_manager).to receive(:hook_line).with('aaa', 2)
expect(hook_manager).to receive(:hook_line).with('aaa', 2, rate_limiter: nil)

processor.process(config)

Expand Down

0 comments on commit e2d51d3

Please sign in to comment.