Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Winter Schema #280

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,16 @@ kowalski:
- {
"$project":
{
"cutoutScience": 0,
"cutoutTemplate": 0,
"cutoutDifference": 0,
"cutout_science": 0,
"cutout_template": 0,
"cutout_difference": 0,
},
}
- {
"$lookup":
{
"from": "WNTR_alerts_aux",
"localField": "objectId",
"localField": "objectid",
"foreignField": "_id",
"as": "aux",
},
Expand Down Expand Up @@ -523,7 +523,7 @@ kowalski:
},
"schemavsn": 1,
"publisher": 1,
"objectId": 1,
"objectid": 1,
"candid": 1,
"candidate": 1,
"classifications": 1,
Expand Down Expand Up @@ -1233,7 +1233,6 @@ kowalski:
lifetime_restart: true

dask_wntr:
# TODO verify! can't used PGIR's
host: 127.0.0.1
scheduler_port: 8784
dashboard_address: :8785
Expand Down
254 changes: 0 additions & 254 deletions data/wntr_alerts/20220815/2459303860000.avro

This file was deleted.

271 changes: 0 additions & 271 deletions data/wntr_alerts/20220815/2459303860001.avro

This file was deleted.

291 changes: 0 additions & 291 deletions data/wntr_alerts/20220815/2459303860002.avro

This file was deleted.

226 changes: 0 additions & 226 deletions data/wntr_alerts/20220815/2459303860003.avro

This file was deleted.

291 changes: 0 additions & 291 deletions data/wntr_alerts/20220815/2459362710041.avro

This file was deleted.

173 changes: 173 additions & 0 deletions data/wntr_alerts/20240311/3608694.avro

Large diffs are not rendered by default.

287 changes: 287 additions & 0 deletions data/wntr_alerts/20240311/3608701.avro

Large diffs are not rendered by default.

197 changes: 197 additions & 0 deletions data/wntr_alerts/20240311/3608704.avro

Large diffs are not rendered by default.

380 changes: 380 additions & 0 deletions data/wntr_alerts/20240311/3608721.avro

Large diffs are not rendered by default.

357 changes: 357 additions & 0 deletions data/wntr_alerts/20240311/3608741.avro

Large diffs are not rendered by default.

375 changes: 375 additions & 0 deletions data/wntr_alerts/20240311/3608742.avro

Large diffs are not rendered by default.

372 changes: 372 additions & 0 deletions data/wntr_alerts/20240311/3608756.avro

Large diffs are not rendered by default.

355 changes: 355 additions & 0 deletions data/wntr_alerts/20240311/3608792.avro

Large diffs are not rendered by default.

366 changes: 366 additions & 0 deletions data/wntr_alerts/20240311/3608871.avro

Large diffs are not rendered by default.

421 changes: 421 additions & 0 deletions data/wntr_alerts/20240311/3608892.avro

Large diffs are not rendered by default.

359 changes: 359 additions & 0 deletions data/wntr_alerts/20240311/3608898.avro

Large diffs are not rendered by default.

26 changes: 23 additions & 3 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io
import os
import pathlib
import platform
import sys
import traceback
from ast import literal_eval
Expand All @@ -16,6 +17,7 @@
import confluent_kafka
import dask.distributed
import fastavro
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -50,6 +52,13 @@

simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

try:
if platform.uname()[0] == "Darwin":
# make the matplotlib backend non-interactive on mac to avoid crashes
matplotlib.pyplot.switch_backend("Agg")
except Exception as e:
log(f"Failed to switch matplotlib backend to non-interactive: {e}")

# Tensorflow is problematic for Mac's currently, so we can add an option to disable it
USE_TENSORFLOW = os.environ.get("USE_TENSORFLOW", True) in [
"True",
Expand Down Expand Up @@ -220,8 +229,16 @@ def process_alert(alert: Mapping, topic: str):
raise NotImplementedError("Must be implemented in subclass")

def submit_alert(self, record: Mapping):
# we look for objectId and objectid if missing,
# to support both ZTF and WNTR alert schemas
objectId = record.get("objectId", record.get("objectid", None))
if objectId is None:
log(
f"Failed to get objectId from record {record}, skipping alert submission"
)
return
with timer(
f"Submitting alert {record['objectId']} {record['candid']} for processing",
f"Submitting alert {objectId} {record['candid']} for processing",
self.verbose > 1,
):
future = self.dask_client.submit(
Expand Down Expand Up @@ -468,11 +485,14 @@ def __init__(self, **kwargs):

# get instrument id
self.instrument_id = 1
instrument_name_on_skyportal = self.instrument
if self.instrument == "WNTR":
instrument_name_on_skyportal = "WINTER"
with timer(
f"Getting {self.instrument} instrument_id from SkyPortal", self.verbose > 1
):
response = self.api_skyportal(
"GET", "/api/instrument", {"name": self.instrument}
"GET", "/api/instrument", {"name": instrument_name_on_skyportal}
)
if response.json()["status"] == "success" and len(response.json()["data"]) > 0:
self.instrument_id = response.json()["data"][0]["id"]
Expand Down Expand Up @@ -617,7 +637,7 @@ def make_thumbnail(
image_data = hdu[0].data

# Survey-specific transformations to get North up and West on the right
if self.instrument == "ZTF":
if self.instrument in ["ZTF", "WNTR"]:
image_data = np.flipud(image_data)
elif self.instrument == "PGIR":
image_data = np.rot90(np.fliplr(image_data), 3)
Expand Down
36 changes: 25 additions & 11 deletions kowalski/alert_brokers/alert_broker_winter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ def process_alert(alert: Mapping, topic: str):
:return:
"""
candid = alert["candid"]
object_id = alert["objectId"]

print(f"WINTER: {topic} {object_id} {candid} in process_alert")
object_id = alert["objectid"]

# get worker running current task
worker = dask.distributed.get_worker()
Expand Down Expand Up @@ -155,6 +153,14 @@ def process_alert(alert: Mapping, topic: str):
)

if config["misc"]["broker"]:
# winter has a different schema (fields have different names),
# so now that the alert packet has been ingested, we just add some aliases
# to avoid having to make exceptions all the time everywhere in the rest of the code
# not good memory-wise, but not worth adding if statements everywhere just for this...
alert["objectId"] = alert.get("objectid")
Theodlz marked this conversation as resolved.
Show resolved Hide resolved
alert["cutoutScience"] = alert.get("cutout_science")
alert["cutoutTemplate"] = alert.get("cutout_template")
alert["cutoutDifference"] = alert.get("cutout_difference")
# execute user-defined alert filters
with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1):
passed_filters = alert_worker.alert_filter__user_defined(
Expand Down Expand Up @@ -197,8 +203,12 @@ def __init__(self, **kwargs):
response = self.api_skyportal("GET", "/api/streams")
if response.json()["status"] == "success" and len(response.json()["data"]) > 0:
for stream in response.json()["data"]:
if "WNTR" in stream.get("name"):
self.wntr_stream_id = stream["id"]
for name_options in ["WNTR", "WINTER"]:
if (
name_options.lower().strip()
in str(stream.get("name")).lower().strip()
):
self.wntr_stream_id = stream["id"]
if self.wntr_stream_id is None:
log("Failed to get WNTR alert stream ids from SkyPortal")
raise ValueError("Failed to get WNTR alert stream ids from SkyPortal")
Expand Down Expand Up @@ -438,14 +448,14 @@ def alert_put_photometry(self, alert):
:return:
"""
with timer(
f"Making alert photometry of {alert['objectId']} {alert['candid']}",
f"Making alert photometry of {alert['objectid']} {alert['candid']}",
self.verbose > 1,
):
df_photometry = self.make_photometry(alert)

# post photometry
photometry = {
"obj_id": alert["objectId"],
"obj_id": alert["objectid"],
"stream_ids": [int(self.wntr_stream_id)],
"instrument_id": self.instrument_id,
"mjd": df_photometry["mjd"].tolist(),
Expand All @@ -462,18 +472,18 @@ def alert_put_photometry(self, alert):
len(photometry.get("fluxerr", ())) > 0
):
with timer(
f"Posting photometry of {alert['objectId']} {alert['candid']}, "
f"Posting photometry of {alert['objectid']} {alert['candid']}, "
f"stream_id={self.wntr_stream_id} to SkyPortal",
self.verbose > 1,
):
response = self.api_skyportal("PUT", "/api/photometry", photometry)
if response.json()["status"] == "success":
log(
f"Posted {alert['objectId']} photometry stream_id={self.wntr_stream_id} to SkyPortal"
f"Posted {alert['objectid']} photometry stream_id={self.wntr_stream_id} to SkyPortal"
)
else:
log(
f"Failed to post {alert['objectId']} photometry stream_id={self.wntr_stream_id} to SkyPortal"
f"Failed to post {alert['objectid']} photometry stream_id={self.wntr_stream_id} to SkyPortal"
)
log(response.json())

Expand Down Expand Up @@ -508,7 +518,11 @@ def topic_listener(
)
# init each worker with AlertWorker instance
worker_initializer = WorkerInitializer()
dask_client.register_worker_plugin(worker_initializer, name="worker-init")
try:
dask_client.register_worker_plugin(worker_initializer, name="worker-init")
except Exception as e:
log(f"Failed to register worker plugin: {e}")
log(f"Traceback: {traceback.format_exc()}")
# Configure consumer connection to Kafka broker
conf = {
"bootstrap.servers": bootstrap_servers,
Expand Down
76 changes: 67 additions & 9 deletions kowalski/tests/test_alert_broker_wntr.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import fastavro
import pytest
from copy import deepcopy
from kowalski.alert_brokers.alert_broker_winter import WNTRAlertWorker
from kowalski.config import load_config
from kowalski.log import log
Expand All @@ -20,16 +21,54 @@ def worker_fixture(request):
@pytest.fixture(autouse=True, scope="class")
def alert_fixture(request):
log("Loading a sample WNTR alert")
# candid = 2459303860002
candid = 2459362710041 # candidate with a prv_candidate field
candid = 3608694 # candidate with a prv_candidate field
request.cls.candid = candid
sample_avro = f"data/wntr_alerts/20220815/{candid}.avro"
sample_avro = f"data/wntr_alerts/20240311/{candid}.avro"
with open(sample_avro, "rb") as f:
for record in fastavro.reader(f):
request.cls.alert = record
log("Successfully loaded")


def post_alert(worker: WNTRAlertWorker, alert):
delete_alert(worker, alert)
alert, prv_candidates, fp_hists = worker.alert_mongify(alert)
# check if it already exists
if worker.mongo.db[worker.collection_alerts].count_documents(
{"candid": alert["candid"]}
):
log(f"Alert {alert['candid']} already exists, skipping")
else:
worker.mongo.insert_one(collection=worker.collection_alerts, document=alert)

if worker.mongo.db[worker.collection_alerts_aux].count_documents(
{"_id": alert["objectid"]}
):
# delete if it exists
worker.mongo.delete_one(
collection=worker.collection_alerts_aux,
document={"_id": alert["objectid"]},
)

aux = {
"_id": alert["objectid"],
"prv_candidates": prv_candidates,
"cross_matches": {},
}
worker.mongo.insert_one(collection=worker.collection_alerts_aux, document=aux)


def delete_alert(worker, alert):
worker.mongo.delete_one(
collection=worker.collection_alerts,
document={"candidate.candid": alert["candid"]},
)
worker.mongo.delete_one(
collection=worker.collection_alerts_aux,
document={"_id": alert["objectid"]},
)


class TestAlertBrokerWNTR:
"""Test individual components/methods of the WNTR alert processor used by the broker"""

Expand All @@ -38,19 +77,29 @@ def test_alert_mongification(self):
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(alert["candidate"]) > 0 # ensure cand data is not empty
assert alert["objectId"] == self.alert["objectId"]
assert alert["objectid"] == self.alert["objectid"]
assert len(prv_candidates) == 1 # 1 old candidate in prv_cand
assert prv_candidates[0]["jd"] == self.alert["prv_candidates"][0]["jd"]

def test_make_photometry(self):
df_photometry = self.worker.make_photometry(self.alert)
assert len(df_photometry) == 1
assert df_photometry["isdiffpos"][0] == 1.0
assert df_photometry["diffmaglim"][0] == 19.74010467529297
assert df_photometry["filter"][0] == "2massj"
assert df_photometry["diffmaglim"][0] == 17.6939640045166
assert df_photometry["filter"][0] == "2massh"

def test_make_thumbnails(self):
alert, _, _ = self.worker.alert_mongify(self.alert)
# just like in the alert broker, to avoid having if statement just to grab the
# winter alert packet fields that are not the same as other instruments, we first add aliases for those
assert "cutout_science" in alert
assert "cutout_template" in alert
assert "cutout_difference" in alert
assert "objectid" in alert
alert["cutoutScience"] = alert.get("cutout_science")
alert["cutoutTemplate"] = alert.get("cutout_template")
alert["cutoutDifference"] = alert.get("cutout_difference")
alert["objectId"] = alert.get("objectid")
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -73,15 +122,18 @@ def test_alert_filter__xmatch(self):

def test_alert_filter__user_defined(self):
"""Test pushing an alert through a filter"""
# prepend upstream aggregation stages:
post_alert(self.worker, self.alert)
alert = deepcopy(self.alert)

alert["objectId"] = alert["objectid"]
upstream_pipeline = config["database"]["filters"][self.worker.collection_alerts]
pipeline = upstream_pipeline + [
{
"$addFields": {
"annotations.author": "dd",
}
},
{"$project": {"_id": 0, "candid": 1, "objectId": 1, "annotations": 1}},
{"$project": {"_id": 0, "candid": 1, "objectid": 1, "annotations": 1}},
]

filter_template = {
Expand All @@ -96,6 +148,12 @@ def test_alert_filter__user_defined(self):
"pipeline": pipeline,
}
passed_filters = self.worker.alert_filter__user_defined(
[filter_template], self.alert
[filter_template], alert
)
delete_alert(self.worker, self.alert)

assert passed_filters is not None
assert len(passed_filters) == 1
assert passed_filters[0]["data"]["candid"] == self.alert["candid"]
assert passed_filters[0]["data"]["objectid"] == self.alert["objectid"]
assert passed_filters[0]["data"]["annotations"]["author"] == "dd"
10 changes: 5 additions & 5 deletions kowalski/tests/test_ingester_wntr.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ def test_ingester(self):
filter_id=program3.filter_id,
update_annotations=True,
pipeline=[
{"$match": {"objectId": "WIRC21aaaac"}}
], # there are 2 alerts in the test set for this oid
{"$match": {"objectId": "WNTR24auhaa"}}
], # there is 1 alerts in the test set for this oid
)

# create a test WNTR topic for the current UTC date
date = datetime.datetime.utcnow().strftime("%Y%m%d")
topic_name = f"winter_{date}_test"
path_alerts = "wntr_alerts/20220815"
path_alerts = "wntr_alerts/20240311"

with KafkaStream(
topic_name,
Expand All @@ -134,8 +134,8 @@ def test_ingester(self):
n_alerts_aux = mongo.db[collection_alerts_aux].count_documents({})

try:
assert n_alerts == 5
assert n_alerts_aux == 4
assert n_alerts == 11
assert n_alerts_aux == 11
print("----Passed WNTR ingester tests----")
break
except AssertionError:
Expand Down
2 changes: 0 additions & 2 deletions kowalski/tests/test_ingester_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def create(
timeout=3,
)
result = resp.json()

assert result["status"] == "success"
assert "data" in result
assert "id" in result["data"]
Expand Down Expand Up @@ -138,7 +137,6 @@ def create(
timeout=3,
)
result = resp.json()

assert result["status"] == "success"
assert "data" in result
assert "id" in result["data"]
Expand Down
Loading