Skip to content

Commit

Permalink
adding queue handler for message caching (not implemented in the api …
Browse files Browse the repository at this point in the history
…yet)
  • Loading branch information
Deutscher775 committed Dec 1, 2024
1 parent 14b7e10 commit 730e3f7
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/astroidapi/endpoint_update_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import fastapi
import astroidapi.sending_handler as sending_handler
import astroidapi.surrealdb_handler as surrealdb_handler
import astroidapi.queue_processor as queue_processor
from Bot import config
import astroidapi.health_check as health_check

Expand Down Expand Up @@ -254,8 +255,7 @@ async def update_endpoint(
finally:
if not updated_json["config"]["self-user"] is True:
if updated_json["meta"]["trigger"]:
asyncio.create_task(sending_handler.SendingHandler.distribute(endpoint, updated_json))
print("Distributed")
asyncio.create_task(queue_processor.QueueProcessor.handleUpdatedEndpointData(endpoint, updated_json))
waiting_secs = 0
max_secs = 10
while True:
Expand Down
8 changes: 4 additions & 4 deletions src/astroidapi/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
"type_reply": "meta.message.reply",
"isReply": "meta.message.isReply",
"reply": {
"message": "meta.message.reply.message",
"author": "meta.message.reply.author"
},
"message": "meta.message.reply.message",
"author": "meta.message.reply.author"
},
"type_author": "meta.message.author",
"author": {
"name": "meta.message.author.name",
Expand Down Expand Up @@ -108,7 +108,7 @@ async def check(cls, endpoint):
"isbeta": False
},
"meta": {
"_message_cache": None,
"_message_cache": [],
"sender-channel": None,
"trigger": False,
"sender": None,
Expand Down
49 changes: 49 additions & 0 deletions src/astroidapi/queue_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import threading
import time
import asyncio
from astroidapi import surrealdb_handler, sending_handler
import logging
import json
import os


class QueueProcessor:
@classmethod
async def appendMessage(cls, endpoint, updated_json):
print(f"Appending message for endpoint {endpoint}")
if updated_json is None:
raise Exception("You must provide a valid 'updated_json' json object to appendMessage.")
message = updated_json["meta"]["message"]
message_sender = updated_json["meta"]["sender"]
message_sender_channel = updated_json["meta"]["sender-channel"]
message["sender"] = message_sender
message["sender-channel"] = message_sender_channel
await surrealdb_handler.QueueHandler.append_to_queue(endpoint, message)
return True

@classmethod
async def sendMessage(cls, endpoint):
print(f"Sending message for endpoint {endpoint}")
queue = await surrealdb_handler.QueueHandler.get_queue(endpoint)
print(f"Queue: {queue}")
if len(queue) == 0:
return False
elif len(queue) == 1:
message = queue[0]
print(f"Loading message {message}")
updated = await surrealdb_handler.QueueHandler.loadMessage(endpoint, message)
await sending_handler.SendingHandler.distribute(endpoint, updated)
return True
elif len(queue) > 1:
for message in queue:
updated = await surrealdb_handler.QueueHandler.loadMessage(endpoint, message)
await sending_handler.SendingHandler.distribute(endpoint, updated)
return True
else:
raise Exception("Unknown error in sendMessage while processing queue. Queue length is unsupported type or negative.")

@classmethod
async def handleUpdatedEndpointData(cls, endpoint, updated_json):
await cls.appendMessage(endpoint, updated_json)
await cls.sendMessage(endpoint)

74 changes: 74 additions & 0 deletions src/astroidapi/surrealdb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,80 @@ async def write_to_structure(endpoint, key, value):
raise errors.SurrealDBHandler.UpdateEndpointError(e)


class QueueHandler:
@classmethod
async def get_queue(cls, endpoint: int):
try:
async with Surreal(config.SDB_URL) as db:
await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS})
await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE)
data = await db.select(f"endpoints:`{endpoint}`")
return data["meta"]["_message_cache"]
except Exception as e:
raise errors.SurrealDBHandler.GetEndpointError(e)

@classmethod
async def append_to_queue(cls, endpoint: int, message: dict):
try:
async with Surreal(config.SDB_URL) as db:
await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS})
await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE)
data = await db.select(f"endpoints:`{endpoint}`")
data['meta']['_message_cache'].append(message)
appending = json.dumps(data['meta']['_message_cache'])
print(appending)
await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = {appending}")
return await db.select(f"endpoints:`{endpoint}`")
except Exception as e:
raise errors.SurrealDBHandler.UpdateEndpointError(e)

@classmethod
async def remove_from_queue(cls, endpoint: int, message: dict):
try:
async with Surreal(config.SDB_URL) as db:
await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS})
await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE)
data = await db.select(f"endpoints:`{endpoint}`")
data["meta"]["_message_cache"].remove(message)
if data["meta"]["_message_cache"] is None:
await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = []")
return await db.select(f"endpoints:`{endpoint}`")
await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = {json.dumps(data['meta']['_message_cache'])}")
return await db.select(f"endpoints:`{endpoint}`")
except Exception as e:
raise errors.SurrealDBHandler.UpdateEndpointError(e)

@classmethod
async def clear_queue(cls, endpoint: int):
try:
async with Surreal(config.SDB_URL) as db:
await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS})
await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE)
await db.query(f"UPDATE endpoints:`{endpoint}` SET meta._message_cache = []")
return await db.select(f"endpoints:`{endpoint}`")
except Exception as e:
raise errors.SurrealDBHandler.UpdateEndpointError(e)

@classmethod
async def loadMessage(cls, endpoint: int, message: dict):
try:
async with Surreal(config.SDB_URL) as db:
await db.signin({"user": config.SDB_USER, "pass": config.SDB_PASS})
await db.use(config.SDB_NAMESPACE, config.SDB_DATABASE)
data = await db.select(f"endpoints:`{endpoint}`")
print(message["sender"])
print(message["sender-channel"])
first = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.sender = '{message['sender']}'")
print(f"First: {first}")
second = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.`sender-channel` = '{message['sender-channel']}'")
last = await db.query(f"UPDATE endpoints:`{endpoint}` SET meta.message = {message}")
await cls.remove_from_queue(endpoint, message)
return await db.select(f"endpoints:`{endpoint}`")
except Exception as e:
raise errors.SurrealDBHandler.GetEndpointError(e)



class AttachmentProcessor:

@classmethod
Expand Down

0 comments on commit 730e3f7

Please sign in to comment.