From 730e3f720bd110c8b7477bfae361c34ed7e7c6ba Mon Sep 17 00:00:00 2001 From: Jason <81298350+Deutscher775@users.noreply.github.com> Date: Sun, 1 Dec 2024 18:41:55 +0100 Subject: [PATCH] adding queue handler for message caching (not implemented in the api yet) --- src/astroidapi/endpoint_update_handler.py | 4 +- src/astroidapi/health_check.py | 8 +-- src/astroidapi/queue_processor.py | 49 +++++++++++++++ src/astroidapi/surrealdb_handler.py | 74 +++++++++++++++++++++++ 4 files changed, 129 insertions(+), 6 deletions(-) create mode 100644 src/astroidapi/queue_processor.py diff --git a/src/astroidapi/endpoint_update_handler.py b/src/astroidapi/endpoint_update_handler.py index 1bbcc8b..761bc6a 100644 --- a/src/astroidapi/endpoint_update_handler.py +++ b/src/astroidapi/endpoint_update_handler.py @@ -7,6 +7,7 @@ import fastapi import astroidapi.sending_handler as sending_handler import astroidapi.surrealdb_handler as surrealdb_handler +import astroidapi.queue_processor as queue_processor from Bot import config import astroidapi.health_check as health_check @@ -254,8 +255,7 @@ async def update_endpoint( finally: if not updated_json["config"]["self-user"] is True: if updated_json["meta"]["trigger"]: - asyncio.create_task(sending_handler.SendingHandler.distribute(endpoint, updated_json)) - print("Distributed") + asyncio.create_task(queue_processor.QueueProcessor.handleUpdatedEndpointData(endpoint, updated_json)) waiting_secs = 0 max_secs = 10 while True: diff --git a/src/astroidapi/health_check.py b/src/astroidapi/health_check.py index 37d5970..55ad161 100644 --- a/src/astroidapi/health_check.py +++ b/src/astroidapi/health_check.py @@ -49,9 +49,9 @@ "type_reply": "meta.message.reply", "isReply": "meta.message.isReply", "reply": { - "message": "meta.message.reply.message", - "author": "meta.message.reply.author" - }, + "message": "meta.message.reply.message", + "author": "meta.message.reply.author" + }, "type_author": "meta.message.author", "author": { "name": "meta.message.author.name", @@ -108,7 +108,7 @@ async def check(cls, endpoint): "isbeta": False }, "meta": { - "_message_cache": None, + "_message_cache": [], "sender-channel": None, "trigger": False, "sender": None, diff --git a/src/astroidapi/queue_processor.py b/src/astroidapi/queue_processor.py new file mode 100644 index 0000000..11ee414 --- /dev/null +++ b/src/astroidapi/queue_processor.py @@ -0,0 +1,49 @@ +import threading +import time +import asyncio +from astroidapi import surrealdb_handler, sending_handler +import logging +import json +import os + + +class QueueProcessor: + @classmethod + async def appendMessage(cls, endpoint, updated_json): + print(f"Appending message for endpoint {endpoint}") + if updated_json is None: + raise Exception("You must provide a valid 'updated_json' json object to appendMessage.") + message = updated_json["meta"]["message"] + message_sender = updated_json["meta"]["sender"] + message_sender_channel = updated_json["meta"]["sender-channel"] + message["sender"] = message_sender + message["sender-channel"] = message_sender_channel + await surrealdb_handler.QueueHandler.append_to_queue(endpoint, message) + return True + + @classmethod + async def sendMessage(cls, endpoint): + print(f"Sending message for endpoint {endpoint}") + queue = await surrealdb_handler.QueueHandler.get_queue(endpoint) + print(f"Queue: {queue}") + if len(queue) == 0: + return False + elif len(queue) == 1: + message = queue[0] + print(f"Loading message {message}") + updated = await surrealdb_handler.QueueHandler.loadMessage(endpoint, message) + await sending_handler.SendingHandler.distribute(endpoint, updated) + return True + elif len(queue) > 1: + for message in queue: + updated = await surrealdb_handler.QueueHandler.loadMessage(endpoint, message) + await sending_handler.SendingHandler.distribute(endpoint, updated) + return True + else: + raise Exception("Unknown error in sendMessage while processing queue. Queue length is unsupported type or negative.") + + @classmethod + async def handleUpdatedEndpointData(cls, endpoint, updated_json): + await cls.appendMessage(endpoint, updated_json) + await cls.sendMessage(endpoint) + \ No newline at end of file diff --git a/src/astroidapi/surrealdb_handler.py b/src/astroidapi/surrealdb_handler.py index c01c4e9..483642a 100644 --- a/src/astroidapi/surrealdb_handler.py +++ b/src/astroidapi/surrealdb_handler.py @@ -159,6 +159,80 @@ async def write_to_structure(endpoint, key, value): raise errors.SurrealDBHandler.UpdateEndpointError(e) +class QueueHandler: + @classmethod + async def get_queue(cls, endpoint: int): + try: + async with Surreal(config.SDB_URL) as db: + await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS}) + await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE) + data = await db.select(f"endpoints:`{endpoint}`") + return data["meta"]["_message_cache"] + except Exception as e: + raise errors.SurrealDBHandler.GetEndpointError(e) + + @classmethod + async def append_to_queue(cls, endpoint: int, message: dict): + try: + async with Surreal(config.SDB_URL) as db: + await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS}) + await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE) + data = await db.select(f"endpoints:`{endpoint}`") + data['meta']['_message_cache'].append(message) + appending = json.dumps(data['meta']['_message_cache']) + print(appending) + await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = {appending}") + return await db.select(f"endpoints:`{endpoint}`") + except Exception as e: + raise errors.SurrealDBHandler.UpdateEndpointError(e) + + @classmethod + async def remove_from_queue(cls, endpoint: int, message: dict): + try: + async with Surreal(config.SDB_URL) as db: + await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS}) + await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE) + data = await db.select(f"endpoints:`{endpoint}`") + data["meta"]["_message_cache"].remove(message) + if data["meta"]["_message_cache"] is None: + await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = []") + return await db.select(f"endpoints:`{endpoint}`") + await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = {json.dumps(data['meta']['_message_cache'])}") + return await db.select(f"endpoints:`{endpoint}`") + except Exception as e: + raise errors.SurrealDBHandler.UpdateEndpointError(e) + + @classmethod + async def clear_queue(cls, endpoint: int): + try: + async with Surreal(config.SDB_URL) as db: + await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS}) + await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE) + await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = []") + return await db.select(f"endpoints:`{endpoint}`") + except Exception as e: + raise errors.SurrealDBHandler.UpdateEndpointError(e) + + @classmethod + async def loadMessage(cls, endpoint: int, message: dict): + try: + async with Surreal(config.SDB_URL) as db: + await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS}) + await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE) + data = await db.select(f"endpoints:`{endpoint}`") + print(message["sender"]) + print(message["sender-channel"]) + first = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.sender = '{message['sender']}'") + print(f"First: {first}") + second = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.`sender-channel` = '{message['sender-channel']}'") + last = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.message = {message}") + await cls.remove_from_queue(endpoint, message) + return await db.select(f"endpoints:`{endpoint}`") + except Exception as e: + raise errors.SurrealDBHandler.GetEndpointError(e) + + + class AttachmentProcessor: @classmethod