Skip to content

Commit

Permalink
[SPARK-51178][CONNECT][PYTHON] Raise proper PySpark error instead of …
Browse files Browse the repository at this point in the history
…`SparkConnectGrpcException`

### What changes were proposed in this pull request?

This PR propose to raise proper PySpark error instead of `SparkConnectGrpcException`.

This PR also introduces new PySpark error `PickleException` to cover the errors that represents an exception which is failed while pickling from server side

### Why are the changes needed?

To raise proper exception instead of `SparkConnectGrpcException`

### Does this PR introduce _any_ user-facing change?

No API changes, but the user-facing error improvement.

### How was this patch tested?

Updated the existing UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49953 from itholic/SPARK-51178.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 4134e9f)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic authored and HyukjinKwon committed Feb 18, 2025
1 parent 0b627ae commit 58029ac
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 4 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Classes
TempTableAlreadyExistsException
UnknownException
UnsupportedOperationException
PickleException


Methods
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
QueryContext,
QueryContextType,
StreamingPythonRunnerInitializationException,
PickleException,
)


Expand Down Expand Up @@ -87,4 +88,5 @@
"QueryContext",
"QueryContextType",
"StreamingPythonRunnerInitializationException",
"PickleException",
]
8 changes: 8 additions & 0 deletions python/pyspark/errors/exceptions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ class PySparkImportError(PySparkException, ImportError):
"""


class PickleException(PySparkException):
"""
Represents an exception which is failed while pickling from server side
such as `net.razorvine.pickle.PickleException`. This is different from `PySparkPicklingError`
which represents an exception failed from Python built-in `pickle.PicklingError`.
"""


class QueryContextType(Enum):
"""
The type of :class:`QueryContext`.
Expand Down
21 changes: 21 additions & 0 deletions python/pyspark/errors/exceptions/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
QueryContext as BaseQueryContext,
QueryContextType,
StreamingPythonRunnerInitializationException as BaseStreamingPythonRunnerInitException,
PickleException as BasePickleException,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -94,6 +95,14 @@ def convert_exception(
# Return exception based on class mapping
for error_class_name in classes:
ExceptionClass = EXCEPTION_CLASS_MAPPING.get(error_class_name)
if ExceptionClass is SparkException:
for third_party_exception_class in THIRD_PARTY_EXCEPTION_CLASS_MAPPING:
ExceptionClass = (
THIRD_PARTY_EXCEPTION_CLASS_MAPPING.get(third_party_exception_class)
if third_party_exception_class in message
else SparkException
)

if ExceptionClass:
return ExceptionClass(
message,
Expand Down Expand Up @@ -316,6 +325,14 @@ class StreamingPythonRunnerInitializationException(
"""


class PickleException(SparkConnectGrpcException, BasePickleException):
"""
Represents an exception which is failed while pickling from server side
such as `net.razorvine.pickle.PickleException`. This is different from `PySparkPicklingError`
which represents an exception failed from Python built-in `pickle.PicklingError`.
"""


# Update EXCEPTION_CLASS_MAPPING here when adding a new exception
EXCEPTION_CLASS_MAPPING = {
"org.apache.spark.sql.catalyst.parser.ParseException": ParseException,
Expand All @@ -339,6 +356,10 @@ class StreamingPythonRunnerInitializationException(
"$StreamingPythonRunnerInitializationException": StreamingPythonRunnerInitializationException,
}

THIRD_PARTY_EXCEPTION_CLASS_MAPPING = {
"net.razorvine.pickle.PickleException": PickleException,
}


class SQLQueryContext(BaseQueryContext):
def __init__(self, q: pb2.FetchErrorDetailsResponse.QueryContext):
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/sql/tests/connect/test_parity_udtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
sql.udtf.UserDefinedTableFunction = UserDefinedTableFunction
from pyspark.sql.connect.functions import lit, udtf
from pyspark.errors.exceptions.connect import (
SparkConnectGrpcException,
PickleException,
PythonException,
InvalidPlanInput,
)
Expand All @@ -46,10 +46,8 @@ def tearDownClass(cls):
finally:
super(UDTFParityTests, cls).tearDownClass()

# TODO: use PySpark error classes instead of SparkConnectGrpcException

def test_struct_output_type_casting_row(self):
self.check_struct_output_type_casting_row(SparkConnectGrpcException)
self.check_struct_output_type_casting_row(PickleException)

def test_udtf_with_invalid_return_type(self):
@udtf(returnType="int")
Expand Down

0 comments on commit 58029ac

Please sign in to comment.