Skip to content

Commit

Permalink
Merge pull request #26 from PinoutLTD/development
Browse files Browse the repository at this point in the history
receive reports via libp2p
  • Loading branch information
tubleronchik authored Aug 23, 2024
2 parents 8ae7f8f + 6615be6 commit 346e95d
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 17 deletions.
32 changes: 17 additions & 15 deletions registar/src/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
PINATA_API_KEY = os.getenv("PINATA_API_KEY")
PINATA_API_SECRET = os.getenv("PINATA_API_SECRET")

PAID_SERVICE = False


class WSClient:
def __init__(self, odoo) -> None:
Expand Down Expand Up @@ -65,24 +67,24 @@ def _on_message(self, ws, message):
self.ws.send(msg)
return
user_id = self.odoo.create_rrs_user(decrypted_email, sender_address)
hash = add_device_to_subscription(sender_address)
if hash:
self._logger.debug(f"Add {sender_address} to subscription")
pinata_keys = PinataHelper.generate_pinata_keys(sender_address)
pinata_key = pinata_keys["pinata_api_key"]
pinata_secret = pinata_keys["pinata_api_secret"]
self._logger.debug(f"Pinata creds: {pinata_key}, {pinata_secret}, {pinata_keys}")
self.odoo.update_rrs_user_with_pinata_creds(user_id, pinata_key, pinata_secret)
msg = message_with_pinata_creds(pinata_key, pinata_secret, sender_address, self._logger)
self.ws.send(msg)
if PAID_SERVICE:
self._logger.debug(f"Paid service. Adding account to subscription")
tx_hash = add_device_to_subscription(sender_address)
if tx_hash:
self._logger.debug(f"Add {sender_address} to subscription")
else:
self._logger.error(f"Couldn't add {sender_address} to subscription: {tx_hash}")

else:
self._logger.error(f"Couldn't add {sender_address} to subscription: {hash}")
pinata_keys = PinataHelper.generate_pinata_keys(sender_address)
pinata_key = pinata_keys["pinata_api_key"]
pinata_secret = pinata_keys["pinata_api_secret"]
self._logger.debug(f"Pinata creds: {pinata_key}, {pinata_secret}, {pinata_keys}")
self.odoo.update_rrs_user_with_pinata_creds(user_id, pinata_key, pinata_secret)
msg = message_with_pinata_creds(pinata_key, pinata_secret, sender_address, self._logger)
self.ws.send(msg)

def _on_error(self, ws, error):
self._logger.error(f"{error}")

def _on_close(self, ws, close_status_code, close_msg):
self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}")


self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}")
4 changes: 2 additions & 2 deletions registar/utils/robonomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
WSS_ENDPOINT = os.getenv("WSS_ENDPOINT")
ADMIN_ADDRESS = os.getenv("ADMIN_ADDRESS")

def add_device_to_subscription(address: str) -> bool:
def add_device_to_subscription(address: str) -> str:
""" Checks whether the address has an active subscription or not.
:param owner_address: Address of the subscription's owner.
:return True if subscription is active, False otherwise
:return Hash of the transaction
"""
account = ri.Account(remote_ws=WSS_ENDPOINT, seed=ADMIN_SEED, crypto_type=KeypairType.ED25519)
rws = ri.RWS(account)
Expand Down
7 changes: 7 additions & 0 deletions rrs_operator/rrs_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import threading

from rrs_operator.src.odoo import Odoo
from rrs_operator.src.robonomics import RobonomicsHelper
from rrs_operator.src.ws_client import WSClient
from rrs_operator.utils.ipfs_helper import IPFSHelper


Expand All @@ -8,6 +11,10 @@ def __init__(self) -> None:
self.odoo = Odoo()
self.robonomics = RobonomicsHelper(self.odoo)
self.robonomics.subscribe()
self.ws = WSClient(self.odoo)
ws_thread = threading.Thread(target=self.ws.run)
ws_thread.daemon = True
ws_thread.start()

def get_robonomics_add_user_callback(self) -> None:
return self.robonomics.add_user_callback
Expand Down
1 change: 1 addition & 0 deletions rrs_operator/src/report_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json

from tenacity import *

from helpers.logger import Logger
Expand Down
72 changes: 72 additions & 0 deletions rrs_operator/src/ws_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import json
import os

import websocket
from dotenv import load_dotenv

from helpers.logger import Logger
from rrs_operator.src.report_manager import ReportManager
from rrs_operator.utils.messages import message_for_subscribing

load_dotenv()

LIBP2P_WS_SERVER = os.getenv("LIBP2P_WS_SERVER")
ADMIN_SEED = os.getenv("ADMIN_SEED")

class WSClient:
def __init__(self, odoo) -> None:
self.odoo = odoo
self._logger = Logger("operator-ws")
self._connect2server()

def _connect2server(self):
self.ws = websocket.WebSocketApp(
url=LIBP2P_WS_SERVER,
on_open=self._on_connection,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)

def run(self) -> None:
self.ws.run_forever(reconnect=5)

def _on_connection(self, ws):
self._logger.debug(f"Connected to {LIBP2P_WS_SERVER}")
msg = message_for_subscribing()
self._logger.debug(f"Connection msg: {msg}")
self.ws.send(msg)

def _on_message(self, ws, message):
json_message = json.loads(message)
self._logger.debug(f"Got msg: {json_message}")
if "peerId" in json_message:
return
message_data = json_message["data"]
if "report" in message_data:
sender_address = message_data["address"]
json_report_message = json.dumps(message_data["report"])
email = self.odoo.find_user_email(sender_address)
if email:
report_manager = ReportManager(sender_address, json_report_message)
report_manager.process_report()
descriptions_list, priority = report_manager.get_description_and_priority()
logs_hashes = report_manager.get_logs_hashes()
self._logger.debug(f"Data from ipfs: {email}, {descriptions_list}, priority: {priority}")
for description in descriptions_list:
ticket_id = self.odoo.find_ticket_with_description(description, email)
if not ticket_id:
ticket_id = self.odoo.create_ticket(email, sender_address, description, priority)

if logs_hashes:
for hash in logs_hashes:
self.odoo.create_note_with_logs_hash(ticket_id, hash)
else:
self._logger.debug(f"Address {sender_address} is not registred in Odoo. Email is: {email}")


def _on_error(self, ws, error):
self._logger.error(f"{error}")

def _on_close(self, ws, close_status_code, close_msg):
self._logger.debug(f"Connection closed with status code {close_status_code} and message {close_msg}")
6 changes: 6 additions & 0 deletions rrs_operator/utils/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import json


def message_for_subscribing() -> str:
msg = {"protocols_to_listen": ["/report"]}
return json.dumps(msg)

0 comments on commit 346e95d

Please sign in to comment.