From 3d84e48076db9c81646ad2b6665c3353b5cee596 Mon Sep 17 00:00:00 2001 From: Harvey Devereux Date: Thu, 12 Dec 2024 09:41:22 +0000 Subject: [PATCH 1/5] Comment correlator.py --- janus_core/processing/correlator.py | 33 +++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/janus_core/processing/correlator.py b/janus_core/processing/correlator.py index 10f18c66..5bf40258 100644 --- a/janus_core/processing/correlator.py +++ b/janus_core/processing/correlator.py @@ -14,6 +14,8 @@ class Correlator: """ Correlate scalar real values, . + Implements the algorithm detailed in https://doi.org/10.1063/1.3491098. + Parameters ---------- blocks : int @@ -37,18 +39,30 @@ def __init__(self, *, blocks: int, points: int, averaging: int) -> None: averaging : int Averaging window per block level. """ + # Number of resoluion levels. self._blocks = blocks + # Data points a each resolution. self._points = points + # Coarse-graining between resolution levels. self._averaging = averaging + # Which levels have been updated with data. self._max_block_used = 0 + # First point in a block relevant for correlation updates. self._min_dist = self._points / self._averaging + # Sum of data seen for calculating the average between blocks. self._accumulator = np.zeros((self._blocks, 2)) + # Data points accumulated at each block. self._count_accumulated = np.zeros(self._blocks, dtype=int) + # Current position in each blocks data store. self._shift_index = np.zeros(self._blocks, dtype=int) + # Rolling data store for each block. self._shift = np.zeros((self._blocks, self._points, 2)) + # If data is stored in this block's rolling data store, at a given index. self._shift_not_null = np.zeros((self._blocks, self._points), dtype=bool) + # Running correlation values. self._correlation = np.zeros((self._blocks, self._points)) + # Count of correlation updates. self._count_correlated = np.zeros((self._blocks, self._points), dtype=int) def update(self, a: float, b: float) -> None: @@ -78,46 +92,58 @@ def _propagate(self, a: float, b: float, block: int) -> None: Block in the hierachy being updated. """ if block == self._blocks: + # Hit the end of the data structure. return shift = self._shift_index[block] self._max_block_used = max(self._max_block_used, block) + + # Update the rolling data store, and accumulate self._shift[block, shift, :] = a, b self._accumulator[block, :] += a, b self._shift_not_null[block, shift] = True self._count_accumulated[block] += 1 if self._count_accumulated[block] == self._averaging: + # Hit the coarse graining threshold, the next block can be updated. self._propagate( self._accumulator[block, 0] / self._averaging, self._accumulator[block, 1] / self._averaging, block + 1, ) + # Reset the accumulator at this block level. self._accumulator[block, :] = 0.0 self._count_accumulated[block] = 0 + # Update the correlation. i = self._shift_index[block] if block == 0: + # Need to multiply all (full resolution). j = i for point in range(self._points): if self._shifts_valid(block, i, j): + # Correlate at this lag. self._correlation[block, point] += ( self._shift[block, i, 0] * self._shift[block, j, 1] ) self._count_correlated[block, point] += 1 j -= 1 if j < 0: + # Wrap to start of rolling data store. j += self._points else: + # Only need to update after points/averaging. for point in range(self._min_dist, self._points): if j < 0: j = j + self._points if self._shifts_valid(block, i, j): + # Correlate at this lag. self._correlation[block, point] += ( self._shift[block, i, 0] * self._shift[block, j, 1] ) self._count_correlated[block, point] += 1 j = j - 1 + # Update the rolling data store. self._shift_index[block] = (self._shift_index[block] + 1) % self._points def _shifts_valid(self, block: int, p_i: int, p_j: int) -> bool: @@ -154,11 +180,13 @@ def get_lags(self) -> Iterable[float]: lag = 0 for i in range(self._points): if self._count_correlated[0, i] > 0: + # Data has been correlated, at full resolution. lags[lag] = i lag += 1 for k in range(1, self._max_block_used): for i in range(self._min_dist, self._points): if self._count_correlated[k, i] > 0: + # Data has been correlated at a coarse-grained level. lags[lag] = float(i) * float(self._averaging) ** k lag += 1 return lags[0:lag] @@ -177,13 +205,16 @@ def get_value(self) -> Iterable[float]: lag = 0 for i in range(self._points): if self._count_correlated[0, i] > 0: + # Data has been correlated at full resolution. correlation[lag] = ( self._correlation[0, i] / self._count_correlated[0, i] ) lag += 1 for k in range(1, self._max_block_used): for i in range(self._min_dist, self._points): + # Indices at less than points/averaging are accounted in the previous block. if self._count_correlated[k, i] > 0: + # Data has been correlated at a coarse-grained level. correlation[lag] = ( self._correlation[k, i] / self._count_correlated[k, i] ) @@ -275,8 +306,10 @@ def update(self, atoms: Atoms) -> None: atoms : Atoms Atoms object to observe values from. """ + # All pairs of data to be correlated. value_pairs = zip(self._get_a(atoms), self._get_b(atoms)) if self._correlators is None: + # Initialise correlators automatically. self._correlators = [ Correlator( blocks=self.blocks, points=self.points, averaging=self.averaging From a1712a366cfc2bb008756705d11831fd876bf0c4 Mon Sep 17 00:00:00 2001 From: Harvey Devereux Date: Thu, 12 Dec 2024 10:27:11 +0000 Subject: [PATCH 2/5] Don't flatten Velocity values --- janus_core/processing/correlator.py | 14 +++++++------- janus_core/processing/observables.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/janus_core/processing/correlator.py b/janus_core/processing/correlator.py index 5bf40258..ea456b83 100644 --- a/janus_core/processing/correlator.py +++ b/janus_core/processing/correlator.py @@ -47,14 +47,14 @@ def __init__(self, *, blocks: int, points: int, averaging: int) -> None: self._averaging = averaging # Which levels have been updated with data. self._max_block_used = 0 - # First point in a block relevant for correlation updates. + # First point in a block relevant for correlation updates. self._min_dist = self._points / self._averaging # Sum of data seen for calculating the average between blocks. self._accumulator = np.zeros((self._blocks, 2)) # Data points accumulated at each block. self._count_accumulated = np.zeros(self._blocks, dtype=int) - # Current position in each blocks data store. + # Current position in each blocks data store. self._shift_index = np.zeros(self._blocks, dtype=int) # Rolling data store for each block. self._shift = np.zeros((self._blocks, self._points, 2)) @@ -97,7 +97,7 @@ def _propagate(self, a: float, b: float, block: int) -> None: shift = self._shift_index[block] self._max_block_used = max(self._max_block_used, block) - + # Update the rolling data store, and accumulate self._shift[block, shift, :] = a, b self._accumulator[block, :] += a, b @@ -137,7 +137,7 @@ def _propagate(self, a: float, b: float, block: int) -> None: if j < 0: j = j + self._points if self._shifts_valid(block, i, j): - # Correlate at this lag. + # Correlate at this lag. self._correlation[block, point] += ( self._shift[block, i, 0] * self._shift[block, j, 1] ) @@ -212,7 +212,7 @@ def get_value(self) -> Iterable[float]: lag += 1 for k in range(1, self._max_block_used): for i in range(self._min_dist, self._points): - # Indices at less than points/averaging are accounted in the previous block. + # Indices less than points/averaging accounted in the previous block. if self._count_correlated[k, i] > 0: # Data has been correlated at a coarse-grained level. correlation[lag] = ( @@ -307,14 +307,14 @@ def update(self, atoms: Atoms) -> None: Atoms object to observe values from. """ # All pairs of data to be correlated. - value_pairs = zip(self._get_a(atoms), self._get_b(atoms)) + value_pairs = zip(self._get_a(atoms).flatten(), self._get_b(atoms).flatten()) if self._correlators is None: # Initialise correlators automatically. self._correlators = [ Correlator( blocks=self.blocks, points=self.points, averaging=self.averaging ) - for _ in range(len(self._get_a(atoms))) + for _ in range(len(self._get_a(atoms).flatten())) ] for corr, values in zip(self._correlators, value_pairs): corr.update(*values) diff --git a/janus_core/processing/observables.py b/janus_core/processing/observables.py index 52fe0b91..188abe45 100644 --- a/janus_core/processing/observables.py +++ b/janus_core/processing/observables.py @@ -266,4 +266,4 @@ def __call__(self, atoms: Atoms) -> list[float]: list[float] The velocity values. """ - return atoms.get_velocities()[self.atoms_slice, :][:, self._indices].flatten() + return atoms.get_velocities()[self.atoms_slice, :][:, self._indices] From 8601bfb7728e7d3e8ad5b603d7ce8ff5deb7db9a Mon Sep 17 00:00:00 2001 From: Harvey Devereux Date: Thu, 12 Dec 2024 10:37:33 +0000 Subject: [PATCH 3/5] Fix some comments, add general description --- janus_core/processing/correlator.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/janus_core/processing/correlator.py b/janus_core/processing/correlator.py index ea456b83..e002611c 100644 --- a/janus_core/processing/correlator.py +++ b/janus_core/processing/correlator.py @@ -16,6 +16,15 @@ class Correlator: Implements the algorithm detailed in https://doi.org/10.1063/1.3491098. + Data pairs are observed iteratively and stored in a set of rolling hierarchical data + blocks. + + Once a block is filled, coarse graining may be applied to update coarser block levels by + updating the coarser block with the average of values accumulated up to that point in + the filled block. + + The correlation is continuously updated when any block is updated with new data. + Parameters ---------- blocks : int @@ -41,13 +50,13 @@ def __init__(self, *, blocks: int, points: int, averaging: int) -> None: """ # Number of resoluion levels. self._blocks = blocks - # Data points a each resolution. + # Data points at each resolution. self._points = points # Coarse-graining between resolution levels. self._averaging = averaging # Which levels have been updated with data. self._max_block_used = 0 - # First point in a block relevant for correlation updates. + # First point in coarse-grained block relevant for correlation updates. self._min_dist = self._points / self._averaging # Sum of data seen for calculating the average between blocks. @@ -118,7 +127,7 @@ def _propagate(self, a: float, b: float, block: int) -> None: # Update the correlation. i = self._shift_index[block] if block == 0: - # Need to multiply all (full resolution). + # Need to multiply by all in this block (full resolution). j = i for point in range(self._points): if self._shifts_valid(block, i, j): @@ -133,6 +142,7 @@ def _propagate(self, a: float, b: float, block: int) -> None: j += self._points else: # Only need to update after points/averaging. + # The previous block already accounts for those points. for point in range(self._min_dist, self._points): if j < 0: j = j + self._points From 7ca6bd0caac114865fa4780fabc09442c3ca42f2 Mon Sep 17 00:00:00 2001 From: Harvey Devereux Date: Thu, 12 Dec 2024 10:39:34 +0000 Subject: [PATCH 4/5] Formatting --- janus_core/processing/correlator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/janus_core/processing/correlator.py b/janus_core/processing/correlator.py index e002611c..627fbe33 100644 --- a/janus_core/processing/correlator.py +++ b/janus_core/processing/correlator.py @@ -16,12 +16,12 @@ class Correlator: Implements the algorithm detailed in https://doi.org/10.1063/1.3491098. - Data pairs are observed iteratively and stored in a set of rolling hierarchical data + Data pairs are observed iteratively and stored in a set of rolling hierarchical data blocks. - - Once a block is filled, coarse graining may be applied to update coarser block levels by - updating the coarser block with the average of values accumulated up to that point in - the filled block. + + Once a block is filled, coarse graining may be applied to update coarser block + levels by updating the coarser block with the average of values accumulated up to + that point in the filled block. The correlation is continuously updated when any block is updated with new data. From 592add3d60682ce534e70c2ee83f9df9a9af2f43 Mon Sep 17 00:00:00 2001 From: Harvey Devereux Date: Mon, 13 Jan 2025 10:05:48 +0000 Subject: [PATCH 5/5] Add Attributes section to correlator --- janus_core/processing/correlator.py | 39 ++++++++++++++++++----------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/janus_core/processing/correlator.py b/janus_core/processing/correlator.py index 627fbe33..4328d20d 100644 --- a/janus_core/processing/correlator.py +++ b/janus_core/processing/correlator.py @@ -33,6 +33,27 @@ class Correlator: Number of points per block. averaging : int Averaging window per block level. + + Attributes + ---------- + _max_block_used : int + Which levels have been updated with data. + _min_dist : int + First point in coarse-grained block relevant for correlation updates. + _accumulator : NDArray[float64] + Sum of data seen for calculating the average between blocks. + _count_accumulated : NDArray[int] + Data points accumulated at this block. + _shift_index : NDArray[int] + Current position in each block's data store. + _shift : NDArray[float64] + Rolling data store for each block. + _shift_not_null : NDArray[bool] + If data is stored in this block's rolling data store, at a given index. + _correlation : NDArray[float64] + Running correlation values. + _count_correlated : NDArray[int] + Count of correlation updates for each block. """ def __init__(self, *, blocks: int, points: int, averaging: int) -> None: @@ -42,36 +63,24 @@ def __init__(self, *, blocks: int, points: int, averaging: int) -> None: Parameters ---------- blocks : int - Number of correlation blocks. + Number of resolution levels. points : int - Number of points per block. + Data points at each resolution. averaging : int - Averaging window per block level. + Coarse-graining between resolution levels. """ - # Number of resoluion levels. self._blocks = blocks - # Data points at each resolution. self._points = points - # Coarse-graining between resolution levels. self._averaging = averaging - # Which levels have been updated with data. self._max_block_used = 0 - # First point in coarse-grained block relevant for correlation updates. self._min_dist = self._points / self._averaging - # Sum of data seen for calculating the average between blocks. self._accumulator = np.zeros((self._blocks, 2)) - # Data points accumulated at each block. self._count_accumulated = np.zeros(self._blocks, dtype=int) - # Current position in each blocks data store. self._shift_index = np.zeros(self._blocks, dtype=int) - # Rolling data store for each block. self._shift = np.zeros((self._blocks, self._points, 2)) - # If data is stored in this block's rolling data store, at a given index. self._shift_not_null = np.zeros((self._blocks, self._points), dtype=bool) - # Running correlation values. self._correlation = np.zeros((self._blocks, self._points)) - # Count of correlation updates. self._count_correlated = np.zeros((self._blocks, self._points), dtype=int) def update(self, a: float, b: float) -> None: