Skip to content

Commit

Permalink
unpacker functionality (#3)
Browse files Browse the repository at this point in the history
* unpacker functionality

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* document responses

* mypy

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
untzag and pre-commit-ci[bot] authored Jul 29, 2022
1 parent cc1ac34 commit 9a9d09d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 78 deletions.
69 changes: 67 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,76 @@ command = hart_protocol.pack_command(address=123, command_id=236, data=data)

## Parsing Responses

Documentation TODO
All responses are parsed into named tuples.
Every single response will have the following keys.

Generic Response
| key | value |
| --------------- | --------- |
| `address` | `<int>` |
| `bytecount` | `<int>` |
| `command` | `<int>` |
| `command_name` | `<str>` |
| `data` | `<bytes>` |
| `full_response` | `<bytes>` |
| `status` | `<int>` |

You can parse the raw `data` according to the particulars of your peripheral.
Certain standard responses are parsed further as shown below.

Response 0
| key | value |
| --------------------------------------------- | -------------------------- |
| `command_name` | `"read_unique_identifier"` |
| `command` | `0` |
| `device_id` | `<bytes>` |
| `hardware_revision_level` | `<int>` |
| `manufacturer_device_type` | `<bytes>` |
| `manufacturer_id` | `<int>` |
| `number_response_preamble_charachters` | `<int>` |
| `software_revision_level` | `<int>` |
| `transmitter_specific_command_revision_level` | `<int>` |
| `universal_command_revision_level | `<int>` |

Response 1
| key | value |
| ------------------ | ------------------------- |
| `command_name` | `"read_primary_variable"` |
| `command` | `1` |
| `primary_variable` | `<float>` |

Response 11
| key | value |
| --------------------------------------------- | -------------------------- |
| `command_name` | `"read_unique_identifier"` |
| `command` | `11` |
| `device_id` | `<bytes>` |
| `hardware_revision_level` | `<int>` |
| `manufacturer_device_type` | `<bytes>` |
| `manufacturer_id` | `<int>` |
| `number_response_preamble_charachters` | `<int>` |
| `software_revision_level` | `<int>` |
| `transmitter_specific_command_revision_level` | `<int>` |
| `universal_command_revision_level | `<int>` |

## Integration Example

Documentation TODO
```python
>>> import hart_protocol
>>> import serial
>>>
>>> port = serial.Serial("/dev/ttyUSB0", 19200, timeout=0.1)
>>> port.parity = "O"
>>> port.stopbits = 1
>>> tag = hart_protocol.tools.pack_ascii("06C22300517"[-8:])
>>> port.write(hart_protocol.universal.read_unique_identifier_associated_with_tag(tag))
>>>
>>> unpacker = apt.Unpacker(port)
>>> for msg in unpacker:
... print(msg)
...
>>>
```

## Maintainers

Expand Down
119 changes: 46 additions & 73 deletions hart_protocol/_unpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import warnings

from ._parsing import parse
from . import tools


class Unpacker:
"""
Create an Unpacker to decode a byte stream into Thorlabs APT protocol messages.
Create an Unpacker to decode a byte stream into HART protocol messages.
The ``file_like`` parameter should be an object which data can be sourced from.
It should support the ``read()`` method.
Expand Down Expand Up @@ -52,80 +53,52 @@ def _decoding_error(self, message="Error decoding message from buffer."):
self.buf = self.buf[1:]

def __next__(self):
# Basic message packet is 6 bytes, try to fill buffer to at least that size
if len(self.buf) < 6:
self.buf += self._file.read(6 - len(self.buf))
# Hopefully enough data in buffer now to try to decode a message
while len(self.buf) >= 6:
# Look at first two bytes and ensure they look like a message ID we recognise
msgid, length = struct.unpack_from("<HH", self.buf)
if msgid not in id_to_func:
self._decoding_error(f"Invalid message with id={msgid:#06x}")
continue
# Looks like a message, now check the source and destination locations
long_form = self.buf[4] & 0x80 # Check MSB of byte 4 for "long form" flag
dest = self.buf[4] & ~0x80 # Destination is remaining lower bits
source = self.buf[5]
# Destination should be the Host, source should be a recognised controller ID
if not (
dest in (0x00, 0x01)
and source
in (
0x00,
0x11,
0x21,
0x22,
0x23,
0x24,
0x25,
0x26,
0x27,
0x28,
0x29,
0x2A,
0x50,
)
):
self._decoding_error(
"Invalid source or destination for message with id="
f"{msgid:#06x}, src={source:#04x}, dest={dest:#04x}"
)
continue
# Message ID, source and dest seem legit, now check long form length
if long_form:
# A bad or malicious packet could make us try to read up to 65 kB...
# Documentation says "currently no datapacket exceeds 255 bytes in length"
if length > 255:
self._decoding_error(
f"Invalid length={length} for message with "
f"id={msgid:#06x}, src={source:#04x}, dest={dest:#04x}"
)
continue
# must work with at least two bytes to start with
while len(self.buf) < 2:
self.buf += self._file.read(1)
# keep reading until we find a minimum preamble
while self.buf[:2] != b"\xFF\xFF":
self.buf += self._file.read(1)
self._decoding_error("Head of buffer not recognized as valid preamble")
# now we are within the preamble, seek forward for start charachter
while True:
if len(self.buf) < 2:
self.buf += self._file.read(1)
if self.buf[0] == 0xFF:
self.buf = self.buf[1:]
elif self.buf[0] in [0x06, 0x86]:
break
else:
# Length field is actually two parameters in short form messages
length = 0
# Either short form message, or long form message of reasonable size
# Looks good! Break from loop and proceed
break
# If we got here, either the buffer was/shrank too small,
# or we have the start of something that looks like a valid message
if len(self.buf) < 6:
# Not enough data to form a message packet
raise StopIteration
# Buffer contains enough for a short message, but maybe not a long form one
if len(self.buf) < length + 6:
# Not enough data in buffer to decode long form message, attempt to read some more data
self.buf += self._file.read(length - len(self.buf) + 6)
if len(self.buf) < length + 6:
# Still didn't receive enough data to decode message
self._decoding_error("Preamble did not end with start charachter.")
raise StopIteration
# Have enough data in buffer to decode the full message
data = self.buf[: length + 6]
# Can now remove the message data from the buffer
self.buf = self.buf[length + 6 :]
# Decode the message contents
dict_ = id_to_func[msgid](data)
return namedtuple(dict_["msg"], dict_.keys())(**dict_)
# now the head of our buffer is the start charachter
# we will read all the way through status
if self.buf[0] & 0x80:
l = 10
else:
l = 6
while len(self.buf) < l:
self.buf += self._file.read(1)
# now we can use the bytecount to read through the data and checksum
bytecount = self.buf[l - 3]
response_length = l + bytecount - 1
while len(self.buf) < response_length:
self.buf += self._file.read(1)
# checksum
checksum = int.from_bytes(tools.calculate_checksum(self.buf[: response_length - 1]), "big")
if checksum != self.buf[response_length - 1]:
self._decoding_error("Invalid checksum.")
raise StopIteration
# parse
response = self.buf[: response_length + 1]
dict_ = parse(response)
# clear buffer
if len(self.buf) == response_length:
self.buf = b""
else:
self.buf = self.buf[response_length + 1 :]
# return
return namedtuple(dict_["command_name"], dict_.keys())(**dict_)

def __aiter__(self):
return self
Expand Down
7 changes: 4 additions & 3 deletions hart_protocol/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from typing import Union


def calculate_checksum(command: bytes) -> bytes:
def calculate_checksum(command: Union[int, bytes]) -> bytes:
if type(command) == int:
command = command.to_bytes(64, "big") # type: ignore
lrc = 0
for byte in command:
for byte in command: # type: ignore
lrc ^= byte
out = lrc.to_bytes(1, "big")
return out
Expand Down Expand Up @@ -32,7 +34,6 @@ def pack_command(address, command_id, data=None):
command += len(data).to_bytes(1, "big") # byte count
command += data # data
command += calculate_checksum(command[5:])
print(command)
return command


Expand Down

0 comments on commit 9a9d09d

Please sign in to comment.