Skip to content

Commit

Permalink
post prv_candidates and fp_hists to SkyPortal separately
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz committed Mar 4, 2024
1 parent dfa32f4 commit 73a2ab5
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 46 deletions.
7 changes: 5 additions & 2 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,6 @@ def alert_post_source(
log(
f"Saved {alert['objectId']} {alert['candid']} as a Source on SkyPortal"
)
print(response.json())
saved_to_groups = response.json()["data"].get(
"saved_to_groups", None
)
Expand Down Expand Up @@ -1888,7 +1887,11 @@ def alert_sentinel_skyportal(
# this should never happen, but just in case
log(f"Failed to get all alerts for {alert['objectId']}: {e}")

self.alert_put_photometry(alert)
try:
self.alert_put_photometry(alert)
except Exception as e:
traceback.print_exc()
raise e

# post thumbnails
self.alert_post_thumbnails(alert)
Expand Down
99 changes: 56 additions & 43 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import traceback
import numpy as np
import pandas as pd
from abc import ABC
from copy import deepcopy
from typing import Mapping, Sequence
Expand Down Expand Up @@ -470,7 +471,7 @@ def alert_put_photometry(self, alert):
df_photometry = self.make_photometry(alert, include_fp_hists=True)

log(
f"Alert {alert['objectId']} contains program_ids={df_photometry['programid']}"
f"Alert {alert['objectId']} contains program_ids={list(df_photometry['programid'])}"
)

df_photometry["stream_id"] = df_photometry["programid"].apply(
Expand All @@ -481,52 +482,64 @@ def alert_put_photometry(self, alert):
f"Posting {alert['objectId']} photometry for stream_ids={set(df_photometry.stream_id.unique())} to SkyPortal"
)

# post photometry by stream_id
for stream_id in set(df_photometry.stream_id.unique()):
stream_id_mask = df_photometry.stream_id == int(stream_id)
photometry = {
"obj_id": alert["objectId"],
"stream_ids": [int(stream_id)],
"instrument_id": self.instrument_id,
"mjd": df_photometry.loc[stream_id_mask, "mjd"].tolist(),
"flux": df_photometry.loc[stream_id_mask, "flux"].tolist(),
"fluxerr": df_photometry.loc[stream_id_mask, "fluxerr"].tolist(),
"zp": df_photometry.loc[stream_id_mask, "zp"].tolist(),
"magsys": df_photometry.loc[stream_id_mask, "zpsys"].tolist(),
"filter": df_photometry.loc[stream_id_mask, "filter"].tolist(),
"ra": df_photometry.loc[stream_id_mask, "ra"].tolist(),
"dec": df_photometry.loc[stream_id_mask, "dec"].tolist(),
"origin": df_photometry.loc[stream_id_mask, "origin"].tolist()
if "origin" in df_photometry
else [],
}
# split the dataframe in 2, one for the alert photometry (prv_candidates) and one for the forced photometry (fp_hists)
# the first has no origin, the second has origin = 'alert_fp'
if "origin" in df_photometry.columns:
df_prv_candidates = df_photometry[df_photometry["origin"].isnull()]
df_fp_hists = df_photometry[df_photometry["origin"] == "alert_fp"]
else:
df_prv_candidates = df_photometry
df_fp_hists = pd.DataFrame()

if (len(photometry.get("flux", ())) > 0) or (
len(photometry.get("fluxerr", ())) > 0
):
with timer(
f"Posting photometry of {alert['objectId']} {alert['candid']}, "
f"stream_id={stream_id} to SkyPortal",
self.verbose > 1,
for df, is_fp in zip([df_prv_candidates, df_fp_hists], [False, True]):
if df.empty or len(df) == 0:
continue
# post photometry by stream_id
for stream_id in set(df.stream_id.unique()):
stream_id_mask = df.stream_id == int(stream_id)
photometry = {
"obj_id": alert["objectId"],
"stream_ids": [int(stream_id)],
"instrument_id": self.instrument_id,
"mjd": df.loc[stream_id_mask, "mjd"].tolist(),
"flux": df.loc[stream_id_mask, "flux"].tolist(),
"fluxerr": df.loc[stream_id_mask, "fluxerr"].tolist(),
"zp": df.loc[stream_id_mask, "zp"].tolist(),
"magsys": df.loc[stream_id_mask, "zpsys"].tolist(),
"filter": df.loc[stream_id_mask, "filter"].tolist(),
"ra": df.loc[stream_id_mask, "ra"].tolist(),
"dec": df.loc[stream_id_mask, "dec"].tolist(),
"origin": df.loc[stream_id_mask, "origin"].tolist()
if "origin" in df
else [],
}

if (len(photometry.get("flux", ())) > 0) or (
len(photometry.get("fluxerr", ())) > 0
):
try:
response = self.api_skyportal(
"PUT",
"/api/photometry?ignore_flux_deduplication=true&ignore_flux_deduplication_replace=true",
photometry,
timeout=15,
)
if response.json()["status"] == "success":
with timer(
f"Posting {'(forced)' if is_fp else ''} photometry of {alert['objectId']} {alert['candid']}, "
f"stream_id={stream_id} to SkyPortal",
self.verbose > 1,
):
try:
response = self.api_skyportal(
"PUT",
f"/api/photometry{'?ignore_flux_deduplication=true&ignore_flux_deduplication_replace=true' if is_fp else ''}",
photometry,
timeout=15,
)
if response.json()["status"] == "success":
log(
f"Posted {alert['objectId']} photometry stream_id={stream_id} to SkyPortal"
)
else:
raise ValueError(response.json()["message"])
except Exception as e:
log(
f"Posted {alert['objectId']} photometry stream_id={stream_id} to SkyPortal"
f"Failed to post {alert['objectId']} photometry stream_id={stream_id} to SkyPortal: {e}"
)
else:
raise ValueError(response.json()["message"])
except Exception as e:
log(
f"Failed to post {alert['objectId']} photometry stream_id={stream_id} to SkyPortal: {e}"
)
continue
continue

def flux_to_mag(self, flux, fluxerr, zp):
"""Convert flux to magnitude and calculate SNR
Expand Down
1 change: 0 additions & 1 deletion kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,6 @@ def test_alert_filter__user_defined__with_fp_hists(self):
passed_filters = self.worker.alert_filter__user_defined([filter], record)

assert passed_filters is not None
print(passed_filters)
assert len(passed_filters) == 1
assert "autosave" in passed_filters[0]

Expand Down

0 comments on commit 73a2ab5

Please sign in to comment.