diff --git a/pyproject.toml b/pyproject.toml index 553f7a2..e4279d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,8 +51,46 @@ include = ["coherent_lasers*"] namespaces = false -[tool.setuptools.dynamic] -version = { attr = "coherent_lasers.__version__" } +[tool.ruff] +line-length = 120 +show-fixes = true + +target-version = "py311" + +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".gitignore", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", + "sdk", +] + +[tool.ruff.lint] +extend-select = ["C4", "SIM", "TCH"] +fixable = ["ALL"] [tool.black] line-length = 120 diff --git a/src/coherent_lasers/app/api.py b/src/coherent_lasers/app/api.py index b14292e..2ce28f2 100644 --- a/src/coherent_lasers/app/api.py +++ b/src/coherent_lasers/app/api.py @@ -7,7 +7,6 @@ from pydantic import BaseModel from coherent_lasers.genesis_mx.driver import GenesisMX, MockGenesisMX -from coherent_lasers.hops.lib import get_hops_manager, reset_hops_manager GENESIS_MX_HEADTYPES = {"MiniX", "Mini00"} @@ -69,9 +68,7 @@ class GenesisMXModel(BaseModel): flags: GenesisMXFlags -MessageData: TypeAlias = ( - dict[str, GenesisMXSignals] | dict[str, GenesisMXPower] | dict[str, GenesisMXFlags] -) +MessageData: TypeAlias = dict[str, GenesisMXSignals] | dict[str, GenesisMXPower] | dict[str, GenesisMXFlags] class BaseMessage(BaseModel): @@ -161,9 +158,7 @@ async def set_laser_power(serial: str, value: float): raise HTTPException(status_code=404, detail="Laser not found") laser = self.lasers[serial] laser.power_mw = value - print( - f"Set power of laser {serial} to {value} mW. laser.power_mw: {laser.power_mw}" - ) + print(f"Set power of laser {serial} to {value} mW. laser.power_mw: {laser.power_mw}") # await self.broadcast_power() @self.put("/enable") @@ -172,9 +167,7 @@ async def enable_laser(serial: str): if serial not in self.lasers: raise HTTPException(status_code=404, detail="Laser not found") self.lasers[serial].enable() - print( - f"Enabled laser {serial}. Software switch: {self.lasers[serial].software_switch}" - ) + print(f"Enabled laser {serial}. Software switch: {self.lasers[serial].software_switch}") await self.broadcast_flags() @self.put("/disable") @@ -183,9 +176,7 @@ async def disable_laser(serial: str): if serial not in self.lasers: raise HTTPException(status_code=404, detail="Laser not found") self.lasers[serial].disable() - print( - f"Disabled laser {serial}. Software switch: {self.lasers[serial].software_switch}" - ) + print(f"Disabled laser {serial}. Software switch: {self.lasers[serial].software_switch}") await self.broadcast_flags() @self.put("/remote") @@ -229,9 +220,7 @@ async def broadcast_signals(self): async def broadcast_flags(self): await self.broadcast_items("flags", self._get_all_laser_flags()) - async def scheduled_broadcast( - self, msg_type: str, data: MessageData, interval: float - ): + async def scheduled_broadcast(self, msg_type: str, data: MessageData, interval: float): while True: try: await self.broadcast_items(msg_type, data) @@ -337,19 +326,10 @@ def _get_laser_power( ) def _get_all_laser_powers(self) -> dict[str, GenesisMXPower]: - return { - serial: self._get_laser_power(laser) - for serial, laser in self.lasers.items() - } + return {serial: self._get_laser_power(laser) for serial, laser in self.lasers.items()} def _get_all_laser_signals(self) -> dict[str, GenesisMXSignals]: - return { - serial: self._get_laser_signals(laser) - for serial, laser in self.lasers.items() - } + return {serial: self._get_laser_signals(laser) for serial, laser in self.lasers.items()} def _get_all_laser_flags(self) -> dict[str, GenesisMXFlags]: - return { - serial: self._get_laser_flags(laser) - for serial, laser in self.lasers.items() - } + return {serial: self._get_laser_flags(laser) for serial, laser in self.lasers.items()} diff --git a/src/coherent_lasers/app/cli.py b/src/coherent_lasers/app/cli.py index b70f3df..81da626 100644 --- a/src/coherent_lasers/app/cli.py +++ b/src/coherent_lasers/app/cli.py @@ -17,12 +17,10 @@ import logging from coherent_lasers.genesis_mx.driver import GenesisMX from coherent_lasers.genesis_mx.commands import OperationMode, ReadCmds -from coherent_lasers.hops.lib import HOPSException, get_hops_manager +from coherent_lasers.hops import HOPSException, get_hops_manager # Setup logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) @@ -47,9 +45,7 @@ def interactive_session(lasers: dict[str, GenesisMX]) -> None: list_cmd = {"list", "ls"} lasers = lasers current = [next(iter(lasers.keys()))] - click.echo( - f"Starting interactive session with lasers: {', '.join(lasers.keys())}. Type 'exit' to end." - ) + click.echo(f"Starting interactive session with lasers: {', '.join(lasers.keys())}. Type 'exit' to end.") while True: command: str = click.prompt(f"{', '.join(current)}>", prompt_suffix="") command_parts = command.split() @@ -63,9 +59,7 @@ def interactive_session(lasers: dict[str, GenesisMX]) -> None: continue if primary_command in select: if len(command_parts) < 2: - click.echo( - "Please provide a device to switch to. Either by index or serial number." - ) + click.echo("Please provide a device to switch to. Either by index or serial number.") continue current = parse_select_command(command, lasers) continue @@ -102,17 +96,13 @@ def validate_lasers(devices: dict[str, GenesisMX]) -> dict[str, GenesisMX]: return lasers -def run_command_on_lasers( - lasers: dict[str, GenesisMX], selected: list[str], command: str -) -> None: +def run_command_on_lasers(lasers: dict[str, GenesisMX], selected: list[str], command: str) -> None: try: if len(selected) == 1: handle_command(lasers[selected[0]], command) return for serial in selected: - click.echo( - f" {serial} Laser -------------------------------------------------------------------------" - ) + click.echo(f" {serial} Laser -------------------------------------------------------------------------") handle_command(lasers[serial], command) except HOPSException as e: click.echo(f"Laser error: {e}") @@ -157,9 +147,7 @@ def enable(laser: GenesisMX, args=None) -> None: def disable(laser: GenesisMX, args=None) -> None: """Disable the laser.""" if args: - click.echo( - f" The disable command does not take any arguments, ignoring: {args}" - ) + click.echo(f" The disable command does not take any arguments, ignoring: {args}") laser.disable() click.echo(" Laser disabled.") @@ -168,9 +156,7 @@ def info(laser: GenesisMX, args=None) -> None: """Display Laser Head information.""" click.echo(" Laser Head info:") if args: - click.echo( - f" The info command does not take any arguments, ignoring: {args}" - ) + click.echo(f" The info command does not take any arguments, ignoring: {args}") try: info = laser.head click.echo(f" Serial: {info.serial}", nl=False) @@ -185,13 +171,10 @@ def info(laser: GenesisMX, args=None) -> None: def mode(laser: GenesisMX, args=None) -> None: """Get or set the laser operation mode.""" value = args[0] if args else None - if value is not None: - if value.upper() in OperationMode.__members__: - laser.mode = OperationMode[value.upper()] - click.echo(" Updating laser mode...") - click.echo( - f" Mode: {laser.mode.name}, Valid modes: {' | '.join(OperationMode.__members__)}" - ) + if value is not None and value.upper() in OperationMode.__members__: + laser.mode = OperationMode[value.upper()] + click.echo(" Updating laser mode...") + click.echo(f" Mode: {laser.mode.name}, Valid modes: {' | '.join(OperationMode.__members__)}") def power(laser: GenesisMX, args=[]) -> None: @@ -221,9 +204,7 @@ def power(laser: GenesisMX, args=[]) -> None: def status(laser: GenesisMX, args=None) -> None: """Display the current status of the laser.""" full = "--full" in args or "-f" in args if args else False - divider = ( - " ------------------------------------------------------------------------" - ) + divider = " ------------------------------------------------------------------------" if full: click.echo(divider) info(laser) diff --git a/src/coherent_lasers/genesis_mx/driver.py b/src/coherent_lasers/genesis_mx/driver.py index 2adfa34..3e1fd7c 100644 --- a/src/coherent_lasers/genesis_mx/driver.py +++ b/src/coherent_lasers/genesis_mx/driver.py @@ -9,8 +9,7 @@ OperationMode, Alarms, ) -from coherent_lasers.hops import HOPSDevice -from ..hops.lib import HOPSException +from coherent_lasers.hops import HOPSDevice, HOPSException @dataclass(frozen=True) @@ -60,9 +59,7 @@ def ready(self) -> bool: return self.interlock and self.key and not self.software def __repr__(self) -> str: - return ( - f"Software: {self.software}, Interlock: {self.interlock}, Key: {self.key}" - ) + return f"Software: {self.software}, Interlock: {self.interlock}, Key: {self.key}" class GenesisMX: @@ -98,16 +95,12 @@ def power_mw(self, value: float) -> None: value = value / self._unit_factor self.send_write_command(WriteCmds.SET_POWER, value) if not self.enable_loop.enabled: - self.log.warning( - f"Attempting to set power to {value} mW while laser is disabled." - ) + self.log.warning(f"Attempting to set power to {value} mW while laser is disabled.") @property def power_setpoint_mw(self) -> float: """Get the current power setpoint of the laser.""" - return ( - float(self.send_read_command(ReadCmds.POWER_SETPOINT)) * self._unit_factor - ) + return float(self.send_read_command(ReadCmds.POWER_SETPOINT)) * self._unit_factor @property def ldd_current(self) -> float: @@ -168,6 +161,7 @@ def remote_control(self, value: bool) -> None: try: self.send_write_command(WriteCmds.SET_REMOTE_CONTROL, value) except HOPSException: + self.log.debug(f"Failed to set remote control to {value}") pass @property @@ -182,6 +176,7 @@ def analog_input(self, value: bool) -> None: try: self.send_write_command(WriteCmds.SET_ANALOG_INPUT, value) except HOPSException: + self.log.debug(f"Failed to set analog input to {value}") pass def enable(self) -> GenesisMXEnableLoop: @@ -319,9 +314,7 @@ def etalon_heater_drive_v(self) -> float: # Commands - def send_write_command( - self, cmd: WriteCmds, new_value: float | None = None - ) -> None: + def send_write_command(self, cmd: WriteCmds, new_value: float | None = None) -> None: """Send a write command to the laser.""" self.hops.send_command(f"{cmd.value}{new_value}") @@ -391,9 +384,7 @@ def power_mw(self) -> float: def power_mw(self, value: float) -> None: """Set the power of the laser.""" if not self.enable_loop.enabled: - self.log.warning( - f"Attempting to set power to {value} mW while laser is disabled." - ) + self.log.warning(f"Attempting to set power to {value} mW while laser is disabled.") self._power_mw = value @property diff --git a/src/coherent_lasers/hops/__init__.py b/src/coherent_lasers/hops/__init__.py index 04522f7..11acdcc 100644 --- a/src/coherent_lasers/hops/__init__.py +++ b/src/coherent_lasers/hops/__init__.py @@ -8,13 +8,14 @@ 2. Place the DLL files in this package alongside the respective .h files. """ -import platform -import sys +from .lib import HOPSDevice, get_hops_manager, HOPSException +from .lib2 import CohrHOPSDevice, get_cohrhops_manager, HOPSCommandException -# Ensure is windows -# if not (sys.platform.startswith("win") and platform.machine().endswith("64")): -# raise OSError("This package only supports 64-bit Windows systems.") - -from .lib import HOPSDevice - -__all__ = ["HOPSDevice"] +__all__ = [ + "HOPSDevice", + "get_hops_manager", + "HOPSException", + "CohrHOPSDevice", + "get_cohrhops_manager", + "HOPSCommandException", +] diff --git a/src/coherent_lasers/hops/lib.py b/src/coherent_lasers/hops/lib.py index 70d8e19..f688482 100644 --- a/src/coherent_lasers/hops/lib.py +++ b/src/coherent_lasers/hops/lib.py @@ -12,9 +12,6 @@ DLL_DIR = os.path.dirname(os.path.abspath(__file__)) -# Add the DLL directory to the DLL search path -# os.add_dll_directory(DLL_DIR) - # Add the DLL directory to the system PATH os.environ["PATH"] = DLL_DIR + os.pathsep + os.environ["PATH"] @@ -157,16 +154,11 @@ def _fetch_device_connection_info(self): ) if res != COHRHOPS_OK: raise HOPSException(f"Error checking for devices: {res}") - self._log.debug( - f"Updated devices info. Connected: {self._number_of_devices_connected.value}" - ) + self._log.debug(f"Updated devices info. Connected: {self._number_of_devices_connected.value}") def _activate_all_devices(self): self._log.debug("Activating all devices...") - connected_handles = { - self._devices_connected[i] - for i in range(self._number_of_devices_connected.value) - } + connected_handles = {self._devices_connected[i] for i in range(self._number_of_devices_connected.value)} for handle in connected_handles: self._initialize_device_by_handle(handle) ser = self._get_device_serial(handle) @@ -177,21 +169,14 @@ def _activate_all_devices(self): def _validate_active_devices(self): self._log.debug("Validating active devices...") - connected_handles = { - self._devices_connected[i] - for i in range(self._number_of_devices_connected.value) - } + connected_handles = {self._devices_connected[i] for i in range(self._number_of_devices_connected.value)} for handle in connected_handles: self._initialize_device_by_handle(handle) ser = self._get_device_serial(handle) self._handles[handle] = ser if ser not in self._active_serials: self._close_device_by_handle(handle) - self._handles = { - handle: ser - for handle, ser in self._handles.items() - if ser in self._active_serials - } + self._handles = {handle: ser for handle, ser in self._handles.items() if ser in self._active_serials} self._log.debug("Registered Handles: " + str(self._handles)) self._log.debug("Active Devices: " + str(self._active_serials)) diff --git a/src/coherent_lasers/hops/lib2.py b/src/coherent_lasers/hops/lib2.py new file mode 100644 index 0000000..ea10e90 --- /dev/null +++ b/src/coherent_lasers/hops/lib2.py @@ -0,0 +1,331 @@ +import asyncio +import ctypes as C +from ctypes.util import find_library +from functools import cached_property +import logging +import os +import threading +from threading import Lock +import time + + +# Make sure prerequisites are met ###################################################################################### +DLL_DIR = os.path.dirname(os.path.abspath(__file__)) +HOPS_DLL = os.path.join(DLL_DIR, "CohrHOPS.dll") +REQUIRED_DLLS = ["CohrHOPS", "CohrFTCI2C"] + +# Validate the system is Windows and 64-bit +if not (os.name == "nt" and os.environ["PROCESSOR_ARCHITECTURE"].endswith("64")): + raise OSError("This package only supports 64-bit Windows systems.") + +# Validate the required DLLs are present +os.environ["PATH"] = DLL_DIR + os.pathsep + os.environ["PATH"] + +for dll_name in REQUIRED_DLLS: + dll_path = find_library(dll_name) + if dll_path is None: + raise FileNotFoundError(f"Required 64-bit DLL file not found: {dll_name}.dll") +try: + C.CDLL(find_library("CohrHOPS")) + C.CDLL(find_library("CohrFTCI2C")) +except Exception as e: + print(f"Error loading DLLs: {e}") + raise + +######################################################################################################################## + +# Constants +COHRHOPS_OK = 0 +MAX_DEVICES = 20 +MAX_STRLEN = 100 + + +# Exceptions +class HOPSException(Exception): + def __init__(self, message, code: int | None = None) -> None: + if code is not None: + message = f"{message} (Error code: {code})" + super().__init__(message) + + +# Exception for when a message is sent and an error is returned +class HOPSCommandException(HOPSException): + def __init__(self, message, command: str, code: int) -> None: + super().__init__(message, code) + message = f"Error [{code}] sending command: {command}. {message}" + super().__init__(message) + + +# C types +LPULPTR = C.POINTER(C.c_ulonglong) +COHRHOPS_HANDLE = C.c_ulonglong +LPDWORD = C.POINTER(C.c_ulong) +LPSTR = C.c_char_p + + +# Data structures +class HandleCollection: + def __init__(self) -> None: + self._ptr = (COHRHOPS_HANDLE * MAX_DEVICES)() + self._len = C.c_ulong() + + def __getitem__(self, index): + return self._ptr[index] + + def pointer(self) -> C.Array[C.c_ulonglong]: + return self._ptr + + def len_pointer(self): + return C.byref(self._len) + + def __len__(self): + return self._len.value + + def __str__(self): + return f"{len(self)} HOPSHandles({[hex(h) for h in self]})" + + +class CohrHOPSManager: + def __init__(self, refresh_interval: float = 5): + self._log = logging.getLogger(__name__) + self._lock = Lock() + self._dll = C.CDLL(HOPS_DLL) + self._wrap_dll_functions() + self._connections: HandleCollection = HandleCollection() + self._removed_connections: HandleCollection = HandleCollection() + self._added_connections: HandleCollection = HandleCollection() + self._serials: dict[str, COHRHOPS_HANDLE] = {} + self._refresh_interval = refresh_interval + self.close() + self.discover() + # self._discover_thread = threading.Thread(target=self._discover_loop, daemon=True) + # self._discover_thread.start() + + def discover(self) -> list[str]: + # timeout = 1 + self._refresh_connected_handles() + # if len(self._connections) == 0: + # self._log.warning("No devices found, attempting to discover...") + # time.sleep(timeout) + # self._refresh_connected_handles() + self._refresh_serials() + return self.serials + + def _discover_loop(self): + while True: + self.discover() + time.sleep(self._refresh_interval) + + def send_command(self, serial: str, command: str) -> str: + def send_cohrhops_command(handle: COHRHOPS_HANDLE, command: str) -> str | None: + response = C.create_string_buffer(MAX_STRLEN) + res = self._send_command(handle, command.encode("utf-8"), response) + return response.value.decode("utf-8").strip() if res == COHRHOPS_OK else None + + if serial not in self.serials: + self._log.warning(f"Device {serial} not found. Attempting to discover devices...") + self.discover() + + if serial not in self.serials: + raise HOPSCommandException("Device not found", command, -404) + + with self._lock: + if response := send_cohrhops_command(self._serials[serial], command): + return response + if self._initialize_device(self._serials[serial]): + response = send_cohrhops_command(self._serials[serial], command) + if response: + return response + raise HOPSCommandException("Error sending command", command, -500) + + async def async_send_command(self, serial: str, command: str) -> str: + """ + Asynchronously sends a command by offloading the blocking call + to a thread in the default executor. + """ + loop = asyncio.get_running_loop() + response = await loop.run_in_executor(None, self.send_command, serial, command) + return response + + def close_device(self, serial: str) -> None: + with self._lock: + handle = self._serials.get(serial) + if not handle: + self._log.warning(f"Unable to close {serial}. Device not found.") + return + res = self._close(handle) + if res != COHRHOPS_OK: + self._log.error(f"Error closing device - {serial}.") + + def close(self): + with self._lock: + for handle in self._connections: + self._close(handle) + + def __del__(self): + self.close() + + @property + def serials(self) -> list[str]: + return list(self._serials.keys()) + + @cached_property + def version(self) -> str: + buffer = C.create_string_buffer(MAX_STRLEN) + res = self._get_dll_version(buffer) + if res != COHRHOPS_OK: + raise Exception(f"Error getting DLL version: {res}") + return buffer.value.decode("utf-8") + + def _refresh_connected_handles(self): + res = self._check_for_devices( + self._connections.pointer(), + self._connections.len_pointer(), + self._added_connections.pointer(), + self._added_connections.len_pointer(), + self._removed_connections.pointer(), + self._removed_connections.len_pointer(), + ) + if res != COHRHOPS_OK: + raise HOPSException(f"Error checking for devices: {res}") + self._log.debug(f"Updated conection info: {self._connections}") + + def _initialize_device(self, handle: COHRHOPS_HANDLE) -> bool: + headtype = C.create_string_buffer(MAX_STRLEN) + res = self._initialize_handle(handle, headtype) + return res == COHRHOPS_OK + + def _refresh_serials(self): + def query_serial(handle: COHRHOPS_HANDLE) -> str | None: + response = C.create_string_buffer(MAX_STRLEN) + res = self._send_command(handle, "?HID".encode("utf-8"), response) + return response.value.decode("utf-8").strip() if res == COHRHOPS_OK else None + + with self._lock: + fails = [] + for i in range(len(self._connections)): + handle = self._connections[i] + if serial := query_serial(handle) or ( + self._initialize_device(handle) and (serial := query_serial(handle)) + ): + self._serials[serial] = handle + else: + fails.append(handle) + + if fails: + self._log.warning(f"Failed to get serials for handles: {fails}") + raise HOPSException(f"Error getting serial numbers for handles: {fails}") + + def _wrap_dll_functions(self): + self._initialize_handle = self._dll.CohrHOPS_InitializeHandle + self._initialize_handle.argtypes = [COHRHOPS_HANDLE, LPSTR] + self._initialize_handle.restype = int + + self._send_command = self._dll.CohrHOPS_SendCommand + self._send_command.argtypes = [COHRHOPS_HANDLE, LPSTR, LPSTR] + self._send_command.restype = int + + self._close = self._dll.CohrHOPS_Close + self._close.argtypes = [COHRHOPS_HANDLE] + self._close.restype = int + + self._get_dll_version = self._dll.CohrHOPS_GetDLLVersion + self._get_dll_version.argtypes = [LPSTR] + self._get_dll_version.restype = int + + self._check_for_devices = self._dll.CohrHOPS_CheckForDevices + self._check_for_devices.argtypes = [ + LPULPTR, + LPDWORD, + LPULPTR, + LPDWORD, + LPULPTR, + LPDWORD, + ] + self._check_for_devices.restype = int + + +_cohrhops_manager_instance = None +_cohrhops_manager_lock = threading.Lock() + + +def get_cohrhops_manager(): + global _cohrhops_manager_instance + if _cohrhops_manager_instance is None: + with _cohrhops_manager_lock: + if _cohrhops_manager_instance is None: + _cohrhops_manager_instance = CohrHOPSManager() + return _cohrhops_manager_instance + + +class CohrHOPSDevice: + _manager = get_cohrhops_manager() + + def __init__(self, serial: str): + self.serial = serial + + def send_command(self, command: str) -> str: + return self._manager.send_command(self.serial, command) + + async def async_send_command(self, command: str) -> str: + return await self._manager.async_send_command(self.serial, command) + + def close(self): + self._manager.close_device(self.serial) + + +if __name__ == "__main__": + TEST_COMMAND = "?HID" + + def sync_example(): + # Get the manager instance (this automatically performs discovery) + manager = get_cohrhops_manager() + + manager.discover() + + # Print the DLL version + print("DLL Version:", manager.version) + + # List discovered device serials + serials = manager.serials + print("Discovered Devices:", serials) + + if not serials: + print("No devices discovered.") + return + + for serial in serials: + try: + response = manager.send_command(serial, TEST_COMMAND) + print(f"Synchronous response from device {serial}: {response}") + except HOPSCommandException as e: + print(f"Synchronous command failed: {e}") + + async def async_example(): + manager = get_cohrhops_manager() + + # Print the DLL version + print("DLL Version:", manager.version) + + # List discovered device serials + serials = manager.serials + print("Discovered Devices:", serials) + + if not serials: + print("No devices discovered.") + return + + for serial in serials: + try: + response = await manager.async_send_command(serial, TEST_COMMAND) + print(f"Asynchronous response from device {serial}: {response}") + except HOPSCommandException as e: + print(f"Asynchronous command failed: {e}") + + logging.basicConfig(level=logging.DEBUG) + + print("=== Synchronous Example ===") + sync_example() + + print("\n=== Asynchronous Example ===") + asyncio.run(async_example())