Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update logic to retrieve threshold-specific pellet delivery timestamps #398

Merged

Conversation

ttngu207
Copy link
Contributor

@ttngu207 ttngu207 commented Aug 22, 2024

Fix #382
Fix #403

High level logic for block detection
On a per-chunk basis, check for the presence of new block, insert into Block table.

  1. Find the 0s in pellet_ct (these are times when the pellet count reset - i.e. new block)
  2. Remove any double 0s (0s within 1 second of each other) (pick the first 0)
  3. Calculate block end_times (use due_time) and durations
  4. Insert into Block table

@ttngu207 ttngu207 marked this pull request as ready for review August 28, 2024 14:13
aeon/dj_pipeline/analysis/block_analysis.py Outdated Show resolved Hide resolved
aeon/dj_pipeline/analysis/block_analysis.py Outdated Show resolved Hide resolved
@jkbhagatio
Copy link
Member

We need to add checking the beambreaks after attempted pellet delivery, to account for the cases where pellet delivery is attempted but not successful..

I need to look into the false negative rate (beambreaks that don't occur after successful pellet delivery)

@ttngu207
Copy link
Contributor Author

@jkbhagatio Removal of ManualDelivery is added. The block key I'm testing with is:

{'experiment_name': 'social0.3-aeon3',
 'block_start': '2024-06-26 10:52:10.001984',
 'block_end': '2024-06-26 12:57:18'}

@@ -188,37 +183,14 @@ def make(self, key):
)
patch_keys, patch_names = patch_query.fetch("KEY", "underground_feeder_name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude Dummy patches here or is this already handled elsewhere and we can assume dummy patches will never be fetched?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not excluded, Dummy Patches will still be included in the analysis, but will return 0 pellets, wheel data will still be computed

depletion_state_df = depletion_state_df[~invalid_rows]

# find pellet times with matching beam break times (within 500ms after pellet times)
pellet_beam_break_df = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any of delivered_pellet_df, beam_break_df, depletion_state_df be an empty df? If yes, pd.merge_asof will throw an error.

@jkbhagatio
Copy link
Member

@ttngu207

I get slightly different results with this approach - it returns more final match pellet delivery triggers, beambreaks, and threshold values.

It's similar to yours, I just changed things to make it more readable for me, but see the diffs, and see the results, and let me know what you think.

(If you want me to put it in a separate PR or something let me know, else you can just copy and paste and compare)


def get_threshold_associated_pellets(patch_key, start, end):
    """
    Retrieve the pellet delivery timestamps associated with each patch threshold update within the specified start-end time.
    1. Get all patch state update timestamps (DepletionState): let's call these events "A"
        - Remove all events within 1 second of each other
        - Remove all events without threshold value (NaN)
    2. Get all pellet delivery timestamps (DeliverPellet): let's call these events "B"
        - Find matching beam break timestamps within 1.2s after each pellet delivery
    3. For each event "A", find the nearest event "B" within 100ms before or after the event "A"
        - These are the pellet delivery events "B" associated with the previous threshold update event "A"
    4. Shift back the pellet delivery timestamps by 1 to match the pellet delivery with the previous threshold update
    5. Remove all threshold updates events "A" without a corresponding pellet delivery event "B"
    Args:
        patch_key (dict): primary key for the patch
        start (datetime): start timestamp
        end (datetime): end timestamp
    Returns:
        pd.DataFrame: DataFrame with the following columns:
        - threshold_update_timestamp (index)
        - pellet_timestamp
        - beam_break_timestamp
        - offset
        - rate
    """
    chunk_restriction = acquisition.create_chunk_restriction(patch_key["experiment_name"], start, end)

    # Get pellet delivery trigger data
    delivered_pellet_df = fetch_stream(
        streams.UndergroundFeederDeliverPellet & patch_key & chunk_restriction
    )[start:end]
    # Remove invalid rows where the time difference is less than 1.2 seconds
    invalid_rows = delivered_pellet_df.index.to_series().diff().dt.total_seconds() < 1.2
    delivered_pellet_df = delivered_pellet_df[~invalid_rows]

    # Get beambreak data
    beambreak_df = fetch_stream(streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction)[
        start:end
    ]
    # Remove invalid rows where the time difference is less than 1 second
    invalid_rows = beambreak_df.index.to_series().diff().dt.total_seconds() < 1
    beambreak_df = beambreak_df[~invalid_rows]
    # Exclude manual deliveries
    manual_delivery_df = fetch_stream(  
        streams.UndergroundFeederManualDelivery & patch_key & chunk_restriction
    )[start:end]
    delivered_pellet_df = delivered_pellet_df.loc[
        delivered_pellet_df.index.difference(manual_delivery_df.index)
    ]

    # Return empty if no pellets
    if delivered_pellet_df.empty or beam_break_df.empty:
        return acquisition.io_api._empty(
            ["threshold", "offset", "rate", "pellet_timestamp", "beam_break_timestamp"]
        )

    # Find pellet delivery triggers with matching beambreaks within 1.2s after each pellet delivery
    pellet_beam_break_df = (
        pd.merge_asof(
            delivered_pellet_df.reset_index(),
            beambreak_df.reset_index().rename(columns={"time": "beam_break_timestamp"}),
            left_on="time",
            right_on="beam_break_timestamp",
            tolerance=pd.Timedelta("1.2s"),
            direction="forward",
        )
        .set_index("time")
        .dropna(subset=["beam_break_timestamp"])
    )
    pellet_beam_break_df.drop_duplicates(subset="beam_break_timestamp", keep="last", inplace=True)

    # Get patch threshold data
    depletion_state_df = fetch_stream(
        streams.UndergroundFeederDepletionState & patch_key & chunk_restriction
    )[start:end]
    # Remove NaNs
    depletion_state_df = depletion_state_df.dropna(subset=["threshold"])
    # Remove invalid rows where the time difference is less than 1 second
    invalid_rows = depletion_state_df.index.to_series().diff().dt.total_seconds() < 1
    depletion_state_df = depletion_state_df[~invalid_rows]

    # Find pellet delivery triggers that approximately coincide with each threshold update
    # i.e. nearest pellet delivery within 100ms before or after threshold update
    pellet_ts_threshold_df = (
        pd.merge_asof(
            depletion_state_df.reset_index(),
            pellet_beam_break_df.reset_index().rename(columns={"time": "pellet_timestamp"}),
            left_on="time",
            right_on="pellet_timestamp",
            tolerance=pd.Timedelta("100ms"),
            direction="nearest",
        )
        .set_index("time")
        .dropna(subset=["pellet_timestamp"])
    )

    # Clean up the df
    pellet_ts_threshold_df = pellet_ts_threshold_df.drop(columns=["event_x", "event_y"])
    # Shift back the pellet_timestamp values by 1 to match with the previous threshold update
    pellet_ts_threshold_df.pellet_timestamp = pellet_ts_threshold_df.pellet_timestamp.shift(-1)
    pellet_ts_threshold_df.beam_break_timestamp = pellet_ts_threshold_df.beam_break_timestamp.shift(-1)
    pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp", "beam_break_timestamp"])
    return pellet_ts_threshold_df

@ttngu207
Copy link
Contributor Author

@ttngu207

I get slightly different results with this approach - it returns more final match pellet delivery triggers, beambreaks, and threshold values.

It's similar to yours, I just changed things to make it more readable for me, but see the diffs, and see the results, and let me know what you think.

(If you want me to put it in a separate PR or something let me know, else you can just copy and paste and compare)

def get_threshold_associated_pellets(patch_key, start, end):
    """
    Retrieve the pellet delivery timestamps associated with each patch threshold update within the specified start-end time.
    1. Get all patch state update timestamps (DepletionState): let's call these events "A"
        - Remove all events within 1 second of each other
        - Remove all events without threshold value (NaN)
    2. Get all pellet delivery timestamps (DeliverPellet): let's call these events "B"
        - Find matching beam break timestamps within 1.2s after each pellet delivery
    3. For each event "A", find the nearest event "B" within 100ms before or after the event "A"
        - These are the pellet delivery events "B" associated with the previous threshold update event "A"
    4. Shift back the pellet delivery timestamps by 1 to match the pellet delivery with the previous threshold update
    5. Remove all threshold updates events "A" without a corresponding pellet delivery event "B"
    Args:
        patch_key (dict): primary key for the patch
        start (datetime): start timestamp
        end (datetime): end timestamp
    Returns:
        pd.DataFrame: DataFrame with the following columns:
        - threshold_update_timestamp (index)
        - pellet_timestamp
        - beam_break_timestamp
        - offset
        - rate
    """
    chunk_restriction = acquisition.create_chunk_restriction(patch_key["experiment_name"], start, end)

    # Get pellet delivery trigger data
    delivered_pellet_df = fetch_stream(
        streams.UndergroundFeederDeliverPellet & patch_key & chunk_restriction
    )[start:end]
    # Remove invalid rows where the time difference is less than 1.2 seconds
    invalid_rows = delivered_pellet_df.index.to_series().diff().dt.total_seconds() < 1.2
    delivered_pellet_df = delivered_pellet_df[~invalid_rows]

    # Get beambreak data
    beambreak_df = fetch_stream(streams.UndergroundFeederBeamBreak & patch_key & chunk_restriction)[
        start:end
    ]
    # Remove invalid rows where the time difference is less than 1 second
    invalid_rows = beambreak_df.index.to_series().diff().dt.total_seconds() < 1
    beambreak_df = beambreak_df[~invalid_rows]
    # Exclude manual deliveries
    manual_delivery_df = fetch_stream(  
        streams.UndergroundFeederManualDelivery & patch_key & chunk_restriction
    )[start:end]
    delivered_pellet_df = delivered_pellet_df.loc[
        delivered_pellet_df.index.difference(manual_delivery_df.index)
    ]

    # Return empty if no pellets
    if delivered_pellet_df.empty or beam_break_df.empty:
        return acquisition.io_api._empty(
            ["threshold", "offset", "rate", "pellet_timestamp", "beam_break_timestamp"]
        )

    # Find pellet delivery triggers with matching beambreaks within 1.2s after each pellet delivery
    pellet_beam_break_df = (
        pd.merge_asof(
            delivered_pellet_df.reset_index(),
            beambreak_df.reset_index().rename(columns={"time": "beam_break_timestamp"}),
            left_on="time",
            right_on="beam_break_timestamp",
            tolerance=pd.Timedelta("1.2s"),
            direction="forward",
        )
        .set_index("time")
        .dropna(subset=["beam_break_timestamp"])
    )
    pellet_beam_break_df.drop_duplicates(subset="beam_break_timestamp", keep="last", inplace=True)

    # Get patch threshold data
    depletion_state_df = fetch_stream(
        streams.UndergroundFeederDepletionState & patch_key & chunk_restriction
    )[start:end]
    # Remove NaNs
    depletion_state_df = depletion_state_df.dropna(subset=["threshold"])
    # Remove invalid rows where the time difference is less than 1 second
    invalid_rows = depletion_state_df.index.to_series().diff().dt.total_seconds() < 1
    depletion_state_df = depletion_state_df[~invalid_rows]

    # Find pellet delivery triggers that approximately coincide with each threshold update
    # i.e. nearest pellet delivery within 100ms before or after threshold update
    pellet_ts_threshold_df = (
        pd.merge_asof(
            depletion_state_df.reset_index(),
            pellet_beam_break_df.reset_index().rename(columns={"time": "pellet_timestamp"}),
            left_on="time",
            right_on="pellet_timestamp",
            tolerance=pd.Timedelta("100ms"),
            direction="nearest",
        )
        .set_index("time")
        .dropna(subset=["pellet_timestamp"])
    )

    # Clean up the df
    pellet_ts_threshold_df = pellet_ts_threshold_df.drop(columns=["event_x", "event_y"])
    # Shift back the pellet_timestamp values by 1 to match with the previous threshold update
    pellet_ts_threshold_df.pellet_timestamp = pellet_ts_threshold_df.pellet_timestamp.shift(-1)
    pellet_ts_threshold_df.beam_break_timestamp = pellet_ts_threshold_df.beam_break_timestamp.shift(-1)
    pellet_ts_threshold_df = pellet_ts_threshold_df.dropna(subset=["pellet_timestamp", "beam_break_timestamp"])
    return pellet_ts_threshold_df

Looks good @jkbhagatio
Yes, the logic is the same, code reorganized to be more readable.

I've updated this PR to include the changes.

This PR is also up to date with main, so merging this will get datajoint_pipeline to include all recent changes on main
Or, if not ready, we can review/merge #416
(tagging @MilagrosMarin here too, if we're merging this PR 398 first, then no need for 416)

@jkbhagatio
Copy link
Member

Looks good @jkbhagatio
Yes, the logic is the same, code reorganized to be more readable.

I've updated this PR to include the changes.

This PR is also up to date with main, so merging this will get datajoint_pipeline to include all recent changes on main
Or, if not ready, we can review/merge #416
(tagging @MilagrosMarin here too, if we're merging this PR 398 first, then no need for 416)

Cool, sounds good, I'm happy with this to merge then, feel free to go ahead @ttngu207 !

@ttngu207 ttngu207 merged commit 07fe4c2 into SainsburyWellcomeCentre:datajoint_pipeline Sep 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants