diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 340ef98d..7a0505e4 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -1591,7 +1591,6 @@ def alert_post_source( log( f"Saved {alert['objectId']} {alert['candid']} as a Source on SkyPortal" ) - print(response.json()) saved_to_groups = response.json()["data"].get( "saved_to_groups", None ) @@ -1888,7 +1887,11 @@ def alert_sentinel_skyportal( # this should never happen, but just in case log(f"Failed to get all alerts for {alert['objectId']}: {e}") - self.alert_put_photometry(alert) + try: + self.alert_put_photometry(alert) + except Exception as e: + traceback.print_exc() + raise e # post thumbnails self.alert_post_thumbnails(alert) diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index 0db21f0c..6c9b40f3 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -8,6 +8,7 @@ import time import traceback import numpy as np +import pandas as pd from abc import ABC from copy import deepcopy from typing import Mapping, Sequence @@ -470,7 +471,7 @@ def alert_put_photometry(self, alert): df_photometry = self.make_photometry(alert, include_fp_hists=True) log( - f"Alert {alert['objectId']} contains program_ids={df_photometry['programid']}" + f"Alert {alert['objectId']} contains program_ids={list(df_photometry['programid'])}" ) df_photometry["stream_id"] = df_photometry["programid"].apply( @@ -481,52 +482,64 @@ def alert_put_photometry(self, alert): f"Posting {alert['objectId']} photometry for stream_ids={set(df_photometry.stream_id.unique())} to SkyPortal" ) - # post photometry by stream_id - for stream_id in set(df_photometry.stream_id.unique()): - stream_id_mask = df_photometry.stream_id == int(stream_id) - photometry = { - "obj_id": alert["objectId"], - "stream_ids": [int(stream_id)], - "instrument_id": self.instrument_id, - "mjd": df_photometry.loc[stream_id_mask, "mjd"].tolist(), - "flux": df_photometry.loc[stream_id_mask, "flux"].tolist(), - "fluxerr": df_photometry.loc[stream_id_mask, "fluxerr"].tolist(), - "zp": df_photometry.loc[stream_id_mask, "zp"].tolist(), - "magsys": df_photometry.loc[stream_id_mask, "zpsys"].tolist(), - "filter": df_photometry.loc[stream_id_mask, "filter"].tolist(), - "ra": df_photometry.loc[stream_id_mask, "ra"].tolist(), - "dec": df_photometry.loc[stream_id_mask, "dec"].tolist(), - "origin": df_photometry.loc[stream_id_mask, "origin"].tolist() - if "origin" in df_photometry - else [], - } + # split the dataframe in 2, one for the alert photometry (prv_candidates) and one for the forced photometry (fp_hists) + # the first has no origin, the second has origin = 'alert_fp' + if "origin" in df_photometry.columns: + df_prv_candidates = df_photometry[df_photometry["origin"].isnull()] + df_fp_hists = df_photometry[df_photometry["origin"] == "alert_fp"] + else: + df_prv_candidates = df_photometry + df_fp_hists = pd.DataFrame() - if (len(photometry.get("flux", ())) > 0) or ( - len(photometry.get("fluxerr", ())) > 0 - ): - with timer( - f"Posting photometry of {alert['objectId']} {alert['candid']}, " - f"stream_id={stream_id} to SkyPortal", - self.verbose > 1, + for df, is_fp in zip([df_prv_candidates, df_fp_hists], [False, True]): + if df.empty or len(df) == 0: + continue + # post photometry by stream_id + for stream_id in set(df.stream_id.unique()): + stream_id_mask = df.stream_id == int(stream_id) + photometry = { + "obj_id": alert["objectId"], + "stream_ids": [int(stream_id)], + "instrument_id": self.instrument_id, + "mjd": df.loc[stream_id_mask, "mjd"].tolist(), + "flux": df.loc[stream_id_mask, "flux"].tolist(), + "fluxerr": df.loc[stream_id_mask, "fluxerr"].tolist(), + "zp": df.loc[stream_id_mask, "zp"].tolist(), + "magsys": df.loc[stream_id_mask, "zpsys"].tolist(), + "filter": df.loc[stream_id_mask, "filter"].tolist(), + "ra": df.loc[stream_id_mask, "ra"].tolist(), + "dec": df.loc[stream_id_mask, "dec"].tolist(), + "origin": df.loc[stream_id_mask, "origin"].tolist() + if "origin" in df + else [], + } + + if (len(photometry.get("flux", ())) > 0) or ( + len(photometry.get("fluxerr", ())) > 0 ): - try: - response = self.api_skyportal( - "PUT", - "/api/photometry?ignore_flux_deduplication=true&ignore_flux_deduplication_replace=true", - photometry, - timeout=15, - ) - if response.json()["status"] == "success": + with timer( + f"Posting {'(forced)' if is_fp else ''} photometry of {alert['objectId']} {alert['candid']}, " + f"stream_id={stream_id} to SkyPortal", + self.verbose > 1, + ): + try: + response = self.api_skyportal( + "PUT", + f"/api/photometry{'?ignore_flux_deduplication=true&ignore_flux_deduplication_replace=true' if is_fp else ''}", + photometry, + timeout=15, + ) + if response.json()["status"] == "success": + log( + f"Posted {alert['objectId']} photometry stream_id={stream_id} to SkyPortal" + ) + else: + raise ValueError(response.json()["message"]) + except Exception as e: log( - f"Posted {alert['objectId']} photometry stream_id={stream_id} to SkyPortal" + f"Failed to post {alert['objectId']} photometry stream_id={stream_id} to SkyPortal: {e}" ) - else: - raise ValueError(response.json()["message"]) - except Exception as e: - log( - f"Failed to post {alert['objectId']} photometry stream_id={stream_id} to SkyPortal: {e}" - ) - continue + continue def flux_to_mag(self, flux, fluxerr, zp): """Convert flux to magnitude and calculate SNR diff --git a/kowalski/tests/test_alert_broker_ztf.py b/kowalski/tests/test_alert_broker_ztf.py index dfa7f5ae..5f3aa6c3 100644 --- a/kowalski/tests/test_alert_broker_ztf.py +++ b/kowalski/tests/test_alert_broker_ztf.py @@ -1015,7 +1015,6 @@ def test_alert_filter__user_defined__with_fp_hists(self): passed_filters = self.worker.alert_filter__user_defined([filter], record) assert passed_filters is not None - print(passed_filters) assert len(passed_filters) == 1 assert "autosave" in passed_filters[0]