From e2d51d35d62c6c801af539c7899951e3a5d9efd1 Mon Sep 17 00:00:00 2001 From: Oleg Pudeyev Date: Tue, 27 Aug 2024 09:51:28 -0400 Subject: [PATCH] di rate limiter wip --- lib/datadog/di/hook_manager.rb | 36 +++++++++++++++--------- lib/datadog/di/probe.rb | 15 ++++++++++ lib/datadog/di/remote_processor.rb | 14 +++++---- spec/datadog/di/remote_processor_spec.rb | 2 +- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/lib/datadog/di/hook_manager.rb b/lib/datadog/di/hook_manager.rb index 90fb4db8b6f..55e31cf1393 100644 --- a/lib/datadog/di/hook_manager.rb +++ b/lib/datadog/di/hook_manager.rb @@ -55,6 +55,7 @@ def initialize(settings) if cls_name.to_s == tp.self.name # TODO is it OK to hook from trace point handler? # TODO the class is now defined, but can hooking still fail? + # TODO pass rate_limiter here, need to get it from somewhere hook_method(cls_name, method_name, &block) pending_methods.delete(pm) end @@ -85,7 +86,7 @@ def clear_hooks end end - def hook_method(cls_name, meth_name) + def hook_method(cls_name, meth_name, rate_limiter: nil) cls = symbolize_class_name(cls_name) id = next_id @@ -95,12 +96,16 @@ def hook_method(cls_name, meth_name) remove_method(meth_name) define_method(meth_name) do |*args, **kwargs| if DI.component.hook_manager.instrumented_methods[[cls_name, meth_name]] == id - rv = nil - duration = Benchmark.realtime do - rv = saved.bind(self).call(*args, **kwargs) + if rate_limiter.nil? || rate_limiter.allow? + rv = nil + duration = Benchmark.realtime do + rv = saved.bind(self).call(*args, **kwargs) + end + yield rv: rv, duration: duration, callers: caller, args: args, kwargs: kwargs + rv + else + saved.bind(self).call(*args, **kwargs) end - yield rv: rv, duration: duration, callers: caller, args: args, kwargs: kwargs - rv else saved.bind(self).call(*args, **kwargs) end @@ -110,9 +115,9 @@ def hook_method(cls_name, meth_name) instrumented_methods[[cls_name, meth_name]] = id end - def hook_method_when_defined(cls_name, meth_name, &block) + def hook_method_when_defined(cls_name, meth_name, rate_limiter: nil, &block) begin - hook_method(cls_name, meth_name, &block) + hook_method(cls_name, meth_name, rate_limiter: rate_limiter, &block) true rescue Error::DITargetNotDefined pending_methods[[cls_name, meth_name]] = block @@ -125,7 +130,7 @@ def hook_method_when_defined(cls_name, meth_name, &block) # not for eval'd code, unless the eval'd code is associated with # a filenam and client invokes this method with the correct # file name for the eval'd code. - def hook_line_now(file, line_no, &block) + def hook_line_now(file, line_no, rate_limiter: nil, &block) # TODO is file a basename, path suffix or full path? # Maybe support all? file = File.basename(file) @@ -187,7 +192,9 @@ def hook_line_now(file, line_no, &block) tp&.disable tp = TracePoint.new(:line) do |tp| - on_line_trace_point(tp, callers: caller, &block) + if rate_limiter.nil? || rate_limiter.allow? + on_line_trace_point(tp, callers: caller, &block) + end end # Put the trace point into our tracking map first to prevent @@ -214,18 +221,19 @@ def hook_line_now(file, line_no, &block) end end - def hook_line_when_defined(file, line_no, &block) + def hook_line_when_defined(file, line_no, rate_limiter: nil, &block) begin - hook_line_now(file, line_no, &block) + hook_line_now(file, line_no, rate_limiter: rate_limiter, &block) true rescue Error::DITargetNotDefined + # TODO store rate limiter pending_lines[[file, line_no]] = block false end end - def hook_line(file, line_no, &block) - hook_line_when_defined(file, line_no, &block) + def hook_line(file, line_no, rate_limiter: nil, &block) + hook_line_when_defined(file, line_no, rate_limiter: rate_limiter, &block) end def install_pending_line_hooks(file) diff --git a/lib/datadog/di/probe.rb b/lib/datadog/di/probe.rb index 26657de5015..2364efeb804 100644 --- a/lib/datadog/di/probe.rb +++ b/lib/datadog/di/probe.rb @@ -20,6 +20,10 @@ def initialize(id:, type:, @method_name = method_name @template = template @capture_snapshot = capture_snapshot + + if capture_snapshot + @rate_limiter = Datadog::Tracing::Sampling::TokenBucket.new(1) + end end attr_reader :id @@ -30,6 +34,9 @@ def initialize(id:, type:, attr_reader :method_name attr_reader :template + # For internal DI use only + attr_reader :rate_limiter + def capture_snapshot? @capture_snapshot end @@ -59,6 +66,14 @@ def location raise Error::UnknownProbeType, 'Unhandled probe type: neither method nor line probe' end end + + # Returns whether this probe is currently under its rate limit. + # If the probe is not rate limited, always returns true. + # Calling this method records the invocation in the rate limiter. + # TODO remove + def under_rate_limit? + rate_limiter.nil? || rate_limiter.allow?(1) + end end end end diff --git a/lib/datadog/di/remote_processor.rb b/lib/datadog/di/remote_processor.rb index af06b12896c..2f37aa069ea 100644 --- a/lib/datadog/di/remote_processor.rb +++ b/lib/datadog/di/remote_processor.rb @@ -26,14 +26,18 @@ def process(config) begin if probe.line? # TODO Test that stack trace is correct with user code being top entry - hook_manager.hook_line(probe.file, probe.line_nos.first) do |tp, callers:| - puts '*** line probe executed ***' - ProbeNotifier.notify_emitting(probe) - ProbeNotifier.notify_executed(probe, tracepoint: tp, callers: callers) + hook_manager.hook_line(probe.file, probe.line_nos.first, rate_limiter: probe.rate_limiter) do |tp, callers:| + if probe.under_rate_limit? + puts '*** line probe executed ***' + ProbeNotifier.notify_emitting(probe) + ProbeNotifier.notify_executed(probe, tracepoint: tp, callers: callers) + else + puts '*** line probe rate limited ***' + end end elsif probe.method? # TODO Test that stack trace is correct with user code being top entry - hook_manager.hook_method_when_defined(probe.type_name, probe.method_name) do |**opts| + hook_manager.hook_method_when_defined(probe.type_name, probe.method_name, rate_limiter: probe.rate_limiter) do |**opts| puts "*** method probe executed: #{opts} ***" ProbeNotifier.notify_emitting(probe) ProbeNotifier.notify_executed(probe, **opts) diff --git a/spec/datadog/di/remote_processor_spec.rb b/spec/datadog/di/remote_processor_spec.rb index 4c182ea7fa4..fe7aa511266 100644 --- a/spec/datadog/di/remote_processor_spec.rb +++ b/spec/datadog/di/remote_processor_spec.rb @@ -64,7 +64,7 @@ end it 'parses the probe and adds it to the defined probe list' do - expect(hook_manager).to receive(:hook_line).with('aaa', 2) + expect(hook_manager).to receive(:hook_line).with('aaa', 2, rate_limiter: nil) processor.process(config)