Skip to content

Commit

Permalink
New ZTF alert schema - Forced photometry (#258)
Browse files Browse the repository at this point in the history
* handle the new ZTF alert schema, to remove forced photometry from the alert's table
* instead, ingest the 30 days window of forced photometry but only for the very first alert of a given object (when the entry for that object is created in the aux table)
  • Loading branch information
Theodlz authored Oct 17, 2023
1 parent 39a918c commit c672826
Show file tree
Hide file tree
Showing 34 changed files with 3,781 additions and 28 deletions.
188 changes: 188 additions & 0 deletions data/ztf_alerts/20231012/2475433850015010009.avro

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions data/ztf_alerts/20231012/2475433850015015000.avro

Large diffs are not rendered by default.

167 changes: 167 additions & 0 deletions data/ztf_alerts/20231012/2475433850015015001.avro

Large diffs are not rendered by default.

149 changes: 149 additions & 0 deletions data/ztf_alerts/20231012/2475433850015015010.avro

Large diffs are not rendered by default.

169 changes: 169 additions & 0 deletions data/ztf_alerts/20231012/2475433850015015022.avro

Large diffs are not rendered by default.

150 changes: 150 additions & 0 deletions data/ztf_alerts/20231012/2475433850015015041.avro

Large diffs are not rendered by default.

133 changes: 133 additions & 0 deletions data/ztf_alerts/20231012/2475433850215010007.avro

Large diffs are not rendered by default.

182 changes: 182 additions & 0 deletions data/ztf_alerts/20231012/2475433850215010008.avro

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions data/ztf_alerts/20231012/2475433850215015019.avro

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions data/ztf_alerts/20231012/2475433850215015020.avro

Large diffs are not rendered by default.

179 changes: 179 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015003.avro

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015004.avro

Large diffs are not rendered by default.

167 changes: 167 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015008.avro

Large diffs are not rendered by default.

162 changes: 162 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015009.avro

Large diffs are not rendered by default.

168 changes: 168 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015010.avro

Large diffs are not rendered by default.

146 changes: 146 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015012.avro

Large diffs are not rendered by default.

165 changes: 165 additions & 0 deletions data/ztf_alerts/20231012/2475433850315015013.avro

Large diffs are not rendered by default.

111 changes: 111 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015002.avro

Large diffs are not rendered by default.

165 changes: 165 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015004.avro

Large diffs are not rendered by default.

136 changes: 136 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015005.avro

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015006.avro

Large diffs are not rendered by default.

125 changes: 125 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015007.avro

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015012.avro

Large diffs are not rendered by default.

112 changes: 112 additions & 0 deletions data/ztf_alerts/20231012/2475433850415015017.avro

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,8 @@ def alert_mongify(alert: Mapping):
Prepare a raw alert for ingestion into MongoDB:
- add a placeholder for ML-based classifications
- add coordinates for 2D spherical indexing and compute Galactic coordinates
- cut off the prv_candidates section
- extract the prv_candidates section
- extract the fp_hists section (if it exists)
:param alert:
:return:
Expand Down Expand Up @@ -577,7 +578,16 @@ def alert_mongify(alert: Mapping):
if prv_candidates is None:
prv_candidates = []

return doc, prv_candidates
# extract the fp_hists section, if it exists
fp_hists = deepcopy(doc.get("fp_hists", None))
doc.pop("fp_hists", None)
if fp_hists is None:
fp_hists = []
else:
# sort by jd
fp_hists = sorted(fp_hists, key=lambda k: k["jd"])

return doc, prv_candidates, fp_hists

def make_thumbnail(
self, alert: Mapping, skyportal_type: str, alert_packet_type: str
Expand Down Expand Up @@ -1637,7 +1647,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
- decide which points to post to what groups based on permissions
- post alert light curve in single PUT call to /api/photometry specifying stream_ids
:param alert: alert with a stripped-off prv_candidates section
:param alert: alert with a stripped-off prv_candidates section and fp_hists sections
:param prv_candidates: could be plain prv_candidates section of an alert, or extended alert history
:param passed_filters: list of filters that alert passed, with their output
:return:
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_pgir.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

# create alert history
all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])]
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_turbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1):
alert_worker.mongo.insert_one(
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_winter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}"):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

# future: add ML model filtering here

Expand Down
60 changes: 59 additions & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, fp_hists = alert_worker.alert_mongify(alert)

# create alert history
all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])]
Expand Down Expand Up @@ -104,6 +104,12 @@ def process_alert(alert: Mapping, topic: str):
for prv_candidate in prv_candidates
]

# fp_hists: pop nulls - save space
fp_hists = [
{kk: vv for kk, vv in fp_hist.items() if vv not in [None, -99999, -99999.0]}
for fp_hist in fp_hists
]

alert_aux, xmatches, passed_filters = None, None, None
# cross-match with external catalogs if objectId not in collection_alerts_aux:
if (
Expand All @@ -123,6 +129,7 @@ def process_alert(alert: Mapping, topic: str):
"_id": object_id,
"cross_matches": xmatches,
"prv_candidates": prv_candidates,
"fp_hists": fp_hists,
}

with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1):
Expand All @@ -134,6 +141,7 @@ def process_alert(alert: Mapping, topic: str):
with timer(
f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1
):
# update prv_candidates
retry(
alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one
)(
Expand All @@ -142,6 +150,35 @@ def process_alert(alert: Mapping, topic: str):
upsert=True,
)

# FOR NOW: we decided to only store the forced photometry for the very first alert we get for an object
# so, no need to update anything here

# update fp_hists
# existing_fp_hists = retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one
# )({"_id": object_id}, {"fp_hists": 1})
# if existing_fp_hists is not None:
# existing_fp_hists = existing_fp_hists.get("fp_hists", [])
# if len(existing_fp_hists) > 0:
# new_fp_hists = alert_worker.deduplicate_fp_hists(
# existing_fp_hists, fp_hists
# )
# else:
# new_fp_hists = fp_hists
# else:
# new_fp_hists = fp_hists
# retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one
# )(
# {"_id": object_id},
# {
# "$set": {
# "fp_hists": new_fp_hists,
# }
# },
# upsert=True,
# )

if config["misc"]["broker"]:
# execute user-defined alert filters
with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1):
Expand All @@ -160,6 +197,7 @@ def process_alert(alert: Mapping, topic: str):
del (
alert,
prv_candidates,
fp_hists,
all_prv_candidates,
scores,
xmatches,
Expand Down Expand Up @@ -457,6 +495,26 @@ def alert_put_photometry(self, alert):
)
continue

def deduplicate_fp_hists(self, existing_fp=[], latest_fp=[]):
# for the forced photometry (fp_hists) unfortunately it's not as simple as deduplicating with a set
# the fp_hists of each candidate of an object is recomputed everytime, so datapoints
# at the same jd can be different, so we grab the existing fp_hists aggregate, and build a new one.

# first find the oldest jd in the latest fp_hists
oldest_jd_in_latest = min([fp["jd"] for fp in latest_fp])
# get all the datapoints in the existing fp_hists that are older than the oldest jd in the latest fp_hists
older_datapoints = [fp for fp in existing_fp if fp["jd"] < oldest_jd_in_latest]

# TODO: implement a better logic here. Could be based on:
# - SNR (better SNR datapoints might be better)
# - position (centroid, closer to the avg position might be better)
# - mag (if 1 sigma brighter or dimmer than the current datapoints, use the newer ones)

# for now, just append the latest fp_hists to the older ones,
# to prioritize newer datapoints which might come from an updated pipeline

return older_datapoints + latest_fp


class WorkerInitializer(dask.distributed.WorkerPlugin):
def __init__(self, *args, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions kowalski/tests/test_alert_broker_pgir.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TestAlertBrokerPGIR:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(prv_candidates) == 71

Expand All @@ -43,7 +43,7 @@ def test_make_photometry(self):
assert len(df_photometry) == 71

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -54,13 +54,13 @@ def test_make_thumbnails(self):

def test_alert_filter__ml(self):
"""Test executing ML models on the alert"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
scores = self.worker.alert_filter__ml(alert)
log(scores)

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = config["database"].get("xmatch", {}).get("PGIR", {}).keys()
assert isinstance(xmatches, dict)
Expand Down
6 changes: 3 additions & 3 deletions kowalski/tests/test_alert_broker_turbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TestAlertBrokerTURBO:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(prv_candidates) == 0 # 71

Expand All @@ -43,7 +43,7 @@ def test_make_photometry(self):
assert len(df_photometry) == 0 # 71

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -54,7 +54,7 @@ def test_make_thumbnails(self):

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = (
config["database"].get("xmatch", {}).get("TURBO", {}).keys()
Expand Down
6 changes: 3 additions & 3 deletions kowalski/tests/test_alert_broker_wntr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TestAlertBrokerWNTR:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(alert["candidate"]) > 0 # ensure cand data is not empty
assert alert["objectId"] == self.alert["objectId"]
Expand All @@ -50,7 +50,7 @@ def test_make_photometry(self):
assert df_photometry["filter"][0] == "2massj"

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -61,7 +61,7 @@ def test_make_thumbnails(self):

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = config["database"].get("xmatch", {}).get("WNTR", {}).keys()
assert isinstance(xmatches, dict)
Expand Down
Loading

0 comments on commit c672826

Please sign in to comment.