diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2ab2f99..2f7de31 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.0.1 + rev: v4.3.0 hooks: - id: trailing-whitespace - id: check-toml @@ -8,7 +8,7 @@ repos: args: [-b, master] - repo: https://github.com/psf/black - rev: 21.6b0 + rev: 22.6.0 hooks: - id: black name: black @@ -17,13 +17,13 @@ repos: types: [python] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.910 + rev: v0.971 hooks: - id: mypy exclude: ^docs/conf.py - repo: https://gitlab.com/yaq/yaq-traits - rev: v2021.4.0 + rev: v2022.3.0 hooks: - id: yaq-traits-check - id: yaq-traits-compose diff --git a/README.md b/README.md index 625d999..d74eaa1 100644 --- a/README.md +++ b/README.md @@ -6,45 +6,86 @@ [![ver](https://img.shields.io/badge/calver-YYYY.M.MICRO-blue)](https://calver.org/) [![log](https://img.shields.io/badge/change-log-informational)](https://github.com/yaq-project/hart-protocol/-/blob/main/CHANGELOG.md) -A sans-io Python implementation of the [Highway Adressable Remote Transducer Protocol](https://en.wikipedia.org/wiki/Highway_Addressable_Remote_Transducer_Protocol). +A sans I/O Python implementation of the [Highway Adressable Remote Transducer Protocol](https://en.wikipedia.org/wiki/Highway_Addressable_Remote_Transducer_Protocol). + +## Introduction + +This Python package contains tooling for encoding and decoding bytestrings for communication with HART peripherals. +HART has been implemented using a variety of transport layers---Bell 202, RS485, Ethernet, etc. +In persuit of simplicity and reusability, this package does not contain any interface capabilities. +Use something like [pySerial](https://pyserial.readthedocs.io) for transport. +Read the [sans I/O manifesto](https://sans-io.readthedocs.io/) for more motivation regarding this design pattern. + +Briefly, HART is an open protocol for industrial automation supported by multiple device manufacturers. +HART has a concept of "address", so that many peripherals can share the same communication channel. +HART has limited support for multiple controllers, and generic handheld controllers exist. +HART peripherals respond to numbered commands, which can be thought of as primative [remote procedure calls](https://en.wikipedia.org/wiki/Remote_procedure_call). +The standard specifies a number of universal commands which should be supported by any peripheral, and there are also so-called "common" commands which many peripherals implement. +It's strongly recommended that you check the documentation of your own peripheral---implementations may be inconsistent. +In addition to universal and common commands, it's likely that your peripheral implements many device-specific commands. + +This package aims to have complete and accurate support for all universal and common commands. +In addition, this package has tooling for packing and unpacking generic command data for device-specific commands. +This package is intentionally simple and narrowly scoped. +There is no documentation beyond this README. +Please open an issue or PR to the GitHub repository if you find any errors or missing functionality. + +## Sending Commands + +The following functions return bytestrings that can be fed to your transport layer. Universal Commands -| command | function | -| ------- | --------------------------------- | -| 0 | `read_unique_identifier(address)` | -| 1 | `read_primary_variable(address)` | -| 2 | `read_loop_current_and_percent(address)` | -| 3 | `read_dynamic_variables_and_loop_current(address)` | -| 4 | -| 5 | -| 6 | -| 11 | -| 12 | -| 13 | -| 14 | -| 15 | -| 16 | -| 17 | -| 18 | -| 19 | +| command | function | +| ------- | ----------------------------------------------------------- | +| 0 | `read_unique_identifier(address)` | +| 1 | `read_primary_variable(address)` | +| 2 | `read_loop_current_and_percent(address)` | +| 3 | `read_dynamic_variables_and_loop_current(address)` | +| 6 | `write_polling_address(address, new_short_address)` | +| 11 | `read_unique_identifier_associated_with_tag(tag)` | +| 12 | `read_message(address)` | +| 13 | `read_tag_descriptor_date(address)` | +| 14 | `read_primary_variable_information(address)` | +| 15 | `read_output_information(address)` | +| 16 | `read_final_assembly_number(address)` | +| 17 | `write_message(address, message)` | +| 18 | `write_tag_descriptor_date(address, tag, descriptor, date)` | +| 19 | `write_final_assembly_number(address, number)` | Common-Practice Commands -| command | function | -| ------- | --------------------------------- | -| 33 | -| 38 | -| 40 | -| 42 | -| 44 | -| 45 | -| 46 | -| 48 | -| 50 | -| 51 | -| 53 | -| 54 | -| 59 | - -Maintainers: +| command | function | +| ------- | -------------------------------------------------------- | +| 37 | `set_primary_variable_lower_range_value(address, value)` | +| 38 | `reset_configuration_changed_flag(address)` | +| 42 | `perform_master_reset(address)` | +| 48 | `read_additional_transmitter_status(address)` | +| 50 | `read_dynamic_variable_assignments(address)` | +| 59 | `write_number_of_response_preambles(address, number)` | +| 66 | `toggle_analog_output_mode(address)` | +| 67 | `trim_analog_output_zero(address)` | +| 68 | `trim_analog_output_span(address)` | +| 123 | `select_baud_rate(address, rate)` | + +Arbitrary additional command bytestrings can also be generated as shown below. +This is a device-specific command for Brooks GF40 Mass Flow Controllers, which takes an IEE-754 floating point number as well as a unique code. + +```python +import struct +import hart_protocol +code = 0 +value = 32.1 +data = struct.pack(">Bf", code, value) +command = hart_protocol.pack_command(address=123, command_id=236, data=data) +``` + +## Parsing Responses + +Documentation TODO + +## Integration Example + +Documentation TODO + +## Maintainers - [Blaise Thompson](https://github.com/untzag) diff --git a/hart_protocol/_parsing.py b/hart_protocol/_parsing.py index f678597..e0f37b8 100644 --- a/hart_protocol/_parsing.py +++ b/hart_protocol/_parsing.py @@ -1,35 +1,34 @@ import struct +from typing import MutableMapping, Union - - -def parse(response: bytes) -> dict: - out = dict() - out["full_response"]: bytes = response +def parse(response: bytes) -> MutableMapping[str, Union[int, bytes, str, float]]: + out: MutableMapping[str, Union[int, bytes, str, float]] = dict() + out["full_response"] = response if response[0] & 0x80: # long address - out["address"]: bytes = response[1:6] + out["address"] = int.from_bytes(response[1:6], "big") response = response[6:] else: # short address - out["address"]: bytes = response[1] + out["address"] = response[1] response = response[2:] command, bytecount, status = struct.unpack_from(">BBL", response) out["status"] = status - data = response[4:4+bytecount] - out["command"]: int = command - out["command_name"]: str = f"hart_command_{command}" - out["bytecount"]: int = bytecount - out["data"]: bytes = data + data = response[4 : 4 + bytecount] + out["command"] = command + out["command_name"] = f"hart_command_{command}" + out["bytecount"] = bytecount + out["data"] = data if command in [0, 11]: out["command_name"] = "read_unique_identifier" - out["manufacturer_id"]: bytes = data[1] - out["manufacturer_device_type"]: bytes = data[2] - out["number_response_preamble_characters"]: bytes = data[3] - out["universal_command_revision_level"]: bytes = data[4] - out["transmitter_specific_command_revision_level"]: bytes = data[5] - out["software_revision_level"]: bytes = data[6] - out["hardware_revision_level"]: bytes = data[7] - out["device_id"]: bytes = data[9:12] + out["manufacturer_id"] = data[1] + out["manufacturer_device_type"] = data[2] + out["number_response_preamble_characters"] = data[3] + out["universal_command_revision_level"] = data[4] + out["transmitter_specific_command_revision_level"] = data[5] + out["software_revision_level"] = data[6] + out["hardware_revision_level"] = data[7] + out["device_id"] = data[9:12] if command in [1]: out["command_name"] = "read_primary_variable" - out["primary_variable"] = struct.unpack_from(">f", data) + out["primary_variable"] = struct.unpack_from(">f", data)[0] return out diff --git a/hart_protocol/py.typed b/hart_protocol/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/hart_protocol/tools.py b/hart_protocol/tools.py index 15fbc8d..22f2dc6 100644 --- a/hart_protocol/tools.py +++ b/hart_protocol/tools.py @@ -1,4 +1,5 @@ import math +from typing import Union def calculate_checksum(command: bytes) -> bytes: @@ -35,11 +36,12 @@ def pack_command(address, command_id, data=None): return command -def pack_ascii(string: str) -> bytes: - chars = [c.encode() for c in string] - chars = [ord(c) & 0b0011_1111 for c in chars] +def pack_ascii(string: Union[str, bytes]) -> bytes: + if type(string) == str: + chars = [c.encode() for c in string] # type: ignore + else: + chars = [c for c in string] # type: ignore out = 0 - for i, c in zip(range(8), chars[::-1]): + for i, c in zip(range(8), [ord(c) & 0b0011_1111 for c in chars][::-1]): out |= c << (i * 6) - out = out.to_bytes(math.ceil((len(string) * 6) / 8), "big") - return out + return out.to_bytes(math.ceil((len(string) * 6) / 8), "big") diff --git a/hart_protocol/universal.py b/hart_protocol/universal.py index 6ad06bf..4e66dc1 100644 --- a/hart_protocol/universal.py +++ b/hart_protocol/universal.py @@ -2,12 +2,68 @@ def read_unique_identifier(address: bytes) -> bytes: - return tools.pack_command(address, 0) + return tools.pack_command(address, command_id=0) def read_primary_variable(address: bytes) -> bytes: - return tools.pack_command(address, 1) + return tools.pack_command(address, command_id=1) -def read_unique_identifier_associated_with_tag(tag: bytes, *, address:int=0) -> bytes: - return tools.pack_command(address, 11, tag) +def read_loop_current_and_percent(address: bytes) -> bytes: + return tools.pack_command(address, command_id=2) + + +def read_dynamic_variables_and_loop_current(address: bytes) -> bytes: + return tools.pack_command(address, command_id=3) + + +def write_polling_address(address: bytes, new_polling_address: int) -> bytes: + assert 0 <= new_polling_address <= 15 + return tools.pack_command(address, command_id=6, data=new_polling_address.to_bytes(1, "big")) + + +def read_unique_identifier_associated_with_tag(tag: bytes, *, address: int = 0) -> bytes: + return tools.pack_command(address, command_id=11, data=tag) + + +def read_message(address: bytes) -> bytes: + return tools.pack_command(address, command_id=12) + + +def read_tag_descriptor_date(address: bytes) -> bytes: + return tools.pack_command(address, command_id=13) + + +def read_primary_variable_information(address: bytes) -> bytes: + return tools.pack_command(address, command_id=14) + + +def read_output_information(address: bytes) -> bytes: + return tools.pack_command(address, command_id=15) + + +def read_final_assembly_number(address: bytes) -> bytes: + return tools.pack_command(address, command_id=16) + + +def write_message(address: bytes, message: str) -> bytes: + message = message.ljust(32) + return tools.pack_command(address, command_id=17, data=tools.pack_ascii(message)) + + +def write_tag_descriptor_date(address: bytes, tag: str, descriptor: str, date: tuple): + data = b"" + assert len(tag) <= 8 + data += tools.pack_ascii(tag.ljust(8)) + assert len(descriptor) <= 16 + data += tools.pack_ascii(descriptor.ljust(16)) + day, month, year = date + data += day.to_bytes(1, "big") + data += month.to_bytes(1, "big") + data += year.to_bytes(1, "big") + return tools.pack_command(address, command_id=18, data=data) + + +def write_final_assembly_number(address: bytes, number: int): + data = number.to_bytes(3, "big") + return tools.pack_command(address, command_id=19, data=data)