Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement per collection age-based cleanup #55

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions etc/msg_oods.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ ingester:
repoDirectory : /tmp/repo/LATISS
collections:
- LATISS/raw/all
cleanCollections:
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
seconds: 30
scanInterval:
<<: *interval
seconds: 10
filesOlderThan:
<<: *interval
seconds: 30
batchSize: 20
24 changes: 19 additions & 5 deletions python/lsst/ctrl/oods/butlerAttendant.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ def __init__(self, config, csc=None):
repo = self.config["repoDirectory"]
self.instrument = self.config["instrument"]
self.scanInterval = self.config["scanInterval"]
self.olderThan = self.config["filesOlderThan"]
self.collections = self.config["collections"]
self.cleanCollections = self.config.get("cleanCollections", None)
self.cleanCollections = self.config.get("cleanCollections")

LOGGER.info(f"Using Butler repo located at {repo}")
self.butlerConfig = repo
Expand Down Expand Up @@ -206,15 +205,31 @@ async def clean_task(self):
await asyncio.sleep(seconds)

async def clean(self):
"""Remove all the datasets in the butler that
were ingested before the configured time interval
"""
for entry in self.cleanCollections:
collection = entry["collection"]
olderThan = entry["filesOlderThan"]
await self.cleanCollection(collection, olderThan)

async def cleanCollection(self, collection, olderThan):
"""Remove all the datasets in the butler that
were ingested before the configured Interval

Parameters
----------
collection: `str`
collection to clean up
olderThan: `dict`
time interval
"""

await asyncio.sleep(0)
# calculate the time value which is Time.now - the
# "olderThan" configuration
t = Time.now()
interval = collections.namedtuple("Interval", self.olderThan.keys())(*self.olderThan.values())
interval = collections.namedtuple("Interval", olderThan.keys())(*olderThan.values())
td = TimeDelta(
interval.days * u.d + interval.hours * u.h + interval.minutes * u.min + interval.seconds * u.s
)
Expand All @@ -230,11 +245,10 @@ async def clean(self):

LOGGER.info("about to call queryDatasets")
# get all datasets in these collections
allCollections = self.collections if self.cleanCollections is None else self.cleanCollections
all_datasets = set(
butler.registry.queryDatasets(
datasetType=...,
collections=allCollections,
collections=[collection],
where="ingest_date < ref_date",
bind={"ref_date": t},
)
Expand Down
13 changes: 8 additions & 5 deletions tests/etc/cc_oods_multi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ ingester:
collections:
- FARG/raw/all
cleanCollections:
- FARG/raw/all
- FARG/runs/quickLook
- collection: FARG/raw/all
filesOlderThan:
<<: *interval
days: 30
- collection: FARG/runs/quickLook
filesOlderThan:
<<: *interval
days: 30
instrument: lsst.obs.lsst.LsstComCam
class:
import : lsst.ctrl.oods.fileAttendant
Expand All @@ -29,9 +35,6 @@ ingester:
scanInterval:
<<: *interval
minutes: 1
filesOlderThan:
<<: *interval
days: 30

batchSize: 20
scanInterval:
Expand Down
55 changes: 55 additions & 0 deletions tests/etc/clean_collections.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defaultInterval: &interval
days: 0
hours: 0
minutes: 0
seconds: 0

archiver:
name: "ATArchiver"

ingester:
FILE_INGEST_REQUEST: AT_FILE_INGEST_REQUEST
CONSUME_QUEUE: at_publish_to_oods
PUBLISH_QUEUE: oods_publish_to_at
imageStagingDirectory: data
butlers:
- butler:
class:
import : lsst.ctrl.oods.fileAttendant
name : FileAttendant
repoDirectory : repo
instrument: lsst.obs.lsst.Latiss
badFileDirectory: /tmp/bad
stagingDirectory: /tmp/staging
collections:
- collection_a
- collection_b
cleanCollections:
- collection: collection_a
filesOlderThan:
<<: *interval
days: 3
- collection: collection_b
filesOlderThan:
<<: *interval
days: 3
scanInterval:
<<: *interval
seconds: 2
batchSize: 20
scanInterval:
<<: *interval
seconds: 10

cacheCleaner:
clearEmptyDirectoriesAndOldFiles:
- repo/raw
scanInterval:
<<: *interval
seconds: 30
filesOlderThan:
<<: *interval
days: 30
directoriesEmptyForMoreThan:
<<: *interval
days: 1
13 changes: 8 additions & 5 deletions tests/etc/collection_test_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ ingester:
collections:
- LATISS/runs/quickLook
cleanCollections:
- LATISS/raw/all
- LATISS/runs/quickLook
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
seconds: 5
- collection: LATISS/runs/quickLook
filesOlderThan:
<<: *interval
seconds: 5
scanInterval:
<<: *interval
seconds: 3
filesOlderThan:
<<: *interval
seconds: 5
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/collection_test_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ ingester:
stagingDirectory: /tmp/staging
collections:
- LATISS/raw/all
cleanCollections:
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
seconds: 5
scanInterval:
<<: *interval
seconds: 3
filesOlderThan:
<<: *interval
seconds: 5
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/ingest_auxtel_clean.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ ingester:
stagingDirectory: /tmp/staging
collections:
- LATISS/raw/all
cleanCollections:
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
seconds: 3
scanInterval:
<<: *interval
seconds: 2
filesOlderThan:
<<: *interval
seconds: 3
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/ingest_auxtel_gen3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ ingester:
stagingDirectory: /tmp/staging
collections:
- LATISS/raw/all
cleanCollections:
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
days: 30
scanInterval:
<<: *interval
minutes: 1
filesOlderThan:
<<: *interval
days: 30
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/ingest_auxtel_s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ ingester:
instrument: lsst.obs.lsst.Latiss
collections:
- LATISS/raw/all
cleanCollections:
- collection: LATISS/raw/all
filesOlderThan:
<<: *interval
days: 30
scanInterval:
<<: *interval
minutes: 1
filesOlderThan:
<<: *interval
days: 30
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/ingest_comcam_gen3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ ingester:
stagingDirectory: /tmp/staging
collections:
- LSSTComCam/raw/all
cleanCollections:
- collection: LSSTComCam/raw/all
filesOlderThan:
<<: *interval
seconds: 2
scanInterval:
<<: *interval
seconds: 1
filesOlderThan:
<<: *interval
seconds: 2
batchSize: 20
scanInterval:
<<: *interval
Expand Down
8 changes: 5 additions & 3 deletions tests/etc/ingest_tag_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ ingester:
stagingDirectory: /tmp/staging
collections:
- LSSTComCam/raw/all
cleanCollections:
- collection: LSSTComCam/raw/all
filesOlderThan:
<<: *interval
seconds: 5
scanInterval:
<<: *interval
seconds: 20
filesOlderThan:
<<: *interval
seconds: 5
batchSize: 20
scanInterval:
<<: *interval
Expand Down
Loading
Loading