Skip to content

Commit

Permalink
Merge pull request #20 from PinoutLTD/repeating_reports
Browse files Browse the repository at this point in the history
Repeating reports
  • Loading branch information
LoSk-p authored Sep 17, 2024
2 parents 3b2fd77 + 2001efa commit ac746ed
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 118 deletions.
10 changes: 4 additions & 6 deletions custom_components/robonomics_report_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType

from .const import (
CONF_SENDER_SEED,
DOMAIN,
ERROR_SOURCES_MANAGER,
CONF_EMAIL
)

from .const import CONF_SENDER_SEED, DOMAIN, ERROR_SOURCES_MANAGER, CONF_EMAIL

# from .frontend import async_register_frontend, async_remove_frontend
from .rws_registration import RWSRegistrationManager
from .robonomics import Robonomics
Expand All @@ -19,6 +16,7 @@

_LOGGER = logging.getLogger(__name__)


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][CONF_EMAIL] = entry.data[CONF_EMAIL]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import asyncio

from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry
from homeassistant.helpers.device_registry import async_get as async_get_devices_registry
from homeassistant.helpers.device_registry import (
async_get as async_get_devices_registry,
)
from homeassistant.helpers.entity_registry import RegistryEntry
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.core import HomeAssistant, State, callback
Expand All @@ -15,6 +17,7 @@

from .error_source import ErrorSource
from .utils.message_formatter import MessageFormatter
from .utils.problem_type import ProblemType
from ...const import CHECK_ENTITIES_TIMEOUT

_LOGGER = logging.getLogger(__name__)
Expand All @@ -40,14 +43,20 @@ def setup(self) -> None:
def remove(self) -> None:
self.unsub_timer()

async def _check_entities(self, _ = None):
async def _check_entities(self, _=None):
await asyncio.sleep(15)
unavailables = self._get_unavailables()
not_updated = await self._get_not_updated()
unavailables_text = MessageFormatter.format_devices_list(unavailables, "unavailables")
not_updated_text = MessageFormatter.format_devices_list(not_updated, "not updated")
problem_text = MessageFormatter.concatinate_messages(unavailables_text, not_updated_text)
await self._run_report_service(problem_text, "unresponded_devices")
unavailables_text = MessageFormatter.format_devices_list(
unavailables, "unavailables"
)
not_updated_text = MessageFormatter.format_devices_list(
not_updated, "not updated"
)
problem_text = MessageFormatter.concatinate_messages(
unavailables_text, not_updated_text
)
await self._run_report_service(problem_text, ProblemType.Devices, "devices")

def _get_unavailables(self) -> tp.Dict:
unavailables = []
Expand All @@ -64,20 +73,27 @@ async def _get_not_updated(self) -> tp.Dict:
if not self._is_available(entity) or entity_data.disabled:
continue
if entity_data.entity_id.split(".")[0] == "sensor":
if not await self._check_state_changed_during_period(entity_data.entity_id):
if not await self._check_state_changed_during_period(
entity_data.entity_id
):
not_updated.append(entity_data.entity_id)
return self._get_dict_with_devices(not_updated)

def _get_dict_with_devices(self, entities_list: tp.List[str]) -> tp.Dict:
res_dict = {"devices": {}, "entities": []}
for entity_id in entities_list:
entity_data = self.entity_registry.async_get(entity_id)
if entity_data.device_id is not None:
device = self.devices_registry.async_get(entity_data.device_id)
if entity_data.device_id in res_dict["devices"]:
res_dict["devices"][entity_data.device_id]["entities"].append(entity_id)
res_dict["devices"][entity_data.device_id]["entities"].append(
entity_id
)
else:
res_dict["devices"][entity_data.device_id] = {"device_name": self._get_device_name(device), "entities": [entity_id]}
res_dict["devices"][entity_data.device_id] = {
"device_name": self._get_device_name(device),
"entities": [entity_id],
}
else:
res_dict["entities"].append(entity_id)
return res_dict
Expand All @@ -94,9 +110,10 @@ def _is_available(self, entity: RegistryEntry):
entity_state = self.hass.states.get(entity)
if entity_state is not None:
return entity_state.state != STATE_UNAVAILABLE


async def _check_state_changed_during_period(self, entity_id: str, hours: int = 26) -> bool:
async def _check_state_changed_during_period(
self, entity_id: str, hours: int = 26
) -> bool:
start = dt_util.utcnow() - timedelta(hours=hours)
end = dt_util.utcnow()
instance = get_instance(self.hass)
Expand All @@ -119,7 +136,7 @@ async def _check_state_changed_during_period(self, entity_id: str, hours: int =
return False
else:
return False

def _state_changes_during_period(
self,
start: datetime,
Expand All @@ -133,4 +150,4 @@ def _state_changes_during_period(
entity_id,
include_start_time_state=True,
no_attributes=True,
).get(entity_id, [])
).get(entity_id, [])
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from homeassistant.core import HomeAssistant

from ...const import DOMAIN, PROBLEM_REPORT_SERVICE, CONF_EMAIL, CONF_PHONE_NUMBER
from .utils.problem_type import ProblemType


class ErrorSource(abc.ABC):
Expand All @@ -16,12 +17,22 @@ def setup(self):
def remove(self):
pass

async def _run_report_service(self, description: str, error_type: str):
formatted_description = {"description": description, "type": error_type}
async def _run_report_service(
self,
description: str,
error_type: ProblemType,
problem_source: str,
repeated_error: bool = False,
):
formatted_description = {
"description": description,
"type": error_type.value,
"source": problem_source,
}
service_data = {
"description": formatted_description,
"mail": self.hass.data[DOMAIN][CONF_EMAIL],
"attach_logs": True,
"only_description": repeated_error,
}
if CONF_PHONE_NUMBER in self.hass.data[DOMAIN]:
service_data["phone_number"] = self.hass.data[DOMAIN][CONF_PHONE_NUMBER]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,61 +1,60 @@
import logging
import asyncio
from datetime import timedelta

_LOGGER = logging.getLogger(__name__)

from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.core import HomeAssistant, callback, Event
from homeassistant.components.system_log import DOMAIN as SYSTEM_LOG_DOMAIN
from homeassistant.components.system_log import EVENT_SYSTEM_LOG

from .error_source import ErrorSource
from .utils.message_formatter import MessageFormatter
from .utils.problem_type import ProblemType
from ...const import DOMAIN

WARNING_SENDING_TIMEOUT = timedelta(hours=4)

class LoggerHandler(logging.Handler, ErrorSource):
class LoggerHandler(ErrorSource):
def __init__(self, hass: HomeAssistant):
logging.Handler.__init__(self)
ErrorSource.__init__(self, hass)
self.unsub_timer = None
self.root_logger = None
self._warning_messages = []
self.unsub = None
self.hass.data[SYSTEM_LOG_DOMAIN].fire_event = True

@callback
def setup(self):
self.root_logger = logging.getLogger()
self.root_logger.addHandler(self)
self.unsub_timer = async_track_time_interval(
self.hass,
self._send_warnings,
WARNING_SENDING_TIMEOUT,
)
self.unsub = self.hass.bus.async_listen(EVENT_SYSTEM_LOG, self.new_log)

@callback
def remove(self):
self.root_logger.removeHandler(self)
if self.unsub_timer is not None:
self.unsub_timer()

def emit(self, record: logging.LogRecord):
if DOMAIN not in record.name:
if record.levelname == "ERROR" or record.levelname == "CRITICAL":
_LOGGER.debug(f"New error message: {record.message}")
error_msg = f"{record.name} - {record.levelname}: {record.message}"
asyncio.run_coroutine_threadsafe(
self._run_report_service(error_msg, "errors"), self.hass.loop
if self.unsub is not None:
self.unsub()

async def new_log(self, record_event: Event):
_LOGGER.debug(f"New log: {record_event.data}, type: {type(record_event.data)}")
record = record_event.data
if DOMAIN not in record["name"]:
record_type = self._get_record_type(record)
if record_type:
repeated_error = self._repeated_error(record)
_LOGGER.debug(f"New {record_type} message: {record['message']}")
error_msg = (
f"{record['name']} - {record['level']}: {record['message'][0]}"
)
error_source = record["source"]
await self._run_report_service(
error_msg, record_type, error_source, repeated_error
)
elif record.levelname == "WARNING":
self._warning_messages.append(f"{record.name} - {record.levelname}: {record.message}")

async def _send_warnings(self, _ = None) -> None:
if len(self._warning_messages) > 0:
_LOGGER.debug(f"Got {len(self._warning_messages)} warning messages, start sending report")
warnings = self._warning_messages.copy()
self._warning_messages.clear()
message = MessageFormatter.format_warnins_message(warnings)
await self._run_report_service(message, "warnings")
else:
_LOGGER.debug("Haven't got any warning messages during timeout")


def _get_record_type(self, record: dict) -> ProblemType | None:
if record["level"] == "ERROR" or record["level"] == "CRITICAL":
record_type = ProblemType.Errors
elif record["level"] == "WARNING":
record_type = ProblemType.Warnings
else:
record_type = None
return record_type

def _repeated_error(self, record: dict) -> bool:
logs = self.hass.data[SYSTEM_LOG_DOMAIN].records.to_list()
for log in logs:
if log["source"] == record["source"]:
return log["count"] > 1
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,3 @@ def format_devices_list(data: dict, type: str) -> str:
def concatinate_messages(message1: str, message2: str) -> str:
return f"{message1}\n{message2}"

@staticmethod
def format_warnins_message(warnings: tp.List[str]) -> str:
message = ""
for warning in warnings:
message += f"*{warning}\n"
return message
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class ProblemType(Enum):
Devices = "unresponded_devices"
Errors = "errors"
Warnings = "warnings"
36 changes: 24 additions & 12 deletions custom_components/robonomics_report_service/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,46 @@

_LOGGER = logging.getLogger(__name__)


class IPFS:
def __init__(self, hass: HomeAssistant):
self.hass = hass

async def pin_to_pinata(self, dirname: str) -> tp.Optional[str]:
pinata = await self._get_pinata_with_creds()
if pinata is not None:
ipfs_hash = await self.hass.async_add_executor_job(self._pin_to_pinata, dirname, pinata)
ipfs_hash = await self.hass.async_add_executor_job(
self._pin_to_pinata, dirname, pinata
)
return ipfs_hash
async def unpin_from_pinata(self, ipfs_hashes_dict: str) -> tp.Optional[str]:

async def unpin_from_pinata(self, ipfs_hashes_dict: str | dict) -> tp.Optional[str]:
pinata = await self._get_pinata_with_creds()
ipfs_hashes_dict = json.loads(ipfs_hashes_dict)
if isinstance(ipfs_hashes_dict, str):
ipfs_hashes_dict = json.loads(ipfs_hashes_dict)
if pinata is not None:
await self.hass.async_add_executor_job(self._unpin_from_pinata, ipfs_hashes_dict, pinata)
await self.hass.async_add_executor_job(
self._unpin_from_pinata, ipfs_hashes_dict, pinata
)

async def _get_pinata_with_creds(self) -> tp.Optional[PinataPy]:
storage_data = await async_load_from_store(self.hass, STORAGE_PINATA_CREDS)
if CONF_PINATA_PUBLIC in storage_data and CONF_PINATA_SECRET in storage_data:
return PinataPy(storage_data[CONF_PINATA_PUBLIC], storage_data[CONF_PINATA_SECRET])
return PinataPy(
storage_data[CONF_PINATA_PUBLIC], storage_data[CONF_PINATA_SECRET]
)

def _pin_to_pinata(self, dirname: str, pinata: PinataPy) -> tp.Optional[str]:
try:
res = None
dict_with_hashes = {}

_LOGGER.debug(f"tmp dir: {dirname}")
file_names = [f for f in os.listdir(dirname) if os.path.isfile(os.path.join(dirname, f))]
file_names = [
f
for f in os.listdir(dirname)
if os.path.isfile(os.path.join(dirname, f))
]
_LOGGER.debug(f"file names: {file_names}")
for file in file_names:
path_to_file = f"{dirname}/{file}"
Expand All @@ -59,7 +71,7 @@ def _pin_to_pinata(self, dirname: str, pinata: PinataPy) -> tp.Optional[str]:
def _unpin_from_pinata(self, ipfs_hashes_dict: tp.Dict, pinata: PinataPy) -> None:
_LOGGER.debug(f"Start removing pins: {ipfs_hashes_dict}")
for key in ipfs_hashes_dict:
current_hash = ipfs_hashes_dict[key]
res = pinata.remove_pin_from_ipfs(current_hash)
_LOGGER.debug(f"Remove response for pin {current_hash}: {res}")

current_hash: str = ipfs_hashes_dict[key]
if isinstance(current_hash, str) and current_hash.startswith("Qm"):
res = pinata.remove_pin_from_ipfs(current_hash)
_LOGGER.debug(f"Remove response for pin {current_hash}: {res}")
Loading

0 comments on commit ac746ed

Please sign in to comment.