diff --git a/MATLAB/tests/single_ch.mat b/MATLAB/tests/single_ch.mat new file mode 100644 index 00000000..86af5719 Binary files /dev/null and b/MATLAB/tests/single_ch.mat differ diff --git a/envs/requirements.txt b/envs/requirements.txt index f1c8932d..43001b22 100644 --- a/envs/requirements.txt +++ b/envs/requirements.txt @@ -14,7 +14,7 @@ matplotlib scipy scikit-image pandas -git+https://github.com/joblib/joblib.git +joblib psutil pyvistaqt pytz diff --git a/ieeg/_tests/test_mat.py b/ieeg/_tests/test_mat.py new file mode 100644 index 00000000..840af9ac --- /dev/null +++ b/ieeg/_tests/test_mat.py @@ -0,0 +1,235 @@ +import numpy as np +import pytest +from ieeg.calc.mat import concatenate_arrays, get_homogeneous_shapes, \ + LabeledArray, combine, iter_nest_dict + + +@pytest.mark.parametrize("arrays, axis, expected_output", [ + # Test case 1: Concatenate along axis 0 + ([np.array([]), np.array([[1, 2], [3, 4]]), + np.array([[5, 6, 7], [8, 9, 10]])], + 0, + np.array([[1, 2, np.nan], [3, 4, np.nan], [5, 6, 7], [8, 9, 10]])), + + # Test case 2: Concatenate along axis 1 + ([np.ones((2, 1)), np.zeros((3, 1))], 1, + np.array([[1, 0], [1, 0], [np.nan, 0]])), + + # Test case 3: Empty input arrays + ([np.array([]), np.array([])], 0, None), + + # Test case 4: Concatenate along axis 2 + ([np.array([[[1]], [[2]]]), np.array([[[3], [4]], [[5], [6]]])], + 2, + np.array([[[1, 3], [np.nan, 4]], [[2, 5], [np.nan, 6]]])), + + # Test case 5: Concatenate along axis 0 with empty array in the middle + ([np.array([[1, 2], [3, 4]]), np.array([]), + np.array([[5, 6, 7], [8, 9, 10]])], + 0, + np.array([[1, 2, np.nan], [3, 4, np.nan], [5, 6, 7], [8, 9, 10]])), + + # Test case 6: Concatenate along axis 0 with empty arrays at the beginning + # and end + ([np.array([]), np.array([[1, 2], [3, 4]]), np.array([])], + 0, + np.array([[1, 2], [3, 4]])), + + # Test case 7: Concatenate along axis -1 (last axis) + ([np.array([[[1]], [[2]]]), np.array([[[3], [4]], [[5], [6]]])], + -1, + np.array([[[1, 3], [np.nan, 4]], [[2, 5], [np.nan, 6]]])), + + # Test case 8: Concatenate an array containing only nan values + ([np.array([[np.nan, np.nan], [np.nan, np.nan]]), + np.array([[5, 6, 7], [8, 9, 10]])], + 0, + np.array([[np.nan, np.nan, np.nan], [np.nan, np.nan, np.nan], + [5, 6, 7], [8, 9, 10]])), + + # Test case 9: Concatenate along new axis + ([np.array([[1, 2], [3, 4]]), np.array([[5, 6, 7], [8, 9, 10]])], + None, + np.array([[[1, 2, np.nan], [3, 4, np.nan]], [[5, 6, 7], [8, 9, 10]]])) +]) +def test_concatenate_arrays(arrays, axis, expected_output): + print(f"Shapes {[arr.shape for arr in arrays]}") + try: + new = concatenate_arrays(arrays, axis) + print(f"New shape {new.shape}") + if axis is None: + axis = 0 + arrays = [np.expand_dims(arr, axis) for arr in arrays] + while axis < 0: + axis += new.ndim + congruency = new.shape == np.max(get_homogeneous_shapes(arrays), + axis=0) + print(congruency) + assert all([con for i, con in enumerate(congruency) if i != axis]) + assert np.array_equal(new, expected_output, True) + except ValueError as e: + try: + assert expected_output is None + except AssertionError: + raise e + + +# Test creation of ArrayDict +def test_array_creation(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + assert isinstance(ad, LabeledArray) + + +# Test conversion to numpy array +def test_array_to_array(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + np_array = np.array([[[1, 2, 3], [4, 5, np.nan]]]) + assert np.array_equal(ad, np_array, True) + + +# Test getting all keys +def test_array_all_keys(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + keys = (('a',), ('b', 'f'), ('c', 'd', 'e')) + assert ad.labels == keys + + +# Test getting all keys in a really nested ArrayDict +def test_array_all_keys_nested(): + data = {'a': {'b': {'c': {'d': {'e': {'f': {'g': {'h': {'i': {'j': { + 'k': 1}}}}}}}}}}} + ad = LabeledArray.from_dict(data) + keys = (('a',), ('b',), ('c',), ('d',), ('e',), ('f',), ('g',), ('h',), + ('i',), ('j',), ('k',)) + assert ad.labels == keys + + +# Test getting all keys in a large ArrayDict (10000 keys) +def test_array_all_keys_large(): + data = {str(i): i for i in range(100000)} + ad = LabeledArray.from_dict(data) + labels = set(ad.labels[0]) + assert labels == set(map(str, range(100000))) + + +# Test indexing with a single key +def test_array_single_key_indexing(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + subset = LabeledArray.from_dict({'c': 1, 'd': 2, 'e': 3}) + assert ad['a']['b'] == subset + + +# Test indexing with a tuple of keys that leads to a scalar value +def test_array_scalar_value_indexing(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + assert ad['a']['b']['d'] == 2 + + +# Test shape property +def test_array_shape(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + assert ad.shape == (1, 2, 3) + + +# Test combine +@pytest.mark.parametrize('data, dims, expected', [ + ({'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}}, (1, 2), + {'a': {'b-c': 1, 'b-d': 2, 'b-e': 3, 'f-c': 4, 'f-d': 5}}), + ({'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}}, (0, 2), + {'b': {'a-c': 1, 'a-d': 2, 'a-e': 3}, 'f': {'a-c': 4, 'a-d': 5}}), + ({'a': {'b': {'c': {'d': {'e': 1, 'f': 2}}}}}, (0, 4), + {'b': {'c': {'d': {'a-e': 1, 'a-f': 2}}}}), + ({'a': {'b': {'c': {'d': {'e': 1, 'f': 2}}}}, + 'g': {'b': {'c': {'d': {'e': 3, 'f': 4}}}}}, (0, 4), + {'b': {'c': {'d': {'a-e': 1, 'a-f': 2, 'g-e': 3, 'g-f': 4}}}}) +]) +def test_combine(data, dims, expected): + new = combine(data, dims) + assert new == expected + + +# Test nested dict iterator +def test_dict_iterator(): + data = {'a': {'b': {'c': {'d': {'e': 1, 'f': 2}}}}} + iterator = iter_nest_dict(data) + assert len(list(iterator)) == 2 + + +# Test combine dimensions with arrays +def test_array_dict_combine_dimensions_with_arrays(): + data = {'b': {'c': np.array([1, 2, 3]), 'd': np.array([4, 5, 6])}, + 'f': {'c': np.array([7, 8, 9])}} + ad = LabeledArray.from_dict(data) + new = ad.combine((1, 2)) + assert new == LabeledArray([[1., 2., 3., 4., 5., 6.], + [7., 8., 9., np.nan, np.nan, np.nan]], + labels=(('b', 'f'), ('c-0', 'c-1', 'c-2', + 'd-0', 'd-1', 'd-2'))) + assert new['b'].to_dict() == {'c-0': 1., 'c-1': 2., 'c-2': 3., 'd-0': 4., + 'd-1': 5., 'd-2': 6.} + + +def test_from_dict(): + data = {'a': {'b': {'c': 1, 'd': 2, 'e': 3}, 'f': {'c': 4, 'd': 5}}} + ad = LabeledArray.from_dict(data) + expected_labels = (('a',), ('b', 'f'), ('c', 'd', 'e')) + assert ad.labels == expected_labels + expected_array = np.array([[[1, 2, 3], [4, 5, float('nan')]]]) + np.testing.assert_array_equal(ad, expected_array) + + +def test_eq(): + data1 = {'a': {'b': {'c': 1}}} + ad1 = LabeledArray.from_dict(data1) + + data2 = {'a': {'b': {'c': 1}}} + ad2 = LabeledArray.from_dict(data2) + + data3 = {'a': {'b': {'d': 1}}} + ad3 = LabeledArray.from_dict(data3) + + assert ad1 == ad2 + assert ad1 != ad3 + + +def test_repr(): + data = {'a': {'b': {'c': 1}}} + ad = LabeledArray.from_dict(data) + expected_repr = "LabeledArray([[[1.]]]), labels=(('a',), ('b',), ('c',))" \ + " ~8.00 B" + assert repr(ad) == expected_repr + + +@pytest.mark.parametrize('idx', [ + (0,), + (0, 0), + (..., 0, 0), + (..., 0), + (slice(None), 0), + (slice(None), 0, ...) +]) +def test_numpy_idx(idx): + data = np.array([[1., 2., 3.], [4., 5., 6.]]) + ad = LabeledArray(data, labels=(('a', 'b'), ('c', 'd', 'e'))) + assert np.array_equal(ad[idx], data[idx]) + + +@pytest.mark.parametrize('idx, expected', [ + ((0,), (('b',), ('c', 'd'))), + ((0, 0), (('c', 'd'),)), + ((..., 0, 0), (('a',),)), + ((..., 0), (('a',), ('b',))), + ((slice(None), 0), (('a',), ('c', 'd'))), + ((slice(None), 0, slice(None)), (('a',), ('c', 'd'))), + (('b',), (('a',), ('c', 'd'))), + (('b', 'c'), (('a',),)), +]) +def test_idx(idx, expected): + ad = LabeledArray([[[1, 2]]], labels=(('a',), ('b',), ('c', 'd'))) + assert ad[idx].labels == expected diff --git a/ieeg/calc/mat.py b/ieeg/calc/mat.py new file mode 100644 index 00000000..1aad5077 --- /dev/null +++ b/ieeg/calc/mat.py @@ -0,0 +1,628 @@ +import numpy as np +from collections.abc import Iterable + + +def iter_nest_dict(d: dict, _lvl: int = 0, _coords=()): + """Iterate over a nested dictionary, yielding the key and value. + + Parameters + ---------- + d : dict + The dictionary to iterate over. + _lvl : int, optional + The current level of nesting, by default 0 + _coords : tuple, optional + The current coordinates of the array, by default () + + Yields + ------ + tuple + The key and value of the dictionary. + """ + for k, v in d.items(): + if isinstance(v, dict): + yield from iter_nest_dict(v, _lvl + 1, _coords + (k,)) + elif isinstance(v, np.ndarray): + yield from iter_nest_dict({i: val for i, val in enumerate(v) + }, _lvl + 1, _coords + (k,)) + else: + yield _coords + (k,), v + + +class LabeledArray(np.ndarray): + """ A numpy array with labeled dimensions, acting like a dictionary. + + A numpy array with labeled dimensions. This class is useful for storing + data that is not easily represented in a tabular format. It acts as a + nested dictionary but its values map to elements of a stored numpy array. + + Parameters + ---------- + input_array : array_like + The array to store in the LabeledArray. + labels : tuple[tuple[str, ...], ...], optional + The labels for each dimension of the array, by default (). + **kwargs + Additional arguments to pass to np.asarray. + + Attributes + ---------- + labels : tuple[tuple[str, ...], ...] + The labels for each dimension of the array. + array : np.ndarray + The array stored in the LabeledArray. + + Examples + -------- + >>> import numpy as np + >>> from ieeg.calc.mat import LabeledArray + >>> arr = np.ones((2, 3, 4)) + >>> labels = (('a', 'b'), ('c', 'd', 'e'), ('f', 'g', 'h', 'i')) + >>> la = LabeledArray(arr, labels) + >>> la.to_dict() + {'a': {'c': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}, + 'd': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}, + 'e': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}}, + 'b': {'c': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}, + 'd': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}, + 'e': {'f': 1.0, 'g': 1.0, 'h': 1.0, 'i': 1.0}}} + >>> la['a', 'c', 'f'] = 2. + >>> la['a', 'c', 'f'] + 2.0 + + References + ---------- + [1] https://numpy.org/doc/stable/user/basics.subclassing.html + """ + + def __new__(cls, input_array, labels: tuple[tuple[str, ...], ...] = (), + **kwargs): + obj = np.asarray(input_array, **kwargs).view(cls) + labels = list(labels) + for i in range(obj.ndim): + if len(labels) < i + 1: + labels.append(tuple(range(obj.shape[i]))) + obj.labels = tuple(labels) + return obj + + @classmethod + def from_dict(cls, data: dict, **kwargs) -> 'LabeledArray': + """Create a LabeledArray from a dictionary. + + Parameters + ---------- + data : dict + The dictionary to convert to a LabeledArray. + + Returns + ------- + LabeledArray + The LabeledArray created from the dictionary. + """ + + arr = inner_array(data) + keys = inner_all_keys(data) + return cls(arr, keys, **kwargs) + + def __array_finalize__(self, obj): + if obj is None: + return + self.labels = getattr(obj, 'labels', None) + + def dropna(self) -> 'LabeledArray': + """Remove all nan values from the array.""" + new_labels = list(self.labels) + idx = [] + for i in range(self.ndim): + axes = tuple(j for j in range(self.ndim) if j != i) + mask = np.all(np.isnan(np.array(self)), axis=axes) + if np.any(mask): + new_labels[i] = tuple(np.array(new_labels[i])[~mask]) + idx.append(~mask) + index = np.ix_(*idx) + new_array = LabeledArray(np.array(self)[index], new_labels) + return new_array + + @property + def label_map(self) -> tuple[dict[str: int, ...], ...]: + """maps the labels to the indices of the array.""" + return tuple({l: i for i, l in enumerate(labels)} + for labels in self.labels) + + def _str_parse(self, *keys) -> tuple[int, int]: + for key in keys: + match key: + case list() | tuple(): + key = list(key) + while key: + for value in self._str_parse(key.pop(0)): + yield value + case str(): + i = 0 + while key not in self.labels[i]: + i += 1 + if i > self.ndim: + raise KeyError(f'{key} not found in labels') + key = self.label_map[i][key] + yield i, key + case _: + yield 0, key + + def _parse_labels(self, keys: tuple) -> tuple: + new_keys = [] + new_labels = [] + dim = 0 + for key in keys: + if isinstance(key, str): + i, key = next(self._str_parse(key)) + new_keys.append(key) + while dim < i: + new_labels.append(self.labels[dim]) + dim += 1 + dim += 1 + elif key is Ellipsis: + new_keys.append(key) + num_ellipsis_dims = self.ndim - len(keys) + 1 + new_labels.extend(self.labels[dim:dim + num_ellipsis_dims]) + dim += num_ellipsis_dims + elif key is None: + new_keys.append(key) + new_labels.append(self.labels[dim]) + dim += 1 + elif isinstance(key, int): + new_keys.append(key) + dim += 1 + elif isinstance(key, slice): + new_keys.append(key) + new_labels.append(self.labels[dim]) + dim += 1 + else: + new_keys.append(key) + new_labels.append(self.labels[dim]) + dim += 1 + while dim < self.ndim: + new_labels.append(self.labels[dim]) + dim += 1 + return new_labels, new_keys + + def __getitem__(self, keys): + if not isinstance(keys, tuple): + keys = (keys,) + new_labels, new_keys = self._parse_labels(keys) + out = super().__getitem__(tuple(new_keys)) + if isinstance(out, np.ndarray): + setattr(out, 'labels', tuple(new_labels)) + return out + + def __setitem__(self, key, value): + dim, num_key = self._str_parse(key) + if key not in self.labels[dim]: + self.labels[dim] += (key,) + super(LabeledArray, self).__setitem__(num_key, value) + + def __delitem__(self, key): + dim, num_key = self._str_parse(key) + if key in self.labels[dim]: + self.labels = list(self.labels) + self.labels[dim] = tuple(lab for lab in self.labels[dim] + if lab != key) + self.labels = tuple(self.labels) + super(LabeledArray, self).__delitem__(key) + + def __repr__(self): + """Display like a dictionary with labels as keys""" + size = self.nbytes + for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']: + if size < 1024.0 or unit == 'PiB': + break + size /= 1024.0 + + return f'{super().__repr__()}, labels={self.labels} ~{size:.2f} {unit}' + + def __eq__(self, other): + if isinstance(other, LabeledArray): + return np.array_equal(self, other, True) and \ + self.labels == other.labels + return super().__eq__(other) + + def __ne__(self, other): + return not self.__eq__(other) + + def to_dict(self) -> dict: + """Convert to a dictionary.""" + out = {} + for k, v in self.items(): + if isinstance(v, LabeledArray): + out[k] = v.to_dict() + else: + out[k] = v + return out + + def items(self): + return zip(self.keys(), self.values()) + + def keys(self): + return (lab for lab in self.labels[0]) + + def values(self): + return (a for a in self) + + def combine(self, levels: tuple[int, int], + delim: str = '-', drop_nan: bool = True) -> 'LabeledArray': + """Combine any levels of a LabeledArray into the lower level + + Takes the input LabeledArray and rearranges its dimensions. + + Parameters + ---------- + levels : tuple[int, int] + The levels to combine, e.g. (0, 1) will combine the 1st and 2nd + level of the array labels into one level at the 2nd level. + delim : str, optional + The delimiter to use when combining labels, by default '-' + drop_nan : bool, optional + Whether to drop all NaN columns, by default True + + Returns + ------- + LabeledArray + The combined LabeledArray + + Examples + -------- + >>> data = {'a': {'b': {'c': 1}}} + >>> ad = LabeledArray.from_dict(data, dtype=int) + >>> ad.combine((0, 2)) + LabeledArray([[1]]), labels=(('b',), ('a-c',)) ~4.00 B + """ + + assert levels[0] >= 0, "first level must be >= 0" + assert levels[1] > levels[0], "second level must be > first level" + + new_labels = list(self.labels) + + new_labels.pop(levels[0]) + + new_labels[levels[1] - 1] = tuple( + f'{i}{delim}{j}' for i in + self.labels[levels[0]] for j in self.labels[levels[1]]) + + new_shape = list(self.shape) + + new_shape[levels[1]] = self.shape[levels[0]] * self.shape[ + levels[1]] + + new_shape.pop(levels[0]) + + new_array = np.reshape(self, new_shape) + + if drop_nan: + return LabeledArray(new_array, new_labels).dropna() + else: + return LabeledArray(new_array, new_labels) + + +def add_to_list_if_not_present(lst: list, element: Iterable): + """Add an element to a list if it is not present. Runs in O(1) time.""" + seen = set(lst) + lst.extend(x for x in element if not (x in seen or seen.add(x))) + + +def inner_all_keys(data: dict, keys: list = None, lvl: int = 0): + """Get all keys of a nested dictionary.""" + if keys is None: + keys = [] + if np.isscalar(data): + return + elif isinstance(data, dict): + if len(keys) < lvl + 1: + keys.append(list(data.keys())) + else: + add_to_list_if_not_present(keys[lvl], data.keys()) + for d in data.values(): + inner_all_keys(d, keys, lvl+1) + elif isinstance(data, np.ndarray): + rows = range(data.shape[0]) + if len(keys) < lvl+1: + keys.append(list(rows)) + else: + add_to_list_if_not_present(keys[lvl], rows) + if len(data.shape) > 1: + inner_all_keys(data[0], keys, lvl+1) + else: + raise TypeError(f"Unexpected data type: {type(data)}") + return tuple(map(tuple, keys)) + + +def inner_array(data: dict | np.ndarray) -> np.ndarray | None: + """Convert a nested dictionary to a nested array.""" + if np.isscalar(data): + return data + elif len(data) == 0: + return + elif isinstance(data, dict): + arr = (inner_array(d) for d in data.values()) + arr = [a for a in arr if a is not None] + if len(arr) > 0: + return concatenate_arrays(arr, axis=None) + else: + return np.array(data) + + +def inner_dict(data: np.ndarray) -> dict | None: + """Convert a nested array to a nested dictionary.""" + if np.isscalar(data): + return data + elif len(data) == 0: + return + elif isinstance(data, np.ndarray): + return {i: inner_dict(d) for i, d in enumerate(data)} + else: + return data + + +def combine(data: dict, levels: tuple[int, int], delim: str = '-') -> dict: + """Combine any levels of a nested dict into the lower level + + Takes the input nested dict and rearranges the top and bottom + sub-dictionary. + + Parameters + data: dict + The nested dict to combine + levels: tuple[int, int] + The levels to combine, e.g. (0, 1) will combine the 1st and 2nd level + of the dict keys into one level at the 2nd level. + + Returns + dict + The combined dict + + Examples + >>> data = {'a': {'b': {'c': 1}}} + >>> combine(data, (0, 2)) + {'b': {'a-c': 1}} + + >>> data = {'a': {'b': {'c': 1}}, 'd': {'b': {'c': 2, 'e': 3}}} + >>> combine(data, (0, 2)) + {'b': {'a-c': 1, 'd-c': 2, 'd-e': 3}} + """ + + assert levels[0] >= 0, "first level must be >= 0" + assert levels[1] > levels[0], "second level must be > first level" + + def _combine_helper(data, levels, depth, keys): + if depth == levels[1]: + return {f'{keys[levels[0]]}{delim}{k}': v for k, v in data.items()} + elif depth == levels[0]: + new_dict = {} + for k, v in data.items(): + for k2, v2 in _combine_helper(v, levels, depth + 1, + keys + [k]).items(): + if isinstance(v2, dict): + if k2 in new_dict: + new_dict[k2] = _merge(new_dict[k2], v2) + else: + new_dict[k2] = v2 + else: + new_dict[k2] = v2 + return new_dict + else: + return {k: _combine_helper(v, levels, depth + 1, keys + [k]) for + k, v in data.items()} + + def _merge(d1: dict, d2: dict) -> dict: + for k, v in d2.items(): + if isinstance(v, dict): + d1[k] = _merge(d1.get(k, {}), v) + else: + d1[k] = v + return d1 + + result = _combine_helper(data, levels, 0, []) + + return result + + +def concatenate_arrays(arrays: list[np.ndarray], axis: int = None + ) -> np.ndarray: + """Concatenate arrays along a specified axis, filling in empty arrays with + nan values. + + Parameters + ---------- + arrays + A list of arrays to concatenate + axis + The axis along which to concatenate the arrays + + Returns + ------- + result + The concatenated arrays + """ + + if axis is None: + axis = 0 + arrays = [np.expand_dims(ar, axis) for ar in arrays] + + while axis < 0: + axis += max(ar.ndim for ar in arrays) + + # Determine the maximum shape along the specified axis + max_shape = np.max(get_homogeneous_shapes(arrays), axis=0) + + # Create a list to store the modified arrays + modified_arrays = [] + + # Iterate over the arrays + for arr in arrays: + if len(arr) == 0: + continue + # Determine the shape of the array + arr_shape = list(max_shape) + arr_shape[axis] = arr.shape[axis] + + # Create an array filled with nan values + nan_array = np.full(arr_shape, np.nan) + + # Fill in the array with the original values + indexing = [slice(None)] * arr.ndim + for ax in range(arr.ndim): + if ax == axis: + continue + indexing[ax] = slice(0, arr.shape[ax]) + nan_array[tuple(indexing)] = arr + + # Append the modified array to the list + modified_arrays.append(nan_array) + + # Concatenate the modified arrays along the specified axis + result = np.concatenate(modified_arrays, axis=axis) + + return result + + +def get_homogeneous_shapes(arrays): + # Determine the maximum number of dimensions among the input arrays + max_dims = max([arr.ndim for arr in arrays]) + + # Create a list to store the shapes with a homogeneous number of dimensions + homogeneous_shapes = [] + + # Iterate over the arrays + for arr in arrays: + # Get the shape of the array + # Handle the case of an empty array + if len(arr) == 0: + shape = (0,) + dims = 1 + else: + shape = arr.shape + dims = arr.ndim + + # Pad the shape tuple with additional dimensions if necessary + num_dims_to_pad = max_dims - dims + shape += (1,) * num_dims_to_pad + + # Add the shape to the list + homogeneous_shapes.append(shape) + + return homogeneous_shapes + + +def get_elbow(data: np.ndarray) -> int: + """Draws a line between the first and last points in a dataset and finds + the point furthest from that line. + + Parameters + ---------- + data : array + The data to find the elbow in. + + Returns + ------- + int + The index of the elbow point. + """ + nPoints = len(data) + allCoord = np.vstack((range(nPoints), data)).T + np.array([range(nPoints), data]) + firstPoint = allCoord[0] + lineVec = allCoord[-1] - allCoord[0] + lineVecNorm = lineVec / np.sqrt(np.sum(lineVec ** 2)) + vecFromFirst = allCoord - firstPoint + scalarProduct = np.sum(vecFromFirst * np.matlib.repmat( + lineVecNorm, nPoints, 1), axis=1) + vecFromFirstParallel = np.outer(scalarProduct, lineVecNorm) + vecToLine = vecFromFirst - vecFromFirstParallel + distToLine = np.sqrt(np.sum(vecToLine ** 2, axis=1)) + # set distance to points below lineVec to 0 + distToLine[vecToLine[:, 1] < 0] = 0 + idxOfBestPoint = np.argmax(distToLine) + return idxOfBestPoint + + +def stitch_mats(mats: list[np.ndarray], overlaps: list[int], axis: int = 0 + ) -> np.ndarray: + """break up the matrices into their overlapping and non-overlapping parts + then stitch them back together + + Parameters + ---------- + mats : list + The matrices to stitch together + overlaps : list + The number of overlapping rows between each matrix + axis : int, optional + The axis to stitch along, by default 0 + + Returns + ------- + np.ndarray + The stitched matrix + """ + stitches = [mats[0]] + if len(mats) != len(overlaps) + 1: + raise ValueError("The number of matrices must be one more than the num" + "ber of overlaps") + for i, over in enumerate(overlaps): + stitches = stitches[:-2] + merge(stitches[-1], mats[i+1], over, axis) + return np.concatenate(stitches, axis=axis) + + +def merge(mat1: np.ndarray, mat2: np.ndarray, overlap: int, axis: int = 0 + ) -> list[np.ndarray]: + """Take two arrays and merge them over the overlap gradually""" + sl = [slice(None)] * mat1.ndim + sl[axis] = slice(0, mat1.shape[axis]-overlap) + start = mat1[tuple(sl)] + sl[axis] = slice(mat1.shape[axis]-overlap, mat1.shape[axis]) + middle1 = np.multiply(np.linspace(1, 0, overlap), mat1[tuple(sl)]) + sl[axis] = slice(0, overlap) + middle2 = np.multiply(np.linspace(0, 1, overlap), mat2[tuple(sl)]) + middle = np.add(middle1, middle2) + sl[axis] = slice(overlap, mat2.shape[axis]) + last = mat2[tuple(sl)] + return [start, middle, last] + + +if __name__ == "__main__": + import os + from ieeg.io import get_data + from utils.mat_load import load_dict + import mne + ins, axis, exp = ([np.array([]), np.array([[1., 2.], [3., 4.]]), + np.array([[5., 6., 7.], [8., 9., 10.]])], 0, + np.array([[1, 2, np.nan], [3, 4, np.nan], + [5, 6, 7], [8, 9, 10]])) + outs = concatenate_arrays(ins, axis) + ar = LabeledArray.from_dict(dict(a=ins[1], b=ins[2])) + x = ar["a"] + y = ar.to_dict() + conds = {"resp": (-1, 1), "aud_ls": (-0.5, 1.5), + "aud_lm": (-0.5, 1.5), "aud_jl": (-0.5, 1.5), + "go_ls": (-0.5, 1.5), "go_lm": (-0.5, 1.5), + "go_jl": (-0.5, 1.5)} + task = "SentenceRep" + root = os.path.expanduser("~/Box/CoganLab") + layout = get_data(task, root=root) + + mne.set_log_level("ERROR") + + # data = LabeledArray.from_dict(dict( + # power=load_dict(layout, conds, "power", False), + # zscore=load_dict(layout, conds, "zscore", False))) + + dict_data = dict( + power=load_dict(layout, conds, "power", False)) + # zscore=load_dict(layout, conds, "zscore", False)) + + keys = inner_all_keys(dict_data) + + data = LabeledArray.from_dict(dict_data) + data.__repr__() + + # data = SparseArray(dict_data) + + # power = data["power"] diff --git a/ieeg/calc/scaling.py b/ieeg/calc/scaling.py index 5300dd03..07a47aec 100644 --- a/ieeg/calc/scaling.py +++ b/ieeg/calc/scaling.py @@ -26,7 +26,7 @@ def rescale(data: np.ndarray, basedata: np.ndarray, mode: str = 'mean', Parameters ---------- - data : array + data : array | mne.Epochs | mne.EpochsTFR It can be of any shape. The only constraint is that the last dimension should be time. basedata : array @@ -47,6 +47,8 @@ def rescale(data: np.ndarray, basedata: np.ndarray, mode: str = 'mean', dividing by the standard deviation of log baseline values ('zlogratio') copy : bool, optional Whether to return a new instance or modify in place. + axis : int or tuple[int], optional + Returns ------- diff --git a/ieeg/calc/stats.py b/ieeg/calc/stats.py index 1d64c3ee..f1b7e23b 100644 --- a/ieeg/calc/stats.py +++ b/ieeg/calc/stats.py @@ -282,7 +282,7 @@ def time_perm_cluster(sig1: np.ndarray, sig2: np.ndarray, p_thresh: float, out = np.zeros(out_shape, dtype=int) ins = ((np.squeeze(sig1[:, i]), np.squeeze(sig2[:, i])) for i in np.ndindex(tuple(sig1.shape[j] for j in ignore_adjacency))) - proc = Parallel(n_jobs, return_generator=True, verbose=40)( + proc = Parallel(n_jobs, return_as='generator', verbose=40)( delayed(time_perm_cluster)( *i, p_thresh=p_thresh, p_cluster=p_cluster, n_perm=n_perm, tails=tails, axis=axis, stat_func=stat_func) for i in ins) diff --git a/ieeg/calc/utils.py b/ieeg/calc/utils.py deleted file mode 100644 index 8c3eff07..00000000 --- a/ieeg/calc/utils.py +++ /dev/null @@ -1,77 +0,0 @@ -import numpy as np - - -def get_elbow(data: np.ndarray) -> int: - """Draws a line between the first and last points in a dataset and finds - the point furthest from that line. - - Parameters - ---------- - data : array - The data to find the elbow in. - - Returns - ------- - int - The index of the elbow point. - """ - nPoints = len(data) - allCoord = np.vstack((range(nPoints), data)).T - np.array([range(nPoints), data]) - firstPoint = allCoord[0] - lineVec = allCoord[-1] - allCoord[0] - lineVecNorm = lineVec / np.sqrt(np.sum(lineVec ** 2)) - vecFromFirst = allCoord - firstPoint - scalarProduct = np.sum(vecFromFirst * np.matlib.repmat( - lineVecNorm, nPoints, 1), axis=1) - vecFromFirstParallel = np.outer(scalarProduct, lineVecNorm) - vecToLine = vecFromFirst - vecFromFirstParallel - distToLine = np.sqrt(np.sum(vecToLine ** 2, axis=1)) - # set distance to points below lineVec to 0 - distToLine[vecToLine[:, 1] < 0] = 0 - idxOfBestPoint = np.argmax(distToLine) - return idxOfBestPoint - - -def stitch_mats(mats: list[np.ndarray], overlaps: list[int], axis: int = 0 - ) -> np.ndarray: - """break up the matrices into their overlapping and non-overlapping parts - then stitch them back together - - Parameters - ---------- - mats : list - The matrices to stitch together - overlaps : list - The number of overlapping rows between each matrix - axis : int, optional - The axis to stitch along, by default 0 - - Returns - ------- - np.ndarray - The stitched matrix - """ - stitches = [mats[0]] - if len(mats) != len(overlaps) + 1: - raise ValueError("The number of matrices must be one more than the num" - "ber of overlaps") - for i, over in enumerate(overlaps): - stitches = stitches[:-2] + merge(stitches[-1], mats[i+1], over, axis) - return np.concatenate(stitches, axis=axis) - - -def merge(mat1: np.ndarray, mat2: np.ndarray, overlap: int, axis: int = 0 - ) -> list[np.ndarray]: - """Take two arrays and merge them over the overlap gradually""" - sl = [slice(None)] * mat1.ndim - sl[axis] = slice(0, mat1.shape[axis]-overlap) - start = mat1[tuple(sl)] - sl[axis] = slice(mat1.shape[axis]-overlap, mat1.shape[axis]) - middle1 = np.multiply(np.linspace(1, 0, overlap), mat1[tuple(sl)]) - sl[axis] = slice(0, overlap) - middle2 = np.multiply(np.linspace(0, 1, overlap), mat2[tuple(sl)]) - middle = np.add(middle1, middle2) - sl[axis] = slice(overlap, mat2.shape[axis]) - last = mat2[tuple(sl)] - return [start, middle, last] diff --git a/ieeg/io.py b/ieeg/io.py index b6a0aba8..5ef1db17 100644 --- a/ieeg/io.py +++ b/ieeg/io.py @@ -181,7 +181,8 @@ def get_data(task: str, root: PathLike) -> BIDSLayout: @fill_doc @verbose def save_derivative(inst: Signal, layout: BIDSLayout, pipeline: str = None, - overwrite: bool = False, verbose=None): + overwrite: bool = False, format: str = 'EDF', + verbose=None): """Save an intermediate data instance from a pipeline to a BIDS folder. Parameters @@ -193,6 +194,8 @@ def save_derivative(inst: Signal, layout: BIDSLayout, pipeline: str = None, pipeline : str The name of the pipeline. %(overwrite)s + format : str + The format to save the data in. Defaults to EDF. %(verbose)s """ save_dir = op.join(layout.root, "derivatives", pipeline) @@ -209,7 +212,7 @@ def save_derivative(inst: Signal, layout: BIDSLayout, pipeline: str = None, entities['description'] = pipeline bids_path = BIDSPath(**entities, root=save_dir) run = inst.copy().crop(tmin=bounds[i], tmax=bounds[i+1]) - write_raw_bids(run, bids_path, allow_preload=True, format='EDF', + write_raw_bids(run, bids_path, allow_preload=True, format=format, acpc_aligned=True, overwrite=overwrite, verbose=verbose) @@ -258,8 +261,21 @@ def update(filename: PathLike, channels: list[str], @update.register -def _(inst: mne.io.base.BaseRaw, layout: BIDSLayout, - description: list[str] | str = None, verbose=None): +def _(inst: mne.io.base.BaseRaw, + layout: BIDSLayout, description: list[str] | str = None, verbose=None): + if not hasattr(inst, 'filenames'): + inst.filenames = inst.info['subject_info'].get('files', None) + for i, file in enumerate(inst.filenames): + fname = op.join(layout.root, file) + update(fname, inst.info['bads'], description=description, status='bad', + verbose=verbose) + goods = [ch for ch in inst.ch_names if ch not in inst.info['bads']] + update(fname, channels=goods, status='good', verbose=None) + + +@update.register +def _(inst: mne.time_frequency._BaseTFR, + layout: BIDSLayout, description: list[str] | str = None, verbose=None): if not hasattr(inst, 'filenames'): inst.filenames = inst.info['subject_info'].get('files', None) for i, file in enumerate(inst.filenames): diff --git a/ieeg/process.py b/ieeg/process.py index 6643bc25..6d481404 100644 --- a/ieeg/process.py +++ b/ieeg/process.py @@ -104,9 +104,9 @@ def is_number(s) -> bool: return False -def proc_array(func: callable, arr_in: np.ndarray, - axes: int | tuple[int] = 0, n_jobs: int = None, - desc: str = "Slices", inplace: bool = True) -> np.ndarray: +def proc_array(func: callable, arr_in: np.ndarray, axes: int | tuple[int] = 0, + n_jobs: int = None, desc: str = "Slices", inplace: bool = True, + **kwargs) -> np.ndarray: """Execute a function in parallel over slices of an array Parameters @@ -147,8 +147,8 @@ def proc_array(func: callable, arr_in: np.ndarray, array_gen = list(arr_in[indices] for indices in cross_sect_ind) # array_gen = tqdm(array_gen, desc=desc, total=len(cross_sect_ind)) - gen = Parallel(n_jobs, return_generator=True, verbose=40)( - delayed(func)(x_) for x_ in array_gen) + gen = Parallel(n_jobs, return_as='generator', verbose=40)( + delayed(func)(x_, **kwargs) for x_ in array_gen) # Create process pool and apply the function in parallel for out, ind in zip(gen, cross_sect_ind): diff --git a/ieeg/viz/mri.py b/ieeg/viz/mri.py index 67761682..76403cb4 100644 --- a/ieeg/viz/mri.py +++ b/ieeg/viz/mri.py @@ -465,24 +465,42 @@ def plot_subj(inst: Signal | mne.Info | str, subj_dir: PathLike = None, # Default montage positions are in m, whereas plotting functions assume mm left = [p * 1000 for k, p in pos.items() if k.startswith('L')] - if len(left) != 0: + right = [p * 1000 for k, p in pos.items() if k.startswith('R')] + + if left: fig.add_foci(np.vstack(left), hemi='lh', color=color, scale_factor=size) - - right = [p * 1000 for k, p in pos.items() if k.startswith('R')] - if len(right) != 0: + if right: fig.add_foci(np.vstack(right), hemi='rh', color=color, scale_factor=size) if labels_every is not None: - if isinstance(picks[0], (int, np.integer)): - picks = [info.ch_names[p] for p in picks] - names = picks[slice(labels_every - 1, info['nchan'], labels_every)] + settings = dict(shape=None, always_visible=True, text_color=(0, 0, 0), + bold=False) + _add_labels(fig, info, sub, picks, pos, labels_every, hemi, + (left, right), **settings) + + return fig + + +def _add_labels(fig, info, sub, picks, pos, every, hemi, lr, **kwargs): + picks = [info.ch_names[p] for p in picks] if isinstance(picks[0], ( + int, np.integer)) else picks + names = picks[slice(every - 1, info['nchan'], every)] + + if hemi == 'split': + for hems, positions in zip(range(2), lr): + if not positions: + continue + pos = positions[slice(every - 1, info['nchan'], every)] + plt_names = [f'{sub}-{n}' for n in names if + n.startswith(['L', 'R'][hems])] + fig.plotter.subplot(0, hems) + fig.plotter.add_point_labels(pos, plt_names, **kwargs) + else: plt_names = [f'{sub}-{n}' for n in names] positions = np.array([pos[name] for name in names]) * 1000 - fig.plotter.add_point_labels(positions, plt_names, shape=None, - always_visible=True) - return fig + fig.plotter.add_point_labels(positions, plt_names, **kwargs) def subject_to_info(subject: str, subjects_dir: PathLike = None, @@ -599,16 +617,16 @@ def gen_labels(info: mne.Info, sub: str = None, subj_dir: PathLike = None, overwrite=True) mne.set_log_level("INFO") TASK = "SentenceRep" - sub_num = 57 + sub_num = 59 layout = get_data(TASK, root=LAB_root) subj_dir = op.join(LAB_root, "ECoG_Recon_Full") sub_pad = "D" + str(sub_num).zfill(4) - sub = "D{}".format(sub_num) + # sub = "D{}".format(sub_num) filt = raw_from_layout(layout.derivatives['clean'], subject=sub_pad, extension='.edf', desc='clean', preload=False) ## - brain = plot_subj("D57") + brain = plot_subj(filt) # plot_on_average(filt) # plot_gamma(raw)