Skip to content

Commit fd555c8

Browse files
authored
Merge pull request #21 from taskiq-python/feature/deps-update
Dependencies were updated.
2 parents b762571 + 1dd3879 commit fd555c8

File tree

11 files changed

+629
-503
lines changed

11 files changed

+629
-503
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
pytest:
2828
strategy:
2929
matrix:
30-
py_version: ["3.8", "3.9", "3.10", "3.11"]
30+
py_version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
3131
runs-on: "ubuntu-latest"
3232
steps:
3333
- uses: actions/checkout@v2

poetry.lock

+539-449
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+11-12
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ classifiers = [
1212
"Programming Language :: Python",
1313
"Programming Language :: Python :: 3",
1414
"Programming Language :: Python :: 3 :: Only",
15-
"Programming Language :: Python :: 3.8",
1615
"Programming Language :: Python :: 3.9",
1716
"Programming Language :: Python :: 3.10",
1817
"Programming Language :: Python :: 3.11",
18+
"Programming Language :: Python :: 3.12",
19+
"Programming Language :: Python :: 3.13",
1920
"Operating System :: OS Independent",
2021
"Intended Audience :: Developers",
2122
"Topic :: System :: Networking",
@@ -25,20 +26,20 @@ homepage = "https://github.com/taskiq-python/taskiq-pipelines"
2526
keywords = ["taskiq", "pipelines", "tasks", "distributed", "async"]
2627

2728
[tool.poetry.dependencies]
28-
python = "^3.8.1"
29-
taskiq = ">=0.11.0, <1"
29+
python = "^3.9"
30+
taskiq = ">=0.11.12, <1"
3031
typing-extensions = "^4.3.0"
3132
pydantic = "^2"
3233

3334
[tool.poetry.group.dev.dependencies]
34-
pytest = "^7"
35-
black = { version = "^22.6.0", allow-prereleases = true }
36-
pytest-cov = "^3.0.0"
37-
anyio = "^3.6.1"
38-
pre-commit = "^2.20.0"
35+
pytest = "^8"
36+
black = { version = "^25", allow-prereleases = true }
37+
pytest-cov = "^6"
38+
anyio = "^4"
39+
pre-commit = "^4"
3940
mypy = "^1"
40-
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
41-
ruff = "^0.5.6"
41+
pytest-xdist = { version = "^3", extras = ["psutil"] }
42+
ruff = "^0.9.9"
4243

4344
[tool.mypy]
4445
strict = true
@@ -97,8 +98,6 @@ lint.ignore = [
9798
"D401", # First line should be in imperative mood
9899
"D104", # Missing docstring in public package
99100
"D100", # Missing docstring in public module
100-
"ANN102", # Missing type annotation for self in method
101-
"ANN101", # Missing type annotation for argument
102101
"ANN401", # typing.Any are disallowed in `**kwargs
103102
"PLR0913", # Too many arguments for function call
104103
"D106", # Missing docstring in public nested class

taskiq_pipelines/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
"""Pipelines for taskiq tasks."""
2+
23
from taskiq_pipelines.exceptions import AbortPipeline, PipelineError
34
from taskiq_pipelines.middleware import PipelineMiddleware
45
from taskiq_pipelines.pipeliner import Pipeline
56

67
__all__ = [
8+
"AbortPipeline",
79
"Pipeline",
810
"PipelineError",
9-
"AbortPipeline",
1011
"PipelineMiddleware",
1112
]

taskiq_pipelines/exceptions.py

+30
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,36 @@
1+
from typing import ClassVar, Union
2+
13
from taskiq import TaskiqError
24

35

46
class PipelineError(TaskiqError):
57
"""Generic pipeline error."""
68

79

10+
class StepError(PipelineError):
11+
"""Error found while mapping step."""
12+
13+
__template__ = (
14+
"Task {task_id} returned an error. {_STEP_NAME} failed. Reason: {error}"
15+
)
16+
_STEP_NAME: ClassVar[str]
17+
18+
task_id: str
19+
error: Union[BaseException, None]
20+
21+
22+
class MappingError(StepError):
23+
"""Error found while mapping step."""
24+
25+
_STEP_NAME = "mapping"
26+
27+
28+
class FilterError(StepError):
29+
"""Error found while filtering step."""
30+
31+
_STEP_NAME = "filtering"
32+
33+
834
class AbortPipeline(PipelineError): # noqa: N818
935
"""
1036
Abort curret pipeline execution.
@@ -15,3 +41,7 @@ class AbortPipeline(PipelineError): # noqa: N818
1541
It imediately aborts current pipeline
1642
execution.
1743
"""
44+
45+
__template__ = "Pipeline was aborted. {reason}"
46+
47+
reason: str = "No reason provided."

taskiq_pipelines/middleware.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async def post_save( # noqa: PLR0911
3838
return
3939
current_step_num = int(message.labels[CURRENT_STEP])
4040
if PIPELINE_DATA not in message.labels:
41-
logger.warn("Pipline data not found. Execution flow is broken.")
41+
logger.warning("Pipline data not found. Execution flow is broken.")
4242
return
4343
pipeline_data = message.labels[PIPELINE_DATA]
4444
parsed_data = self.broker.serializer.loadb(pipeline_data)
@@ -47,7 +47,7 @@ async def post_save( # noqa: PLR0911
4747
parsed_data,
4848
)
4949
except ValueError as err:
50-
logger.warn("Cannot parse pipline_data: %s", err, exc_info=True)
50+
logger.warning("Cannot parse pipline_data: %s", err, exc_info=True)
5151
return
5252
if current_step_num + 1 >= len(steps_data):
5353
logger.debug("Pipeline is completed.")
@@ -99,7 +99,7 @@ async def on_error(
9999
return
100100
current_step_num = int(message.labels[CURRENT_STEP])
101101
if PIPELINE_DATA not in message.labels:
102-
logger.warn("Pipline data not found. Execution flow is broken.")
102+
logger.warning("Pipline data not found. Execution flow is broken.")
103103
return
104104
pipe_data = message.labels[PIPELINE_DATA]
105105
try:
@@ -129,7 +129,7 @@ async def fail_pipeline(
129129
TaskiqResult(
130130
is_err=True,
131131
return_value=None, # type: ignore
132-
error=abort or AbortPipeline("Execution aborted."),
132+
error=abort or AbortPipeline(reason="Execution aborted."),
133133
execution_time=0,
134134
log="Error found while executing pipeline.",
135135
),

taskiq_pipelines/pipeliner.py

+8-16
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ def call_next(
7373
],
7474
param_name: Union[Optional[str], Literal[-1]] = None,
7575
**additional_kwargs: Any,
76-
) -> "Pipeline[_FuncParams, _T2]":
77-
...
76+
) -> "Pipeline[_FuncParams, _T2]": ...
7877

7978
@overload
8079
def call_next(
@@ -85,8 +84,7 @@ def call_next(
8584
],
8685
param_name: Union[Optional[str], Literal[-1]] = None,
8786
**additional_kwargs: Any,
88-
) -> "Pipeline[_FuncParams, _T2]":
89-
...
87+
) -> "Pipeline[_FuncParams, _T2]": ...
9088

9189
def call_next(
9290
self,
@@ -133,8 +131,7 @@ def call_after(
133131
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
134132
],
135133
**additional_kwargs: Any,
136-
) -> "Pipeline[_FuncParams, _T2]":
137-
...
134+
) -> "Pipeline[_FuncParams, _T2]": ...
138135

139136
@overload
140137
def call_after(
@@ -144,8 +141,7 @@ def call_after(
144141
AsyncTaskiqDecoratedTask[Any, _T2],
145142
],
146143
**additional_kwargs: Any,
147-
) -> "Pipeline[_FuncParams, _T2]":
148-
...
144+
) -> "Pipeline[_FuncParams, _T2]": ...
149145

150146
def call_after(
151147
self,
@@ -192,8 +188,7 @@ def map(
192188
skip_errors: bool = False,
193189
check_interval: float = 0.5,
194190
**additional_kwargs: Any,
195-
) -> "Pipeline[_FuncParams, List[_T2]]":
196-
...
191+
) -> "Pipeline[_FuncParams, List[_T2]]": ...
197192

198193
@overload
199194
def map(
@@ -206,8 +201,7 @@ def map(
206201
skip_errors: bool = False,
207202
check_interval: float = 0.5,
208203
**additional_kwargs: Any,
209-
) -> "Pipeline[_FuncParams, List[_T2]]":
210-
...
204+
) -> "Pipeline[_FuncParams, List[_T2]]": ...
211205

212206
def map(
213207
self,
@@ -263,8 +257,7 @@ def filter(
263257
skip_errors: bool = False,
264258
check_interval: float = 0.5,
265259
**additional_kwargs: Any,
266-
) -> "Pipeline[_FuncParams, _ReturnType]":
267-
...
260+
) -> "Pipeline[_FuncParams, _ReturnType]": ...
268261

269262
@overload
270263
def filter(
@@ -277,8 +270,7 @@ def filter(
277270
skip_errors: bool = False,
278271
check_interval: float = 0.5,
279272
**additional_kwargs: Any,
280-
) -> "Pipeline[_FuncParams, _ReturnType]":
281-
...
273+
) -> "Pipeline[_FuncParams, _ReturnType]": ...
282274

283275
def filter(
284276
self,

taskiq_pipelines/steps/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Package with default pipeline steps."""
2+
23
from logging import getLogger
34
from typing import Any, Dict
45

@@ -19,7 +20,7 @@ def parse_step(step_type: str, step_data: Dict[str, Any]) -> AbstractStep:
1920

2021

2122
__all__ = [
23+
"FilterStep",
2224
"MapperStep",
2325
"SequentialStep",
24-
"FilterStep",
2526
]

taskiq_pipelines/steps/filter.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
from typing import Any, Dict, Iterable, List, Optional, Union
33

44
import pydantic
5-
from taskiq import AsyncBroker, Context, TaskiqDepends, TaskiqError, TaskiqResult
5+
from taskiq import AsyncBroker, Context, TaskiqDepends, TaskiqResult
66
from taskiq.brokers.shared_broker import async_shared_broker
77
from taskiq.decor import AsyncTaskiqDecoratedTask
88
from taskiq.kicker import AsyncKicker
99

1010
from taskiq_pipelines.abc import AbstractStep
1111
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
12-
from taskiq_pipelines.exceptions import AbortPipeline
12+
from taskiq_pipelines.exceptions import AbortPipeline, FilterError
1313

1414

1515
@async_shared_broker.task(task_name="taskiq_pipelines.shared.filter_tasks")
16-
async def filter_tasks(
16+
async def filter_tasks( # noqa: C901
1717
task_ids: List[str],
1818
parent_task_id: str,
1919
check_interval: float,
@@ -62,7 +62,11 @@ async def filter_tasks(
6262
if result.is_err:
6363
if skip_errors:
6464
continue
65-
raise TaskiqError(f"Task {task_id} returned error. Filtering failed.")
65+
err_cause = None
66+
if isinstance(result.error, BaseException):
67+
err_cause = result.error
68+
raise FilterError(task_id=task_id, error=result.error) from err_cause
69+
6670
if result.return_value:
6771
filtered_results.append(value)
6872
return filtered_results
@@ -103,7 +107,7 @@ async def act(
103107
:raises AbortPipeline: if result is not iterable.
104108
"""
105109
if not isinstance(result.return_value, Iterable):
106-
raise AbortPipeline("Result of the previous task is not iterable.")
110+
raise AbortPipeline(reason="Result of the previous task is not iterable.")
107111
sub_task_ids = []
108112
for item in result.return_value:
109113
kicker: "AsyncKicker[Any, Any]" = AsyncKicker(

taskiq_pipelines/steps/mapper.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
AsyncTaskiqDecoratedTask,
88
Context,
99
TaskiqDepends,
10-
TaskiqError,
1110
TaskiqResult,
1211
async_shared_broker,
1312
)
1413
from taskiq.kicker import AsyncKicker
1514

1615
from taskiq_pipelines.abc import AbstractStep
1716
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
18-
from taskiq_pipelines.exceptions import AbortPipeline
17+
from taskiq_pipelines.exceptions import AbortPipeline, MappingError
1918

2019

2120
@async_shared_broker.task(task_name="taskiq_pipelines.shared.wait_tasks")
@@ -60,7 +59,11 @@ async def wait_tasks(
6059
if result.is_err:
6160
if skip_errors:
6261
continue
63-
raise TaskiqError(f"Task {task_id} returned error. Mapping failed.")
62+
err_cause = None
63+
if isinstance(result.error, BaseException):
64+
err_cause = result.error
65+
raise MappingError(task_id=task_id, error=result.error) from err_cause
66+
6467
results.append(result.return_value)
6568
return results
6669

@@ -106,7 +109,7 @@ async def act(
106109
sub_task_ids: List[str] = []
107110
return_value = result.return_value
108111
if not isinstance(return_value, Iterable):
109-
raise AbortPipeline("Result of the previous task is not iterable.")
112+
raise AbortPipeline(reason="Result of the previous task is not iterable.")
110113

111114
for item in return_value:
112115
kicker: "AsyncKicker[Any, Any]" = AsyncKicker(
@@ -121,14 +124,20 @@ async def act(
121124
task = await kicker.kiq(item, **self.additional_kwargs)
122125
sub_task_ids.append(task.task_id)
123126

124-
await wait_tasks.kicker().with_task_id(task_id).with_broker(
125-
broker,
126-
).with_labels(
127-
**{CURRENT_STEP: step_number, PIPELINE_DATA: pipe_data}, # type: ignore
128-
).kiq(
129-
sub_task_ids,
130-
check_interval=self.check_interval,
131-
skip_errors=self.skip_errors,
127+
await (
128+
wait_tasks.kicker()
129+
.with_task_id(task_id)
130+
.with_broker(
131+
broker,
132+
)
133+
.with_labels(
134+
**{CURRENT_STEP: step_number, PIPELINE_DATA: pipe_data}, # type: ignore
135+
)
136+
.kiq(
137+
sub_task_ids,
138+
check_interval=self.check_interval,
139+
skip_errors=self.skip_errors,
140+
)
132141
)
133142

134143
@classmethod

tests/test_steps.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def normal_task(i: bool) -> bool:
5757
@broker.task
5858
def aborting_task(i: int) -> bool:
5959
if i:
60-
raise AbortPipeline(text)
60+
raise AbortPipeline(reason=text)
6161
return True
6262

6363
pipe = Pipeline(broker, aborting_task).call_next(normal_task)
@@ -70,4 +70,4 @@ def aborting_task(i: int) -> bool:
7070
res = await sent.wait_result()
7171
assert res.is_err is True
7272
assert res.return_value is None
73-
assert res.error.args[0] == text
73+
assert text in res.error.args[0]

0 commit comments

Comments
 (0)