Skip to content

Commit

Permalink
refactor: Lift channels and readouts sequence splitting
Browse files Browse the repository at this point in the history
As they are not really Qblox-specific
  • Loading branch information
alecandido committed Feb 7, 2025
1 parent 3c336d9 commit daf2698
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 41 deletions.
43 changes: 3 additions & 40 deletions src/qibolab/_core/instruments/qblox/sequence/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from qibolab._core.execution_parameters import ExecutionParameters
from qibolab._core.identifier import ChannelId
from qibolab._core.pulses import PulseLike, Readout
from qibolab._core.pulses import PulseLike
from qibolab._core.sequence import PulseSequence
from qibolab._core.serialize import Model
from qibolab._core.sweeper import ParallelSweepers
Expand Down Expand Up @@ -35,7 +35,7 @@ class Q1Sequence(Model):
@classmethod
def from_pulses(
cls,
sequence: PulseSequence,
sequence: list[PulseLike],
sweepers: list[ParallelSweepers],
options: ExecutionParameters,
sampling_rate: float,
Expand Down Expand Up @@ -80,43 +80,6 @@ def integration_lengths(self) -> dict[MeasureId, Optional[int]]:
return {acq: _weight_len(self.weights.get(acq)) for acq in self.acquisitions}


def _split_channels(sequence: PulseSequence) -> dict[ChannelId, PulseSequence]:
def unwrap(pulse: PulseLike, output: bool) -> PulseLike:
return (
pulse
if not isinstance(pulse, Readout)
else pulse.probe
if output
else pulse.acquisition
)

def unwrap_seq(seq: PulseSequence, output: bool) -> PulseSequence:
return PulseSequence((ch, unwrap(p, output)) for ch, p in seq)

def ch_pulses(channel: ChannelId) -> PulseSequence:
return PulseSequence((ch, pulse) for ch, pulse in sequence if ch == channel)

def probe(channel: ChannelId) -> ChannelId:
return channel.split("/")[0] + "/probe"

def split(channel: ChannelId) -> dict[ChannelId, PulseSequence]:
seq = ch_pulses(channel)
readouts = any(isinstance(p, Readout) for _, p in seq)
assert not readouts or probe(channel) not in sequence.channels
return (
{channel: seq}
if not readouts
else {
channel: unwrap_seq(seq, output=False),
probe(channel): unwrap_seq(seq, output=True),
}
)

return {
ch: seq for channel in sequence.channels for ch, seq in split(channel).items()
}


def compile(
sequence: PulseSequence,
sweepers: list[ParallelSweepers],
Expand All @@ -125,5 +88,5 @@ def compile(
) -> dict[ChannelId, Q1Sequence]:
return {
ch: Q1Sequence.from_pulses(seq, sweepers, options, sampling_rate, ch)
for ch, seq in _split_channels(sequence).items()
for ch, seq in sequence.split_readouts.by_channel.items()
}
49 changes: 48 additions & 1 deletion src/qibolab/_core/sequence.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""PulseSequence class."""

from collections import UserList
from collections import UserList, defaultdict
from collections.abc import Callable, Iterable
from functools import cache
from typing import Any, Union

from pydantic import TypeAdapter
Expand Down Expand Up @@ -209,3 +210,49 @@ def acquisitions(self) -> list[tuple[ChannelId, InputOps]]:
"""
# pulse filter needed to exclude delays
return [(ch, p) for ch, p in self if isinstance(p, (Acquisition, Readout))]

@property
def split_readouts(self) -> "PulseSequence":
"""Split readout operations in its constituents.
This will also double the rest of the channels (mainly delays) on which the
readouts are placed, assuming the probe channels to be absent.
.. note::
Since :class:`Readout` is only placed on an acquisition channel, the name of
the associated probe channel is actually unknown.
This function assumes the convention that the relevant channels are named
``.../acquisition`` and ``.../probe``.
"""

def unwrap(pulse: PulseLike, double: bool) -> tuple[PulseLike, ...]:
return (
(pulse.acquisition, pulse.probe)
if isinstance(pulse, Readout)
else (pulse, pulse)
if double
else (pulse,)
)

@cache
def probe(channel: ChannelId) -> ChannelId:
return channel.split("/")[0] + "/probe"

readouts = {ch for ch, p in self if isinstance(p, Readout)}
return type(self)(
[
(ch_, p_)
for ch, p in self
for ch_, p_ in zip((ch, probe(ch)), unwrap(p, ch in readouts))
]
)

@property
def by_channel(self) -> dict[ChannelId, list[PulseLike]]:
"""Separate sequence into channels dictionary."""
seqs = defaultdict(list)
for ch, pulse in self:
seqs[ch].append(pulse)

return seqs

0 comments on commit daf2698

Please sign in to comment.