Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Thingsboard into Pyra #226

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@
},
"tum_plc": null,
"helios": null,
"upload": null
"upload": null,
"thingsboard": null
}
6 changes: 6 additions & 0 deletions config/thingsboard.config.default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"host": "0.0.0.0",
"access_token": "...",
"seconds_per_publish_interval": 60,
"ca_cert": null
}
7 changes: 5 additions & 2 deletions packages/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def run() -> None:
logger.info("Initializing threads")
helios_thread_instance = threads.HeliosThread(config)
upload_thread_instance = threads.UploadThread(config)
thingsboard_thread_instance = threads.ThingsBoardThread(config)

current_exceptions = interfaces.StateInterface.load_state(
).current_exceptions or []
Expand All @@ -153,8 +154,9 @@ def run() -> None:
def _graceful_teardown(*args: Any) -> None:
logger.info("Received shutdown signal, starting graceful teardown")
interfaces.ActivityHistoryInterface.dump_current_activity_history()
current_exceptions = interfaces.StateInterface.load_state(
).current_exceptions or []
current_exceptions = (
interfaces.StateInterface.load_state().current_exceptions or []
)
interfaces.StateInterface.update_state(
current_exceptions=current_exceptions, enforce_none_values=True
)
Expand Down Expand Up @@ -193,6 +195,7 @@ def _graceful_teardown(*args: Any) -> None:
# possibly (re)start each thread
helios_thread_instance.update_thread_state(config)
upload_thread_instance.update_thread_state(config)
thingsboard_thread_instance.update_thread_state(config)

if config.general.test_mode:
logger.info("pyra-core in test mode")
Expand Down
1 change: 1 addition & 0 deletions packages/core/threads/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .helios_thread import HeliosThread
from .upload_thread import UploadThread
from .thingsboard_thread import ThingsBoardThread
126 changes: 126 additions & 0 deletions packages/core/threads/thingsboard_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from .abstract_thread import AbstractThread
from packages.core import types, utils, interfaces
import threading
import time
from typing import Dict, Union, Optional


class ThingsBoardThread(AbstractThread):
@staticmethod
def should_be_running(config: types.Config) -> bool:
"""Based on the config, should the thread be running or not?"""

# only publish data when Thingsboard is configured
if config.thingsboard is None:
return False

# don't publish in test mode
if config.general.test_mode:
return False

return True

@staticmethod
def get_new_thread_object() -> threading.Thread:
"""Return a new thread object that is to be started."""
return threading.Thread(target=ThingsBoardThread.main, daemon=True)

@staticmethod
def main(headless: bool = False) -> None:
"""Main entrypoint of the thread. In headless mode,
don't write to log files but print to console."""

logger = utils.Logger(origin="thingsboard", just_print=headless)
config = types.Config.load()
assert config.thingsboard is not None

from tb_device_mqtt import TBDeviceMqttClient # type: ignore

# initialize MQTT client
client = TBDeviceMqttClient(
config.thingsboard.host, username=config.thingsboard.access_token
)

# connect to MQTT broker
if config.thingsboard.ca_cert is not None:
client.connect(tls=True, ca_certs=config.thingsboard.ca_cert)
else:
client.connect()

logger.info("Succesfully connected to Thingsboard Broker.")

while True:
# Read latest config
config = types.Config.load()
if not ThingsBoardThread.should_be_running(config):
logger.info("ThingsboardThread shall not be running anymore.")
client.disconnect()
return

# publish if client is connected
if not client.is_connected():
logger.warning(
"Client is currently not connected. Waiting for reconnect."
)
else:
# read latest state file
current_state = interfaces.StateInterface.load_state()
state: Dict[str, Optional[Union[str, bool, int, float]]] = {
"state_file_last_updated":
str(current_state.last_updated),
"helios_indicates_good_conditions":
current_state.helios_indicates_good_conditions,
"measurements_should_be_running":
current_state.measurements_should_be_running,
"os_memory_usage":
current_state.operating_system_state.memory_usage,
"os_filled_disk_space_fraction":
current_state.operating_system_state.
filled_disk_space_fraction,
"enclosure_actor_fan_speed":
current_state.plc_state.actors.fan_speed,
"enclosure_actor_current_angle":
current_state.plc_state.actors.current_angle,
"enclosure_control_auto_temp_mode":
current_state.plc_state.control.auto_temp_mode,
"enclosure_control_manual_control":
current_state.plc_state.control.manual_control,
"enclosure_control_manual_temp_mode":
current_state.plc_state.control.manual_temp_mode,
"enclosure_control_sync_to_tracker":
current_state.plc_state.control.sync_to_tracker,
"enclosure_sensor_humidity":
current_state.plc_state.sensors.humidity,
"enclosure_sensor_temperature":
current_state.plc_state.sensors.temperature,
"enclosure_state_cover_closed":
current_state.plc_state.state.cover_closed,
"enclosure_state_motor_failed":
current_state.plc_state.state.motor_failed,
"enclosure_state_rain":
current_state.plc_state.state.rain,
"enclosure_state_reset_needed":
current_state.plc_state.state.reset_needed,
"enclosure_state_ups_alert":
current_state.plc_state.state.ups_alert,
"enclosure_power_heater":
current_state.plc_state.power.heater,
"enclosure_power_spectrometer":
current_state.plc_state.power.spectrometer,
}
patrickjaigner marked this conversation as resolved.
Show resolved Hide resolved
patrickjaigner marked this conversation as resolved.
Show resolved Hide resolved

telemetry_with_ts = {
"ts": int(round(time.time() * 1000)),
"values": state,
}

try:
# Sending telemetry without checking the delivery status (QoS1)
result = client.send_telemetry(telemetry_with_ts)
logger.info(f"Published with result: {result}")
except Exception as e:
logger.exception(e)
logger.info("Failed to publish last telemetry data.")

if config.thingsboard is not None:
time.sleep(config.thingsboard.seconds_per_publish_interval)
58 changes: 38 additions & 20 deletions packages/core/types/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ class StricterBaseModel(pydantic.BaseModel):
class StrictFilePath(pydantic.RootModel[str]):
patrickjaigner marked this conversation as resolved.
Show resolved Hide resolved
root: str

@pydantic.field_validator('root')
@pydantic.field_validator("root")
@classmethod
def path_should_exist(cls, v: str, info: pydantic.ValidationInfo) -> str:
ignore_path_existence = (
info.context.get('ignore-path-existence') == True
) if isinstance(info.context, dict) else False
ignore_path_existence = ((
info.context.get("ignore-path-existence") == True
) if isinstance(info.context, dict) else False)
if (not ignore_path_existence) and (not os.path.isfile(v)):
raise ValueError('File does not exist')
raise ValueError("File does not exist")
return v


class StrictDirectoryPath(pydantic.RootModel[str]):
root: str

@pydantic.field_validator('root')
@pydantic.field_validator("root")
@classmethod
def path_should_exist(cls, v: str, info: pydantic.ValidationInfo) -> str:
ignore_path_existence = (
info.context.get('ignore-path-existence') == True
) if isinstance(info.context, dict) else False
ignore_path_existence = ((
info.context.get("ignore-path-existence") == True
) if isinstance(info.context, dict) else False)
if (not ignore_path_existence) and (not os.path.isdir(v)):
raise ValueError('Directory does not exist')
raise ValueError("Directory does not exist")
return v


Expand Down Expand Up @@ -257,6 +257,22 @@ class PartialUploadConfig(StricterBaseModel):
streams: Optional[list[UploadStreamConfig]] = None


class ThingsBoardConfig(StricterBaseModel):
host: str
access_token: str
seconds_per_publish_interval: int = pydantic.Field(..., ge=30, le=999999)
ca_cert: Optional[str]


class PartialThingsBoardConfig(StricterBaseModel):
host: Optional[str] = None
access_token: Optional[str] = None
seconds_per_publish_interval: Optional[int] = pydantic.Field(
None, ge=30, le=999999
)
ca_cert: Optional[str] = None


class Config(StricterBaseModel):
general: GeneralConfig
opus: OpusConfig
Expand All @@ -267,15 +283,16 @@ class Config(StricterBaseModel):
tum_plc: Optional[TumPlcConfig] = None
helios: Optional[HeliosConfig] = None
upload: Optional[UploadConfig] = None
thingsboard: Optional[ThingsBoardConfig] = None

@staticmethod
def load(
config_object: Optional[str | dict[Any, Any]] = None,
with_filelock: bool = True,
ignore_path_existence: bool = False
ignore_path_existence: bool = False,
) -> Config:
"""Load the config file.

Args:
config_object: If provided, the config file will be ignored and
the provided content will be used instead. Defaults
Expand All @@ -285,9 +302,9 @@ def load(
ignore_path_existence: If True, the existence of the file and directory
paths used in the whole config file will not be
checked. Defaults to False.

Returns: The loaded config object.

Raises:
ValueError: If the config file is invalid.
"""
Expand Down Expand Up @@ -336,7 +353,7 @@ def _read() -> Config:

# the "from None" suppresses the pydantic exception
raise ValueError(
"Config is invalid:\n" + ',\n'.join(pretty_errors)
"Config is invalid:\n" + ",\n".join(pretty_errors)
) from None

def dump(self, with_filelock: bool = True) -> None:
Expand All @@ -355,9 +372,9 @@ def dump(self, with_filelock: bool = True) -> None:
)
def update_in_context() -> Generator[Config, None, None]:
"""Update the confug file in a context manager.

Example:

```python
with Config.update_in_context() as state:
config.somesetting = somevalue
Expand Down Expand Up @@ -390,22 +407,23 @@ class PartialConfig(StricterBaseModel):
tum_plc: Optional[PartialTumPlcConfig] = None
helios: Optional[PartialHeliosConfig] = None
upload: Optional[PartialUploadConfig] = None
thingsboard: Optional[PartialThingsBoardConfig] = None

@staticmethod
def load(
config_object: str,
ignore_path_existence: bool = False
) -> PartialConfig:
"""Load a partial config file.

Args:
config_object: JSON string containing a partial config.
ignore_path_existence: If True, the existence of the file and directory
paths used in the whole config file will not be
checked. Defaults to False.

Returns: The loaded partial config object.

Raises:
ValueError: If the config file is invalid.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
hide_table_of_contents: true
---

<!-- markdownlint-disable -->

# <kbd>module</kbd> `core.threads.thingsboard_thread`






---

## <kbd>class</kbd> `ThingsBoardThread`







---

### <kbd>method</kbd> `get_new_thread_object`

```python
get_new_thread_object() → Thread
```

Return a new thread object that is to be started.

---

### <kbd>method</kbd> `main`

```python
main(headless: bool = False) → None
```

Main entrypoint of the thread. In headless mode, don't write to log files but print to console.

---

### <kbd>method</kbd> `should_be_running`

```python
should_be_running(config: Config) → bool
```

Based on the config, should the thread be running or not?


Loading