diff --git a/ddtrace/contrib/aiohttp/__init__.py b/ddtrace/contrib/aiohttp/__init__.py index 3075ca744fa..3fdaca2fad5 100644 --- a/ddtrace/contrib/aiohttp/__init__.py +++ b/ddtrace/contrib/aiohttp/__init__.py @@ -36,6 +36,13 @@ Default: ``False`` +.. py:data:: ddtrace.config.aiohttp['disable_stream_timing_for_mem_leak'] + + Whether or not to to address a potential memory leak in the aiohttp integration. + When set to ``True``, this flag may cause streamed response span timing to be inaccurate. + + Default: ``False`` + Server ****** diff --git a/ddtrace/contrib/internal/aiohttp/middlewares.py b/ddtrace/contrib/internal/aiohttp/middlewares.py index 4f7abe5a12f..b019bd25013 100644 --- a/ddtrace/contrib/internal/aiohttp/middlewares.py +++ b/ddtrace/contrib/internal/aiohttp/middlewares.py @@ -70,8 +70,9 @@ async def attach_context(request): request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY] try: response = await handler(request) - if isinstance(response, web.StreamResponse): - request.task.add_done_callback(lambda _: finish_request_span(request, response)) + if not config.aiohttp["disable_stream_timing_for_mem_leak"]: + if isinstance(response, web.StreamResponse): + request.task.add_done_callback(lambda _: finish_request_span(request, response)) return response except Exception: request_span.set_traceback() @@ -142,9 +143,13 @@ async def on_prepare(request, response): the trace middleware execution. """ # NB isinstance is not appropriate here because StreamResponse is a parent of the other - # aiohttp response types - if type(response) is web.StreamResponse and not response.task.done(): - return + # aiohttp response types. However in some cases this can also lead to missing the closing of + # spans, leading to a memory leak, which is why we have this flag. + # todo: this is a temporary fix for a memory leak in aiohttp. We should find a way to + # consistently close spans with the correct timing. + if not config.aiohttp["disable_stream_timing_for_mem_leak"]: + if type(response) is web.StreamResponse and not response.task.done(): + return finish_request_span(request, response) diff --git a/ddtrace/contrib/internal/aiohttp/patch.py b/ddtrace/contrib/internal/aiohttp/patch.py index 900a8d26e41..4643ba2ae43 100644 --- a/ddtrace/contrib/internal/aiohttp/patch.py +++ b/ddtrace/contrib/internal/aiohttp/patch.py @@ -22,6 +22,7 @@ from ddtrace.internal.utils import get_argument_value from ddtrace.internal.utils.formats import asbool from ddtrace.propagation.http import HTTPPropagator +from ddtrace.settings._core import get_config as _get_config from ddtrace.trace import Pin @@ -31,7 +32,12 @@ # Server config config._add( "aiohttp", - dict(distributed_tracing=True), + dict( + distributed_tracing=True, + disable_stream_timing_for_mem_leak=asbool( + _get_config("DD_AIOHTTP_CLIENT_DISABLE_STREAM_TIMING_FOR_MEM_LEAK", default=False) + ), + ), ) config._add( diff --git a/releasenotes/notes/add_aiohttp_memory_leak_flag-66005f987dbfbd47.yaml b/releasenotes/notes/add_aiohttp_memory_leak_flag-66005f987dbfbd47.yaml new file mode 100644 index 00000000000..67ef6980a36 --- /dev/null +++ b/releasenotes/notes/add_aiohttp_memory_leak_flag-66005f987dbfbd47.yaml @@ -0,0 +1,5 @@ +--- + +fixes: + - | + aiohttp: Adds the environment variable ``DD_AIOHTTP_CLIENT_DISABLE_STREAM_TIMING_FOR_MEM_LEAK`` to address a potential memory leak in the aiohttp integration. When set to true, this flag may cause streamed response span timing to be inaccurate. The flag defaults to false. \ No newline at end of file diff --git a/tests/contrib/aiohttp/test_request.py b/tests/contrib/aiohttp/test_request.py index 36e9d8a399a..056eda09c4b 100644 --- a/tests/contrib/aiohttp/test_request.py +++ b/tests/contrib/aiohttp/test_request.py @@ -31,6 +31,33 @@ async def test_full_request(patched_app_tracer, aiohttp_client, loop): assert "GET /" == request_span.resource +async def test_full_request_w_mem_leak_prevention_flag(patched_app_tracer, aiohttp_client, loop): + config.aiohttp.disable_stream_timing_for_mem_leak = True + try: + app, tracer = patched_app_tracer + client = await aiohttp_client(app) + # it should create a root span when there is a handler hit + # with the proper tags + request = await client.request("GET", "/") + assert 200 == request.status + await request.text() + # the trace is created + traces = tracer.pop_traces() + assert 1 == len(traces) + assert 1 == len(traces[0]) + request_span = traces[0][0] + assert_is_measured(request_span) + + # request + assert "aiohttp-web" == request_span.service + assert "aiohttp.request" == request_span.name + assert "GET /" == request_span.resource + except Exception: + raise + finally: + config.aiohttp.disable_stream_timing_for_mem_leak = False + + async def test_stream_request(patched_app_tracer, aiohttp_client, loop): app, tracer = patched_app_tracer async with await aiohttp_client(app) as client: