Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comment correlation algorithm. #369

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions janus_core/processing/correlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ class Correlator:
"""
Correlate scalar real values, <ab>.

Implements the algorithm detailed in https://doi.org/10.1063/1.3491098.

Data pairs are observed iteratively and stored in a set of rolling hierarchical data
blocks.

Once a block is filled, coarse graining may be applied to update coarser block
levels by updating the coarser block with the average of values accumulated up to
that point in the filled block.

The correlation is continuously updated when any block is updated with new data.

Parameters
----------
blocks : int
Expand All @@ -37,18 +48,30 @@ def __init__(self, *, blocks: int, points: int, averaging: int) -> None:
averaging : int
Averaging window per block level.
"""
# Number of resoluion levels.
self._blocks = blocks
# Data points at each resolution.
self._points = points
# Coarse-graining between resolution levels.
self._averaging = averaging
Comment on lines +51 to 56
Copy link
Member

@ElliottKasoar ElliottKasoar Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can probably be merged into the parameter descriptions just above.

I'm also wondering if, similarly, rather than comments for the others, we could add an Attributes section (which includes all these hidden attributes).

# Which levels have been updated with data.
self._max_block_used = 0
# First point in coarse-grained block relevant for correlation updates.
self._min_dist = self._points / self._averaging

# Sum of data seen for calculating the average between blocks.
self._accumulator = np.zeros((self._blocks, 2))
# Data points accumulated at each block.
self._count_accumulated = np.zeros(self._blocks, dtype=int)
# Current position in each blocks data store.
self._shift_index = np.zeros(self._blocks, dtype=int)
# Rolling data store for each block.
self._shift = np.zeros((self._blocks, self._points, 2))
# If data is stored in this block's rolling data store, at a given index.
self._shift_not_null = np.zeros((self._blocks, self._points), dtype=bool)
# Running correlation values.
self._correlation = np.zeros((self._blocks, self._points))
# Count of correlation updates.
self._count_correlated = np.zeros((self._blocks, self._points), dtype=int)

def update(self, a: float, b: float) -> None:
Expand Down Expand Up @@ -78,46 +101,59 @@ def _propagate(self, a: float, b: float, block: int) -> None:
Block in the hierachy being updated.
"""
if block == self._blocks:
# Hit the end of the data structure.
return

shift = self._shift_index[block]
self._max_block_used = max(self._max_block_used, block)

# Update the rolling data store, and accumulate
self._shift[block, shift, :] = a, b
self._accumulator[block, :] += a, b
self._shift_not_null[block, shift] = True
self._count_accumulated[block] += 1

if self._count_accumulated[block] == self._averaging:
# Hit the coarse graining threshold, the next block can be updated.
self._propagate(
self._accumulator[block, 0] / self._averaging,
self._accumulator[block, 1] / self._averaging,
block + 1,
)
# Reset the accumulator at this block level.
self._accumulator[block, :] = 0.0
self._count_accumulated[block] = 0

# Update the correlation.
i = self._shift_index[block]
if block == 0:
# Need to multiply by all in this block (full resolution).
j = i
for point in range(self._points):
if self._shifts_valid(block, i, j):
# Correlate at this lag.
self._correlation[block, point] += (
self._shift[block, i, 0] * self._shift[block, j, 1]
)
self._count_correlated[block, point] += 1
j -= 1
if j < 0:
# Wrap to start of rolling data store.
j += self._points
else:
# Only need to update after points/averaging.
# The previous block already accounts for those points.
for point in range(self._min_dist, self._points):
if j < 0:
j = j + self._points
if self._shifts_valid(block, i, j):
# Correlate at this lag.
self._correlation[block, point] += (
self._shift[block, i, 0] * self._shift[block, j, 1]
)
self._count_correlated[block, point] += 1
j = j - 1
# Update the rolling data store.
self._shift_index[block] = (self._shift_index[block] + 1) % self._points

def _shifts_valid(self, block: int, p_i: int, p_j: int) -> bool:
Expand Down Expand Up @@ -154,11 +190,13 @@ def get_lags(self) -> Iterable[float]:
lag = 0
for i in range(self._points):
if self._count_correlated[0, i] > 0:
# Data has been correlated, at full resolution.
lags[lag] = i
lag += 1
for k in range(1, self._max_block_used):
for i in range(self._min_dist, self._points):
if self._count_correlated[k, i] > 0:
# Data has been correlated at a coarse-grained level.
lags[lag] = float(i) * float(self._averaging) ** k
lag += 1
return lags[0:lag]
Expand All @@ -177,13 +215,16 @@ def get_value(self) -> Iterable[float]:
lag = 0
for i in range(self._points):
if self._count_correlated[0, i] > 0:
# Data has been correlated at full resolution.
correlation[lag] = (
self._correlation[0, i] / self._count_correlated[0, i]
)
lag += 1
for k in range(1, self._max_block_used):
for i in range(self._min_dist, self._points):
# Indices less than points/averaging accounted in the previous block.
if self._count_correlated[k, i] > 0:
# Data has been correlated at a coarse-grained level.
correlation[lag] = (
self._correlation[k, i] / self._count_correlated[k, i]
)
Expand Down Expand Up @@ -275,13 +316,15 @@ def update(self, atoms: Atoms) -> None:
atoms : Atoms
Atoms object to observe values from.
"""
value_pairs = zip(self._get_a(atoms), self._get_b(atoms))
# All pairs of data to be correlated.
value_pairs = zip(self._get_a(atoms).flatten(), self._get_b(atoms).flatten())
if self._correlators is None:
# Initialise correlators automatically.
self._correlators = [
Correlator(
blocks=self.blocks, points=self.points, averaging=self.averaging
)
for _ in range(len(self._get_a(atoms)))
for _ in range(len(self._get_a(atoms).flatten()))
]
for corr, values in zip(self._correlators, value_pairs):
corr.update(*values)
Expand Down
2 changes: 1 addition & 1 deletion janus_core/processing/observables.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,4 @@ def __call__(self, atoms: Atoms) -> list[float]:
list[float]
The velocity values.
"""
return atoms.get_velocities()[self.atoms_slice, :][:, self._indices].flatten()
return atoms.get_velocities()[self.atoms_slice, :][:, self._indices]
Loading