Skip to content

Commit 639cc40

Browse files
authored
Merge pull request #6 from taskiq-python/feat/add-queued-tasks
feat: add queued tasks
2 parents 381e7c7 + 0b3da89 commit 639cc40

26 files changed

+483
-534
lines changed

README.md

+76-30
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ Standalone admin panel with all data stored in SQLite database
66
- [Broker-agnostic admin panel for Taskiq](#broker-agnostic-admin-panel-for-taskiq)
77
- [Previews](#previews)
88
- [Usage](#usage)
9-
- [Docker Compose Examples](#docker-compose-examples)
9+
- [Docker Compose Example](#docker-compose-example)
10+
- [Task States](#task-states)
1011
- [Development](#development)
1112

1213
### Previews
@@ -16,7 +17,7 @@ Tasks Page | Task Details Page
1617

1718
### Usage
1819

19-
1) Add this middleware to your taskiq broker:
20+
1) Add this middleware to your project:
2021

2122
```python
2223
from typing import Any
@@ -26,30 +27,49 @@ from datetime import datetime, UTC
2627
import httpx
2728
from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage
2829

29-
TASKIQ_ADMIN_URL = "..." # or os.getenv() to use .env vars
30-
TASKIQ_ADMIN_API_TOKEN = "..." # or os.getenv() to use .env vars
31-
32-
3330
class TaskiqAdminMiddleware(TaskiqMiddleware):
34-
def __init__(self, taskiq_broker_name: str | None = None):
31+
def __init__(
32+
self,
33+
url: str,
34+
api_token: str,
35+
taskiq_broker_name: str | None = None,
36+
):
3537
super().__init__()
38+
self.url = url
39+
self.api_token = api_token
3640
self.__ta_broker_name = taskiq_broker_name
3741

42+
async def post_send(self, message):
43+
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
44+
async with httpx.AsyncClient() as client:
45+
await client.post(
46+
headers={"access-token": self.api_token},
47+
url=urljoin(self.url, f"/api/tasks/{message.task_id}/queued"),
48+
json={
49+
"args": message.args,
50+
"kwargs": message.kwargs,
51+
"taskName": message.task_name,
52+
"worker": self.__ta_broker_name,
53+
"queuedAt": now,
54+
},
55+
)
56+
return super().post_send(message)
57+
3858
async def pre_execute(self, message: TaskiqMessage):
3959
""""""
60+
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
4061
async with httpx.AsyncClient() as client:
4162
await client.post(
42-
headers={"access-token": TASKIQ_ADMIN_API_TOKEN},
43-
url=urljoin(TASKIQ_ADMIN_URL, f"/api/tasks/{message.task_id}/started"),
63+
headers={"access-token": self.api_token},
64+
url=urljoin(self.url, f"/api/tasks/{message.task_id}/started"),
4465
json={
66+
"startedAt": now,
4567
"args": message.args,
4668
"kwargs": message.kwargs,
4769
"taskName": message.task_name,
4870
"worker": self.__ta_broker_name,
49-
"startedAt": datetime.now(UTC).replace(tzinfo=None).isoformat(),
5071
},
5172
)
52-
5373
return super().pre_execute(message)
5474

5575
async def post_execute(
@@ -58,26 +78,50 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
5878
result: TaskiqResult[Any],
5979
):
6080
""""""
81+
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
6182
async with httpx.AsyncClient() as client:
6283
await client.post(
63-
headers={"access-token": TASKIQ_ADMIN_API_TOKEN},
64-
url=urljoin(TASKIQ_ADMIN_URL, f"/api/tasks/{message.task_id}/executed"),
84+
headers={"access-token": self.api_token},
85+
url=urljoin(
86+
self.url,
87+
f"/api/tasks/{message.task_id}/executed",
88+
),
6589
json={
90+
"finishedAt": now,
6691
"error": result.error
6792
if result.error is None
6893
else repr(result.error),
6994
"executionTime": result.execution_time,
7095
"returnValue": {"return_value": result.return_value},
71-
"finishedAt": datetime.now(UTC).replace(tzinfo=None).isoformat(),
7296
},
7397
)
74-
7598
return super().post_execute(message, result)
7699
```
77100

78-
2) Pull the image from GitHub Container Registry: `docker pull ghcr.io/taskiq-python/taskiq-admin:latest`
101+
2) Connect the middleware to your broker:
102+
103+
```python
104+
...
105+
broker = (
106+
ListQueueBroker(
107+
url=redis_url,
108+
queue_name="my_lovely_queue",
109+
)
110+
.with_result_backend(result_backend)
111+
.with_middlewares(
112+
TaskiqAdminMiddleware(
113+
url="http://localhost:3000", # the url to your taskiq-admin instance
114+
api_token="supersecret", # any secret enough string
115+
taskiq_broker_name="mybroker",
116+
)
117+
)
118+
)
119+
...
120+
```
79121

80-
3) Replace `TASKIQ_ADMIN_API_TOKEN` with any secret enough string and run:
122+
3) Pull the image from GitHub Container Registry: `docker pull ghcr.io/taskiq-python/taskiq-admin:latest`
123+
124+
4) Replace `TASKIQ_ADMIN_API_TOKEN` with any secret enough string and run:
81125
```bash
82126
docker run -d --rm \
83127
-p "3000:3000" \
@@ -87,17 +131,10 @@ docker run -d --rm \
87131
"ghcr.io/taskiq-python/taskiq-admin:latest"
88132
```
89133

90-
4) Go to `http://localhost:3000/tasks`
134+
5) Go to `http://localhost:3000/tasks`
91135

92-
### Docker Compose Examples
136+
### Docker Compose Example
93137

94-
.env file example:
95-
```bash
96-
TASKIQ_ADMIN_URL="http://taskiq_admin:3000"
97-
TASKIQ_ADMIN_API_TOKEN="supersecret"
98-
```
99-
100-
compose.yml file example
101138
```yaml
102139
services:
103140
queue:
@@ -106,8 +143,9 @@ services:
106143
dockerfile: ./Dockerfile
107144
container_name: my_queue
108145
command: taskiq worker app.tasks.queue:broker --workers 1 --max-async-tasks 20
109-
env_file:
110-
- .env
146+
environment:
147+
- TASKIQ_ADMIN_URL=http://taskiq_admin:3000
148+
- TASKIQ_ADMIN_API_TOKEN=supersecret
111149
depends_on:
112150
- redis
113151
- taskiq_admin
@@ -117,15 +155,23 @@ services:
117155
container_name: taskiq_admin
118156
ports:
119157
- 3000:3000
120-
env_file:
121-
- .env
158+
environment:
159+
- TASKIQ_ADMIN_API_TOKEN=supersecret
122160
volumes:
123161
- admin_data:/usr/database/
124162

125163
volumes:
126164
admin_data:
127165
```
128166
167+
### Task States
168+
Let's assume we have a task 'do_smth', there are all states it can embrace:
169+
1) `queued` - the task has been sent to the queue without an error
170+
2) `running` - the task is grabbed by a worker and is being processed
171+
3) `success` - the task is fully processed without any errors
172+
4) `failure` - an error occured during the task processing
173+
5) `abandoned` - taskiq-admin sets all 'running' tasks as 'abandoned' if there was a downtime between the time these tasks were in 'running' state and the time of next startup of taskiq-admin
174+
129175
### Development
130176
1) Run `pnpm install` to install all dependencies
131177
2) Run `pnpm db:push` to create the sqlite database if needed

env-example

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
DB_FILE_PATH=database/database.db
2-
BACKUP_FILE_PATH=database/backup.db
2+
BACKUP_FILE_PATH=database/backup.db
3+
TASKIQ_ADMIN_API_TOKEN=supersecret

nuxt.config.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
import tailwindcss from "@tailwindcss/vite"
1+
import tailwindcss from '@tailwindcss/vite'
22

33
export default defineNuxtConfig({
4-
compatibilityDate: "2024-11-01",
4+
compatibilityDate: '2024-11-01',
55
devtools: { enabled: true },
6-
css: ["~/assets/css/main.css"],
7-
srcDir: "src/",
6+
css: ['~/assets/css/main.css'],
7+
srcDir: 'src/',
88
imports: {
9-
scan: false,
9+
autoImport: false
1010
},
1111
vite: {
12-
plugins: [tailwindcss()],
12+
plugins: [tailwindcss()]
1313
},
1414
typescript: {
15-
strict: true,
16-
},
15+
strict: true
16+
}
1717
})

package.json

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,30 @@
22
"name": "nuxt-app",
33
"private": true,
44
"type": "module",
5-
"version": "1.5.0",
5+
"version": "1.6.0",
66
"scripts": {
77
"build": "nuxt build",
88
"dev": "nuxt dev",
99
"generate": "nuxt generate",
1010
"preview": "nuxt preview",
1111
"postinstall": "nuxt prepare",
12+
"typecheck": "tsc --noEmit",
1213
"db:push": "drizzle-kit push",
1314
"generate:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql",
1415
"generate:future:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql; sed -i '1s/^/PRAGMA journal_mode = WAL; PRAGMA synchronous = normal; PRAGMA journal_size_limit = 6144000;\\n/' dbschema.sql"
1516
},
1617
"dependencies": {
1718
"@internationalized/date": "^3.8.0",
1819
"@tailwindcss/vite": "^4.1.3",
19-
"@tanstack/vue-table": "^8.21.2",
20+
"@tanstack/vue-table": "^8.21.3",
2021
"@vueuse/core": "^12.8.2",
2122
"better-sqlite3": "^11.9.1",
2223
"bootstrap": "^5.3.3",
2324
"class-variance-authority": "^0.7.1",
2425
"clsx": "^2.1.1",
2526
"dayjs": "^1.11.13",
2627
"dotenv": "^16.4.7",
27-
"drizzle-orm": "^0.41.0",
28+
"drizzle-orm": "^0.42.0",
2829
"lucide-vue-next": "^0.487.0",
2930
"nuxt": "^3.16.2",
3031
"reka-ui": "^2.2.0",
@@ -41,7 +42,7 @@
4142
"@iconify-json/radix-icons": "^1.2.2",
4243
"@iconify/vue": "^4.3.0",
4344
"@types/better-sqlite3": "^7.6.12",
44-
"drizzle-kit": "^0.30.6",
45+
"drizzle-kit": "^0.31.0",
4546
"prettier": "^3.5.3",
4647
"typescript": "^5.8.3"
4748
}

0 commit comments

Comments
 (0)