-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1138 from qiboteam/qrng
QRNG serial driver
- Loading branch information
Showing
11 changed files
with
327 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from . import extractors, qrng | ||
from .extractors import * | ||
from .qrng import * | ||
|
||
__all__ = [] | ||
__all__ += qrng.__all__ | ||
__all__ += extractors.__all__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from . import abstract, sha256, toeplitz | ||
from .abstract import * | ||
from .sha256 import * | ||
from .toeplitz import * | ||
|
||
__all__ = [] | ||
__all__ += abstract.__all__ | ||
__all__ += sha256.__all__ | ||
__all__ += toeplitz.__all__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import List | ||
|
||
import numpy.typing as npt | ||
|
||
from ....serialize import Model | ||
|
||
__all__ = ["Extractor"] | ||
|
||
|
||
class Extractor(Model, ABC): | ||
@abstractmethod | ||
def num_raw_samples(self, n: int) -> int: | ||
"""Number of raw QRNG samples that are needed to reach the required random floats. | ||
Args: | ||
n (int): Number of required random floats. | ||
""" | ||
|
||
@abstractmethod | ||
def extract(self, raw: List[int]) -> npt.NDArray: | ||
"""Extract uniformly distributed integers from the device samples.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import hashlib | ||
from typing import List | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
|
||
from .abstract import Extractor | ||
|
||
__all__ = ["ShaExtractor"] | ||
|
||
|
||
class ShaExtractor(Extractor): | ||
"""Extractor based on the SHA-256 hash algorithm.""" | ||
|
||
def num_raw_samples(self, n: int) -> int: | ||
return 22 * (n // 4 + 1) | ||
|
||
def extract(self, raw: List[int]) -> npt.NDArray: | ||
extracted = [] | ||
for i in range(len(raw) // 22): | ||
stream = "".join( | ||
format(sample, "012b") for sample in raw[22 * i : 22 * (i + 1)] | ||
) | ||
hash = hashlib.sha256(stream.encode("utf-8")).hexdigest() | ||
sha_bin = bin(int(hash, 16))[2:].zfill(256) | ||
for j in range(4): | ||
# Convert 53-bit chunk to integer | ||
uniform_int = int(sha_bin[53 * j : 53 * (j + 1)], 2) | ||
extracted.append(uniform_int / (2**53 - 1)) | ||
return np.array(extracted) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from typing import List | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
from scipy.linalg import toeplitz | ||
|
||
from .abstract import Extractor | ||
|
||
__all__ = ["ToeplitzExtractor"] | ||
|
||
|
||
def unpackbits(x: npt.NDArray, num_bits: int) -> npt.NDArray: | ||
if np.issubdtype(x.dtype, np.floating): | ||
raise ValueError("numpy data type needs to be int-like") | ||
xshape = list(x.shape) | ||
x = x.reshape([-1, 1]) | ||
mask = 2 ** np.arange(num_bits, dtype=x.dtype).reshape([1, num_bits]) | ||
return (x & mask).astype(bool).astype(int).reshape(xshape + [num_bits]) | ||
|
||
|
||
def generate_toeplitz(input_bits: int, extraction_ratio: int) -> npt.NDArray: | ||
"""Generate a pseudo-random Toeplitz matrix of dimension ``(input_bits, extraction_ratio)``.""" | ||
while True: | ||
c = np.mod(np.random.permutation(input_bits), 2) | ||
r = np.mod(np.random.permutation(extraction_ratio), 2) | ||
if np.sum(c) == 6 and np.sum(r) == (extraction_ratio // 2): | ||
return toeplitz(c, r) | ||
|
||
|
||
def toeplitz_extract( | ||
m1: npt.NDArray, input_bits: int, extraction_ratio: int | ||
) -> npt.NDArray: | ||
m2 = unpackbits(m1, input_bits) | ||
m2 = np.flip(m2) | ||
|
||
m3 = generate_toeplitz(input_bits, extraction_ratio) | ||
|
||
m4 = np.matmul(m2, m3) | ||
m4 = m4.astype("int16") | ||
m4 = np.mod(m4, 2) | ||
m4 = np.packbits(m4) | ||
m4 = np.right_shift(m4, 8 - extraction_ratio) | ||
return m4 | ||
|
||
|
||
def upscale(samples: npt.NDArray, input_bits=4, output_bits=32) -> npt.NDArray: | ||
"""Increase size of random bit strings by concatenating them.""" | ||
assert output_bits > input_bits | ||
assert output_bits % input_bits == 0 | ||
|
||
factor = output_bits // input_bits | ||
assert len(samples) % factor == 0 | ||
|
||
n = len(samples) // factor | ||
upscaled = np.zeros(n, dtype=int) | ||
for i, s in enumerate(np.reshape(samples, (factor, n))): | ||
upscaled += s << (input_bits * i) | ||
return upscaled | ||
|
||
|
||
class ToeplitzExtractor(Extractor): | ||
"""https://arxiv.org/pdf/2402.09481 appendix A.5""" | ||
|
||
input_bits: int = 12 | ||
"""Number of bits of the raw numbers sampled from the QRNG.""" | ||
extraction_ratio: int = 4 | ||
"""Number of bits of the uniformly distributed extracted output samples.""" | ||
precision_bits: int = 32 | ||
|
||
def __post_init__(self): | ||
if self.precision_bits % self.extraction_ratio != 0: | ||
raise ValueError( | ||
f"Number of bits must be a multiple of the extracted bits {self.extraction_ratio}." | ||
) | ||
|
||
def num_raw_samples(self, n: int) -> int: | ||
return 2 * n * (self.precision_bits // self.extraction_ratio) | ||
|
||
def extract(self, raw: List[int]) -> npt.NDArray: | ||
extracted = toeplitz_extract( | ||
np.array(raw), self.input_bits, self.extraction_ratio | ||
).astype(int) | ||
upscaled = upscale(extracted, self.extraction_ratio, self.precision_bits) | ||
return upscaled.astype(float) / (2**self.precision_bits - 1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from typing import List, Optional, Sequence, Union | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
from serial import Serial | ||
|
||
from ..abstract import Instrument | ||
from .extractors import Extractor, ShaExtractor | ||
|
||
__all__ = ["QRNG"] | ||
|
||
|
||
def read(port: Serial, n: int = 1, nbytes: int = 4) -> List[int]: | ||
"""Read raw samples from the QRNG device serial output. | ||
In the entropy mode of the device, these typically follow a | ||
normal distribution. | ||
Args: | ||
n: Number of samples to retrieve. | ||
nbytes: Number of bytes to read from serial port to generate one raw sample. | ||
""" | ||
samples = [] | ||
while len(samples) < n: | ||
num_str = "" | ||
while len(num_str) < nbytes: | ||
sample = port.read(1) | ||
if sample == b" ": | ||
break | ||
num_str += sample.decode("utf-8") | ||
try: | ||
samples.append(int(num_str)) | ||
except ValueError: | ||
pass | ||
return samples | ||
|
||
|
||
class QRNG(Instrument): | ||
"""Driver to sample numbers from a Quantum Random Number Generator (QRNG). | ||
See :ref:`qrng` for example usage. | ||
""" | ||
|
||
address: str | ||
baudrate: int = 115200 | ||
extractor: Extractor = ShaExtractor() | ||
port: Optional[Serial] = None | ||
|
||
bytes_per_number: int = 4 | ||
"""Number of bytes to read from serial port to generate one raw sample.""" | ||
|
||
def connect(self): | ||
if self.port is None: | ||
self.port = Serial(self.address, self.baudrate) | ||
|
||
def disconnect(self): | ||
if self.port is not None: | ||
self.port.close() | ||
self.port = None | ||
|
||
def read(self, n: int) -> List[int]: | ||
"""Read raw samples from the QRNG device serial output. | ||
In the entropy mode of the device, these typically follow a | ||
normal distribution. | ||
Args: | ||
n: Number of samples to retrieve. | ||
""" | ||
return read(self.port, n, self.bytes_per_number) | ||
|
||
def random(self, size: Optional[Union[int, Sequence[int]]] = None) -> npt.NDArray: | ||
"""Returns random floats following uniform distribution in [0, 1]. | ||
Args: | ||
size: Shape of the returned array (to behave similarly to ``np.random.random``). | ||
""" | ||
n = np.prod(size) | ||
nraw = self.extractor.num_raw_samples(n) | ||
raw = self.read(nraw) | ||
extracted = self.extractor.extract(raw)[:n] | ||
return np.reshape(extracted, size) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
"""Quantum random number generator drivers.""" | ||
|
||
from qibolab._core.instruments.qrng import * # noqa: F403 | ||
from qibolab._core.instruments.qrng import qrng | ||
|
||
__all__ = [] | ||
__all__ += qrng.__all__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import numpy as np | ||
import pytest | ||
from serial.serialutil import SerialException | ||
|
||
from qibolab.instruments.qrng import QRNG, ShaExtractor, ToeplitzExtractor | ||
|
||
RAW_BITS = 12 | ||
"""Number of bits in each QRNG sample.""" | ||
|
||
|
||
@pytest.fixture | ||
def extractor(): | ||
return ShaExtractor() | ||
|
||
|
||
class MockPort: | ||
"""Mock the serial port when QRNG device is not available for testing.""" | ||
|
||
def read(self, n: int) -> str: | ||
data = [] | ||
for i in range(n): | ||
if i % 4 == 3: | ||
data.append(" ") | ||
else: | ||
data.append(str(np.random.randint(0, 10))) | ||
return "".join(data).encode("utf-8") | ||
|
||
|
||
@pytest.fixture | ||
def qrng(extractor): | ||
qrng = QRNG(address="/dev/ttyACM0", extractor=extractor) | ||
try: | ||
qrng.connect() | ||
except SerialException: | ||
qrng.port = MockPort() | ||
return qrng | ||
|
||
|
||
@pytest.mark.parametrize("extractor", [ShaExtractor(), ToeplitzExtractor()]) | ||
def test_qrng_random(qrng): | ||
data = qrng.random(1000) | ||
assert isinstance(data, np.ndarray) | ||
assert data.shape == (1000,) |