From a0fb5489d0457853e69f30f3f04b7a204cb30580 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 16 Dec 2024 18:15:43 +0800 Subject: [PATCH] feat: web api --- main.py | 31 ++++++++------ requirements.txt | 9 +++- router/__init__.py | 0 router/route.py | 97 +++++++++++++++++++++++++++++++++++++++++++- schemas/__init__.py | 0 schemas/schemas.py | 70 ++++++++++++++++++++++++++++++++ services/__init__.py | 11 +++++ 7 files changed, 202 insertions(+), 16 deletions(-) create mode 100644 router/__init__.py create mode 100644 schemas/__init__.py create mode 100644 services/__init__.py diff --git a/main.py b/main.py index 501ba66..ecfb264 100644 --- a/main.py +++ b/main.py @@ -1,23 +1,17 @@ import logging from contextlib import asynccontextmanager -from fastapi import FastAPI, status, Request +from fastapi import FastAPI, status +from fastapi.responses import ORJSONResponse from pydantic import ValidationError -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore -from router.route import module_router, sms_router -from services.utils.config_parser import config +from router.route import module_router, sms_router, schedule_router +from services import scheduler +from schemas.schemas import ErrorModel, ErrorDetail logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger("PyAirLink") -jobstores = { - 'default': SQLAlchemyJobStore(url=config.sqlite_url()) -} - -scheduler = AsyncIOScheduler(jobstores=jobstores) - @asynccontextmanager async def lifespan(app: FastAPI): @@ -32,7 +26,18 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan, title='PyAirLink API', version='0.0.1') app.include_router(module_router) app.include_router(sms_router) +app.include_router(schedule_router) + + +@app.exception_handler(ValidationError) +async def validation_exception_handler(exc: ValidationError): + error_response = ErrorModel(detail=[ErrorDetail(loc=err.get('loc'), msg=err.get('msg'), type=err.get('type')) for err in exc.errors()]) + return ORJSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content=error_response.model_dump() + ) -if __name__ == '__main__': - pass \ No newline at end of file +if __name__ == "__main__": + import uvicorn + uvicorn.run("main:app", host="0.0.0.0", port=10103, reload=False) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 048808e..8fdf5c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,11 @@ pydantic~=2.10.3 pyserial~=3.5 smspdudecoder~=2.1.0 requests~=2.32.3 -APScheduler~=3.11.0 \ No newline at end of file +APScheduler~=3.11.0 +uvicorn~=0.34.0 + +# SQLAlchemy is used indirectly by APScheduler SQLAlchemyJobStore +SQLAlchemy~=2.0.36 + +# orjson is used indirectly by fastapi Custom Response +orjson~=3.10.12 \ No newline at end of file diff --git a/router/__init__.py b/router/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/router/route.py b/router/route.py index 47a18e1..970e62f 100644 --- a/router/route.py +++ b/router/route.py @@ -1,6 +1,12 @@ -from fastapi import APIRouter +from typing import List + +from fastapi import APIRouter, Depends, Query from fastapi.responses import ORJSONResponse +from services import scheduler +from schemas import schemas +from services.initialize import send_at_command, send_sms +from services.utils.commands import at_commands module_router = APIRouter( prefix="/api/v1/module", @@ -18,4 +24,91 @@ prefix="/api/v1/schedule", tags=["schedule"], responses={404: {"description": "Not found"}}, -) \ No newline at end of file +) + + +@module_router.post("/command/base", response_model=schemas.CommandResponse, summary='执行任意AT命令', + description= +""" +需要自己拼接所有参数 +""" + ) +async def command_base(params: schemas.CommandBaseRequest = Depends()): + response = send_at_command(at_commands.base(params.command), keywords=params.keyword, timeout=params.timeout) + return {'status': 'success' if response else 'failure', 'content': response} + + +@module_router.post("/command/restart", response_model=schemas.CommandResponse, summary='重启模块', + description= +""" +""" + ) +async def command_reset(params: schemas.Command = Depends()): + response = send_at_command(at_commands.reset(), keywords=params.keyword, timeout=params.timeout) + return {'status': 'success' if response else 'failure', 'content': response} + + +@sms_router.post("/sms/send", response_model=schemas.CommandResponse, summary='发送短信', + description= +""" +尚未支持长短信,不要大于70个字符 +""" + ) +async def immediately_send_sms(params: schemas.SendSMSRequest = Depends()): + response = send_sms(f'+{params.country}{params.number}', text=params.message) + return {'status': 'success' if response else 'failure', 'content': response} + + +@schedule_router.get("/schedule/list", response_model=List[schemas.ListScheduleJob], summary='查看定时任务', + description= +""" +查看所有执行中的定时任务 +""" + ) +async def list_schedule(): + jobs = scheduler.get_jobs() + if jobs: + return [{'id': job.id, 'next_run_time': job.next_run_time, 'trigger': job.trigger, 'func': job.func} for job in jobs] + return ORJSONResponse(status_code=404, content={"status": "fail", "message": f"no jobs in schedule"}) + + +@schedule_router.delete("/schedule/del", response_model=schemas.CommandResponse, summary='删除定时任务', + description= +""" +删除定时任务 +""" + ) +async def del_schedule(job_id: str = Query()): + job = scheduler.get_job(job_id=job_id) + if job: + scheduler.remove_job(job_id=job_id) + return {'status': 'success', 'content': job_id} + return 404 + + +@schedule_router.post("/schedule/add/sms", response_model=schemas.CommandResponse, summary='添加定时发短信任务', + description= +""" +""" + ) +async def add_sms_schedule(params: schemas.ScheduleSendSMSRequest = Depends()): + try: + job = scheduler.add_job(func=send_sms, args=(f'+{params.country}{params.number}', params.message,), + trigger='interval', seconds=params.seconds, jobstore='default') + return {'status': 'success', 'content': job.id} + except Exception as e: + return ORJSONResponse(status_code=400, content={"status": "error", "message": f"An error occurred: {str(e)}"}) + + +@schedule_router.post("/schedule/add/restart", response_model=schemas.CommandResponse, summary='添加定时重启任务', + description= +""" +""" + ) +async def add_restart_schedule(params: schemas.ScheduleSendSMSRequest = Depends()): + try: + job = scheduler.add_job(func=send_at_command, args=(at_commands.reset(),), + trigger='interval', seconds=params.seconds, jobstore='default') + return {'status': 'success', 'content': job.id} + except Exception as e: + return ORJSONResponse(status_code=400, content={"status": "error", "message": f"An error occurred: {str(e)}"}) diff --git a/schemas/__init__.py b/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/schemas/schemas.py b/schemas/schemas.py index e69de29..0a73aaa 100644 --- a/schemas/schemas.py +++ b/schemas/schemas.py @@ -0,0 +1,70 @@ +from datetime import datetime +from typing import List, Union, Optional, Any, Dict + +from pydantic import BaseModel, Field, field_validator + + +class ErrorDetail(BaseModel): + loc: List[Union[str, int]] + msg: str + type: str + + +class ErrorModel(BaseModel): + detail: tuple[ErrorDetail] + + +class ResponseDetail(BaseModel): + status: str + data: List[Any] + message: str + + +class Command(BaseModel): + keyword: Optional[str] = Field(default=None, description="AT命令返回的关键字, 一般是'OK'或者 'ERROR',不传则两个都检测") + timeout: float = Field(default=3, description="等待AT命令回应的超时时间") + + @field_validator('keyword', mode='after', check_fields=False) + @classmethod + def check_message(cls, v: str) -> list[str]: + if v is None: + return ['OK', 'ERROR'] + return [v] + + +class CommandRequest(Command): + command: str + + +class CommandBaseRequest(CommandRequest): + command: str + + +class CommandResponse(BaseModel): + status: str + content: str + +class SendSMSRequest(BaseModel): + country: int + number: int + message: str + + @field_validator('message') + @classmethod + def check_message(cls, v: str) -> str: + if len(v) > 70: + raise ValueError(f"Do not support long sms yet, makesure less than 71 characters.") + return v + + +class ListScheduleJob(BaseModel): + id: str + next_run_time: datetime + trigger: str + func: str + + +class ScheduleSendSMSRequest(SendSMSRequest): + id: str + seconds: int + next_run_time: datetime = Field(default=datetime.now(), description="下次执行的时间") \ No newline at end of file diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..191a2fb --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,11 @@ +from zoneinfo import ZoneInfo + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from services.utils.config_parser import config + +jobstores = { + 'default': SQLAlchemyJobStore(url=config.sqlite_url()) +} + +scheduler = AsyncIOScheduler(timezone=ZoneInfo("Asia/Shanghai"), jobstores=jobstores)