From 6615be68348615ca41dfce9ca28a0d9161576014 Mon Sep 17 00:00:00 2001 From: tubleronchik Date: Fri, 23 Aug 2024 17:29:11 +0200 Subject: [PATCH] receive reports via libp2p --- registar/src/websocket.py | 32 ++++++------- registar/utils/robonomics.py | 4 +- rrs_operator/rrs_operator.py | 7 +++ rrs_operator/src/report_manager.py | 1 + rrs_operator/src/ws_client.py | 72 ++++++++++++++++++++++++++++++ rrs_operator/utils/messages.py | 6 +++ 6 files changed, 105 insertions(+), 17 deletions(-) create mode 100644 rrs_operator/src/ws_client.py create mode 100644 rrs_operator/utils/messages.py diff --git a/registar/src/websocket.py b/registar/src/websocket.py index c97982d..2d1677f 100644 --- a/registar/src/websocket.py +++ b/registar/src/websocket.py @@ -19,6 +19,8 @@ PINATA_API_KEY = os.getenv("PINATA_API_KEY") PINATA_API_SECRET = os.getenv("PINATA_API_SECRET") +PAID_SERVICE = False + class WSClient: def __init__(self, odoo) -> None: @@ -65,24 +67,24 @@ def _on_message(self, ws, message): self.ws.send(msg) return user_id = self.odoo.create_rrs_user(decrypted_email, sender_address) - hash = add_device_to_subscription(sender_address) - if hash: - self._logger.debug(f"Add {sender_address} to subscription") - pinata_keys = PinataHelper.generate_pinata_keys(sender_address) - pinata_key = pinata_keys["pinata_api_key"] - pinata_secret = pinata_keys["pinata_api_secret"] - self._logger.debug(f"Pinata creds: {pinata_key}, {pinata_secret}, {pinata_keys}") - self.odoo.update_rrs_user_with_pinata_creds(user_id, pinata_key, pinata_secret) - msg = message_with_pinata_creds(pinata_key, pinata_secret, sender_address, self._logger) - self.ws.send(msg) + if PAID_SERVICE: + self._logger.debug(f"Paid service. Adding account to subscription") + tx_hash = add_device_to_subscription(sender_address) + if tx_hash: + self._logger.debug(f"Add {sender_address} to subscription") + else: + self._logger.error(f"Couldn't add {sender_address} to subscription: {tx_hash}") - else: - self._logger.error(f"Couldn't add {sender_address} to subscription: {hash}") + pinata_keys = PinataHelper.generate_pinata_keys(sender_address) + pinata_key = pinata_keys["pinata_api_key"] + pinata_secret = pinata_keys["pinata_api_secret"] + self._logger.debug(f"Pinata creds: {pinata_key}, {pinata_secret}, {pinata_keys}") + self.odoo.update_rrs_user_with_pinata_creds(user_id, pinata_key, pinata_secret) + msg = message_with_pinata_creds(pinata_key, pinata_secret, sender_address, self._logger) + self.ws.send(msg) def _on_error(self, ws, error): self._logger.error(f"{error}") def _on_close(self, ws, close_status_code, close_msg): - self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}") - - + self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}") \ No newline at end of file diff --git a/registar/utils/robonomics.py b/registar/utils/robonomics.py index e8fbed5..ff981f9 100644 --- a/registar/utils/robonomics.py +++ b/registar/utils/robonomics.py @@ -9,11 +9,11 @@ WSS_ENDPOINT = os.getenv("WSS_ENDPOINT") ADMIN_ADDRESS = os.getenv("ADMIN_ADDRESS") -def add_device_to_subscription(address: str) -> bool: +def add_device_to_subscription(address: str) -> str: """ Checks whether the address has an active subscription or not. :param owner_address: Address of the subscription's owner. - :return True if subscription is active, False otherwise + :return Hash of the transaction """ account = ri.Account(remote_ws=WSS_ENDPOINT, seed=ADMIN_SEED, crypto_type=KeypairType.ED25519) rws = ri.RWS(account) diff --git a/rrs_operator/rrs_operator.py b/rrs_operator/rrs_operator.py index 32bce81..6106e0b 100644 --- a/rrs_operator/rrs_operator.py +++ b/rrs_operator/rrs_operator.py @@ -1,5 +1,8 @@ +import threading + from rrs_operator.src.odoo import Odoo from rrs_operator.src.robonomics import RobonomicsHelper +from rrs_operator.src.ws_client import WSClient from rrs_operator.utils.ipfs_helper import IPFSHelper @@ -8,6 +11,10 @@ def __init__(self) -> None: self.odoo = Odoo() self.robonomics = RobonomicsHelper(self.odoo) self.robonomics.subscribe() + self.ws = WSClient(self.odoo) + ws_thread = threading.Thread(target=self.ws.run) + ws_thread.daemon = True + ws_thread.start() def get_robonomics_add_user_callback(self) -> None: return self.robonomics.add_user_callback diff --git a/rrs_operator/src/report_manager.py b/rrs_operator/src/report_manager.py index 908f4e9..4b20ec3 100644 --- a/rrs_operator/src/report_manager.py +++ b/rrs_operator/src/report_manager.py @@ -1,4 +1,5 @@ import json + from tenacity import * from helpers.logger import Logger diff --git a/rrs_operator/src/ws_client.py b/rrs_operator/src/ws_client.py new file mode 100644 index 0000000..6f0a0af --- /dev/null +++ b/rrs_operator/src/ws_client.py @@ -0,0 +1,72 @@ +import json +import os + +import websocket +from dotenv import load_dotenv + +from helpers.logger import Logger +from rrs_operator.src.report_manager import ReportManager +from rrs_operator.utils.messages import message_for_subscribing + +load_dotenv() + +LIBP2P_WS_SERVER = os.getenv("LIBP2P_WS_SERVER") +ADMIN_SEED = os.getenv("ADMIN_SEED") + +class WSClient: + def __init__(self, odoo) -> None: + self.odoo = odoo + self._logger = Logger("operator-ws") + self._connect2server() + + def _connect2server(self): + self.ws = websocket.WebSocketApp( + url=LIBP2P_WS_SERVER, + on_open=self._on_connection, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close, + ) + + def run(self) -> None: + self.ws.run_forever(reconnect=5) + + def _on_connection(self, ws): + self._logger.debug(f"Connected to {LIBP2P_WS_SERVER}") + msg = message_for_subscribing() + self._logger.debug(f"Connection msg: {msg}") + self.ws.send(msg) + + def _on_message(self, ws, message): + json_message = json.loads(message) + self._logger.debug(f"Got msg: {json_message}") + if "peerId" in json_message: + return + message_data = json_message["data"] + if "report" in message_data: + sender_address = message_data["address"] + json_report_message = json.dumps(message_data["report"]) + email = self.odoo.find_user_email(sender_address) + if email: + report_manager = ReportManager(sender_address, json_report_message) + report_manager.process_report() + descriptions_list, priority = report_manager.get_description_and_priority() + logs_hashes = report_manager.get_logs_hashes() + self._logger.debug(f"Data from ipfs: {email}, {descriptions_list}, priority: {priority}") + for description in descriptions_list: + ticket_id = self.odoo.find_ticket_with_description(description, email) + if not ticket_id: + ticket_id = self.odoo.create_ticket(email, sender_address, description, priority) + + if logs_hashes: + for hash in logs_hashes: + self.odoo.create_note_with_logs_hash(ticket_id, hash) + else: + self._logger.debug(f"Address {sender_address} is not registred in Odoo. Email is: {email}") + + + def _on_error(self, ws, error): + self._logger.error(f"{error}") + + def _on_close(self, ws, close_status_code, close_msg): + self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}") diff --git a/rrs_operator/utils/messages.py b/rrs_operator/utils/messages.py new file mode 100644 index 0000000..22e4a6b --- /dev/null +++ b/rrs_operator/utils/messages.py @@ -0,0 +1,6 @@ +import json + + +def message_for_subscribing() -> str: + msg = {"protocols_to_listen": ["/report"]} + return json.dumps(msg)