Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZTF Alert FP - deduplication logic, flux to mag space #261

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,15 @@ def alert_filter__user_defined(
},
}

if not isinstance(_filter.get("autosave", False), bool):
passed_filter["auto_followup"]["data"][
"ignore_source_group_ids"
] = [
_filter.get("autosave", {}).get(
"ignore_group_ids", []
)
]

passed_filters.append(passed_filter)

except Exception as e:
Expand Down Expand Up @@ -1927,9 +1936,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
response = self.api_skyportal(
"POST",
"/api/followup_request",
passed_filter["auto_followup"][
"data"
], # already contains the optional ignore_group_ids
passed_filter["auto_followup"]["data"],
)
if (
response.json()["status"] == "success"
Expand All @@ -1939,7 +1946,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
is False
):
log(
f"Posted followup request for {alert['objectId']} to SkyPortal"
f"Posted followup request successfully for {alert['objectId']} to SkyPortal"
)
# add it to the existing requests
existing_requests.append(
Expand Down Expand Up @@ -1971,7 +1978,14 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
"text": passed_filter["auto_followup"][
"comment"
],
"group_ids": [passed_filter["group_id"]],
"group_ids": list(
set(
[passed_filter["group_id"]]
+ passed_filter.get("auto_followup", {})
.get("data", {})
.get("target_group_ids", [])
)
),
}
with timer(
f"Posting followup comment {comment['text']} for {alert['objectId']} to SkyPortal",
Expand All @@ -1989,23 +2003,28 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
.get("data", {})
.get(
"message",
"unknow error posting comment",
"unknown error posting comment",
)
)
except Exception as e:
log(
f"Failed to post followup comment {comment['text']} for {alert['objectId']} to SkyPortal: {e}"
)
else:
error_message = response.json().get(
"message",
response.json()
.get("data", {})
.get(
try:
error_message = response.json().get(
"message",
"unknow error posting followup request",
),
)
response.json()
.get("data", {})
.get(
"message",
"unknown error posting followup request",
),
Theodlz marked this conversation as resolved.
Show resolved Hide resolved
)
except Exception:
error_message = (
"unknown error posting followup request"
)
raise ValueError(error_message)
except Exception as e:
log(
Expand Down Expand Up @@ -2079,7 +2098,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
raise ValueError(
response.json().get(
"message",
"unknow error updating followup request",
"unknown error updating followup request",
)
)
except Exception as e:
Expand Down
221 changes: 176 additions & 45 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import threading
import time
import traceback
import numpy as np
from abc import ABC
from copy import deepcopy
from typing import Mapping, Sequence
Expand Down Expand Up @@ -43,7 +44,7 @@ def process_alert(alert: Mapping, topic: str):

# get worker running current task
worker = dask.distributed.get_worker()
alert_worker = worker.plugins["worker-init"].alert_worker
alert_worker: ZTFAlertWorker = worker.plugins["worker-init"].alert_worker

log(f"{topic} {object_id} {candid} {worker.address}")

Expand Down Expand Up @@ -129,9 +130,12 @@ def process_alert(alert: Mapping, topic: str):
"_id": object_id,
"cross_matches": xmatches,
"prv_candidates": prv_candidates,
"fp_hists": fp_hists,
}

# only add the fp_hists if its a brand new object, not just if there is no entry there
if alert["candidate"]["ndethist"] <= 1:
alert_aux["fp_hists"] = alert_worker.format_fp_hists(alert, fp_hists)

with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1):
retry(alert_worker.mongo.insert_one)(
collection=alert_worker.collection_alerts_aux, document=alert_aux
Expand All @@ -150,34 +154,24 @@ def process_alert(alert: Mapping, topic: str):
upsert=True,
)

# FOR NOW: we decided to only store the forced photometry for the very first alert we get for an object
# so, no need to update anything here

# update fp_hists
# existing_fp_hists = retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one
# )({"_id": object_id}, {"fp_hists": 1})
# if existing_fp_hists is not None:
# existing_fp_hists = existing_fp_hists.get("fp_hists", [])
# if len(existing_fp_hists) > 0:
# new_fp_hists = alert_worker.deduplicate_fp_hists(
# existing_fp_hists, fp_hists
# )
# else:
# new_fp_hists = fp_hists
# else:
# new_fp_hists = fp_hists
# retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one
# )(
# {"_id": object_id},
# {
# "$set": {
# "fp_hists": new_fp_hists,
# }
# },
# upsert=True,
# )
# if there is no fp_hists for this object, we don't update anything
Theodlz marked this conversation as resolved.
Show resolved Hide resolved
# the idea is that we start accumulating FP only for new objects, to avoid
# having some objects with incomplete FP history, which would be confusing for the filters
# either there is full FP, or there isn't any
if (
retry(
alert_worker.mongo.db[
alert_worker.collection_alerts_aux
].count_documents
)(
{"_id": alert["objectId"], "fp_hists": {"$exists": True}},
limit=1,
)
== 1
):
alert_worker.update_fp_hists(
alert, alert_worker.format_fp_hists(alert, fp_hists)
)

if config["misc"]["broker"]:
# execute user-defined alert filters
Expand Down Expand Up @@ -495,25 +489,162 @@ def alert_put_photometry(self, alert):
)
continue

def deduplicate_fp_hists(self, existing_fp=[], latest_fp=[]):
# for the forced photometry (fp_hists) unfortunately it's not as simple as deduplicating with a set
# the fp_hists of each candidate of an object is recomputed everytime, so datapoints
# at the same jd can be different, so we grab the existing fp_hists aggregate, and build a new one.
def flux_to_mag(self, flux, fluxerr, zp):
Theodlz marked this conversation as resolved.
Show resolved Hide resolved
"""Convert flux to magnitude and calculate SNR

:param flux:
:param fluxerr:
:param zp:
:param snr_threshold:
:return:
"""
# make sure its all numpy floats or nans
values = np.array([flux, fluxerr, zp], dtype=np.float64)
snr = values[0] / values[1]
mag = -2.5 * np.log10(values[0]) + values[2]
magerr = 1.0857 * (values[1] / values[0])
limmag3sig = -2.5 * np.log10(3 * values[1]) + values[2]
limmag5sig = -2.5 * np.log10(5 * values[1]) + values[2]
if np.isnan(snr):
return {}
if snr < 0:
return {
"snr": snr,
}
mag_data = {
"mag": mag,
"magerr": magerr,
"snr": snr,
"limmag3sig": limmag3sig,
"limmag5sig": limmag5sig,
}
# remove all NaNs fields
mag_data = {k: v for k, v in mag_data.items() if not np.isnan(v)}
return mag_data

def format_fp_hists(self, alert, fp_hists):
if len(fp_hists) == 0:
return []
# sort by jd
fp_hists = sorted(fp_hists, key=lambda x: x["jd"])

# add the "alert_mag" field to the new fp_hist
# as well as alert_ra, alert_dec
for i, fp in enumerate(fp_hists):
fp_hists[i] = {
**fp,
**self.flux_to_mag(
flux=fp.get("forcediffimflux", np.nan),
fluxerr=fp.get("forcediffimfluxunc", np.nan),
zp=fp["magzpsci"],
),
"alert_mag": alert["candidate"]["magpsf"],
"alert_ra": alert["candidate"]["ra"],
"alert_dec": alert["candidate"]["dec"],
}

return fp_hists

# first find the oldest jd in the latest fp_hists
oldest_jd_in_latest = min([fp["jd"] for fp in latest_fp])
# get all the datapoints in the existing fp_hists that are older than the oldest jd in the latest fp_hists
older_datapoints = [fp for fp in existing_fp if fp["jd"] < oldest_jd_in_latest]
def update_fp_hists(self, alert, formatted_fp_hists):
# update the existing fp_hist with the new one
# instead of treating it as a set,
# if some entries have the same jd, keep the one with the highest alert_mag

# TODO: implement a better logic here. Could be based on:
# - SNR (better SNR datapoints might be better)
# - position (centroid, closer to the avg position might be better)
# - mag (if 1 sigma brighter or dimmer than the current datapoints, use the newer ones)
# make sure this is an aggregate pipeline in mongodb
if len(formatted_fp_hists) == 0:
return

# for now, just append the latest fp_hists to the older ones,
# to prioritize newer datapoints which might come from an updated pipeline
with timer(
f"Updating fp_hists of {alert['objectId']} {alert['candid']}",
self.verbose > 1,
):
update_pipeline = [
# 1. concat the new fp_hists with the existing ones
{
"$project": {
"all_fp_hists": {
"$concatArrays": [
{"$ifNull": ["$fp_hists", []]},
formatted_fp_hists,
]
}
}
},
# 2. unwind the resulting array to get one document per fp_hist
{"$unwind": "$all_fp_hists"},
# 3. group by jd and keep the one with the highest alert_mag for each jd
{
"$set": {
"all_fp_hists.alert_mag": {
"$cond": {
"if": {
"$eq": [
{"$type": "$all_fp_hists.alert_mag"},
"missing",
]
},
"then": -99999.0,
"else": "$all_fp_hists.alert_mag",
}
}
}
},
# 4. sort by jd and alert_mag
{
"$sort": {
"all_fp_hists.jd": 1,
"all_fp_hists.alert_mag": 1,
}
},
# 5. group all the deduplicated fp_hists back into an array, keeping the first one (the brightest at each jd)
{
"$group": {
"_id": "$all_fp_hists.jd",
"fp_hist": {"$first": "$$ROOT.all_fp_hists"},
}
},
# 6. sort by jd again
{"$sort": {"fp_hist.jd": 1}},
# 7. group all the fp_hists documents back into a single array
{"$group": {"_id": None, "fp_hists": {"$push": "$fp_hist"}}},
# 8. project only the new fp_hists array
{"$project": {"fp_hists": 1, "_id": 0}},
]
n_retries = 0
while True:
# run the pipeline and then update the document
new_fp_hists = (
self.mongo.db[self.collection_alerts_aux]
.aggregate(
update_pipeline,
)
.next()
.get("fp_hists", [])
)

return older_datapoints + latest_fp
# update the document, only if there is still less points in the DB than in the new fp_hists.
# Otherwise, rerun the pipeline. This is to help a little bit with concurrency issues
result = self.mongo.db[self.collection_alerts_aux].find_one_and_update(
{
"_id": alert["objectId"],
f"fp_hists.{len(new_fp_hists)}": {"$exists": False},
},
{"$set": {"fp_hists": new_fp_hists}},
)
if result is None:
n_retries += 1
if n_retries > 10:
log(
f"Failed to update fp_hists of {alert['objectId']} {alert['candid']}"
)
break
else:
log(
f"Retrying to update fp_hists of {alert['objectId']} {alert['candid']}"
)
time.sleep(1)
else:
break


class WorkerInitializer(dask.distributed.WorkerPlugin):
Expand Down
Loading
Loading