Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop .settings in Qblox modules #736

Closed
wants to merge 11 commits into from
37 changes: 14 additions & 23 deletions src/qibolab/instruments/qblox/cluster_qcm_bb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
from qblox_instruments.qcodes_drivers.qcm_qrm import QcmQrm as QbloxQrmQcm
from qibo.config import log

from qibolab.instruments.qblox.module import ClusterModule
from qibolab.instruments.qblox.q1asm import (
Block,
Register,
convert_phase,
loop_block,
wait_block,
)
from qibolab.instruments.qblox.sequencer import Sequencer, WaveformsBuffer
from qibolab.instruments.qblox.sweeper import QbloxSweeper, QbloxSweeperType
from qibolab.pulses import Pulse, PulseSequence, PulseType
from qibolab.sweeper import Parameter, Sweeper, SweeperType

from .module import ClusterModule
from .port import QbloxPort
from .q1asm import Block, Register, convert_phase, loop_block, wait_block
from .sequencer import Sequencer, WaveformsBuffer
from .sweeper import QbloxSweeper, QbloxSweeperType


class QcmBb(ClusterModule):
"""Qblox Cluster Qubit Control Module Baseband driver.
Expand Down Expand Up @@ -172,19 +168,14 @@ def connect(self, cluster: QbloxCluster = None):
self._device_num_sequencers = len(self.device.sequencers)
self._set_default_values()
# then set the value loaded from the runcard
try:
for port in self._ports:
self._sequencers[port] = []
self._ports[port].hardware_mod_en = True
self._ports[port].nco_freq = 0
self._ports[port].nco_phase_offs = 0
except Exception as error:
raise RuntimeError(
f"Unable to initialize port parameters on module {self.name}: {error}"
)
for port in self._ports.values():
port: QbloxPort
self._sequencers[port.name] = []
port.upload_settings("hardware_mod_en", "nco_freq", "nco_phase_offs")

self.is_connected = True

def setup(self, **settings):
def setup(self):
"""Cache the settings of the runcard and instantiate the ports of the
module.

Expand Down Expand Up @@ -213,7 +204,7 @@ def _get_next_sequencer(self, port, frequency, qubits: dict):
# select the qubit with flux line, if present, connected to the specific port
qubit = None
for _qubit in qubits.values():
if _qubit.flux is not None and _qubit.flux.port == self.ports(port):
if _qubit.flux is not None and _qubit.flux.port == self._ports[port]:
qubit = _qubit

# select a new sequencer and configure it as required
Expand Down Expand Up @@ -726,7 +717,7 @@ def upload(self):
# self.device.print_readable_snapshot(update=True)

# DEBUG: QCM Save Readable Snapshot
from qibolab.instruments.qblox.debug import print_readable_snapshot
from .debug import print_readable_snapshot

if self._debug_folder != "":
filename = self._debug_folder + f"Z_{self.name}_snapshot.json"
Expand Down
47 changes: 20 additions & 27 deletions src/qibolab/instruments/qblox/cluster_qcm_rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
from qblox_instruments.qcodes_drivers.qcm_qrm import QcmQrm as QbloxQrmQcm
from qibo.config import log

from qibolab.instruments.qblox.module import ClusterModule
from qibolab.instruments.qblox.q1asm import (
Block,
Register,
convert_phase,
loop_block,
wait_block,
)
from qibolab.instruments.qblox.sequencer import Sequencer, WaveformsBuffer
from qibolab.instruments.qblox.sweeper import QbloxSweeper, QbloxSweeperType
from qibolab.pulses import Pulse, PulseSequence, PulseType
from qibolab.sweeper import Parameter, Sweeper, SweeperType

from .module import ClusterModule
from .port import QbloxPort
from .q1asm import Block, Register, convert_phase, loop_block, wait_block
from .sequencer import Sequencer, WaveformsBuffer
from .sweeper import QbloxSweeper, QbloxSweeperType


class QcmRf(ClusterModule):
"""Qblox Cluster Qubit Control Module RF driver.
Expand Down Expand Up @@ -131,7 +127,6 @@ def __init__(self, name: str, address: str):
"""
super().__init__(name, address)
self.device: QbloxQrmQcm = None
self.settings = {}

self._debug_folder: str = ""
self._sequencers: dict[Sequencer] = {}
Expand Down Expand Up @@ -188,22 +183,18 @@ def connect(self, cluster: QbloxCluster = None):
self._device_num_sequencers = len(self.device.sequencers)
self._set_default_values()
# then set the value loaded from the runcard
try:
for port in self.settings:
self._sequencers[port] = []
if self.settings[port]["lo_frequency"]:
self._ports[port].lo_enabled = True
self._ports[port].lo_frequency = self.settings[port][
"lo_frequency"
]
self._ports[port].attenuation = self.settings[port]["attenuation"]
self._ports[port].hardware_mod_en = True
self._ports[port].nco_freq = 0
self._ports[port].nco_phase_offs = 0
except Exception as error:
raise RuntimeError(
f"Unable to initialize port parameters on module {self.name}: {error}"
for port in self._ports.values():
port: QbloxPort
self._sequencers[port.name] = []
port.upload_settings(
"attenuation",
"lo_enabled",
"lo_frequency",
"hardware_mod_en",
"nco_freq",
"nco_phase_offs",
)

self.is_connected = True

def setup(self, **settings):
Expand All @@ -225,7 +216,9 @@ def setup(self, **settings):
using the numerically controlled oscillator within the fpga. It only requires the upload of the pulse envelope waveform.
At the moment this param is not loaded but is always set to True.
"""
self.settings = settings if settings else self.settings
for port, settings in settings.items():
for setting_name, value in settings.items():
setattr(self._ports[port]._settings, setting_name, value)

def _get_next_sequencer(self, port, frequency, qubit: None):
"""Retrieves and configures the next avaliable sequencer.
Expand Down
53 changes: 25 additions & 28 deletions src/qibolab/instruments/qblox/cluster_qrm_rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from .acquisition import AveragedAcquisition, DemodulatedAcquisition
from .module import ClusterModule
from .port import QbloxPort
from .q1asm import Block, Register, convert_phase, loop_block, wait_block
from .sequencer import Sequencer, WaveformsBuffer
from .sweeper import QbloxSweeper, QbloxSweeperType
Expand Down Expand Up @@ -140,7 +141,6 @@ def __init__(self, name: str, address: str):
super().__init__(name, address)
self.device: QbloxQrmQcm = None
self.classification_parameters: dict = {}
self.settings: dict = {}

self._debug_folder: str = ""
self._input_ports_keys = ["i1"]
Expand Down Expand Up @@ -204,29 +204,22 @@ def connect(self, cluster: QbloxCluster = None):
self._device_num_sequencers = len(self.device.sequencers)
self._set_default_values()
# then set the value loaded from the runcard
try:
if "o1" in self.settings:
self._ports["o1"].attenuation = self.settings["o1"]["attenuation"]
if self.settings["o1"]["lo_frequency"]:
self._ports["o1"].lo_enabled = True
self._ports["o1"].lo_frequency = self.settings["o1"][
"lo_frequency"
]
self._ports["o1"].hardware_mod_en = True
self._ports["o1"].nco_freq = 0
self._ports["o1"].nco_phase_offs = 0

if "i1" in self.settings:
self._ports["i1"].hardware_demod_en = True
self._ports["i1"].acquisition_hold_off = self.settings["i1"][
"acquisition_hold_off"
]
self._ports["i1"].acquisition_duration = self.settings["i1"][
"acquisition_duration"
]
except Exception as error:
raise RuntimeError(
f"Unable to initialize port parameters on module {self.name}: {error}"

if "o1" in self._ports:
out_port: QbloxPort = self._ports["o1"]
out_port.upload_settings(
"attenuation",
"lo_enabled",
"lo_frequency",
"hardware_mod_en",
"nco_freq",
"nco_phase_offs",
)

if "i1" in self._ports:
input_port: QbloxPort = self._ports["i1"]
input_port.upload_settings(
"hardware_demod_en", "acquisition_hold_off", "acquisition_duration"
)
self.is_connected = True

Expand Down Expand Up @@ -258,7 +251,9 @@ def setup(self, **settings):
- settings['i1']['acquisition_duration'] (int): [0 to 8192 ns] the duration of the acquisition. It is limited by
the amount of memory available in the fpga to store i q samples.
"""
self.settings = settings if settings else self.settings
for port, settings in settings.items():
for setting_name, value in settings.items():
setattr(self._ports[port]._settings, setting_name, value)

def _get_next_sequencer(self, port: str, frequency: int, qubits: dict, qubit: None):
"""Retrieves and configures the next avaliable sequencer.
Expand Down Expand Up @@ -705,14 +700,16 @@ def process_pulse_sequence(
)

if pulses[n].type == PulseType.READOUT:
delay_after_play = self._ports["i1"].acquisition_hold_off
delay_after_play = self._ports[
"i1"
]._settings.acquisition_hold_off

if len(pulses) > n + 1:
# If there are more pulses to be played, the delay is the time between the pulse end and the next pulse start
delay_after_acquire = (
pulses[n + 1].start
- pulses[n].start
- self._ports["i1"].acquisition_hold_off
- self._ports["i1"]._settings.acquisition_hold_off
)
else:
delay_after_acquire = (
Expand All @@ -721,7 +718,7 @@ def process_pulse_sequence(
time_between_repetitions = (
repetition_duration
- sequence_total_duration
- self._ports["i1"].acquisition_hold_off
- self._ports["i1"]._settings.acquisition_hold_off
)
assert time_between_repetitions > 0

Expand Down
26 changes: 26 additions & 0 deletions src/qibolab/instruments/qblox/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import signal
from dataclasses import asdict

import numpy as np
from qblox_instruments.qcodes_drivers.cluster import Cluster as QbloxCluster
Expand All @@ -18,6 +19,12 @@
"""Maximum number of sequences that can be unrolled in a single one
(independent of measurements)."""
SEQUENCER_MEMORY = 2**17
PARAMS_TO_DUMP = [
"attenuation",
"lo_frequency",
"acquisition_hold_off",
"acquisition_duration",
]


class QbloxController(Controller):
Expand Down Expand Up @@ -90,6 +97,25 @@ def _termination_handler(self, signum, frame):
log.warning("QbloxController: all modules are disconnected.")
exit(0)

def dump(self):
def get_settings(port):
return {
setting: value
for setting, value in asdict(port._settings).items()
if setting in PARAMS_TO_DUMP
}

data = {
module.name: {
port_name: get_settings(port)
for port_name, port in module._ports.items()
}
for module in self.modules.values()
if not isinstance(module, QcmBb)
}

return data

def _set_module_channel_map(self, module: QrmRf, qubits: dict):
"""Retrieve all the channels connected to a specific Qblox module.

Expand Down
47 changes: 32 additions & 15 deletions src/qibolab/instruments/qblox/port.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,42 @@ class QbloxOutputPort_Settings:
nco_freq: int = 0
nco_phase_offs: float = 0
lo_enabled: bool = True
lo_frequency: int = 2_000_000_000
lo_frequency: int = 0


@dataclass
class QbloxInputPort_Settings:
channel: str = None
acquisition_hold_off: int = 0
acquisition_duration: int = 1000
hardware_demod_en: bool = True


class QbloxOutputPort(Port):
class QbloxPort(Port):
def __init__(self, module, settings_class, port_number: int, port_name: str = None):
self.name = port_name
self.module = module
self.port_number: int = port_number
self._settings = settings_class

def upload_settings(self, *settings):
"""Upload to the instrument the requested settings cached in
`self._settings`."""
for setting in settings:
try:
setattr(self, setting, getattr(self, setting))
except Exception as error:
raise RuntimeError(
f"Unable to initialize port parameters '{setting}' on module {self.name}: {error}"
)


class QbloxOutputPort(QbloxPort):
"""qibolab.instruments.port.Port interface implementation for Qblox
instruments."""

def __init__(self, module, port_number: int, port_name: str = None):
self.name = port_name
self.module = module
super().__init__(module, QbloxOutputPort_Settings(), port_number, port_name)
self.sequencer_number: int = port_number
self.port_number: int = port_number
self._settings = QbloxOutputPort_Settings()

@property
def attenuation(self) -> str:
Expand Down Expand Up @@ -222,17 +237,19 @@ def lo_frequency(self, value):
self.module.device.set(f"out{self.port_number}_lo_freq", value=value)


class QbloxInputPort:
class QbloxInputPort(QbloxPort):
def __init__(self, module, port_number: int, port_name: str = None):
self.name = port_name
self.module = module
self.output_sequencer_number: int = 0 # output_sequencer_number
self.input_sequencer_number: int = 0 # input_sequencer_number
self.port_number: int = port_number
super().__init__(module, QbloxInputPort_Settings(), port_number, port_name)
self.output_sequencer_number: int = 0
self.input_sequencer_number: int = 0

self.acquisition_hold_off = 4 # To be discontinued
@property
def acquisition_hold_off(self):
return self._settings.acquisition_hold_off

self._settings = QbloxInputPort_Settings()
@acquisition_hold_off.setter
def acquisition_hold_off(self, value):
self._settings.acquisition_hold_off = value

@property
def hardware_demod_en(self):
Expand Down
Loading
Loading