Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43256 Add MetricMeasurementBundles to GatherResoureUsage #220

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 172 additions & 5 deletions python/lsst/analysis/tools/tasks/gatherResourceUsage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from collections.abc import Iterable, Sequence
from typing import Any

import astropy.units as apu
import numpy as np
import pandas as pd
from lsst.daf.butler import Butler, DatasetRef, DatasetType
Expand All @@ -60,10 +61,66 @@
# written in task metadata were platform-dependent. Once we no longer care
# about older runs, this import and the code that uses it can be removed.
from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER
from lsst.verify import Measurement

from ..interfaces import MetricMeasurementBundle
from ..interfaces._task import _timestampValidator

_LOG = logging.getLogger(__name__)


def _resource_table_to_bundle(
table: pd.DataFrame, dataset_identifier: str, reference_package: str, timestamp_version: str
) -> MetricMeasurementBundle:
"""Convert a resource usage table into a `MetricMeasurementBundle`

See `lsst.analysis.tools.interfaces.AnalysisPipelineTask` for more
information on each of the following options.

Parameters
----------
table : `DataFrame`
Resource Usge in the the form of a DataFrame.
dataset_identifier : `str`
The name of the data processing to associate with this metric bundle.
reference_package : `str`
The reference package to use if the timestamp version is set to a
package version.
timestamp_version : `str`
The type of timestamp to associate with the bundle.
"""
bundle = MetricMeasurementBundle(
dataset_identifier=dataset_identifier,
reference_package=reference_package,
timestamp_version=timestamp_version,
)
# determine all the columns in the table these will become measurements.
column_keys = set(table.keys())
# discard the task, as this will be like the AnalysisTools in the bundle.
column_keys.remove("task")
# Measurements need units, use this to map the column to unit type.
unit_mapping = (
("quanta", apu.Unit("count")),
("_hrs", apu.Unit("hour")),
("_GB", apu.Unit("Gbyte")),
("_s", apu.Unit("s")),
)
# for each row, grab the task name, and create a list of measurements.
for _, row in table.iterrows():
task_name = f"{row['task']}_memrun"
task_data = []
for key in column_keys:
unit = None
for stub, value in unit_mapping:
if stub in key:
unit = value
if unit is None:
raise ValueError(f"Could not determine units for task {row['task']}")
task_data.append(Measurement(key, row[key] * unit))
bundle[task_name] = task_data
return bundle


class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()):
"""Connection definitions for `ConsolidateResourceUsageTask`."""

Expand All @@ -74,6 +131,16 @@ class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()
doc="Consolidated table of resource usage statistics. One row per task label",
)

output_metrics = cT.Output(
name="ResourceUsageSummary_metrics",
storageClass="MetricMeasurementBundle",
dimensions=(),
doc=(
"MetricMeasurementBundle with the same information as the ResourceUsageSummary in the form "
"required for Sasquatch dispatch"
),
)

def __init__(self, *, config):
super().__init__(config=config)
for name in self.config.input_names:
Expand All @@ -88,6 +155,8 @@ def __init__(self, *, config):
),
)
self.inputs.add(name)
if not self.config.do_make_metrics:
self.outputs.remove("output_metrics")


class ConsolidateResourceUsageConfig(
Expand All @@ -99,6 +168,25 @@ class ConsolidateResourceUsageConfig(
doc="Input resource usage dataset type names",
default=[],
)
do_make_metrics = Field[bool](doc="Make metric bundle in addition to DataFrame", default=False)
dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True)
reference_package = Field[str](
doc="A package whos version, at the time of metric upload to a "
"time series database, will be converted to a timestamp of when "
"that version was produced",
default="lsst_distrib",
optional=True,
)
timestamp_version = Field[str](
doc="Which time stamp should be used as the reference timestamp for a "
"metric in a time series database, valid values are; "
"reference_package_timestamp, run_timestamp, current_timestamp, "
"dataset_timestamp and explicit_timestamp:datetime where datetime is "
"given in the form %Y%m%dT%H%M%S%z",
default="run_timestamp",
check=_timestampValidator,
optional=True,
)


class ConsolidateResourceUsageTask(PipelineTask):
Expand All @@ -113,6 +201,7 @@ class ConsolidateResourceUsageTask(PipelineTask):
"""

ConfigClass = ConsolidateResourceUsageConfig
config: ConsolidateResourceUsageConfig
_DefaultName = "consolidateResourceUsage"

def run(self, **kwargs: Any) -> Struct:
Expand Down Expand Up @@ -166,8 +255,18 @@ def run(self, **kwargs: Any) -> Struct:
.sort_values("task"),
memrun,
)
results = Struct(output_table=memrun)

if self.config.do_make_metrics:
bundle = _resource_table_to_bundle(
memrun,
self.config.dataset_identifier,
self.config.reference_package,
self.config.timestamp_version,
)
results.output_metrics = bundle

return Struct(output_table=memrun)
return results


class GatherResourceUsageConnections(
Expand Down Expand Up @@ -547,6 +646,18 @@ class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder):
Whether *execution* of this quantum graph will permit clobbering. If
`False` (default), existing outputs in ``output_run`` are an error
unless ``skip_existing_in`` will cause those quanta to be skipped.
make_metric : `bool`, optional
Produce a metric measurement bundle when processing the output
table.
timestamp_version : `str`, optional
The type of timestamp used when creating a `MetricMeasurementBundle`,
see there for more details.
dataset_identifier: `str`, optional
A processing identifer that is associated with the processing of this
data, for instance "RC2_subset" for the nightly reprocessings.
reference_package : `str`, optional
The dataset used as an identifier when timestamp_version is set to
reference_package.

Notes
-----
Expand All @@ -567,6 +678,10 @@ def __init__(
output_run: str | None = None,
skip_existing_in: Sequence[str] = (),
clobber: bool = False,
make_metric: bool = False,
timestamp_version: str | None = None,
dataset_identifier: str | None = None,
reference_package: str | None = None,
):
# Start by querying for metadata datasets, since we'll need to know
# which dataset types exist in the input collections in order to
Expand All @@ -580,6 +695,11 @@ def __init__(
pipeline_graph = PipelineGraph()
metadata_refs: dict[str, set[DatasetRef]] = {}
consolidate_config = ConsolidateResourceUsageConfig()
if make_metric:
consolidate_config.do_make_metrics = True
consolidate_config.dataset_identifier = dataset_identifier
consolidate_config.timestamp_version = timestamp_version
consolidate_config.reference_package = reference_package
for results in butler.registry.queryDatasets(
input_dataset_types,
where=where,
Expand All @@ -589,9 +709,10 @@ def __init__(
input_metadata_dataset_type = results.parentDatasetType
refs_for_type = set(results)
if refs_for_type:
gather_task_label, gather_dataset_type_name = self._add_gather_task(
pipeline_graph, input_metadata_dataset_type
)
task_results = self._add_gather_task(pipeline_graph, input_metadata_dataset_type)
if task_results is None:
continue
gather_task_label, gather_dataset_type_name = task_results
metadata_refs[gather_task_label] = refs_for_type
consolidate_config.input_names.append(gather_dataset_type_name)
pipeline_graph.add_task(
Expand Down Expand Up @@ -623,7 +744,7 @@ def __init__(
@classmethod
def _add_gather_task(
cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType
) -> tuple[str, str]:
) -> tuple[str, str] | None:
"""Add a single configuration of `GatherResourceUsageTask` to a
pipeline graph.

Expand All @@ -647,6 +768,8 @@ def _add_gather_task(
return
elif "gatherResourceUsage" in input_metadata_dataset_type.name:
return
elif "consolidateResourceUsage" in input_metadata_dataset_type.name:
return
else:
input_task_label = m.group(1)
gather_task_label = f"{input_task_label}_gatherResourceUsage"
Expand Down Expand Up @@ -750,6 +873,37 @@ def make_argument_parser(cls) -> argparse.ArgumentParser:
default=None,
metavar="RUN",
)
parser.add_argument(
"--make-metric",
type=bool,
help=(
"Turn the output resource usage table into a metric measurement bundle format compatible "
"with Sasquatch."
),
default=True,
metavar="DO_MAKE_METRIC",
)
parser.add_argument(
"--dataset-identifier",
type=str,
help="Set the dataset these results are associated with.",
default=None,
metavar="DATASET_IDENTIFIER",
)
parser.add_argument(
"--reference-package",
type=str,
help="Reference package to use when selecting reference timestamp",
default="lsst_distrib",
metavar="REFERENCE_PACKAGE",
)
parser.add_argument(
"--timestamp-version",
type=str,
help="Set the dataset these results are associated with.",
default="run_timestamp",
metavar="TIMESTAMP_VERSION",
)
return parser

@classmethod
Expand All @@ -767,13 +921,26 @@ def main(cls) -> None:
raise ValueError("At least one of --output or --output-run options is required.")
args.output_run = "{}/{}".format(args.output, Instrument.makeCollectionTimestamp())

extra_args = {}
if args.make_metric:
if args.dataset_identifier is None or args.timestamp_version is None:
raise ValueError(
"If metrics are going to be created, --dataset-identifier and --timestamp-version "
"must be specified."
)
extra_args["make_metric"] = True
extra_args["timestamp_version"] = args.timestamp_version
extra_args["dataset_identifier"] = args.dataset_identifier
extra_args["reference_package"] = args.reference_package

butler = Butler(args.repo, collections=args.collections)
builder = cls(
butler,
dataset_type_names=args.dataset_types,
where=args.where,
input_collections=args.collections,
output_run=args.output_run,
**extra_args,
)
qg: QuantumGraph = builder.build(
# Metadata includes a subset of attributes defined in CmdLineFwk.
Expand Down
Loading