Skip to content

Commit

Permalink
feat!: add dsp register readout, use click
Browse files Browse the repository at this point in the history
  • Loading branch information
elagil committed Oct 20, 2022
1 parent 4b51921 commit 41b2fc3
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 391 deletions.
8 changes: 8 additions & 0 deletions schemas/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ message LoadParameters {
repeated string content = 1;
}

message Register {
uint32 address = 1;
uint32 length = 2;
repeated bytes data = 3;
}

message ControlRequest {
oneof command {
bool reset_dsp = 1;
bool hard_reset_dsp = 2;
LoadParameters load_parameters = 3;
Register read_register = 4;
Register write_register = 5;
}
}

Expand Down
10 changes: 10 additions & 0 deletions sigmadsp/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ def control(self, request: ControlRequest, context):
response.message = "Safety check failed, parameters cannot be adjusted."
response.success = False

elif "read_register" == command:
value: bytes = self.dsp.read(request.read_register.address, request.read_register.length)

response.message = f"0x{value.hex()}"
response.success = True

else:
response.message = f"Unknown command '{command}'."
response.success = False

return response


Expand Down
224 changes: 133 additions & 91 deletions sigmadsp/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
It can control the backend via the command line.
"""
import argparse
from __future__ import annotations

import logging
from io import TextIOWrapper
from ipaddress import IPv4Address
from typing import Union

import click
import grpc

from sigmadsp.generated.backend_service.control_pb2 import (
Expand All @@ -16,117 +20,155 @@
from sigmadsp.generated.backend_service.control_pb2_grpc import BackendStub


def main():
"""The main frontend command-line application, which controls the SigmaDSP backend."""
logging.basicConfig(level=logging.INFO)

argument_parser = argparse.ArgumentParser()

argument_parser.add_argument(
"-p",
"--port",
required=False,
type=int,
help="Set the port, on which the backend listens for requests.",
)
class Channel:
"""Opens a channel to and handles control of the backend."""

stub = BackendStub

def __init__(self, backend_address: IPv4Address, backend_port: int):
"""Initialize the channel.
Args:
backend_address (ip_address): The IP address of the backend.
backend_port (int): The backend port.
"""
self.address = f"{backend_address}:{backend_port}"

def _emit(self, request: ControlRequest | ControlParameterRequest):
"""Emits a request towards the backend.
During emission, the channel is opened.
Args:
request (ControlRequest | ControlParameterRequest): The request to emit.
"""
with grpc.insecure_channel(self.address) as channel:
stub = BackendStub(channel)
response: Union[ControlResponse, None] = None

argument_parser.add_argument(
"-a",
"--address",
required=False,
type=str,
help="Set the IP address, on which the backend listens for requests.",
)
if isinstance(request, ControlRequest):
response = stub.control(request)

argument_parser.add_argument(
"-av",
"--adjust_volume",
required=False,
type=float,
help="Adjust the volume by a certain value in dB (positive or negative).",
)
elif isinstance(request, ControlParameterRequest):
response = stub.control_parameter(request)

argument_parser.add_argument(
"-sv",
"--set_volume",
required=False,
type=float,
help="Sets the volume to a certain value in dB (zero or lower).",
)
if response is not None:
logging.info(response and response.message)

argument_parser.add_argument(
"-r",
"--reset",
required=False,
help="Soft-reset the DSP.",
action="store_true",
)
def read_register(self, address: int, length: int):
"""Read a DSP register.
argument_parser.add_argument(
"-R",
"--hard-reset",
required=False,
help="hard-reset the DSP.",
action="store_true",
)
Args:
address (int): The address to read from.
length (int): The length of data to read in bytes.
"""
request = ControlRequest()
request.read_register.address = address
request.read_register.length = length
self._emit(request)

argument_parser.add_argument(
"-lp",
"--load_parameters",
required=False,
help="Load new parameter file",
)
def change_volume(self, volume: float, relative: bool):
"""Change the DSP volume by a certain amount.
arguments = argument_parser.parse_args()
Args:
volume (float): The volume change in dB.
relative (bool): If True, performs a relative change. If False, sets the provided volume.
"""
request = ControlParameterRequest()
request.change_volume.name_tokens[:] = ["main"]
request.change_volume.value = volume
request.change_volume.relative = relative
self._emit(request)

backend_port = 50051
backend_address = "localhost"
def reset(self, hard: bool):
"""Reset the DSP.
if arguments.port is not None:
backend_port = arguments.port
Args:
hard (bool): If True, performs a hard reset. If False, performs a soft reset.
"""
request = ControlRequest()

if arguments.address is not None:
backend_address = arguments.address
if hard:
request.hard_reset_dsp = True

response: Union[ControlResponse, None] = None
control_request = ControlRequest()
control_parameter_request = ControlParameterRequest()
else:
request.reset_dsp = True

self._emit(request)

def load_parameters(self, file: TextIOWrapper):
"""Load a parameter file to the backend.
Args:
file (TextIOWrapper): The file to load.
"""
request = ControlRequest()
request.load_parameters.content[:] = file.readlines()
self._emit(request)


@click.group()
@click.pass_context
@click.option(
"--port",
default=50051,
show_default=True,
type=int,
help="Set the port, on which the backend listens for requests.",
)
@click.option(
"--ip",
default=IPv4Address("127.0.0.1"),
show_default=True,
type=IPv4Address,
help="Set the IP address, on which the backend listens for requests.",
)
def sigmadsp(ctx: click.Context, port: int, ip: IPv4Address):
"""Command-line tool for controlling the sigmadsp-backend."""
logging.basicConfig(level=logging.DEBUG)
ctx.obj = Channel(ip, port)

with grpc.insecure_channel(f"{backend_address}:{backend_port}") as channel:
stub = BackendStub(channel)

if arguments.adjust_volume is not None:
control_parameter_request.change_volume.name_tokens[:] = ["main"]
control_parameter_request.change_volume.value = arguments.adjust_volume
control_parameter_request.change_volume.relative = True
@sigmadsp.command(context_settings={"ignore_unknown_options": True})
@click.pass_obj
@click.argument("file", type=click.File("r"))
def load_parameters(channel: Channel, file: TextIOWrapper):
"""Load a parameter file."""
channel.load_parameters(file)

response = stub.control_parameter(control_parameter_request)

if arguments.set_volume is not None:
control_parameter_request.change_volume.name_tokens[:] = ["main"]
control_parameter_request.change_volume.value = arguments.set_volume
control_parameter_request.change_volume.relative = False
@sigmadsp.command(context_settings={"ignore_unknown_options": True})
@click.pass_obj
@click.argument("volume", type=float)
def set_volume(channel: Channel, volume: float):
"""Sets the volume to a certain value in dB."""
channel.change_volume(volume, False)

response = stub.control_parameter(control_parameter_request)

if arguments.load_parameters is not None:
with open(arguments.load_parameters, "r", encoding="utf8") as parameter_file:
control_request.load_parameters.content[:] = parameter_file.readlines()
@sigmadsp.command(context_settings={"ignore_unknown_options": True})
@click.pass_obj
@click.argument("change", type=float)
def change_volume(channel: Channel, change: float):
"""Changes the volume by a certain amount in dB."""
channel.change_volume(change, True)

response = stub.control(control_request)

if arguments.reset is True:
control_request.reset_dsp = True
@sigmadsp.command()
@click.pass_obj
@click.option("--hard", is_flag=True)
def reset(channel: Channel, hard: bool):
"""Resets the DSP."""
channel.reset(hard)

response = stub.control(control_request)

if arguments.hard_reset is True:
control_request.hard_reset_dsp = True

response = stub.control(control_request)

logging.info(response and response.message)
@sigmadsp.command()
@click.pass_obj
@click.argument("address", type=int)
@click.argument("length", type=int)
def read_register(channel: Channel, address: int, length: int):
"""Reads a DSP register."""
channel.read_register(address, length)


if __name__ == "__main__":
main()
sigmadsp() # pylint: disable=no-value-for-parameter
Loading

0 comments on commit 41b2fc3

Please sign in to comment.