Skip to content

Commit

Permalink
Merge pull request #1001 from quartiq/py-stream
Browse files Browse the repository at this point in the history
Py-stream
  • Loading branch information
jordens authored Jan 21, 2025
2 parents 53f84e9 + 47e9b64 commit 69ab275
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 54 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

### [UNRELEASED](https://github.com/quartiq/stabilizer/compare/v0.11.0...HEAD) - DATE

### Added

* Support for exponentially swept sine signal source

### Changed

* `py`: `StabilizerStream` renamed to `Stream`

## [v0.11.0](https://github.com/quartiq/stabilizer/compare/v0.10.0...v0.11.0) - 2024-12-02

### Added
Expand Down
36 changes: 17 additions & 19 deletions hitl/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os

import miniconf
from stabilizer.stream import measure, StabilizerStream, get_local_ip
from stabilizer.stream import measure, Stream, get_local_ip

logger = logging.getLogger(__name__)

Expand All @@ -22,12 +22,13 @@
async def _main():
parser = argparse.ArgumentParser(description="Stabilizer Stream HITL test")
parser.add_argument(
"prefix", type=str, nargs="?", help="The MQTT topic prefix of the target"
"prefix",
help="The MQTT topic prefix of the target",
)
parser.add_argument(
"--broker", "-b", default="mqtt", type=str, help="The MQTT broker address"
)
parser.add_argument("--ip", default="0.0.0.0", help="The IP address to listen on")
parser.add_argument("--addr", default="0.0.0.0", help="The IP address to listen on")
parser.add_argument(
"--port", type=int, default=9293, help="Local port to listen on"
)
Expand All @@ -36,39 +37,36 @@ async def _main():
"--max-loss", type=float, default=5e-2, help="Maximum loss for success"
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)

async with miniconf.Client(
args.broker,
protocol=miniconf.MQTTv5,
logger=logging.getLogger("aiomqtt-client"),
) as client:
prefix = args.prefix
if not args.prefix:
prefix, _alive = miniconf.one(
await miniconf.discover(client, "dt/sinara/dual-iir/+")
)

logging.basicConfig(level=logging.INFO)

prefix, _alive = miniconf.one(await miniconf.discover(client, args.prefix))
conf = miniconf.Miniconf(client, prefix)

if ipaddress.ip_address(args.ip).is_unspecified:
args.ip = get_local_ip(args.broker)
if ipaddress.ip_address(args.addr).is_unspecified:
args.addr = get_local_ip(args.broker)
if ipaddress.ip_address(args.addr).is_multicast:
local = get_local_ip(args.broker)
else:
local = "0.0.0.0"

logger.info("Starting stream")
await conf.set("/stream", f"{args.ip}:{args.port}", retain=False)
await conf.set("/stream", f"{args.addr}:{args.port}")

try:
logger.info("Testing stream reception")
_transport, stream = await StabilizerStream.open(
args.port, args.ip, args.broker
_transport, stream = await Stream.open(
args.port, addr=args.addr, local=local
)
logger.info("Testing stream reception")
loss = await measure(stream, args.duration)
if loss > args.max_loss:
raise RuntimeError("High frame loss", loss)
finally:
logger.info("Stopping stream")
await conf.set("/stream", "0.0.0.0:0", retain=False)
await conf.set("/stream", "0.0.0.0:0")

logger.info("Draining queue")
await asyncio.sleep(0.1)
Expand Down
105 changes: 70 additions & 35 deletions py/stabilizer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_local_ip(remote):
Returns a list of four octets."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect((remote, 1883))
sock.connect((remote, 9)) # discard
return sock.getsockname()[0]
finally:
sock.close()
Expand Down Expand Up @@ -79,6 +79,26 @@ def to_traces(self):
]


class ThermostatEem:
"""Thermostat-EEM format"""

format_id = 3

def __init__(self, header, body):
self.header = header
self.body = body

def size(self):
"""Return the data size of the frame in bytes"""
return len(self.body)

def to_si(self):
"""Return the parsed data in SI units"""
return np.frombuffer(
self.body, np.dtype([("input", "<f4", (4, 4)), ("output", "<f4", (4,))])
)


class Frame:
"""Stream frame constisting of a header and multiple data batches"""

Expand All @@ -88,6 +108,7 @@ class Frame:
header = namedtuple("Header", "magic format_id batches sequence")
parsers = {
AdcDac.format_id: AdcDac,
ThermostatEem.format_id: ThermostatEem,
}

@classmethod
Expand All @@ -103,38 +124,42 @@ def parse(cls, data):
return parser(header, data[cls.header_fmt.size :])


class StabilizerStream(asyncio.DatagramProtocol):
class Stream(asyncio.DatagramProtocol):
"""Stabilizer streaming receiver protocol"""

@classmethod
async def open(cls, port=9293, addr="0.0.0.0", broker=None, maxsize=1):
async def open(cls, port=9293, addr="0.0.0.0", local="0.0.0.0", maxsize=1):
"""Open a UDP socket and start receiving frames"""
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Increase the OS UDP receive buffer size to 4 MiB so that latency
# spikes don't impact much. Achieving 4 MiB may require increasing
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except NameError:
pass # Windows
# Increase the OS UDP receive buffer size so that latency
# spikes don't impact much. Achieving this may require increasing
# the max allowed buffer size, e.g. via
# `sudo sysctl net.core.rmem_max=26214400` but nowadays the default
# max appears to be ~ 50 MiB already.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20)

# We need to specify which interface to receive broadcasts from, or Windows may choose the
# wrong one. Thus, use the broker address to figure out our local address for the interface
# of interest.
# max appears to be ~ 50 MiB already, at least on Linux.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8 << 20)
# We need to specify which interface to receive multicasts from, or Windows may choose the
# wrong one. Thus, use a bind address to figure out our local address for the interface
# of interest. There's also an interface index, at least on linux, but apparently windows
# sockets don't do that.
if ipaddress.ip_address(addr).is_multicast:
group = socket.inet_aton(addr)
iface = socket.inet_aton(get_local_ip(broker))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface)
sock.bind(("", port))
else:
sock.bind((addr, port))

transport, protocol = await loop.create_datagram_endpoint(
lambda: cls(maxsize), sock=sock
multiaddr = socket.inet_aton(addr)
local = socket.inet_aton(local)
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
multiaddr + local,
)
sock.bind((addr, port))
return await loop.create_datagram_endpoint(
lambda: cls(maxsize),
sock=sock,
)
return transport, protocol

def __init__(self, maxsize):
self.queue = asyncio.Queue(maxsize)
Expand Down Expand Up @@ -177,8 +202,6 @@ async def _record():
stat.received += frame.header.batches
stat.expect = wrap(frame.header.sequence + frame.header.batches)
stat.bytes += frame.size()
# test conversion
# frame.to_si()

try:
await asyncio.wait_for(_record(), timeout=duration)
Expand All @@ -190,10 +213,7 @@ async def _record():
)

sent = stat.received + stat.lost
if sent:
loss = stat.lost / sent
else:
loss = 1
loss = stat.lost / sent if sent else 1
logger.info("Loss: %s/%s batches (%g %%)", stat.lost, sent, loss * 1e2)
return loss

Expand All @@ -202,17 +222,32 @@ async def main():
"""Test CLI"""
parser = argparse.ArgumentParser(description="Stabilizer streaming demo")
parser.add_argument(
"--port", type=int, default=9293, help="Local port to listen on"
"--port", type=int, default=9293, help="Local port to listen on [%(default)s]"
)
parser.add_argument(
"--host", default="0.0.0.0", help="Local address to listen on [%(default)s]"
)
parser.add_argument(
"--local",
default="0.0.0.0",
help="The local IP address to receive multicast frames on [%(default)s]",
)
parser.add_argument(
"--broker", help="The MQTT broker address for local IP lookup [%(default)s]"
)
parser.add_argument(
"--maxsize", type=int, default=1, help="Frame queue size [%(default)s]"
)
parser.add_argument(
"--duration", type=float, default=1.0, help="Test duration [%(default)s]"
)
parser.add_argument("--host", default="0.0.0.0", help="Local address to listen on")
parser.add_argument("--broker", default="mqtt", help="The MQTT broker address")
parser.add_argument("--maxsize", type=int, default=1, help="Frame queue size")
parser.add_argument("--duration", type=float, default=1.0, help="Test duration")
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
_transport, stream = await StabilizerStream.open(
args.port, args.host, args.broker, args.maxsize
if args.broker is not None:
args.local = get_local_ip(args.broker)
_transport, stream = await Stream.open(
args.port, args.host, args.local, args.maxsize
)
await measure(stream, args.duration)

Expand Down

0 comments on commit 69ab275

Please sign in to comment.