From 4b6c858889200dd80ee5d102c860529c5f026fb9 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 12:54:19 +0200 Subject: [PATCH 01/55] wip, dl0 source --- pyproject.toml | 3 +++ src/ctapipe_io_zfits/__init__.py | 2 ++ src/ctapipe_io_zfits/conftest.py | 19 +++++++++++++++++++ src/ctapipe_io_zfits/dl0.py | 14 ++++++++++++++ src/ctapipe_io_zfits/tests/test_dl0.py | 4 ++++ 5 files changed, 42 insertions(+) create mode 100644 src/ctapipe_io_zfits/conftest.py create mode 100644 src/ctapipe_io_zfits/dl0.py create mode 100644 src/ctapipe_io_zfits/tests/test_dl0.py diff --git a/pyproject.toml b/pyproject.toml index 639ded9..bb6bcb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,9 @@ all = [ "ctapipe_io_zfits[test,doc,dev]", ] +[project.entry-points.ctapipe_io] +ProtozfitsDL0EventSource = "ctapipe_io_zfits.dl0:ProtozfitsDL0EventSource" + [project.urls] repository = "https://github.com/cta-observatory/ctapipe_io_zfits" diff --git a/src/ctapipe_io_zfits/__init__.py b/src/ctapipe_io_zfits/__init__.py index e5ea576..1f53a40 100644 --- a/src/ctapipe_io_zfits/__init__.py +++ b/src/ctapipe_io_zfits/__init__.py @@ -1,8 +1,10 @@ """ EventSource implementations for protozfits files """ +from .dl0 import ProtozfitsDL0EventSource from .version import __version__ __all__ = [ "__version__", + "ProtozfitsDL0EventSource", ] diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py new file mode 100644 index 0000000..70c9741 --- /dev/null +++ b/src/ctapipe_io_zfits/conftest.py @@ -0,0 +1,19 @@ +from datetime import datetime + +import pytest + +acada_user = "ctao-acada-n" +date = datetime(2023, 8, 1) + + +@pytest.fixture(scope="session") +def acada_base(tmp_path_factory): + return tmp_path_factory.mkdir("acada_base") + + +@pytest.fixture(scope="session") +def dl0_structure(acada_base): + dl0 = acada_base / "DL0" + dl0.mkdir(exist_ok=True) + + return dl0 diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py new file mode 100644 index 0000000..55f3f2f --- /dev/null +++ b/src/ctapipe_io_zfits/dl0.py @@ -0,0 +1,14 @@ +""" +DL0 Protozfits EventSource +""" +from ctapipe.io import EventSource + +__all__ = [ + "ProtozfitsDL0EventSource", +] + + +class ProtozfitsDL0EventSource(EventSource): + """ + DL0 Protozfits EventSource. + """ diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py new file mode 100644 index 0000000..e615c58 --- /dev/null +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -0,0 +1,4 @@ +def test_is_compatible(dummy_dl0): + from ctapipe_io_zfits import ProtozfitsDL0EventSource + + ProtozfitsDL0EventSource.is_compatible() From 7dc8e740e8f6f3c953ba724815497283e91cfc1a Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 16:14:37 +0200 Subject: [PATCH 02/55] Add test using dummy dl0 file --- src/ctapipe_io_zfits/conftest.py | 115 ++++++++++++++++++++++++- src/ctapipe_io_zfits/tests/test_dl0.py | 2 +- src/ctapipe_io_zfits/time.py | 57 ++++++++++++ 3 files changed, 169 insertions(+), 5 deletions(-) create mode 100644 src/ctapipe_io_zfits/time.py diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index 70c9741..52c9933 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -1,19 +1,126 @@ -from datetime import datetime +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo +from astropy.time import Time +import astropy.units as u +import numpy as np import pytest +from protozfits import ProtobufZOFits +from protozfits.CTA_DL0_Subarray_pb2 import DataStream, Event +from protozfits.CoreMessages_pb2 import AnyArray + +from ctapipe_io_zfits.time import time_to_cta_high + +ANY_ARRAY_TYPE_TO_NUMPY_TYPE = { + 1: np.int8, + 2: np.uint8, + 3: np.int16, + 4: np.uint16, + 5: np.int32, + 6: np.uint32, + 7: np.int64, + 8: np.uint64, + 9: np.float32, + 10: np.float64, +} + +DTYPE_TO_ANYARRAY_TYPE = {v: k for k, v in ANY_ARRAY_TYPE_TO_NUMPY_TYPE.items()} acada_user = "ctao-acada-n" -date = datetime(2023, 8, 1) +obs_start = Time("2023-08-02T02:15:31") +timezone_canary = ZoneInfo("Atlantic/Canary") + +array_elements = { + 1: "LSTN-01", +} + + +def to_anyarray(array): + type_ = DTYPE_TO_ANYARRAY_TYPE[array.dtype.type] + return AnyArray(type=type_, data=array.tobytes()) + + +def evening_of_obs(time, tz): + dt = time.to_datetime(timezone.utc).astimezone(tz) + if dt.hour < 12: + return (dt - timedelta(days=1)).date() + + return dt.date() @pytest.fixture(scope="session") def acada_base(tmp_path_factory): - return tmp_path_factory.mkdir("acada_base") + return tmp_path_factory.mktemp("acada_base_") @pytest.fixture(scope="session") -def dl0_structure(acada_base): +def dl0_base(acada_base): + """DL0 Directory Structure. + + See Table 6 of the ACADA-DPPS ICD. + """ dl0 = acada_base / "DL0" dl0.mkdir(exist_ok=True) + lst_base = dl0 / array_elements[1] / acada_user / "acada-adh" + lst_events = lst_base / "events" + lst_monitoring = lst_base / "monitoring" + array_triggers = dl0 / "array" / acada_user / "acada-adh" / "triggers" + array_monitoring = dl0 / "array" / acada_user / "acada-adh" / "monitoring" + + evening = evening_of_obs(obs_start, timezone_canary) + + for directory in (lst_events, lst_monitoring, array_triggers, array_monitoring): + date_path = directory / f"{evening:%Y/%m/%d}" + date_path.mkdir(exist_ok=True, parents=True) + return dl0 + + +@pytest.fixture(scope="session") +def dummy_dl0(dl0_base): + trigger_dir = dl0_base / "array" / acada_user / "acada-adh/triggers/2023/08/01/" + subarray_id = 1 + sb_id = 123 + obs_id = 456 + producer_id = 1 # what is this? + sb_creator_id = 1 + + filename = f"SUB{subarray_id:03d}_SWAT001_{obs_start.to_datetime(timezone.utc):%Y%m%dT%H%M%S}_SBID{sb_id:019d}_OBSID{obs_id:019d}_SUBARRAY_CHUNK000.fits.fz" + path = trigger_dir / filename + + + data_stream = DataStream( + subarray_id=subarray_id, + sb_id=sb_id, + obs_id=obs_id, + producer_id=producer_id, + sb_creator_id=sb_creator_id, + ) + + time = obs_start + rate = 8000 + rng = np.random.default_rng(0) + + with ProtobufZOFits(n_tiles=5, rows_per_tile=20, compression_block_size_kb=64 * 1024) as trigger_file: + trigger_file.open(str(path)) + trigger_file.move_to_new_table("DataStream") + trigger_file.write_message(data_stream) + + trigger_file.move_to_new_table("Events") + time = time + rng.exponential(1 / rate) * u.s + time_s, time_qns = time_to_cta_high(time) + + for event_id in range(100): + trigger_file.write_message(Event( + event_id=event_id, + trigger_type=1, + sb_id=sb_id, + obs_id=obs_id, + event_time_s=int(time_s), + event_time_qns=int(time_qns), + trigger_ids=to_anyarray(np.array([event_id])), + tel_ids=to_anyarray(np.array([1])), + )) + + return path diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index e615c58..7ca93f3 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -1,4 +1,4 @@ def test_is_compatible(dummy_dl0): from ctapipe_io_zfits import ProtozfitsDL0EventSource - ProtozfitsDL0EventSource.is_compatible() + assert ProtozfitsDL0EventSource.is_compatible(dummy_dl0) diff --git a/src/ctapipe_io_zfits/time.py b/src/ctapipe_io_zfits/time.py new file mode 100644 index 0000000..b9fbf5a --- /dev/null +++ b/src/ctapipe_io_zfits/time.py @@ -0,0 +1,57 @@ +""" +Functions handling time, mainly conversion of CTA timestamps to astropy +""" +from astropy.time import Time +import numpy as np + + +EPOCH = Time(0, format='unix_tai', scale='tai') +DAY_TO_S = 86400 +CENTRAL_MODULE = 132 +S_TO_QNS = 4e9 +QNS_TO_S = 0.25e-9 + + +def cta_high_res_to_time(seconds, quarter_nanoseconds): + '''Convert cta high resolution timestamp to astropy Time''' + # unix_tai accepts two floats for maximum precision + # we can just pass integral and fractional part + fractional_seconds = quarter_nanoseconds * QNS_TO_S + return Time( + seconds, + fractional_seconds, + format='unix_tai', + # this is only for displaying iso timestamp, not any actual precision + precision=9, + ) + + +def to_seconds(days): + '''Returns whole and fractional seconds from a number in days''' + seconds = days * DAY_TO_S + return np.divmod(seconds, 1) + + +def time_to_cta_high(time): + '''Convert astropy Time to cta high precision timestamp''' + # make sure we are in TAI + time = time.tai + + # internally, astropy always uses jd values + # jd1 is integral and jd2 is in [-0.5, 0.5] + # we get the integral and fractional seconds from both jd values + # relative to the epoch + seconds_jd1, fractional_seconds_jd1 = to_seconds(time.jd1 - EPOCH.jd1) + seconds_jd2, fractional_seconds_jd2 = to_seconds(time.jd2 - EPOCH.jd2) + + # add up the integral number of seconds + seconds = seconds_jd1 + seconds_jd2 + + # convert fractional seconds to quarter nanoseconds + fractional_seconds = fractional_seconds_jd1 + fractional_seconds_jd2 + quarter_nanoseconds = np.round(fractional_seconds * S_TO_QNS) + + # convert to uint32 + seconds = np.asanyarray(seconds, dtype=np.uint32) + quarter_nanoseconds = np.asanyarray(quarter_nanoseconds, dtype=np.uint32) + return seconds, quarter_nanoseconds From dd4c03f0c3c2c92155ce2788401d1fdd270dcd1a Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 16:32:55 +0200 Subject: [PATCH 03/55] Implement is_compatible --- src/ctapipe_io_zfits/dl0.py | 55 +++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 55f3f2f..5566a4e 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -2,13 +2,68 @@ DL0 Protozfits EventSource """ from ctapipe.io import EventSource +import logging +from contextlib import ExitStack __all__ = [ "ProtozfitsDL0EventSource", ] +log = logging.getLogger(__name__) + class ProtozfitsDL0EventSource(EventSource): """ DL0 Protozfits EventSource. + + The ``input_url`` must be the subarray trigger file, the source + will then look for the other data files according to the filename and + directory schema layed out in the draft of the ACADA - DPPS ICD. """ + + @classmethod + def is_compatible(cls, input_url): + from astropy.io import fits + + # this allows us to just use try/except around the opening of the fits file, + stack = ExitStack() + + with stack: + try: + hdul = stack.enter_context(fits.open(input_url)) + except Exception as e: + log.debug(f"Error trying to open input file as fits: {e}") + return False + + if "DataStream" not in hdul: + log.debug("FITS file does not contain a DataStream HDU, returning False") + return False + + if "Events" not in hdul: + log.debug("FITS file does not contain an Events HDU, returning False") + return False + + header = hdul["Events"].header + + if header["XTENSION"] != "BINTABLE": + log.debug(f"Events HDU is not a bintable") + return False + + if not header.get("ZTABLE", False): + log.debug(f"ZTABLE is not in header or False") + return False + + if header.get("ORIGIN", "") != "CTA": + log.debug("ORIGIN != CTA") + return False + + proto_class = header.get("PBFHEAD") + if proto_class is None: + log.debug("Missing PBFHEAD key") + return False + + if proto_class != "DL0v1.Subarray.Event": + log.debug(f"Unsupported PBDHEAD: {proto_class}") + return False + + return True From 3c3bba4dad801c282390979086dd6316c8d8474f Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 16:34:54 +0200 Subject: [PATCH 04/55] Add test using EventSource --- src/ctapipe_io_zfits/tests/test_dl0.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index 7ca93f3..57e5efc 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -1,4 +1,13 @@ +from ctapipe.io import EventSource + + def test_is_compatible(dummy_dl0): from ctapipe_io_zfits import ProtozfitsDL0EventSource assert ProtozfitsDL0EventSource.is_compatible(dummy_dl0) + + +def test_is_valid_eventsource(dummy_dl0): + from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource + + assert isinstance(EventSource(dummy_dl0), ProtozfitsDL0EventSource) From b20eccb3d9181a6b33a5381bd20f8f33f4339f94 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 17:11:30 +0200 Subject: [PATCH 05/55] Implement minimal EventSource API --- src/ctapipe_io_zfits/dl0.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 5566a4e..3db1f23 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -5,6 +5,12 @@ import logging from contextlib import ExitStack +from typing import Tuple, Dict + +from ctapipe.containers import ObservationBlockContainer, SchedulingBlockContainer +from ctapipe.io import DataLevel +from ctapipe.instrument import SubarrayDescription + __all__ = [ "ProtozfitsDL0EventSource", ] @@ -20,6 +26,34 @@ class ProtozfitsDL0EventSource(EventSource): will then look for the other data files according to the filename and directory schema layed out in the draft of the ACADA - DPPS ICD. """ + def __init__(self, input_url, **kwargs): + super().__init__(input_url=input_url, **kwargs) + self._subarray = None + self._observation_blocks = {} + self._scheduling_blocks = {} + + @property + def is_simulation(self) -> bool: + return False + + @property + def datalevels(self) -> Tuple[DataLevel]: + return (DataLevel.DL0, ) + + @property + def subarray(self) -> SubarrayDescription: + return self._subarray + + @property + def observation_blocks(self) -> Dict[int, ObservationBlockContainer]: + return self._observation_blocks + + @property + def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]: + return self._scheduling_blocks + + def _generator(self): + pass @classmethod def is_compatible(cls, input_url): @@ -67,3 +101,4 @@ def is_compatible(cls, input_url): return False return True + From a2c9bc5f7e2721d3cf6628a9677c8e1dac400248 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 17:14:13 +0200 Subject: [PATCH 06/55] Add basic event iteration test --- src/ctapipe_io_zfits/conftest.py | 2 +- src/ctapipe_io_zfits/tests/test_dl0.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index 52c9933..61a8658 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -111,7 +111,7 @@ def dummy_dl0(dl0_base): time = time + rng.exponential(1 / rate) * u.s time_s, time_qns = time_to_cta_high(time) - for event_id in range(100): + for event_id in range(1, 101): trigger_file.write_message(Event( event_id=event_id, trigger_type=1, diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index 57e5efc..2331437 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -11,3 +11,18 @@ def test_is_valid_eventsource(dummy_dl0): from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource assert isinstance(EventSource(dummy_dl0), ProtozfitsDL0EventSource) + + +def test_subarray_events(dummy_dl0): + from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource + + with EventSource(dummy_dl0) as source: + n_read = 0 + for array_event in source: + assert array_event.index.obs_id == 456 + assert array_event.index.event_id == n_read + 1 + n_read += 1 + + assert n_read == 100 + + assert isinstance(EventSource(dummy_dl0), ProtozfitsDL0EventSource) From 4486e8a17269c407150056fb0b09174336221b36 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 17:25:15 +0200 Subject: [PATCH 07/55] Implement basic event iteration (needs new protozfits to read DataStream object) --- src/ctapipe_io_zfits/dl0.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 3db1f23..18ef892 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -10,6 +10,11 @@ from ctapipe.containers import ObservationBlockContainer, SchedulingBlockContainer from ctapipe.io import DataLevel from ctapipe.instrument import SubarrayDescription +from ctapipe.io.eventsource import ArrayEventContainer +from ctapipe.io.simteleventsource import EventIndexContainer +from eventio.simtel.simtelfile import ArrayEvent +from numpy import array +from protozfits import File __all__ = [ "ProtozfitsDL0EventSource", @@ -31,6 +36,10 @@ def __init__(self, input_url, **kwargs): self._subarray = None self._observation_blocks = {} self._scheduling_blocks = {} + self._subarray_trigger_file = File(str(input_url)) + + def close(self): + self._subarray_trigger_file.close() @property def is_simulation(self) -> bool: @@ -53,7 +62,15 @@ def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]: return self._scheduling_blocks def _generator(self): - pass + for subarray_trigger in self._subarray_trigger_file.Events: + array_event = ArrayEventContainer( + index=EventIndexContainer( + obs_id=subarray_trigger.obs_id, + event_id=subarray_trigger.event_id) + ) + + yield array_event + @classmethod def is_compatible(cls, input_url): From dd50ad39a9faa41dd18ad08dacdd6a04f02858c9 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 18:10:48 +0200 Subject: [PATCH 08/55] Test trigger time and tels_with_trigger --- src/ctapipe_io_zfits/conftest.py | 4 +--- src/ctapipe_io_zfits/tests/test_dl0.py | 6 ++++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index 61a8658..d723477 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -99,8 +99,6 @@ def dummy_dl0(dl0_base): ) time = obs_start - rate = 8000 - rng = np.random.default_rng(0) with ProtobufZOFits(n_tiles=5, rows_per_tile=20, compression_block_size_kb=64 * 1024) as trigger_file: trigger_file.open(str(path)) @@ -108,7 +106,7 @@ def dummy_dl0(dl0_base): trigger_file.write_message(data_stream) trigger_file.move_to_new_table("Events") - time = time + rng.exponential(1 / rate) * u.s + time = time + 0.001 * u.s time_s, time_qns = time_to_cta_high(time) for event_id in range(1, 101): diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index 2331437..65e3dcf 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -1,4 +1,6 @@ from ctapipe.io import EventSource +from astropy.time import Time +import astropy.units as u def test_is_compatible(dummy_dl0): @@ -16,11 +18,15 @@ def test_is_valid_eventsource(dummy_dl0): def test_subarray_events(dummy_dl0): from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource + obs_start = Time("2023-08-02T02:15:31") + with EventSource(dummy_dl0) as source: n_read = 0 for array_event in source: assert array_event.index.obs_id == 456 assert array_event.index.event_id == n_read + 1 + assert array_event.trigger.time == obs_start + n_read * 0.001 * u.s + assert array_event.trigger.tels_with_trigger == [1, ] n_read += 1 assert n_read == 100 From 68b483db584c9c854db97f8dda3599c9168adf6c Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 18:24:13 +0200 Subject: [PATCH 09/55] Implement reading of subarray trigger time and tel_ids --- src/ctapipe_io_zfits/conftest.py | 5 +++-- src/ctapipe_io_zfits/dl0.py | 25 +++++++++++++++++++------ src/ctapipe_io_zfits/tests/test_dl0.py | 10 +++++++--- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index d723477..05c497d 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -106,10 +106,9 @@ def dummy_dl0(dl0_base): trigger_file.write_message(data_stream) trigger_file.move_to_new_table("Events") - time = time + 0.001 * u.s - time_s, time_qns = time_to_cta_high(time) for event_id in range(1, 101): + time_s, time_qns = time_to_cta_high(time) trigger_file.write_message(Event( event_id=event_id, trigger_type=1, @@ -121,4 +120,6 @@ def dummy_dl0(dl0_base): tel_ids=to_anyarray(np.array([1])), )) + time = time + 0.001 * u.s + return path diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 18ef892..4ee8f7c 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -7,15 +7,20 @@ from typing import Tuple, Dict -from ctapipe.containers import ObservationBlockContainer, SchedulingBlockContainer +from ctapipe.containers import ( + ArrayEventContainer, + EventIndexContainer, + ObservationBlockContainer, + SchedulingBlockContainer, + TriggerContainer, +) from ctapipe.io import DataLevel from ctapipe.instrument import SubarrayDescription -from ctapipe.io.eventsource import ArrayEventContainer -from ctapipe.io.simteleventsource import EventIndexContainer -from eventio.simtel.simtelfile import ArrayEvent -from numpy import array + from protozfits import File +from .time import cta_high_res_to_time + __all__ = [ "ProtozfitsDL0EventSource", ] @@ -66,7 +71,15 @@ def _generator(self): array_event = ArrayEventContainer( index=EventIndexContainer( obs_id=subarray_trigger.obs_id, - event_id=subarray_trigger.event_id) + event_id=subarray_trigger.event_id + ), + trigger=TriggerContainer( + time=cta_high_res_to_time( + subarray_trigger.event_time_s, + subarray_trigger.event_time_qns + ), + tels_with_trigger=subarray_trigger.tel_ids.tolist(), + ) ) yield array_event diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index 65e3dcf..03d0286 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -1,6 +1,7 @@ -from ctapipe.io import EventSource from astropy.time import Time import astropy.units as u +from ctapipe.io import EventSource +import numpy as np def test_is_compatible(dummy_dl0): @@ -18,16 +19,19 @@ def test_is_valid_eventsource(dummy_dl0): def test_subarray_events(dummy_dl0): from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource - obs_start = Time("2023-08-02T02:15:31") + time = Time("2023-08-02T02:15:31") with EventSource(dummy_dl0) as source: n_read = 0 for array_event in source: assert array_event.index.obs_id == 456 assert array_event.index.event_id == n_read + 1 - assert array_event.trigger.time == obs_start + n_read * 0.001 * u.s + dt = np.abs(array_event.trigger.time - time).to(u.ns) + assert dt < 0.2 * u.ns assert array_event.trigger.tels_with_trigger == [1, ] + n_read += 1 + time = time + 0.001 * u.s assert n_read == 100 From d13138b4ca1f5f97a061f3c72983bd4baec5b12f Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 18:37:28 +0200 Subject: [PATCH 10/55] Fix python versions in CI --- .github/workflows/ci.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 37039df..0698809 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,10 +35,6 @@ jobs: strategy: matrix: include: - # linux builds - - python-version: "3.8" - os: ubuntu-latest - - python-version: "3.9" os: ubuntu-latest @@ -101,7 +97,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.9" - name: Install doc dependencies run: | From 6cf26901817d872bca0fb8ea7dcb1d520ff85459 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 30 Aug 2023 18:59:39 +0200 Subject: [PATCH 11/55] Fill basic ob and sb container --- src/ctapipe_io_zfits/dl0.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 4ee8f7c..8ff8d2f 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -38,10 +38,20 @@ class ProtozfitsDL0EventSource(EventSource): """ def __init__(self, input_url, **kwargs): super().__init__(input_url=input_url, **kwargs) - self._subarray = None - self._observation_blocks = {} - self._scheduling_blocks = {} self._subarray_trigger_file = File(str(input_url)) + self._subarray_trigger_stream = self._subarray_trigger_file.DataStream[0] + + self._subarray = None + + obs_id = self._subarray_trigger_stream.obs_id + sb_id = self._subarray_trigger_stream.sb_id + + self._observation_blocks = { + obs_id: ObservationBlockContainer(obs_is=obs_id, sb_id=sb_id) + } + self._scheduling_blocks = { + sb_id: SchedulingBlockContainer(sb_id=sb_id) + } def close(self): self._subarray_trigger_file.close() From 39e4ea7c51ef7d96ebb146e985aa5c0cc0875f8e Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Thu, 31 Aug 2023 10:44:58 +0200 Subject: [PATCH 12/55] Unify time function names, add test --- src/ctapipe_io_zfits/conftest.py | 4 +-- src/ctapipe_io_zfits/tests/test_time.py | 37 +++++++++++++++++++++++++ src/ctapipe_io_zfits/time.py | 2 +- 3 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 src/ctapipe_io_zfits/tests/test_time.py diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index 05c497d..66e8704 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -9,7 +9,7 @@ from protozfits.CTA_DL0_Subarray_pb2 import DataStream, Event from protozfits.CoreMessages_pb2 import AnyArray -from ctapipe_io_zfits.time import time_to_cta_high +from ctapipe_io_zfits.time import time_to_cta_high_res ANY_ARRAY_TYPE_TO_NUMPY_TYPE = { 1: np.int8, @@ -108,7 +108,7 @@ def dummy_dl0(dl0_base): trigger_file.move_to_new_table("Events") for event_id in range(1, 101): - time_s, time_qns = time_to_cta_high(time) + time_s, time_qns = time_to_cta_high_res(time) trigger_file.write_message(Event( event_id=event_id, trigger_type=1, diff --git a/src/ctapipe_io_zfits/tests/test_time.py b/src/ctapipe_io_zfits/tests/test_time.py new file mode 100644 index 0000000..958036b --- /dev/null +++ b/src/ctapipe_io_zfits/tests/test_time.py @@ -0,0 +1,37 @@ +from astropy.time import Time +import astropy.units as u +import numpy as np +import pytest + +roundtrip_times = [ + '2020-01-01T00:00:00.12345678925', + '2020-01-01T00:00:00.1234567895', + '2020-01-01T00:00:00.12345678975', + '2020-01-01T00:00:00.123456790', +] + +@pytest.mark.parametrize("timestamp", roundtrip_times) +def test_cta_high_round_trip(timestamp): + from ctapipe_io_zfits.time import time_to_cta_high_res, cta_high_res_to_time + + # note: precision=9 only affects text conversion, not actual precision + time = Time(timestamp, scale='tai', precision=9) + seconds, quarter_nanoseconds = time_to_cta_high_res(time) + time_back = cta_high_res_to_time(seconds, quarter_nanoseconds) + + roundtrip_error = (time - time_back).to_value(u.ns) + np.testing.assert_almost_equal(roundtrip_error, 0.0) + + +test_data = [ + (Time(0, 12.25e-9, format='unix_tai'), 0, 49), + (Time(12345, 12.25e-9, format='unix_tai'), 12345, 49), +] + +@pytest.mark.parametrize("time,expected_s,expected_qns", test_data) +def test_cta_time_to_cta_high_res(time, expected_s, expected_qns): + from ctapipe_io_zfits.time import time_to_cta_high_res + + seconds, quarter_nanoseconds = time_to_cta_high_res(time) + assert seconds == expected_s + assert quarter_nanoseconds == expected_qns diff --git a/src/ctapipe_io_zfits/time.py b/src/ctapipe_io_zfits/time.py index b9fbf5a..0b09c86 100644 --- a/src/ctapipe_io_zfits/time.py +++ b/src/ctapipe_io_zfits/time.py @@ -32,7 +32,7 @@ def to_seconds(days): return np.divmod(seconds, 1) -def time_to_cta_high(time): +def time_to_cta_high_res(time): '''Convert astropy Time to cta high precision timestamp''' # make sure we are in TAI time = time.tai From 833da4fb48d4d842277a5192011c46c9a68684c1 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Thu, 31 Aug 2023 18:54:14 +0200 Subject: [PATCH 13/55] Write basic event files in test config --- src/ctapipe_io_zfits/conftest.py | 102 +++++++++++++++++++++++++++---- src/ctapipe_io_zfits/dl0.py | 2 +- 2 files changed, 92 insertions(+), 12 deletions(-) diff --git a/src/ctapipe_io_zfits/conftest.py b/src/ctapipe_io_zfits/conftest.py index 66e8704..2a5563f 100644 --- a/src/ctapipe_io_zfits/conftest.py +++ b/src/ctapipe_io_zfits/conftest.py @@ -1,3 +1,4 @@ +from contextlib import ExitStack from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo @@ -6,8 +7,10 @@ import numpy as np import pytest from protozfits import ProtobufZOFits -from protozfits.CTA_DL0_Subarray_pb2 import DataStream, Event +from protozfits import CTA_DL0_Subarray_pb2 as DL0_Subarray +from protozfits import CTA_DL0_Telescope_pb2 as DL0_Telescope from protozfits.CoreMessages_pb2 import AnyArray +from ctapipe.containers import EventType from ctapipe_io_zfits.time import time_to_cta_high_res @@ -80,36 +83,93 @@ def dl0_base(acada_base): @pytest.fixture(scope="session") def dummy_dl0(dl0_base): trigger_dir = dl0_base / "array" / acada_user / "acada-adh/triggers/2023/08/01/" + lst_event_dir = dl0_base / "LSTN-01" / acada_user / "acada-adh/events/2023/08/01/" subarray_id = 1 sb_id = 123 obs_id = 456 producer_id = 1 # what is this? sb_creator_id = 1 + sdh_ids = (1, 2, 3, 4) - filename = f"SUB{subarray_id:03d}_SWAT001_{obs_start.to_datetime(timezone.utc):%Y%m%dT%H%M%S}_SBID{sb_id:019d}_OBSID{obs_id:019d}_SUBARRAY_CHUNK000.fits.fz" - path = trigger_dir / filename + obs_start_path_string = f"{obs_start.to_datetime(timezone.utc):%Y%m%dT%H%M%S}" + filename = f"SUB{subarray_id:03d}_SWAT001_{obs_start_path_string}_SBID{sb_id:019d}_OBSID{obs_id:019d}_SUBARRAY_CHUNK000.fits.fz" + # sdh_id and chunk_id will be filled later -> double {{}} + lst_event_pattern = f"TEL001_SDH{{sdh_id:03d}}_{obs_start_path_string}_SBID{sb_id:019d}_OBSID{obs_id:019d}_TEL_SHOWER_CHUNK{{chunk_id:03d}}.fits.fz" + trigger_path = trigger_dir / filename - data_stream = DataStream( + subarray_data_stream = DL0_Subarray.DataStream( subarray_id=subarray_id, sb_id=sb_id, obs_id=obs_id, producer_id=producer_id, sb_creator_id=sb_creator_id, ) + lst_data_stream = DL0_Telescope.DataStream( + tel_id=1, + sb_id=sb_id, + obs_id=obs_id, + waveform_scale=80, + waveform_offset=400, + sb_creator_id=sb_creator_id, + ) + camera_configuration = DL0_Telescope.CameraConfiguration( + tel_id=1, + local_run_id=789, + config_time_s=obs_start.unix, + camera_config_id=47, + pixel_id_map=to_anyarray(np.arange(1855)), + module_id_map=to_anyarray(np.arange(265)), + num_pixels=1855, + num_channels=2, + num_samples_nominal=40, + num_samples_long=0, + num_modules=265, + sampling_frequncy=1024, + ) time = obs_start - with ProtobufZOFits(n_tiles=5, rows_per_tile=20, compression_block_size_kb=64 * 1024) as trigger_file: - trigger_file.open(str(path)) + ctx = ExitStack() + proto_kwargs = dict(n_tiles=5, rows_per_tile=20, compression_block_size_kb=64 * 1024) + + chunksize = 10 + events_written = {sdh_id: 0 for sdh_id in sdh_ids} + current_chunk = {sdh_id: -1 for sdh_id in sdh_ids} + lst_event_files = {} + + def open_next_event_file(sdh_id): + if sdh_id in lst_event_files: + lst_event_files[sdh_id].close() + + current_chunk[sdh_id] += 1 + chunk_id = current_chunk[sdh_id] + path = lst_event_dir / lst_event_pattern.format(sdh_id=sdh_id, chunk_id=chunk_id) + f = ctx.enter_context(ProtobufZOFits(**proto_kwargs)) + f.open(str(path)) + f.move_to_new_table("DataStream") + f.write_message(lst_data_stream) + f.move_to_new_table("CameraConfiguration") + f.write_message(camera_configuration) + f.move_to_new_table("Events") + lst_event_files[sdh_id] = f + events_written[sdh_id] = 0 + + with ctx: + trigger_file = ctx.enter_context(ProtobufZOFits(**proto_kwargs)) + trigger_file.open(str(trigger_path)) trigger_file.move_to_new_table("DataStream") - trigger_file.write_message(data_stream) - + trigger_file.write_message(subarray_data_stream) trigger_file.move_to_new_table("Events") - for event_id in range(1, 101): + for sdh_id in sdh_ids: + open_next_event_file(sdh_id) + + for i in range(100): + event_id = i + 1 time_s, time_qns = time_to_cta_high_res(time) - trigger_file.write_message(Event( + + trigger_file.write_message(DL0_Subarray.Event( event_id=event_id, trigger_type=1, sb_id=sb_id, @@ -120,6 +180,26 @@ def dummy_dl0(dl0_base): tel_ids=to_anyarray(np.array([1])), )) + + sdh_id = sdh_ids[i % len(sdh_ids)] + # TODO: randomize event to test actually parsing it + lst_event_files[sdh_id].write_message(DL0_Telescope.Event( + event_id=event_id, + tel_id=camera_configuration.tel_id, + event_type=EventType.SUBARRAY.value, + event_time_s=int(time_s), + event_time_qns=int(time_qns), + # identified as signal, low gain stored, high gain stored + pixel_status=to_anyarray(np.full(1855, 0b00001101, dtype=np.uint8)), + waveform=to_anyarray(np.full((2, 1855, 40), 400, dtype=np.uint16)), + num_channels=2, + num_samples=40, + num_pixels_survived=1855, + )) + events_written[sdh_id] += 1 + if events_written[sdh_id] >= chunksize: + open_next_event_file(sdh_id) + time = time + 0.001 * u.s - return path + return trigger_path diff --git a/src/ctapipe_io_zfits/dl0.py b/src/ctapipe_io_zfits/dl0.py index 8ff8d2f..3e99562 100644 --- a/src/ctapipe_io_zfits/dl0.py +++ b/src/ctapipe_io_zfits/dl0.py @@ -47,7 +47,7 @@ def __init__(self, input_url, **kwargs): sb_id = self._subarray_trigger_stream.sb_id self._observation_blocks = { - obs_id: ObservationBlockContainer(obs_is=obs_id, sb_id=sb_id) + obs_id: ObservationBlockContainer(obs_id=obs_id, sb_id=sb_id) } self._scheduling_blocks = { sb_id: SchedulingBlockContainer(sb_id=sb_id) From 831ae36e7dfe59e96115a3d60cd6caf668a17fbf Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Thu, 31 Aug 2023 18:57:40 +0200 Subject: [PATCH 14/55] Add test for dl0 telescope data --- src/ctapipe_io_zfits/tests/test_dl0.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/ctapipe_io_zfits/tests/test_dl0.py b/src/ctapipe_io_zfits/tests/test_dl0.py index 03d0286..f413eac 100644 --- a/src/ctapipe_io_zfits/tests/test_dl0.py +++ b/src/ctapipe_io_zfits/tests/test_dl0.py @@ -35,4 +35,23 @@ def test_subarray_events(dummy_dl0): assert n_read == 100 + +def test_subarray_events(dummy_dl0): + from ctapipe_io_zfits.dl0 import ProtozfitsDL0EventSource + + with EventSource(dummy_dl0) as source: + n_read = 0 + for array_event in source: + assert 1 in array_event.dl0.tel + + dl0_tel = array_event.dl0.tel[1] + assert dl0_tel.waveform is not None + assert dl0_tel.waveform.shape == (2, 1855, 40) + assert dl0_tel.pixel_status is not None + assert dl0_tel.pixel_status.shape == (1855, ) + assert dl0_tel.pixel_status.dtype == np.uint8 + + + assert n_read == 100 + assert isinstance(EventSource(dummy_dl0), ProtozfitsDL0EventSource) From 381f183bc8d81c17084864b25819f914da48aa82 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Fri, 1 Sep 2023 13:35:40 +0200 Subject: [PATCH 15/55] Add parsing of ICD filenames --- src/ctapipe_io_zfits/paths.py | 69 +++ src/ctapipe_io_zfits/tests/test_paths.py | 567 +++++++++++++++++++++++ 2 files changed, 636 insertions(+) create mode 100644 src/ctapipe_io_zfits/paths.py create mode 100644 src/ctapipe_io_zfits/tests/test_paths.py diff --git a/src/ctapipe_io_zfits/paths.py b/src/ctapipe_io_zfits/paths.py new file mode 100644 index 0000000..32b5836 --- /dev/null +++ b/src/ctapipe_io_zfits/paths.py @@ -0,0 +1,69 @@ +import re +from dataclasses import dataclass +from datetime import date, datetime +from pathlib import Path +from typing import Optional, Union + + +@dataclass +class FileNameInfo: + system_name: str + system_id: Optional[int] = None + subarray_id: Optional[int] = None + ae_type: Optional[str] = None + ae_id: Optional[int] = None + sb_id: Optional[int] = None + obs_id: Optional[int] = None + type_: Optional[str] = None + subtype: Optional[str] = None + chunk_id: Optional[int] = None + file_id: Optional[int] = None + suffix: Optional[str] = None + timestamp: Union[datetime, date, None] = None + + +#: regex to match filenames according to the ACADA DPPS ICD naming pattern +FILENAME_RE = re.compile( + r"(?:(?:SUB(?P\d+))|(?:(?PTEL|AUX)(?P\d+)))" + r"(?:_(?P[A-Z]+)(?P\d+)?)" + r"(?:_(?P[0-9]{8})(?:T(?P