Skip to content

Commit

Permalink
munir: move telemetry logger to a handler
Browse files Browse the repository at this point in the history
munir: add telemetry log handler
  • Loading branch information
mabdinur committed Jan 31, 2025
1 parent f613c49 commit 3c1acc5
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 109 deletions.
108 changes: 2 additions & 106 deletions ddtrace/internal/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import logging
import os
import traceback
import typing
from typing import Optional # noqa:F401
from typing import cast # noqa:F401
Expand Down Expand Up @@ -47,7 +46,7 @@ def get_logger(name):
logger = manager.loggerDict[name]
if isinstance(manager.loggerDict[name], logging.PlaceHolder):
placeholder = logger
logger = _new_logger(name=name)
logger = DDLogger(name=name)
manager.loggerDict[name] = logger
# DEV: `_fixupChildren` and `_fixupParents` have been around for awhile,
# DEV: but add the `hasattr` guard... just in case.
Expand All @@ -56,7 +55,7 @@ def get_logger(name):
if hasattr(manager, "_fixupParents"):
manager._fixupParents(logger)
else:
logger = _new_logger(name=name)
logger = DDLogger(name=name)
manager.loggerDict[name] = logger
if hasattr(manager, "_fixupParents"):
manager._fixupParents(logger)
Expand All @@ -65,13 +64,6 @@ def get_logger(name):
return cast(DDLogger, logger)


def _new_logger(name):
if _TelemetryConfig.LOG_COLLECTION_ENABLED:
if name.startswith("ddtrace.contrib."):
return DDTelemetryLogger(name=name)
return DDLogger(name=name)


def hasHandlers(self):
# type: (DDLogger) -> bool
"""
Expand Down Expand Up @@ -142,14 +134,6 @@ def handle(self, record):
:param record: The log record being logged
:type record: ``logging.LogRecord``
"""
if record.levelno >= logging.ERROR:
# avoid circular import
from ddtrace.internal import telemetry

# currently we only have one error code
full_file_name = os.path.join(record.pathname, record.filename)
telemetry.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)

# If rate limiting has been disabled (`DD_TRACE_LOGGING_RATE=0`) then apply no rate limit
# If the logging is in debug, then do not apply any limits to any log
if not self.rate_limit or self.getEffectiveLevel() == logging.DEBUG:
Expand Down Expand Up @@ -186,91 +170,3 @@ def handle(self, record):
# Increment the count of records we have skipped
# DEV: `self.buckets[key]` is a tuple which is immutable so recreate instead
self.buckets[key] = DDLogger.LoggingBucket(logging_bucket.bucket, logging_bucket.skipped + 1)


class DDTelemetryLogger(DDLogger):
"""
Logger that intercepts and reports exceptions to the telemetry.
"""

def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
"""Constructor for ``DDTelemetryLogger``"""
super(DDTelemetryLogger, self).__init__(*args, **kwargs)

self.telemetry_log_buckets = collections.defaultdict(
lambda: DDLogger.LoggingBucket(0, 0)
) # type: DefaultDict[Tuple[str, int, str, int], DDLogger.LoggingBucket]

def handle(self, record):
# type: (logging.LogRecord) -> None

from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL

key = (record.name, record.levelno, record.pathname, record.lineno)
current_bucket = int(record.created / _TelemetryConfig.TELEMETRY_HEARTBEAT_INTERVAL)
key_bucket = self.telemetry_log_buckets[key]
if key_bucket.bucket == current_bucket:
self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(key_bucket.bucket, key_bucket.skipped + 1)
else:
self.telemetry_log_buckets[key] = DDLogger.LoggingBucket(current_bucket, 0)
level = (
TELEMETRY_LOG_LEVEL.ERROR
if record.levelno >= logging.ERROR
else TELEMETRY_LOG_LEVEL.WARNING
if record.levelno == logging.WARNING
else TELEMETRY_LOG_LEVEL.DEBUG
)
from ddtrace.internal import telemetry

tags = {
"lib_language": "python",
}
stack_trace = _format_stack_trace(record.exc_info) if record.exc_info is not None else None
if record.levelno >= logging.ERROR or stack_trace is not None:
# Report only an error or an exception with a stack trace
telemetry.telemetry_writer.add_log(
level, record.msg, tags=tags, stack_trace=stack_trace, count=key_bucket.skipped + 1
)

super().handle(record)


def _format_stack_trace(exc_info):
exc_type, exc_value, exc_traceback = exc_info
if exc_traceback:
tb = traceback.extract_tb(exc_traceback)
formatted_tb = ["Traceback (most recent call last):"]
for filename, lineno, funcname, srcline in tb:
if _should_redact(filename):
formatted_tb.append(" <REDACTED>")
else:
relative_filename = _format_file_path(filename)
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
formatted_tb.append(formatted_line)
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
return "\n".join(formatted_tb)
return None


def _should_redact(filename):
return not "ddtrace" in filename


CWD = os.getcwd()


def _format_file_path(filename):
try:
return os.path.relpath(filename, start=CWD)
except ValueError:
return filename


class _TelemetryConfig:
TELEMETRY_ENABLED = os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "true").lower() in ("true", "1")
LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in (
"true",
"1",
)
TELEMETRY_HEARTBEAT_INTERVAL = int(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60"))
66 changes: 66 additions & 0 deletions ddtrace/internal/telemetry/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import os
import traceback

from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL


class DDTelemetryLogHandler(logging.StreamHandler):
CWD = os.getcwd()

def __init__(self, telemetry_writer):
self.telemetry_writer = telemetry_writer
super().__init__()

def emit(self, record):
# type: (logging.LogRecord) -> None
if record.levelno >= logging.ERROR:
# Capture start up errors
full_file_name = os.path.join(record.pathname, record.filename)
self.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)

# Capture errors logged in the ddtrace integrations
if record.name.startswith("ddtrace.contrib"):
telemetry_level = (
TELEMETRY_LOG_LEVEL.ERROR
if record.levelno >= logging.ERROR
else TELEMETRY_LOG_LEVEL.WARNING
if record.levelno == logging.WARNING
else TELEMETRY_LOG_LEVEL.DEBUG
)
stack_trace = self._format_stack_trace(record.exc_info) if record.exc_info is not None else None
# The majority of ddtrace logs are called with exc_info=None and we won't collect telemetry information for them
if stack_trace is not None:
# Report only exceptions with a stack trace
self.telemetry_writer.add_log(
telemetry_level,
record.msg,
# Do we need to set this tag? Should we allow telemetry intake to infer this value?
tags={"lib_language": "python"},
stack_trace=stack_trace,
)

def _format_stack_trace(self, exc_info):
exc_type, exc_value, exc_traceback = exc_info
if exc_traceback:
tb = traceback.extract_tb(exc_traceback)
formatted_tb = ["Traceback (most recent call last):"]
for filename, lineno, funcname, srcline in tb:
if self._should_redact(filename):
formatted_tb.append(" <REDACTED>")
else:
relative_filename = self._format_file_path(filename)
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
formatted_tb.append(formatted_line)
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
return "\n".join(formatted_tb)
return None

def _should_redact(self, filename):
return "ddtrace" not in filename

def _format_file_path(self, filename):
try:
return os.path.relpath(filename, start=self.CWD)
except ValueError:
return filename
12 changes: 9 additions & 3 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .data import get_host_info
from .data import get_python_config_vars
from .data import update_imported_dependencies
from .logging import DDTelemetryLogHandler
from .metrics import CountMetric
from .metrics import DistributionMetric
from .metrics import GaugeMetric
Expand Down Expand Up @@ -68,6 +69,10 @@ class _TelemetryConfig:
INSTALL_TYPE = os.environ.get("DD_INSTRUMENTATION_INSTALL_TYPE", None)
INSTALL_TIME = os.environ.get("DD_INSTRUMENTATION_INSTALL_TIME", None)
FORCE_START = asbool(os.environ.get("_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED", "false"))
LOG_COLLECTION_ENABLED = TELEMETRY_ENABLED and os.getenv("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "true").lower() in (
"true",
"1",
)


class LogData(dict):
Expand Down Expand Up @@ -222,6 +227,8 @@ def __init__(self, is_periodic=True, agentless=None):
# Force app started for unit tests
if _TelemetryConfig.FORCE_START:
self._app_started()
if _TelemetryConfig.LOG_COLLECTION_ENABLED:
getLogger("ddtrace").addHandler(DDTelemetryLogHandler(self))

def enable(self):
# type: () -> bool
Expand Down Expand Up @@ -484,7 +491,7 @@ def add_configurations(self, configuration_list):
"value": value,
}

def add_log(self, level, message, stack_trace="", tags=None, count=1):
def add_log(self, level, message, stack_trace="", tags=None):
"""
Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake.
This will make support cycles easier and ensure we know about potentially silent issues in libraries.
Expand All @@ -504,8 +511,7 @@ def add_log(self, level, message, stack_trace="", tags=None, count=1):
data["tags"] = ",".join(["%s:%s" % (k, str(v).lower()) for k, v in tags.items()])
if stack_trace:
data["stack_trace"] = stack_trace
if count > 1:
data["count"] = count
# Logs are hashed using the message, level, tags, and stack_trace. This should prevent duplicatation.
self._logs.add(data)

def add_gauge_metric(self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: MetricTagType = None):
Expand Down

0 comments on commit 3c1acc5

Please sign in to comment.