@@ -6,7 +6,8 @@ Standalone admin panel with all data stored in SQLite database
6
6
- [ Broker-agnostic admin panel for Taskiq] ( #broker-agnostic-admin-panel-for-taskiq )
7
7
- [ Previews] ( #previews )
8
8
- [ Usage] ( #usage )
9
- - [ Docker Compose Examples] ( #docker-compose-examples )
9
+ - [ Docker Compose Example] ( #docker-compose-example )
10
+ - [ Task States] ( #task-states )
10
11
- [ Development] ( #development )
11
12
12
13
### Previews
@@ -16,7 +17,7 @@ Tasks Page | Task Details Page
16
17
17
18
### Usage
18
19
19
- 1 ) Add this middleware to your taskiq broker :
20
+ 1 ) Add this middleware to your project :
20
21
21
22
``` python
22
23
from typing import Any
@@ -26,30 +27,49 @@ from datetime import datetime, UTC
26
27
import httpx
27
28
from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage
28
29
29
- TASKIQ_ADMIN_URL = " ..." # or os.getenv() to use .env vars
30
- TASKIQ_ADMIN_API_TOKEN = " ..." # or os.getenv() to use .env vars
31
-
32
-
33
30
class TaskiqAdminMiddleware (TaskiqMiddleware ):
34
- def __init__ (self , taskiq_broker_name : str | None = None ):
31
+ def __init__ (
32
+ self ,
33
+ url : str ,
34
+ api_token : str ,
35
+ taskiq_broker_name : str | None = None ,
36
+ ):
35
37
super ().__init__ ()
38
+ self .url = url
39
+ self .api_token = api_token
36
40
self .__ta_broker_name = taskiq_broker_name
37
41
42
+ async def post_send (self , message ):
43
+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
44
+ async with httpx.AsyncClient() as client:
45
+ await client.post(
46
+ headers = {" access-token" : self .api_token},
47
+ url = urljoin(self .url, f " /api/tasks/ { message.task_id} /queued " ),
48
+ json = {
49
+ " args" : message.args,
50
+ " kwargs" : message.kwargs,
51
+ " taskName" : message.task_name,
52
+ " worker" : self .__ta_broker_name,
53
+ " queuedAt" : now,
54
+ },
55
+ )
56
+ return super ().post_send(message)
57
+
38
58
async def pre_execute (self , message : TaskiqMessage):
39
59
""" """
60
+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
40
61
async with httpx.AsyncClient() as client:
41
62
await client.post(
42
- headers = {" access-token" : TASKIQ_ADMIN_API_TOKEN },
43
- url = urljoin(TASKIQ_ADMIN_URL , f " /api/tasks/ { message.task_id} /started " ),
63
+ headers = {" access-token" : self .api_token },
64
+ url = urljoin(self .url , f " /api/tasks/ { message.task_id} /started " ),
44
65
json = {
66
+ " startedAt" : now,
45
67
" args" : message.args,
46
68
" kwargs" : message.kwargs,
47
69
" taskName" : message.task_name,
48
70
" worker" : self .__ta_broker_name,
49
- " startedAt" : datetime.now(UTC ).replace(tzinfo = None ).isoformat(),
50
71
},
51
72
)
52
-
53
73
return super ().pre_execute(message)
54
74
55
75
async def post_execute (
@@ -58,26 +78,50 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
58
78
result : TaskiqResult[Any],
59
79
):
60
80
""" """
81
+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
61
82
async with httpx.AsyncClient() as client:
62
83
await client.post(
63
- headers = {" access-token" : TASKIQ_ADMIN_API_TOKEN },
64
- url = urljoin(TASKIQ_ADMIN_URL , f " /api/tasks/ { message.task_id} /executed " ),
84
+ headers = {" access-token" : self .api_token},
85
+ url = urljoin(
86
+ self .url,
87
+ f " /api/tasks/ { message.task_id} /executed " ,
88
+ ),
65
89
json = {
90
+ " finishedAt" : now,
66
91
" error" : result.error
67
92
if result.error is None
68
93
else repr (result.error),
69
94
" executionTime" : result.execution_time,
70
95
" returnValue" : {" return_value" : result.return_value},
71
- " finishedAt" : datetime.now(UTC ).replace(tzinfo = None ).isoformat(),
72
96
},
73
97
)
74
-
75
98
return super ().post_execute(message, result)
76
99
```
77
100
78
- 2 ) Pull the image from GitHub Container Registry: ` docker pull ghcr.io/taskiq-python/taskiq-admin:latest `
101
+ 2 ) Connect the middleware to your broker:
102
+
103
+ ``` python
104
+ ...
105
+ broker = (
106
+ ListQueueBroker(
107
+ url = redis_url,
108
+ queue_name = " my_lovely_queue" ,
109
+ )
110
+ .with_result_backend(result_backend)
111
+ .with_middlewares(
112
+ TaskiqAdminMiddleware(
113
+ url = " http://localhost:3000" , # the url to your taskiq-admin instance
114
+ api_token = " supersecret" , # any secret enough string
115
+ taskiq_broker_name = " mybroker" ,
116
+ )
117
+ )
118
+ )
119
+ ...
120
+ ```
79
121
80
- 3 ) Replace ` TASKIQ_ADMIN_API_TOKEN ` with any secret enough string and run:
122
+ 3 ) Pull the image from GitHub Container Registry: ` docker pull ghcr.io/taskiq-python/taskiq-admin:latest `
123
+
124
+ 4 ) Replace ` TASKIQ_ADMIN_API_TOKEN ` with any secret enough string and run:
81
125
``` bash
82
126
docker run -d --rm \
83
127
-p " 3000:3000" \
@@ -87,17 +131,10 @@ docker run -d --rm \
87
131
" ghcr.io/taskiq-python/taskiq-admin:latest"
88
132
```
89
133
90
- 4 ) Go to ` http://localhost:3000/tasks `
134
+ 5 ) Go to ` http://localhost:3000/tasks `
91
135
92
- ### Docker Compose Examples
136
+ ### Docker Compose Example
93
137
94
- .env file example:
95
- ``` bash
96
- TASKIQ_ADMIN_URL=" http://taskiq_admin:3000"
97
- TASKIQ_ADMIN_API_TOKEN=" supersecret"
98
- ```
99
-
100
- compose.yml file example
101
138
``` yaml
102
139
services :
103
140
queue :
@@ -106,8 +143,9 @@ services:
106
143
dockerfile : ./Dockerfile
107
144
container_name : my_queue
108
145
command : taskiq worker app.tasks.queue:broker --workers 1 --max-async-tasks 20
109
- env_file :
110
- - .env
146
+ environment :
147
+ - TASKIQ_ADMIN_URL=http://taskiq_admin:3000
148
+ - TASKIQ_ADMIN_API_TOKEN=supersecret
111
149
depends_on :
112
150
- redis
113
151
- taskiq_admin
@@ -117,15 +155,23 @@ services:
117
155
container_name : taskiq_admin
118
156
ports :
119
157
- 3000:3000
120
- env_file :
121
- - .env
158
+ environment :
159
+ - TASKIQ_ADMIN_API_TOKEN=supersecret
122
160
volumes :
123
161
- admin_data:/usr/database/
124
162
125
163
volumes :
126
164
admin_data :
127
165
` ` `
128
166
167
+ ### Task States
168
+ Let's assume we have a task 'do_smth', there are all states it can embrace:
169
+ 1) ` queued` - the task has been sent to the queue without an error
170
+ 2) `running` - the task is grabbed by a worker and is being processed
171
+ 3) `success` - the task is fully processed without any errors
172
+ 4) `failure` - an error occured during the task processing
173
+ 5) `abandoned` - taskiq-admin sets all 'running' tasks as 'abandoned' if there was a downtime between the time these tasks were in 'running' state and the time of next startup of taskiq-admin
174
+
129
175
# ## Development
130
176
1) Run `pnpm install` to install all dependencies
131
177
2) Run `pnpm db:push` to create the sqlite database if needed
0 commit comments