Skip to content

Commit

Permalink
add flag
Browse files Browse the repository at this point in the history
rn

add test to verify new config does not break tracing

add to docs and use isinstance

switch back to type since isinstance breaks tests

use get_config

change config back at end of test
  • Loading branch information
ZStriker19 committed Feb 3, 2025
1 parent b90fa38 commit cf983ed
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 6 deletions.
7 changes: 7 additions & 0 deletions ddtrace/contrib/aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
Default: ``False``
.. py:data:: ddtrace.config.aiohttp['disable_stream_timing_for_mem_leak']
Whether or not to to address a potential memory leak in the aiohttp integration.
When set to ``True``, this flag may cause streamed response span timing to be inaccurate.
Default: ``False``
Server
******
Expand Down
15 changes: 10 additions & 5 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ async def attach_context(request):
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
if not config.aiohttp["disable_stream_timing_for_mem_leak"]:
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
Expand Down Expand Up @@ -142,9 +143,13 @@ async def on_prepare(request, response):
the trace middleware execution.
"""
# NB isinstance is not appropriate here because StreamResponse is a parent of the other
# aiohttp response types
if type(response) is web.StreamResponse and not response.task.done():
return
# aiohttp response types. However in some cases this can also lead to missing the closing of
# spans, leading to a memory leak, which is why we have this flag.
# todo: this is a temporary fix for a memory leak in aiohttp. We should find a way to
# consistently close spans with the correct timing.
if not config.aiohttp["disable_stream_timing_for_mem_leak"]:
if type(response) is web.StreamResponse and not response.task.done():
return
finish_request_span(request, response)


Expand Down
8 changes: 7 additions & 1 deletion ddtrace/contrib/internal/aiohttp/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ddtrace.internal.utils import get_argument_value
from ddtrace.internal.utils.formats import asbool
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.settings._core import get_config as _get_config
from ddtrace.trace import Pin


Expand All @@ -31,7 +32,12 @@
# Server config
config._add(
"aiohttp",
dict(distributed_tracing=True),
dict(
distributed_tracing=True,
disable_stream_timing_for_mem_leak=asbool(
_get_config("DD_AIOHTTP_CLIENT_DISABLE_STREAM_TIMING_FOR_MEM_LEAK", default=False)
),
),
)

config._add(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---

fixes:
- |
aiohttp: Adds the environment variable ``DD_AIOHTTP_CLIENT_DISABLE_STREAM_TIMING_FOR_MEM_LEAK`` to address a potential memory leak in the aiohttp integration. When set to true, this flag may cause streamed response span timing to be inaccurate. The flag defaults to false.
27 changes: 27 additions & 0 deletions tests/contrib/aiohttp/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ async def test_full_request(patched_app_tracer, aiohttp_client, loop):
assert "GET /" == request_span.resource


async def test_full_request_w_mem_leak_prevention_flag(patched_app_tracer, aiohttp_client, loop):
config.aiohttp.disable_stream_timing_for_mem_leak = True
try:
app, tracer = patched_app_tracer
client = await aiohttp_client(app)
# it should create a root span when there is a handler hit
# with the proper tags
request = await client.request("GET", "/")
assert 200 == request.status
await request.text()
# the trace is created
traces = tracer.pop_traces()
assert 1 == len(traces)
assert 1 == len(traces[0])
request_span = traces[0][0]
assert_is_measured(request_span)

# request
assert "aiohttp-web" == request_span.service
assert "aiohttp.request" == request_span.name
assert "GET /" == request_span.resource
except Exception:
raise
finally:
config.aiohttp.disable_stream_timing_for_mem_leak = False


async def test_stream_request(patched_app_tracer, aiohttp_client, loop):
app, tracer = patched_app_tracer
async with await aiohttp_client(app) as client:
Expand Down

0 comments on commit cf983ed

Please sign in to comment.