Skip to content

Commit

Permalink
rewriting hops/libto improve reliability (incomplete)
Browse files Browse the repository at this point in the history
  • Loading branch information
waltermwaniki committed Feb 5, 2025
1 parent 39c7b69 commit dfea9f0
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 107 deletions.
42 changes: 40 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,46 @@ include = ["coherent_lasers*"]
namespaces = false


[tool.setuptools.dynamic]
version = { attr = "coherent_lasers.__version__" }
[tool.ruff]
line-length = 120
show-fixes = true

target-version = "py311"

exclude = [
".bzr",
".direnv",
".eggs",
".git",
".gitignore",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
"sdk",
]

[tool.ruff.lint]
extend-select = ["C4", "SIM", "TCH"]
fixable = ["ALL"]

[tool.black]
line-length = 120
Expand Down
36 changes: 8 additions & 28 deletions src/coherent_lasers/app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pydantic import BaseModel

from coherent_lasers.genesis_mx.driver import GenesisMX, MockGenesisMX
from coherent_lasers.hops.lib import get_hops_manager, reset_hops_manager

GENESIS_MX_HEADTYPES = {"MiniX", "Mini00"}

Expand Down Expand Up @@ -69,9 +68,7 @@ class GenesisMXModel(BaseModel):
flags: GenesisMXFlags


MessageData: TypeAlias = (
dict[str, GenesisMXSignals] | dict[str, GenesisMXPower] | dict[str, GenesisMXFlags]
)
MessageData: TypeAlias = dict[str, GenesisMXSignals] | dict[str, GenesisMXPower] | dict[str, GenesisMXFlags]


class BaseMessage(BaseModel):
Expand Down Expand Up @@ -161,9 +158,7 @@ async def set_laser_power(serial: str, value: float):
raise HTTPException(status_code=404, detail="Laser not found")
laser = self.lasers[serial]
laser.power_mw = value
print(
f"Set power of laser {serial} to {value} mW. laser.power_mw: {laser.power_mw}"
)
print(f"Set power of laser {serial} to {value} mW. laser.power_mw: {laser.power_mw}")
# await self.broadcast_power()

@self.put("/enable")
Expand All @@ -172,9 +167,7 @@ async def enable_laser(serial: str):
if serial not in self.lasers:
raise HTTPException(status_code=404, detail="Laser not found")
self.lasers[serial].enable()
print(
f"Enabled laser {serial}. Software switch: {self.lasers[serial].software_switch}"
)
print(f"Enabled laser {serial}. Software switch: {self.lasers[serial].software_switch}")
await self.broadcast_flags()

@self.put("/disable")
Expand All @@ -183,9 +176,7 @@ async def disable_laser(serial: str):
if serial not in self.lasers:
raise HTTPException(status_code=404, detail="Laser not found")
self.lasers[serial].disable()
print(
f"Disabled laser {serial}. Software switch: {self.lasers[serial].software_switch}"
)
print(f"Disabled laser {serial}. Software switch: {self.lasers[serial].software_switch}")
await self.broadcast_flags()

@self.put("/remote")
Expand Down Expand Up @@ -229,9 +220,7 @@ async def broadcast_signals(self):
async def broadcast_flags(self):
await self.broadcast_items("flags", self._get_all_laser_flags())

async def scheduled_broadcast(
self, msg_type: str, data: MessageData, interval: float
):
async def scheduled_broadcast(self, msg_type: str, data: MessageData, interval: float):
while True:
try:
await self.broadcast_items(msg_type, data)
Expand Down Expand Up @@ -337,19 +326,10 @@ def _get_laser_power(
)

def _get_all_laser_powers(self) -> dict[str, GenesisMXPower]:
return {
serial: self._get_laser_power(laser)
for serial, laser in self.lasers.items()
}
return {serial: self._get_laser_power(laser) for serial, laser in self.lasers.items()}

def _get_all_laser_signals(self) -> dict[str, GenesisMXSignals]:
return {
serial: self._get_laser_signals(laser)
for serial, laser in self.lasers.items()
}
return {serial: self._get_laser_signals(laser) for serial, laser in self.lasers.items()}

def _get_all_laser_flags(self) -> dict[str, GenesisMXFlags]:
return {
serial: self._get_laser_flags(laser)
for serial, laser in self.lasers.items()
}
return {serial: self._get_laser_flags(laser) for serial, laser in self.lasers.items()}
45 changes: 13 additions & 32 deletions src/coherent_lasers/app/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import logging
from coherent_lasers.genesis_mx.driver import GenesisMX
from coherent_lasers.genesis_mx.commands import OperationMode, ReadCmds
from coherent_lasers.hops.lib import HOPSException, get_hops_manager
from coherent_lasers.hops import HOPSException, get_hops_manager

# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


Expand All @@ -47,9 +45,7 @@ def interactive_session(lasers: dict[str, GenesisMX]) -> None:
list_cmd = {"list", "ls"}
lasers = lasers
current = [next(iter(lasers.keys()))]
click.echo(
f"Starting interactive session with lasers: {', '.join(lasers.keys())}. Type 'exit' to end."
)
click.echo(f"Starting interactive session with lasers: {', '.join(lasers.keys())}. Type 'exit' to end.")
while True:
command: str = click.prompt(f"{', '.join(current)}>", prompt_suffix="")
command_parts = command.split()
Expand All @@ -63,9 +59,7 @@ def interactive_session(lasers: dict[str, GenesisMX]) -> None:
continue
if primary_command in select:
if len(command_parts) < 2:
click.echo(
"Please provide a device to switch to. Either by index or serial number."
)
click.echo("Please provide a device to switch to. Either by index or serial number.")
continue
current = parse_select_command(command, lasers)
continue
Expand Down Expand Up @@ -102,17 +96,13 @@ def validate_lasers(devices: dict[str, GenesisMX]) -> dict[str, GenesisMX]:
return lasers


def run_command_on_lasers(
lasers: dict[str, GenesisMX], selected: list[str], command: str
) -> None:
def run_command_on_lasers(lasers: dict[str, GenesisMX], selected: list[str], command: str) -> None:
try:
if len(selected) == 1:
handle_command(lasers[selected[0]], command)
return
for serial in selected:
click.echo(
f" {serial} Laser -------------------------------------------------------------------------"
)
click.echo(f" {serial} Laser -------------------------------------------------------------------------")
handle_command(lasers[serial], command)
except HOPSException as e:
click.echo(f"Laser error: {e}")
Expand Down Expand Up @@ -157,9 +147,7 @@ def enable(laser: GenesisMX, args=None) -> None:
def disable(laser: GenesisMX, args=None) -> None:
"""Disable the laser."""
if args:
click.echo(
f" The disable command does not take any arguments, ignoring: {args}"
)
click.echo(f" The disable command does not take any arguments, ignoring: {args}")
laser.disable()
click.echo(" Laser disabled.")

Expand All @@ -168,9 +156,7 @@ def info(laser: GenesisMX, args=None) -> None:
"""Display Laser Head information."""
click.echo(" Laser Head info:")
if args:
click.echo(
f" The info command does not take any arguments, ignoring: {args}"
)
click.echo(f" The info command does not take any arguments, ignoring: {args}")
try:
info = laser.head
click.echo(f" Serial: {info.serial}", nl=False)
Expand All @@ -185,13 +171,10 @@ def info(laser: GenesisMX, args=None) -> None:
def mode(laser: GenesisMX, args=None) -> None:
"""Get or set the laser operation mode."""
value = args[0] if args else None
if value is not None:
if value.upper() in OperationMode.__members__:
laser.mode = OperationMode[value.upper()]
click.echo(" Updating laser mode...")
click.echo(
f" Mode: {laser.mode.name}, Valid modes: {' | '.join(OperationMode.__members__)}"
)
if value is not None and value.upper() in OperationMode.__members__:
laser.mode = OperationMode[value.upper()]
click.echo(" Updating laser mode...")
click.echo(f" Mode: {laser.mode.name}, Valid modes: {' | '.join(OperationMode.__members__)}")


def power(laser: GenesisMX, args=[]) -> None:
Expand Down Expand Up @@ -221,9 +204,7 @@ def power(laser: GenesisMX, args=[]) -> None:
def status(laser: GenesisMX, args=None) -> None:
"""Display the current status of the laser."""
full = "--full" in args or "-f" in args if args else False
divider = (
" ------------------------------------------------------------------------"
)
divider = " ------------------------------------------------------------------------"
if full:
click.echo(divider)
info(laser)
Expand Down
25 changes: 8 additions & 17 deletions src/coherent_lasers/genesis_mx/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
OperationMode,
Alarms,
)
from coherent_lasers.hops import HOPSDevice
from ..hops.lib import HOPSException
from coherent_lasers.hops import HOPSDevice, HOPSException


@dataclass(frozen=True)
Expand Down Expand Up @@ -60,9 +59,7 @@ def ready(self) -> bool:
return self.interlock and self.key and not self.software

def __repr__(self) -> str:
return (
f"Software: {self.software}, Interlock: {self.interlock}, Key: {self.key}"
)
return f"Software: {self.software}, Interlock: {self.interlock}, Key: {self.key}"


class GenesisMX:
Expand Down Expand Up @@ -98,16 +95,12 @@ def power_mw(self, value: float) -> None:
value = value / self._unit_factor
self.send_write_command(WriteCmds.SET_POWER, value)
if not self.enable_loop.enabled:
self.log.warning(
f"Attempting to set power to {value} mW while laser is disabled."
)
self.log.warning(f"Attempting to set power to {value} mW while laser is disabled.")

@property
def power_setpoint_mw(self) -> float:
"""Get the current power setpoint of the laser."""
return (
float(self.send_read_command(ReadCmds.POWER_SETPOINT)) * self._unit_factor
)
return float(self.send_read_command(ReadCmds.POWER_SETPOINT)) * self._unit_factor

@property
def ldd_current(self) -> float:
Expand Down Expand Up @@ -168,6 +161,7 @@ def remote_control(self, value: bool) -> None:
try:
self.send_write_command(WriteCmds.SET_REMOTE_CONTROL, value)
except HOPSException:
self.log.debug(f"Failed to set remote control to {value}")
pass

@property
Expand All @@ -182,6 +176,7 @@ def analog_input(self, value: bool) -> None:
try:
self.send_write_command(WriteCmds.SET_ANALOG_INPUT, value)
except HOPSException:
self.log.debug(f"Failed to set analog input to {value}")
pass

def enable(self) -> GenesisMXEnableLoop:
Expand Down Expand Up @@ -319,9 +314,7 @@ def etalon_heater_drive_v(self) -> float:

# Commands

def send_write_command(
self, cmd: WriteCmds, new_value: float | None = None
) -> None:
def send_write_command(self, cmd: WriteCmds, new_value: float | None = None) -> None:
"""Send a write command to the laser."""
self.hops.send_command(f"{cmd.value}{new_value}")

Expand Down Expand Up @@ -391,9 +384,7 @@ def power_mw(self) -> float:
def power_mw(self, value: float) -> None:
"""Set the power of the laser."""
if not self.enable_loop.enabled:
self.log.warning(
f"Attempting to set power to {value} mW while laser is disabled."
)
self.log.warning(f"Attempting to set power to {value} mW while laser is disabled.")
self._power_mw = value

@property
Expand Down
19 changes: 10 additions & 9 deletions src/coherent_lasers/hops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
2. Place the DLL files in this package alongside the respective .h files.
"""

import platform
import sys
from .lib import HOPSDevice, get_hops_manager, HOPSException
from .lib2 import CohrHOPSDevice, get_cohrhops_manager, HOPSCommandException

# Ensure is windows
# if not (sys.platform.startswith("win") and platform.machine().endswith("64")):
# raise OSError("This package only supports 64-bit Windows systems.")

from .lib import HOPSDevice

__all__ = ["HOPSDevice"]
__all__ = [
"HOPSDevice",
"get_hops_manager",
"HOPSException",
"CohrHOPSDevice",
"get_cohrhops_manager",
"HOPSCommandException",
]
23 changes: 4 additions & 19 deletions src/coherent_lasers/hops/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

DLL_DIR = os.path.dirname(os.path.abspath(__file__))

# Add the DLL directory to the DLL search path
# os.add_dll_directory(DLL_DIR)

# Add the DLL directory to the system PATH
os.environ["PATH"] = DLL_DIR + os.pathsep + os.environ["PATH"]

Expand Down Expand Up @@ -157,16 +154,11 @@ def _fetch_device_connection_info(self):
)
if res != COHRHOPS_OK:
raise HOPSException(f"Error checking for devices: {res}")
self._log.debug(
f"Updated devices info. Connected: {self._number_of_devices_connected.value}"
)
self._log.debug(f"Updated devices info. Connected: {self._number_of_devices_connected.value}")

def _activate_all_devices(self):
self._log.debug("Activating all devices...")
connected_handles = {
self._devices_connected[i]
for i in range(self._number_of_devices_connected.value)
}
connected_handles = {self._devices_connected[i] for i in range(self._number_of_devices_connected.value)}
for handle in connected_handles:
self._initialize_device_by_handle(handle)
ser = self._get_device_serial(handle)
Expand All @@ -177,21 +169,14 @@ def _activate_all_devices(self):

def _validate_active_devices(self):
self._log.debug("Validating active devices...")
connected_handles = {
self._devices_connected[i]
for i in range(self._number_of_devices_connected.value)
}
connected_handles = {self._devices_connected[i] for i in range(self._number_of_devices_connected.value)}
for handle in connected_handles:
self._initialize_device_by_handle(handle)
ser = self._get_device_serial(handle)
self._handles[handle] = ser
if ser not in self._active_serials:
self._close_device_by_handle(handle)
self._handles = {
handle: ser
for handle, ser in self._handles.items()
if ser in self._active_serials
}
self._handles = {handle: ser for handle, ser in self._handles.items() if ser in self._active_serials}
self._log.debug("Registered Handles: " + str(self._handles))
self._log.debug("Active Devices: " + str(self._active_serials))

Expand Down
Loading

0 comments on commit dfea9f0

Please sign in to comment.