Skip to content

Commit

Permalink
add universal commands, readme (#1)
Browse files Browse the repository at this point in the history
* add universal commands, readme

* mypy

* ignore

* pre-commit autoupdate
  • Loading branch information
untzag authored Jul 28, 2022
1 parent 3b3cc91 commit f33ceed
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 71 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: check-toml
- id: no-commit-to-branch
args: [-b, master]

- repo: https://github.com/psf/black
rev: 21.6b0
rev: 22.6.0
hooks:
- id: black
name: black
Expand All @@ -17,13 +17,13 @@ repos:
types: [python]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v0.971
hooks:
- id: mypy
exclude: ^docs/conf.py

- repo: https://gitlab.com/yaq/yaq-traits
rev: v2021.4.0
rev: v2022.3.0
hooks:
- id: yaq-traits-check
- id: yaq-traits-compose
Expand Down
113 changes: 77 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,86 @@
[![ver](https://img.shields.io/badge/calver-YYYY.M.MICRO-blue)](https://calver.org/)
[![log](https://img.shields.io/badge/change-log-informational)](https://github.com/yaq-project/hart-protocol/-/blob/main/CHANGELOG.md)

A sans-io Python implementation of the [Highway Adressable Remote Transducer Protocol](https://en.wikipedia.org/wiki/Highway_Addressable_Remote_Transducer_Protocol).
A sans I/O Python implementation of the [Highway Adressable Remote Transducer Protocol](https://en.wikipedia.org/wiki/Highway_Addressable_Remote_Transducer_Protocol).

## Introduction

This Python package contains tooling for encoding and decoding bytestrings for communication with HART peripherals.
HART has been implemented using a variety of transport layers---Bell 202, RS485, Ethernet, etc.
In persuit of simplicity and reusability, this package does not contain any interface capabilities.
Use something like [pySerial](https://pyserial.readthedocs.io) for transport.
Read the [sans I/O manifesto](https://sans-io.readthedocs.io/) for more motivation regarding this design pattern.

Briefly, HART is an open protocol for industrial automation supported by multiple device manufacturers.
HART has a concept of "address", so that many peripherals can share the same communication channel.
HART has limited support for multiple controllers, and generic handheld controllers exist.
HART peripherals respond to numbered commands, which can be thought of as primative [remote procedure calls](https://en.wikipedia.org/wiki/Remote_procedure_call).
The standard specifies a number of universal commands which should be supported by any peripheral, and there are also so-called "common" commands which many peripherals implement.
It's strongly recommended that you check the documentation of your own peripheral---implementations may be inconsistent.
In addition to universal and common commands, it's likely that your peripheral implements many device-specific commands.

This package aims to have complete and accurate support for all universal and common commands.
In addition, this package has tooling for packing and unpacking generic command data for device-specific commands.
This package is intentionally simple and narrowly scoped.
There is no documentation beyond this README.
Please open an issue or PR to the GitHub repository if you find any errors or missing functionality.

## Sending Commands

The following functions return bytestrings that can be fed to your transport layer.

Universal Commands
| command | function |
| ------- | --------------------------------- |
| 0 | `read_unique_identifier(address)` |
| 1 | `read_primary_variable(address)` |
| 2 | `read_loop_current_and_percent(address)` |
| 3 | `read_dynamic_variables_and_loop_current(address)` |
| 4 |
| 5 |
| 6 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| command | function |
| ------- | ----------------------------------------------------------- |
| 0 | `read_unique_identifier(address)` |
| 1 | `read_primary_variable(address)` |
| 2 | `read_loop_current_and_percent(address)` |
| 3 | `read_dynamic_variables_and_loop_current(address)` |
| 6 | `write_polling_address(address, new_short_address)` |
| 11 | `read_unique_identifier_associated_with_tag(tag)` |
| 12 | `read_message(address)` |
| 13 | `read_tag_descriptor_date(address)` |
| 14 | `read_primary_variable_information(address)` |
| 15 | `read_output_information(address)` |
| 16 | `read_final_assembly_number(address)` |
| 17 | `write_message(address, message)` |
| 18 | `write_tag_descriptor_date(address, tag, descriptor, date)` |
| 19 | `write_final_assembly_number(address, number)` |

Common-Practice Commands
| command | function |
| ------- | --------------------------------- |
| 33 |
| 38 |
| 40 |
| 42 |
| 44 |
| 45 |
| 46 |
| 48 |
| 50 |
| 51 |
| 53 |
| 54 |
| 59 |

Maintainers:
| command | function |
| ------- | -------------------------------------------------------- |
| 37 | `set_primary_variable_lower_range_value(address, value)` |
| 38 | `reset_configuration_changed_flag(address)` |
| 42 | `perform_master_reset(address)` |
| 48 | `read_additional_transmitter_status(address)` |
| 50 | `read_dynamic_variable_assignments(address)` |
| 59 | `write_number_of_response_preambles(address, number)` |
| 66 | `toggle_analog_output_mode(address)` |
| 67 | `trim_analog_output_zero(address)` |
| 68 | `trim_analog_output_span(address)` |
| 123 | `select_baud_rate(address, rate)` |

Arbitrary additional command bytestrings can also be generated as shown below.
This is a device-specific command for Brooks GF40 Mass Flow Controllers, which takes an IEE-754 floating point number as well as a unique code.

```python
import struct
import hart_protocol
code = 0
value = 32.1
data = struct.pack(">Bf", code, value)
command = hart_protocol.pack_command(address=123, command_id=236, data=data)
```

## Parsing Responses

Documentation TODO

## Integration Example

Documentation TODO

## Maintainers

- [Blaise Thompson](https://github.com/untzag)
41 changes: 20 additions & 21 deletions hart_protocol/_parsing.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
import struct
from typing import MutableMapping, Union




def parse(response: bytes) -> dict:
out = dict()
out["full_response"]: bytes = response
def parse(response: bytes) -> MutableMapping[str, Union[int, bytes, str, float]]:
out: MutableMapping[str, Union[int, bytes, str, float]] = dict()
out["full_response"] = response
if response[0] & 0x80: # long address
out["address"]: bytes = response[1:6]
out["address"] = int.from_bytes(response[1:6], "big")
response = response[6:]
else: # short address
out["address"]: bytes = response[1]
out["address"] = response[1]
response = response[2:]
command, bytecount, status = struct.unpack_from(">BBL", response)
out["status"] = status
data = response[4:4+bytecount]
out["command"]: int = command
out["command_name"]: str = f"hart_command_{command}"
out["bytecount"]: int = bytecount
out["data"]: bytes = data
data = response[4 : 4 + bytecount]
out["command"] = command
out["command_name"] = f"hart_command_{command}"
out["bytecount"] = bytecount
out["data"] = data
if command in [0, 11]:
out["command_name"] = "read_unique_identifier"
out["manufacturer_id"]: bytes = data[1]
out["manufacturer_device_type"]: bytes = data[2]
out["number_response_preamble_characters"]: bytes = data[3]
out["universal_command_revision_level"]: bytes = data[4]
out["transmitter_specific_command_revision_level"]: bytes = data[5]
out["software_revision_level"]: bytes = data[6]
out["hardware_revision_level"]: bytes = data[7]
out["device_id"]: bytes = data[9:12]
out["manufacturer_id"] = data[1]
out["manufacturer_device_type"] = data[2]
out["number_response_preamble_characters"] = data[3]
out["universal_command_revision_level"] = data[4]
out["transmitter_specific_command_revision_level"] = data[5]
out["software_revision_level"] = data[6]
out["hardware_revision_level"] = data[7]
out["device_id"] = data[9:12]
if command in [1]:
out["command_name"] = "read_primary_variable"
out["primary_variable"] = struct.unpack_from(">f", data)
out["primary_variable"] = struct.unpack_from(">f", data)[0]
return out
Empty file added hart_protocol/py.typed
Empty file.
14 changes: 8 additions & 6 deletions hart_protocol/tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
from typing import Union


def calculate_checksum(command: bytes) -> bytes:
Expand Down Expand Up @@ -35,11 +36,12 @@ def pack_command(address, command_id, data=None):
return command


def pack_ascii(string: str) -> bytes:
chars = [c.encode() for c in string]
chars = [ord(c) & 0b0011_1111 for c in chars]
def pack_ascii(string: Union[str, bytes]) -> bytes:
if type(string) == str:
chars = [c.encode() for c in string] # type: ignore
else:
chars = [c for c in string] # type: ignore
out = 0
for i, c in zip(range(8), chars[::-1]):
for i, c in zip(range(8), [ord(c) & 0b0011_1111 for c in chars][::-1]):
out |= c << (i * 6)
out = out.to_bytes(math.ceil((len(string) * 6) / 8), "big")
return out
return out.to_bytes(math.ceil((len(string) * 6) / 8), "big")
64 changes: 60 additions & 4 deletions hart_protocol/universal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,68 @@


def read_unique_identifier(address: bytes) -> bytes:
return tools.pack_command(address, 0)
return tools.pack_command(address, command_id=0)


def read_primary_variable(address: bytes) -> bytes:
return tools.pack_command(address, 1)
return tools.pack_command(address, command_id=1)


def read_unique_identifier_associated_with_tag(tag: bytes, *, address:int=0) -> bytes:
return tools.pack_command(address, 11, tag)
def read_loop_current_and_percent(address: bytes) -> bytes:
return tools.pack_command(address, command_id=2)


def read_dynamic_variables_and_loop_current(address: bytes) -> bytes:
return tools.pack_command(address, command_id=3)


def write_polling_address(address: bytes, new_polling_address: int) -> bytes:
assert 0 <= new_polling_address <= 15
return tools.pack_command(address, command_id=6, data=new_polling_address.to_bytes(1, "big"))


def read_unique_identifier_associated_with_tag(tag: bytes, *, address: int = 0) -> bytes:
return tools.pack_command(address, command_id=11, data=tag)


def read_message(address: bytes) -> bytes:
return tools.pack_command(address, command_id=12)


def read_tag_descriptor_date(address: bytes) -> bytes:
return tools.pack_command(address, command_id=13)


def read_primary_variable_information(address: bytes) -> bytes:
return tools.pack_command(address, command_id=14)


def read_output_information(address: bytes) -> bytes:
return tools.pack_command(address, command_id=15)


def read_final_assembly_number(address: bytes) -> bytes:
return tools.pack_command(address, command_id=16)


def write_message(address: bytes, message: str) -> bytes:
message = message.ljust(32)
return tools.pack_command(address, command_id=17, data=tools.pack_ascii(message))


def write_tag_descriptor_date(address: bytes, tag: str, descriptor: str, date: tuple):
data = b""
assert len(tag) <= 8
data += tools.pack_ascii(tag.ljust(8))
assert len(descriptor) <= 16
data += tools.pack_ascii(descriptor.ljust(16))
day, month, year = date
data += day.to_bytes(1, "big")
data += month.to_bytes(1, "big")
data += year.to_bytes(1, "big")
return tools.pack_command(address, command_id=18, data=data)


def write_final_assembly_number(address: bytes, number: int):
data = number.to_bytes(3, "big")
return tools.pack_command(address, command_id=19, data=data)

0 comments on commit f33ceed

Please sign in to comment.