Skip to content

Commit

Permalink
Add improvements to analog signals processing and a test
Browse files Browse the repository at this point in the history
  • Loading branch information
lauraporta committed Oct 17, 2023
1 parent 5c63f1c commit a0a8507
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 72 deletions.
137 changes: 67 additions & 70 deletions derotation/analysis/derotation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,27 @@ def load_data(self):
).stem.split(".")[0]
self.filename = self.config["paths_write"]["saving_name"]

self.k = self.config["analog_signals_processing"]["squared_pulse_k"]
self.inter_rotation_interval_min_len = self.config[
"analog_signals_processing"
]["inter_rotation_interval_min_len"]

def process_analog_signals(self):
# ===================================
# Use rotation ticks to find the correct rotation angles
self.rotation_ticks_peaks = self.find_rotation_peaks()
self.rotation_on = self.find_when_is_rotation_on()
self.rot_blocks_idx = self.apply_rotation_direction()

start, end = self.get_start_end_times_with_threshold(
self.full_rotation, self.k
)
self.rot_blocks_idx = self.clean_start_and_end_rotation_signal(
self.inter_rotation_interval_min_len, start, end
)
self.rotation_on = self.create_signed_rotation_array(
len(self.full_rotation),
self.rot_blocks_idx["start"],
self.rot_blocks_idx["end"],
self.direction,
)

self.expected_tiks_per_rotation = self.check_number_of_rotations(
self.rotation_increment
)
Expand All @@ -101,8 +116,9 @@ def process_analog_signals(self):
(
self.lines_start,
self.lines_end,
self.threshold,
) = self.get_starting_and_ending_times(clock_type="line")
) = self.get_starting_and_ending_times(
clock="line", target_len=256 * len(self.image_stack)
)
if self.assume_full_rotation:
(
self.rot_deg_line,
Expand Down Expand Up @@ -132,69 +148,51 @@ def find_rotation_peaks(self):

return peaks

def find_when_is_rotation_on(self):
@staticmethod
def get_start_end_times_with_threshold(signal, k):
# identify the rotation ticks that correspond to
# clockwise and counter clockwise rotations

threshold = self.config["analog_signals_processing"]["rotation_is_on"][
"threshold"
]
rotation_on = np.zeros_like(self.full_rotation)
rotation_on[self.full_rotation > threshold] = 1
return rotation_on
mean = np.mean(signal)
std = np.std(signal)
threshold = mean + k * std

def apply_rotation_direction(self):
rotation_signal_copy = copy.deepcopy(self.rotation_on)
latest_rotation_on_end = 0
thresholded_signal = np.zeros_like(signal)
thresholded_signal[signal > threshold] = 1

i = 0
start = np.where(np.diff(thresholded_signal) > 0)[0]
end = np.where(np.diff(thresholded_signal) < 0)[0]

rotation_blocks_idx = {"start": [], "end": []}
while i < len(self.direction):
# find the first rotation_on == 1
try:
first_rotation_on = np.where(rotation_signal_copy == 1)[0][0]
except IndexError:
# no more rotations, data is over
logging.info(
f"Completed, missing {len(self.direction) - i} rotations"
)
break
# now assign the value in dir to all the first set of ones
len_first_group = np.where(
rotation_signal_copy[first_rotation_on:] == 0
)[0][0]

if first_rotation_on < 1000:
# skip this short rotation because it is a false one
# done one additional time to clean up the trace at the end
rotation_signal_copy = rotation_signal_copy[
first_rotation_on + len_first_group :
]
latest_rotation_on_end = (
latest_rotation_on_end
+ first_rotation_on
+ len_first_group
)
continue
return start, end

start = latest_rotation_on_end + first_rotation_on
end = latest_rotation_on_end + first_rotation_on + len_first_group
@staticmethod
def clean_start_and_end_rotation_signal(
inter_rotation_interval_min_len, start, end
):
"""removes very short intervals of off signal,
which are not full rotations"""

shifted_end = np.roll(end, 1)
mask = start - shifted_end > inter_rotation_interval_min_len
mask[0] = True # first rotation is always a full rotation
shifted_mask = np.roll(mask, -1)
new_start = start[mask]
new_end = end[shifted_mask]

# rotation on is modified in place
self.rotation_on[start:end] = self.direction[i]
return {"start": new_start, "end": new_end}

latest_rotation_on_end = (
latest_rotation_on_end + first_rotation_on + len_first_group
@staticmethod
def create_signed_rotation_array(len_full_rotation, start, end, direction):
rotation_on = np.zeros(len_full_rotation)
for i, (start, end) in enumerate(
zip(
start,
end,
)
rotation_signal_copy = rotation_signal_copy[
first_rotation_on + len_first_group :
]
i += 1
rotation_blocks_idx["start"].append(start)
rotation_blocks_idx["end"].append(end)
):
rotation_on[start:end] = direction[i]

return rotation_blocks_idx
return rotation_on

def check_number_of_rotations(self, given_increment=0.2):
logging.info(f"Current increment: {given_increment}")
Expand Down Expand Up @@ -259,16 +257,15 @@ def adjust_rotation_increment(self, given_increment=0.2):

return increments_per_rotation

def get_starting_and_ending_times(self, clock_type):
clock = self.line_clock if clock_type == "line" else self.frame_clock
# Calculate the threshold using a percentile of the total signal
target_len = (
len(self.image_stack)
if clock_type == "frame"
else len(self.image_stack) * 256
)
def get_starting_and_ending_times(self, clock, target_len):
upper_lim_bisect = self.config["analog_signals_processing"][
"upper_lim_bisect"
]
best_k = bisect(
self.goodness_of_threshold, -5, 5, args=(clock, target_len)
self.goodness_of_threshold,
-upper_lim_bisect,
upper_lim_bisect,
args=(clock, target_len),
)
threshold = np.mean(clock) + best_k * np.std(clock)

Expand All @@ -285,7 +282,7 @@ def get_starting_and_ending_times(self, clock_type):
start = np.where(np.diff(clock) > threshold)[0]
end = np.where(np.diff(clock) < -threshold)[0]

return start, end, threshold
return start, end

@staticmethod
def goodness_of_threshold(k, clock, target_len):
Expand All @@ -298,8 +295,8 @@ def goodness_of_threshold(k, clock, target_len):
return len(start) - target_len

def find_rotation_angles_by_frame_in_incremental_rotation(self):
frame_start, frame_end, threshold = self.get_starting_and_ending_times(
clock_type="frame"
frame_start, frame_end = self.get_starting_and_ending_times(
self.frame_clock, len(self.image_stack)
)

rotation_increment_by_frame = np.zeros(len(self.image_stack))
Expand Down
6 changes: 4 additions & 2 deletions derotation/config/full_rotation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ adjust_increment: True
rotation_kind: "line"
rot_deg: 360


analog_signals_processing:
find_rotation_ticks_peaks:
height: 4
distance: 20
rotation_is_on:
threshold: 0.5
squared_pulse_k: 1
upper_lim_bisect: 5
inter_rotation_interval_min_len: 1000
2 changes: 2 additions & 0 deletions derotation/config/incremental_rotation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ analog_signals_processing:
distance: 20
rotation_is_on:
threshold: 0.5
upper_lim_bisect: 5
min_rotation_length: 1000
131 changes: 131 additions & 0 deletions tests/test_integration/test_rotation_signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import copy

import numpy as np
import scipy.signal as signal

from derotation.analysis.derotation_pipeline import DerotationPipeline


class TestRotationSignal:
def __init__(self):
self.full_rotation = self.make_dummy_signal()
self.direction = self.make_dummy_direction()
self.k = 3
self.min_rotation_length = 5

def make_dummy_signal(self):
dummy_signal = signal.square(np.linspace(0, 10, 10000))
dummy_signal += np.random.normal(0, 0.1, 10000)

return dummy_signal

def make_dummy_direction(self):
direction = np.zeros(10)
direction[::2] = 1
direction[1::2] = -1
return direction

def find_when_is_rotation_on(self):
# old implementation
# identify the rotation ticks that correspond to
# clockwise and counter clockwise rotations
threshold = 0.5 # Threshold to consider "on" or rotation occurring
rotation_on = np.zeros_like(self.full_rotation)
rotation_on[self.full_rotation > threshold] = 1
return rotation_on

def apply_rotation_direction(self, rotation_on):
# old implementation
rotation_signal_copy = copy.deepcopy(rotation_on)
latest_rotation_on_end = 0

i = 0

rotation_blocks_idx = {"start": [], "end": []}
while i < len(self.direction):
# find the first rotation_on == 1
try:
first_rotation_on = np.where(rotation_signal_copy == 1)[0][0]
except IndexError:
# no more rotations, data is over
break
# now assign the value in dir to all the first set of ones
len_first_group = np.where(
rotation_signal_copy[first_rotation_on:] == 0
)[0][0]

if first_rotation_on < self.min_rotation_length:
# skip this short rotation because it is a false one
# done one additional time to clean up the trace at the end
rotation_signal_copy = rotation_signal_copy[
first_rotation_on + len_first_group :
]
latest_rotation_on_end = (
latest_rotation_on_end
+ first_rotation_on
+ len_first_group
)
continue

start = latest_rotation_on_end + first_rotation_on
end = latest_rotation_on_end + first_rotation_on + len_first_group

# rotation on is modified in place
rotation_on[start:end] = self.direction[i]

latest_rotation_on_end = (
latest_rotation_on_end + first_rotation_on + len_first_group
)
rotation_signal_copy = rotation_signal_copy[
first_rotation_on + len_first_group :
]
i += 1
rotation_blocks_idx["start"].append(start)
rotation_blocks_idx["end"].append(end)

return rotation_on, rotation_blocks_idx

def test_finding_correct_start_end_times_for_full_rotation(self):
_, test_rotation_blocks_idx = self.apply_rotation_direction(
self.find_when_is_rotation_on()
)

start, end = DerotationPipeline.get_start_end_times_with_threshold(
self.full_rotation, self.k
)
rotation_blocks_idx = (
DerotationPipeline.clean_start_and_end_rotation_signal(start, end)
)

assert len(rotation_blocks_idx["start"]) == len(
test_rotation_blocks_idx["start"]
)
assert len(rotation_blocks_idx["end"]) == len(self.direction)

assert np.all(
rotation_blocks_idx["start"] == test_rotation_blocks_idx["start"]
)
assert np.all(
rotation_blocks_idx["end"] == test_rotation_blocks_idx["end"]
)

def test_calculating_signed_rotation_on(self):
test_rotation_on, _ = self.apply_rotation_direction(
self.find_when_is_rotation_on()
)

start, end = DerotationPipeline.get_start_end_times_with_threshold(
self.full_rotation, self.k
)
rotation_blocks_idx = (
DerotationPipeline.clean_start_and_end_rotation_signal(start, end)
)

rotation_on = DerotationPipeline.create_signed_rotation_array(
len(self.full_rotation),
rotation_blocks_idx["start"],
rotation_blocks_idx["end"],
self.direction,
)

assert np.all(rotation_on == test_rotation_on)

0 comments on commit a0a8507

Please sign in to comment.