Skip to content

Commit

Permalink
[SPARK-51274][PYTHON] PySparkLogger should respect the expected keywo…
Browse files Browse the repository at this point in the history
…rd arguments

### What changes were proposed in this pull request?

`PySparkLogger` should respect the expected keyword arguments.

Also, `debug`, `warn`, `critical`, `fatal`, and `log` are added to have proper docs.

### Why are the changes needed?

Currently all of keyword arguments for `PySparkLogger` will be in the `context`, but it should respect the expected keyword arguments, like `exc_info`, `stack_info`, etc.

### Does this PR introduce _any_ user-facing change?

Yes, the logging methods for `PySparkLogger` will respect the expected arguments.

- before:

```py
>>> from pyspark.logger.logger import PySparkLogger
>>> logger = PySparkLogger.getLogger("TestLogger")
>>>
>>> logger.warning("This is an info log", exc_info=True, user="test_user_info", action="test_action_info")
{"ts": "2025-02-21 10:46:53,786", "level": "WARNING", "logger": "TestLogger", "msg": "This is an info log", "context": {"exc_info": true, "user": "test_user_info", "action": "test_action_info"}}
```

- after

```py
>>> logger.warning("This is an info log", exc_info=True, user="test_user_info", action="test_action_info")
{"ts": "2025-02-21 10:47:36,351", "level": "WARNING", "logger": "TestLogger", "msg": "This is an info log", "context": {"user": "test_user_info", "action": "test_action_info"}, "exception": {"class": "UnknownException", "msg": "None", "stacktrace": ["NoneType: None"]}}
```

### How was this patch tested?

Added the related tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50032 from ueshin/issues/SPARK-51274/logger.

Authored-by: Takuya Ueshin <ueshin@databricks.com>
Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
(cherry picked from commit eb4a28b)
Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
  • Loading branch information
ueshin committed Feb 21, 2025
1 parent 0676664 commit a24f3c3
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 11 deletions.
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,8 @@ def __hash__(self):
dependencies=[],
source_file_regexes=["python/pyspark/logger"],
python_test_goals=[
# doctests
"pyspark.logger.logger",
# unittests
"pyspark.logger.tests.test_logger",
"pyspark.logger.tests.connect.test_parity_logger",
Expand Down
116 changes: 105 additions & 11 deletions python/pyspark/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import logging
import json
from typing import cast, Optional
import sys
from typing import cast, Mapping, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from logging import _ArgsType, _ExcInfoType

SPARK_LOG_SCHEMA = (
"ts TIMESTAMP, "
Expand Down Expand Up @@ -99,7 +103,7 @@ class PySparkLogger(logging.Logger):
>>> logger.info(
... "This is an informational message",
... extra={"user": "test_user", "action": "test_action"}
... user="test_user", action="test_action"
... )
>>> log_output = stream.getvalue().strip().split('\\n')[0]
>>> log = json.loads(log_output)
Expand All @@ -111,10 +115,8 @@ class PySparkLogger(logging.Logger):
"logger": "ExampleLogger",
"msg": "This is an informational message",
"context": {
"extra": {
"user": "test_user",
"action": "test_action"
}
"user": "test_user",
"action": "test_action"
}
}
"""
Expand Down Expand Up @@ -158,6 +160,17 @@ def getLogger(name: Optional[str] = None) -> "PySparkLogger":

return cast(PySparkLogger, pyspark_logger)

def debug(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Log 'msg % args' with severity 'DEBUG' in structured JSON format.
Parameters
----------
msg : str
The log message.
"""
super().debug(msg, *args, **kwargs) # type: ignore[arg-type]

def info(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Log 'msg % args' with severity 'INFO' in structured JSON format.
Expand All @@ -167,7 +180,7 @@ def info(self, msg: object, *args: object, **kwargs: object) -> None:
msg : str
The log message.
"""
super().info(msg, *args, extra={"kwargs": kwargs})
super().info(msg, *args, **kwargs) # type: ignore[arg-type]

def warning(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Expand All @@ -178,7 +191,20 @@ def warning(self, msg: object, *args: object, **kwargs: object) -> None:
msg : str
The log message.
"""
super().warning(msg, *args, extra={"kwargs": kwargs})
super().warning(msg, *args, **kwargs) # type: ignore[arg-type]

if sys.version_info < (3, 13):

def warn(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Log 'msg % args' with severity 'WARN' in structured JSON format.
Parameters
----------
msg : str
The log message.
"""
super().warn(msg, *args, **kwargs) # type: ignore[arg-type]

def error(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Expand All @@ -189,9 +215,11 @@ def error(self, msg: object, *args: object, **kwargs: object) -> None:
msg : str
The log message.
"""
super().error(msg, *args, extra={"kwargs": kwargs})
super().error(msg, *args, **kwargs) # type: ignore[arg-type]

def exception(self, msg: object, *args: object, **kwargs: object) -> None:
def exception(
self, msg: object, *args: object, exc_info: "_ExcInfoType" = True, **kwargs: object
) -> None:
"""
Convenience method for logging an ERROR with exception information.
Expand All @@ -203,4 +231,70 @@ def exception(self, msg: object, *args: object, **kwargs: object) -> None:
If True, exception information is added to the logging message.
This includes the exception type, value, and traceback. Default is True.
"""
super().error(msg, *args, exc_info=True, extra={"kwargs": kwargs})
super().exception(msg, *args, exc_info=exc_info, **kwargs) # type: ignore[arg-type]

def critical(self, msg: object, *args: object, **kwargs: object) -> None:
"""
Log 'msg % args' with severity 'CRITICAL' in structured JSON format.
Parameters
----------
msg : str
The log message.
"""
super().critical(msg, *args, **kwargs) # type: ignore[arg-type]

def log(self, level: int, msg: object, *args: object, **kwargs: object) -> None:
"""
Log 'msg % args' with the given severity in structured JSON format.
Parameters
----------
level : int
The log level.
msg : str
The log message.
"""
super().log(level, msg, *args, **kwargs) # type: ignore[arg-type]

fatal = critical

def _log(
self,
level: int,
msg: object,
args: "_ArgsType",
exc_info: Optional["_ExcInfoType"] = None,
extra: Optional[Mapping[str, object]] = None,
stack_info: bool = False,
stacklevel: int = 1,
**kwargs: object,
) -> None:
if extra is not None:
kwargs["extra"] = extra
super()._log(
level=level,
msg=msg,
args=args,
exc_info=exc_info,
extra={"kwargs": kwargs},
stack_info=stack_info,
stacklevel=stacklevel,
)


def _test() -> None:
import doctest
import pyspark.logger.logger

globs = pyspark.logger.logger.__dict__.copy()
(failure_count, test_count) = doctest.testmod(
pyspark.logger.logger, globs=globs, optionflags=doctest.ELLIPSIS
)

if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
16 changes: 16 additions & 0 deletions python/pyspark/logger/tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ def test_log_info(self):
)
self.assertTrue("exception" not in log_json)

def test_log_info_with_exception(self):
# SPARK-51274: PySparkLogger should respect the expected keyword arguments
self.logger.info(
"This is an info log", exc_info=True, user="test_user_info", action="test_action_info"
)
log_json = json.loads(self.handler.stream.getvalue().strip())

self.assertEqual(log_json["msg"], "This is an info log")
self.assertEqual(
log_json["context"], {"action": "test_action_info", "user": "test_user_info"}
)
self.assertTrue("exception" in log_json)
self.assertTrue("class" in log_json["exception"])
self.assertTrue("msg" in log_json["exception"])
self.assertTrue("stacktrace" in log_json["exception"])

def test_log_warn(self):
self.logger.warn("This is an warn log", user="test_user_warn", action="test_action_warn")
log_json = json.loads(self.handler.stream.getvalue().strip())
Expand Down

0 comments on commit a24f3c3

Please sign in to comment.