Skip to content

Commit

Permalink
Develop consistent nomenclature for modulator variables
Browse files Browse the repository at this point in the history
Fixes #49
  • Loading branch information
mhostetter committed Dec 9, 2023
1 parent 9a0928f commit 66c45bc
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 81 deletions.
26 changes: 13 additions & 13 deletions docs/examples/psk.ipynb

Large diffs are not rendered by default.

72 changes: 44 additions & 28 deletions src/sdr/_modulation/_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@

@export
class LinearModulation:
"""
r"""
Implements linear phase/amplitude modulation with arbitrary symbol mapping.
Note:
The nomenclature for variable names in linear modulators is as follows: $s[k]$ are decimal symbols,
$\hat{s}[k]$ are decimal symbol decisions, $a[k]$ are complex symbols, $\tilde{a}[k]$ are received complex
symbols, $\hat{a}[k]$ are complex symbol decisions, $x[n]$ are pulse-shaped complex samples, and
$\tilde{x}[n]$ are received pulse-shaped complex samples. $k$ indicates a symbol index and $n$ indicates a
sample index.
Group:
modulation-linear
"""
Expand Down Expand Up @@ -137,24 +144,28 @@ def _map_symbols(self, s: npt.NDArray[np.int_]) -> npt.NDArray[np.complex_]:
a = self.symbol_map[s] # Complex symbols
return a

def decide_symbols(self, a_hat: npt.ArrayLike) -> npt.NDArray[np.int_]:
def decide_symbols(self, a_tilde: npt.ArrayLike) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
r"""
Converts the received complex symbols $\hat{a}[k]$ into decimal symbol decisions $\hat{s}[k]$
using maximum-likelihood estimation (MLE).
Converts the received complex symbols $\tilde{a}[k]$ into decimal symbol decisions $\hat{s}[k]$
and complex symbol decisions $\hat{a}[k]$ using maximum-likelihood estimation (MLE).
Arguments:
a_hat: The received complex symbols $\hat{a}[k]$.
a_tilde: The received complex symbols $\tilde{a}[k]$.
Returns:
The decimal symbol decisions $\hat{s}[k]$, $0$ to $M-1$.
- The decimal symbol decisions $\hat{s}[k]$, $0$ to $M-1$.
- The complex symbol decisions $\hat{a}[k]$.
"""
a_hat = np.asarray(a_hat) # Complex symbols
return self._decide_symbols(a_hat)
a_tilde = np.asarray(a_tilde) # Complex symbols
return self._decide_symbols(a_tilde)

def _decide_symbols(self, a_hat: npt.NDArray[np.complex_]) -> npt.NDArray[np.int_]:
error_vectors = np.subtract.outer(a_hat, self.symbol_map)
def _decide_symbols(
self, a_tilde: npt.NDArray[np.complex_]
) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
error_vectors = np.subtract.outer(a_tilde, self.symbol_map)
s_hat = np.argmin(np.abs(error_vectors), axis=-1)
return s_hat
a_hat = self.symbol_map[s_hat]
return s_hat, a_hat

def modulate(self, s: npt.ArrayLike) -> npt.NDArray[np.complex_]:
r"""
Expand All @@ -179,45 +190,50 @@ def _tx_pulse_shape(self, a: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex
x = self._tx_filter(a, mode="full") # Complex samples
return x

def demodulate(self, x_hat: npt.ArrayLike) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
def demodulate(
self, x_tilde: npt.ArrayLike
) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_], npt.NDArray[np.complex_]]:
r"""
Demodulates the pulse-shaped complex samples $\hat{x}[n]$ into decimal symbol decisions $\hat{s}[k]$
Demodulates the pulse-shaped complex samples $\tilde{x}[n]$ into decimal symbol decisions $\hat{s}[k]$
using matched filtering and maximum-likelihood estimation.
Arguments:
x_hat: The received pulse-shaped complex samples $\hat{x}[n]$ to demodulate, with :obj:`sps`
x_tilde: The received pulse-shaped complex samples $\tilde{x}[n]$ to demodulate, with :obj:`sps`
samples per symbol and length `sps * s_hat.size + pulse_shape.size - 1`.
Returns:
- The decimal symbol decisions $\hat{s}[k]$, $0$ to $M-1$.
- The matched filter outputs $\hat{a}[k]$.
- The matched filter outputs $\tilde{a}[k]$.
- The complex symbol decisions $\hat{a}[k]$.
"""
x_hat = np.asarray(x_hat) # Complex samples
return self._demodulate(x_hat)
x_tilde = np.asarray(x_tilde) # Complex samples
return self._demodulate(x_tilde)

def _demodulate(self, x_hat: npt.NDArray[np.complex_]) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
a_hat = self._rx_matched_filter(x_hat) # Complex symbols
s_hat = self._decide_symbols(a_hat) # Decimal symbols
return s_hat, a_hat
def _demodulate(
self, x_tilde: npt.NDArray[np.complex_]
) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_], npt.NDArray[np.complex_]]:
a_tilde = self._rx_matched_filter(x_tilde) # Complex symbols
s_hat, a_hat = self._decide_symbols(a_tilde) # Decimal symbols
return s_hat, a_tilde, a_hat

def _rx_matched_filter(self, x_hat: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex_]:
def _rx_matched_filter(self, x_tilde: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex_]:
if self.pulse_shape.size % self.sps == 0:
x_hat = np.insert(x_hat, 0, 0)
x_tilde = np.insert(x_tilde, 0, 0)

a_hat = self._rx_filter(x_hat, mode="full") # Complex symbols
a_tilde = self._rx_filter(x_tilde, mode="full") # Complex symbols

span = self.pulse_shape.size // self.sps
if span == 1:
N_symbols = x_hat.size // self.sps
N_symbols = x_tilde.size // self.sps
offset = span
else:
N_symbols = x_hat.size // self.sps - span
N_symbols = x_tilde.size // self.sps - span
offset = span

# Select the symbol decisions from the output of the decimating filter
a_hat = a_hat[offset : offset + N_symbols]
a_tilde = a_tilde[offset : offset + N_symbols]

return a_hat
return a_tilde

@abc.abstractmethod
def ber(self, ebn0: npt.ArrayLike) -> npt.NDArray[np.float_]:
Expand Down
9 changes: 8 additions & 1 deletion src/sdr/_modulation/_msk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ class MSK(OQPSK):
MSK can also be consider as continuous-phase frequency-shift keying (CPFSK) with the frequency separation
equaling half the bit period.
Note:
The nomenclature for variable names in linear modulators is as follows: $s[k]$ are decimal symbols,
$\hat{s}[k]$ are decimal symbol decisions, $a[k]$ are complex symbols, $\tilde{a}[k]$ are received complex
symbols, $\hat{a}[k]$ are complex symbol decisions, $x[n]$ are pulse-shaped complex samples, and
$\tilde{x}[n]$ are received pulse-shaped complex samples. $k$ indicates a symbol index and $n$ indicates a
sample index.
Examples:
Create a MSK modem.
Expand Down Expand Up @@ -97,7 +104,7 @@ class MSK(OQPSK):
.. ipython:: python
rx_symbols, rx_complex_symbols = msk.demodulate(rx_samples)
rx_symbols, rx_complex_symbols, _ = msk.demodulate(rx_samples)
# The symbol decisions are error-free
np.array_equal(symbols, rx_symbols)
Expand Down
85 changes: 55 additions & 30 deletions src/sdr/_modulation/_psk.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class PSK(LinearModulation):
$$a[k] = \exp \left[ j\left(\frac{2\pi}{M}s[k] + \phi\right) \right] .$$
Note:
The nomenclature for variable names in linear modulators is as follows: $s[k]$ are decimal symbols,
$\hat{s}[k]$ are decimal symbol decisions, $a[k]$ are complex symbols, $\tilde{a}[k]$ are received complex
symbols, $\hat{a}[k]$ are complex symbol decisions, $x[n]$ are pulse-shaped complex samples, and
$\tilde{x}[n]$ are received pulse-shaped complex samples. $k$ indicates a symbol index and $n$ indicates a
sample index.
Examples:
Create a QPSK modem whose constellation has a 45° phase offset.
Expand Down Expand Up @@ -84,7 +91,7 @@ class PSK(LinearModulation):
.. ipython:: python
rx_symbols, rx_complex_symbols = qpsk.demodulate(rx_samples)
rx_symbols, rx_complex_symbols, _ = qpsk.demodulate(rx_samples)
# The symbol decisions are error-free
np.array_equal(symbols, rx_symbols)
Expand Down Expand Up @@ -463,6 +470,13 @@ class PiMPSK(PSK):
\end{cases}
$$
Note:
The nomenclature for variable names in linear modulators is as follows: $s[k]$ are decimal symbols,
$\hat{s}[k]$ are decimal symbol decisions, $a[k]$ are complex symbols, $\tilde{a}[k]$ are received complex
symbols, $\hat{a}[k]$ are complex symbol decisions, $x[n]$ are pulse-shaped complex samples, and
$\tilde{x}[n]$ are received pulse-shaped complex samples. $k$ indicates a symbol index and $n$ indicates a
sample index.
Examples:
Create a $\pi/4$ QPSK modem.
Expand Down Expand Up @@ -516,7 +530,7 @@ class PiMPSK(PSK):
.. ipython:: python
rx_symbols, rx_complex_symbols = pi4_qpsk.demodulate(rx_samples)
rx_symbols, rx_complex_symbols, _ = pi4_qpsk.demodulate(rx_samples)
# The symbol decisions are error-free
np.array_equal(symbols, rx_symbols)
Expand Down Expand Up @@ -580,20 +594,22 @@ def __init__(
)

def _map_symbols(self, s: npt.NDArray[np.int_]) -> npt.NDArray[np.complex_]:
s = super()._map_symbols(s)
a = super()._map_symbols(s)

# Rotate odd symbols by pi/M
s_rotated = s.copy()
s_rotated[1::2] *= np.exp(1j * np.pi / self.order)
a_rotated = a.copy()
a_rotated[1::2] *= np.exp(1j * np.pi / self.order)

return s_rotated
return a_rotated

def _decide_symbols(self, a_hat: npt.NDArray[np.complex_]) -> npt.NDArray[np.int_]:
def _decide_symbols(
self, a_tilde: npt.NDArray[np.complex_]
) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
# Rotate odd symbols by -pi/M
a_hat_derotated = a_hat.copy()
a_hat_derotated[1::2] *= np.exp(-1j * np.pi / self.order)
a_tilde_derotated = a_tilde.copy()
a_tilde_derotated[1::2] *= np.exp(-1j * np.pi / self.order)

return super()._decide_symbols(a_hat_derotated)
return super()._decide_symbols(a_tilde_derotated)


@export
Expand Down Expand Up @@ -622,6 +638,13 @@ class OQPSK(PSK):
\end{align}
$$
Note:
The nomenclature for variable names in linear modulators is as follows: $s[k]$ are decimal symbols,
$\hat{s}[k]$ are decimal symbol decisions, $a[k]$ are complex symbols, $\tilde{a}[k]$ are received complex
symbols, $\hat{a}[k]$ are complex symbol decisions, $x[n]$ are pulse-shaped complex samples, and
$\tilde{x}[n]$ are received pulse-shaped complex samples. $k$ indicates a symbol index and $n$ indicates a
sample index.
Examples:
Create a OQPSK modem.
Expand Down Expand Up @@ -681,7 +704,7 @@ class OQPSK(PSK):
.. ipython:: python
rx_symbols, rx_complex_symbols = oqpsk.demodulate(rx_samples)
rx_symbols, rx_complex_symbols, _ = oqpsk.demodulate(rx_samples)
# The symbol decisions are error-free
np.array_equal(symbols, rx_symbols)
Expand Down Expand Up @@ -796,38 +819,40 @@ def _tx_pulse_shape(self, a: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex

return x

def _decide_symbols(self, a_hat: npt.NDArray[np.complex_]) -> npt.NDArray[np.int_]:
a_hat = np.asarray(a_hat)
a_hat_I, a_hat_Q = a_hat.real, a_hat.imag
def _decide_symbols(
self, a_tilde: npt.NDArray[np.complex_]
) -> tuple[npt.NDArray[np.int_], npt.NDArray[np.complex_]]:
a_tilde = np.asarray(a_tilde)
a_tilde_I, a_tilde_Q = a_tilde.real, a_tilde.imag

# Shift Q symbols by -1/2 symbol and grab 1 sample per symbol
a_hat_I = a_hat_I[:-1:2]
a_hat_Q = a_hat_Q[1::2]
a_tilde_I = a_tilde_I[:-1:2]
a_tilde_Q = a_tilde_Q[1::2]

a_hat = a_hat_I + 1j * a_hat_Q
a_tilde = a_tilde_I + 1j * a_tilde_Q

return super()._decide_symbols(a_hat)
return super()._decide_symbols(a_tilde)

def _rx_matched_filter(self, x_hat: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex_]:
x_hat_I, x_hat_Q = x_hat.real, x_hat.imag
def _rx_matched_filter(self, x_tilde: npt.NDArray[np.complex_]) -> npt.NDArray[np.complex_]:
x_tilde_I, x_tilde_Q = x_tilde.real, x_tilde.imag

# Shift Q samples by -1/2 symbol
x_hat_I = x_hat_I[: -self.sps // 2]
x_hat_Q = x_hat_Q[self.sps // 2 :]
x_tilde_I = x_tilde_I[: -self.sps // 2]
x_tilde_Q = x_tilde_Q[self.sps // 2 :]

a_hat_I = super()._rx_matched_filter(x_hat_I) # Complex samples
a_hat_Q = super()._rx_matched_filter(x_hat_Q) # Complex samples
a_tilde_I = super()._rx_matched_filter(x_tilde_I) # Complex samples
a_tilde_Q = super()._rx_matched_filter(x_tilde_Q) # Complex samples

a_hat_I = np.repeat(a_hat_I, 2)
a_hat_Q = np.repeat(a_hat_Q, 2)
a_tilde_I = np.repeat(a_tilde_I, 2)
a_tilde_Q = np.repeat(a_tilde_Q, 2)

# Shift Q symbols by 1/2 symbol
a_hat_I = np.append(a_hat_I, 0)
a_hat_Q = np.insert(a_hat_Q, 0, 0)
a_tilde_I = np.append(a_tilde_I, 0)
a_tilde_Q = np.insert(a_tilde_Q, 0, 0)

a_hat = a_hat_I + 1j * a_hat_Q
a_tilde = a_tilde_I + 1j * a_tilde_Q

return a_hat
return a_tilde


def Pk(M: int, esn0_linear: float, j: int) -> float:
Expand Down
16 changes: 8 additions & 8 deletions tests/modulation/psk/test_decide_symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,62 +46,62 @@
def test_bpsk_bin():
phi = np.rad2deg(-2.394815492222444)
psk = sdr.PSK(2, phase_offset=phi, symbol_labels="bin")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0])
assert np.array_equal(s, s_truth)


def test_bpsk_gray():
phi = np.rad2deg(-0.753083802680199)
psk = sdr.PSK(2, phase_offset=phi, symbol_labels="gray")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1])
assert np.array_equal(s, s_truth)


def test_qpsk_bin():
phi = np.rad2deg(1.965584775048307)
psk = sdr.PSK(4, phase_offset=phi, symbol_labels="bin")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([1, 0, 0, 2, 2, 2, 2, 3, 1, 1, 2, 1, 0, 2, 2, 1, 3, 2, 3, 1])
assert np.array_equal(s, s_truth)


def test_qpsk_gray():
phi = np.rad2deg(-1.607892783327202)
psk = sdr.PSK(4, phase_offset=phi, symbol_labels="gray")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([2, 3, 3, 1, 1, 0, 1, 3, 2, 0, 1, 0, 2, 0, 0, 2, 1, 1, 1, 2])
assert np.array_equal(s, s_truth)


def test_8psk_bin():
phi = np.rad2deg(2.415398803363657)
psk = sdr.PSK(8, phase_offset=phi, symbol_labels="bin")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([1, 7, 7, 4, 4, 3, 4, 6, 1, 2, 4, 2, 0, 3, 3, 2, 6, 4, 5, 1])
assert np.array_equal(s, s_truth)


def test_8psk_gray():
phi = np.rad2deg(1.336099216449522)
psk = sdr.PSK(8, phase_offset=phi, symbol_labels="gray")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([2, 0, 1, 5, 7, 7, 5, 0, 3, 2, 5, 6, 1, 6, 7, 2, 4, 5, 4, 2])
assert np.array_equal(s, s_truth)


def test_16psk_bin():
phi = np.rad2deg(-0.765615943499161)
psk = sdr.PSK(16, phase_offset=phi, symbol_labels="bin")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([11, 6, 7, 1, 0, 15, 1, 5, 10, 12, 1, 13, 8, 14, 15, 12, 3, 1, 3, 10])
assert np.array_equal(s, s_truth)


def test_16psk_gray():
phi = np.rad2deg(-1.577584611111288)
psk = sdr.PSK(16, phase_offset=phi, symbol_labels="gray")
s = psk.decide_symbols(X_HAT)
s, _ = psk.decide_symbols(X_HAT)
s_truth = np.array([11, 12, 13, 2, 3, 1, 2, 4, 10, 9, 2, 8, 15, 0, 1, 9, 7, 2, 7, 10])
assert np.array_equal(s, s_truth)
2 changes: 1 addition & 1 deletion tests/modulation/psk/test_demodulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _verify_demodulation(psk: sdr.PSK):
s = np.random.randint(0, psk.order, 20)
a = psk.map_symbols(s)
x = psk.modulate(s)
s_hat, a_hat = psk.demodulate(x)
s_hat, a_tilde, a_hat = psk.demodulate(x)

assert np.array_equal(s, s_hat)
np.testing.assert_array_almost_equal(a, a_hat, decimal=3)

0 comments on commit 66c45bc

Please sign in to comment.