Skip to content

Commit 742036d

Browse files
IlyasDevelopmentIlyas Gasanov
authored and
Ilyas Gasanov
committed
[DOP-19794] Do not block event loop when using Celery (#116)
1 parent 51293e5 commit 742036d

File tree

3 files changed

+29
-5
lines changed

3 files changed

+29
-5
lines changed

syncmaster/backend/api/v1/runs.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
import asyncio
34
from datetime import datetime
45
from typing import Annotated
56

@@ -129,7 +130,8 @@ async def start_run(
129130
)
130131

131132
try:
132-
celery.send_task(
133+
await asyncio.to_thread(
134+
celery.send_task,
133135
"run_transfer_task",
134136
kwargs={"run_id": run.id},
135137
queue=transfer.queue.name,

tests/test_unit/test_runs/test_create_run.py

+24-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from unittest.mock import AsyncMock
2+
13
import pytest
24
from httpx import AsyncClient
35
from sqlalchemy import desc, select
@@ -19,7 +21,8 @@ async def test_developer_plus_can_create_run_of_transfer_his_group(
1921
) -> None:
2022
# Arrange
2123
user = group_transfer.owner_group.get_member_of_role(role_developer_plus)
22-
mocker.patch("syncmaster.worker.config.celery.send_task")
24+
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
25+
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
2326

2427
run = (
2528
await session.scalars(
@@ -55,6 +58,13 @@ async def test_developer_plus_can_create_run_of_transfer_his_group(
5558
}
5659
assert result.status_code == 200
5760

61+
mock_to_thread.assert_awaited_once_with(
62+
mock_send_task,
63+
"run_transfer_task",
64+
kwargs={"run_id": run.id},
65+
queue=group_transfer.queue.name,
66+
)
67+
5868

5969
async def test_groupless_user_cannot_create_run(
6070
client: AsyncClient,
@@ -65,6 +75,7 @@ async def test_groupless_user_cannot_create_run(
6575
) -> None:
6676
# Arrange
6777
mocker.patch("syncmaster.worker.config.celery.send_task")
78+
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
6879

6980
# Act
7081
result = await client.post(
@@ -94,6 +105,7 @@ async def test_group_member_cannot_create_run_of_other_group_transfer(
94105
):
95106
# Arrange
96107
mocker.patch("syncmaster.worker.config.celery.send_task")
108+
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
97109
user = group.get_member_of_role(role_guest_plus)
98110

99111
# Act
@@ -132,7 +144,8 @@ async def test_superuser_can_create_run(
132144
settings.worker.LOG_URL_TEMPLATE = (
133145
"https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
134146
)
135-
mocker.patch("syncmaster.worker.config.celery.send_task")
147+
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
148+
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
136149

137150
# Act
138151
result = await client.post(
@@ -161,6 +174,12 @@ async def test_superuser_can_create_run(
161174
assert result.status_code == 200
162175
assert "correlation_id" in response.get("log_url")
163176
assert "run_id" in response.get("log_url")
177+
mock_to_thread.assert_awaited_once_with(
178+
mock_send_task,
179+
"run_transfer_task",
180+
kwargs={"run_id": run.id},
181+
queue=group_transfer.queue.name,
182+
)
164183

165184

166185
async def test_unauthorized_user_cannot_create_run(
@@ -170,6 +189,7 @@ async def test_unauthorized_user_cannot_create_run(
170189
) -> None:
171190
# Arrange
172191
mocker.patch("syncmaster.worker.config.celery.send_task")
192+
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
173193

174194
# Act
175195
result = await client.post(
@@ -198,6 +218,7 @@ async def test_group_member_cannot_create_run_of_unknown_transfer_error(
198218
# Arrange
199219
user = group_transfer.owner_group.get_member_of_role(role_guest_plus)
200220
mocker.patch("syncmaster.worker.config.celery.send_task")
221+
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
201222

202223
# Act
203224
result = await client.post(
@@ -225,6 +246,7 @@ async def test_superuser_cannot_create_run_of_unknown_transfer_error(
225246
) -> None:
226247
# Arrange
227248
mocker.patch("syncmaster.worker.config.celery.send_task")
249+
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
228250

229251
# Act
230252
result = await client.post(

tests/test_unit/test_scheduler/test_transfer_job_manager.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ async def test_send_job_to_celery_with_success(
8080
group_transfer: MockTransfer,
8181
):
8282
# Arrange
83-
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task", new=AsyncMock())
83+
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
8484
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
8585

8686
# Act
@@ -107,7 +107,7 @@ async def test_send_job_to_celery_with_failure(
107107
group_transfer: MockTransfer,
108108
):
109109
# Arrange
110-
mocker.patch("syncmaster.worker.config.celery.send_task", new=AsyncMock())
110+
mocker.patch("syncmaster.worker.config.celery.send_task")
111111
mocker.patch("asyncio.to_thread", new_callable=AsyncMock, side_effect=KombuError)
112112

113113
# Act & Assert

0 commit comments

Comments
 (0)