-
Notifications
You must be signed in to change notification settings - Fork 456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/Example for training KFP v1 #2118
Changes from 18 commits
03fa850
8918473
fd53d85
e9a0051
17123d6
4f19db8
9f83b0f
61e77ea
88c20c3
904d07d
31655dd
c458541
cee9970
f7e697b
36ed372
15c4a4b
741059f
7d33b7b
35df815
0504085
4cddd3e
6a0bdd3
182b787
9fc7c02
579546c
582a6a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
FROM python:3.10-slim | ||
|
||
ARG TARGETARCH | ||
ENV TARGET_DIR /opt/katib | ||
ENV METRICS_COLLECTOR_DIR cmd/metricscollector/v1beta1/kfp-metricscollector/v1 | ||
ENV PYTHONPATH ${TARGET_DIR}:${TARGET_DIR}/pkg/apis/manager/v1beta1/python:${TARGET_DIR}/pkg/metricscollector/v1beta1/kfp-metricscollector/v1::${TARGET_DIR}/pkg/metricscollector/v1beta1/common/ | ||
|
||
ADD ./pkg/ ${TARGET_DIR}/pkg/ | ||
ADD ./${METRICS_COLLECTOR_DIR}/ ${TARGET_DIR}/${METRICS_COLLECTOR_DIR}/ | ||
|
||
WORKDIR ${TARGET_DIR}/${METRICS_COLLECTOR_DIR} | ||
|
||
RUN if [ "${TARGETARCH}" = "arm64" ]; then \ | ||
apt-get -y update && \ | ||
apt-get -y install gfortran libpcre3 libpcre3-dev && \ | ||
apt-get clean && \ | ||
rm -rf /var/lib/apt/lists/*; \ | ||
fi | ||
|
||
RUN pip install --no-cache-dir -r requirements.txt | ||
RUN chgrp -R 0 ${TARGET_DIR} \ | ||
&& chmod -R g+rwX ${TARGET_DIR} | ||
|
||
ENTRYPOINT ["python", "main.py"] |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,101 @@ | ||||||||||||||||||||||
# Copyright 2023 The Kubeflow Authors. | ||||||||||||||||||||||
# | ||||||||||||||||||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||
# you may not use this file except in compliance with the License. | ||||||||||||||||||||||
# You may obtain a copy of the License at | ||||||||||||||||||||||
# | ||||||||||||||||||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||
# | ||||||||||||||||||||||
# Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||
# See the License for the specific language governing permissions and | ||||||||||||||||||||||
# limitations under the License. | ||||||||||||||||||||||
|
||||||||||||||||||||||
import argparse | ||||||||||||||||||||||
import os | ||||||||||||||||||||||
from logging import INFO, StreamHandler, getLogger | ||||||||||||||||||||||
|
||||||||||||||||||||||
import api_pb2 | ||||||||||||||||||||||
import const | ||||||||||||||||||||||
import grpc | ||||||||||||||||||||||
from metrics_loader import MetricsCollector | ||||||||||||||||||||||
from pns import WaitMainProcesses | ||||||||||||||||||||||
|
||||||||||||||||||||||
timeout_in_seconds = 60 | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
def parse_options(): | ||||||||||||||||||||||
parser = argparse.ArgumentParser( | ||||||||||||||||||||||
description="KFP V1 MetricsCollector", add_help=True | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# TODO (andreyvelich): Add early stopping flags. | ||||||||||||||||||||||
parser.add_argument("-s-db", "--db_manager_server_addr", type=str, default="") | ||||||||||||||||||||||
parser.add_argument("-t", "--pod_name", type=str, default="") | ||||||||||||||||||||||
parser.add_argument( | ||||||||||||||||||||||
"-path", | ||||||||||||||||||||||
"--metrics_file_dir", | ||||||||||||||||||||||
type=str, | ||||||||||||||||||||||
default=const.DEFAULT_METRICS_FILE_KFPV1_DIR, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
parser.add_argument("-m", "--metric_names", type=str, default="") | ||||||||||||||||||||||
parser.add_argument("-o-type", "--objective_type", type=str, default="") | ||||||||||||||||||||||
parser.add_argument("-f", "--metric_filters", type=str, default="") | ||||||||||||||||||||||
parser.add_argument( | ||||||||||||||||||||||
"-p", "--poll_interval", type=int, default=const.DEFAULT_POLL_INTERVAL | ||||||||||||||||||||||
) | ||||||||||||||||||||||
parser.add_argument( | ||||||||||||||||||||||
"-timeout", "--timeout", type=int, default=const.DEFAULT_TIMEOUT | ||||||||||||||||||||||
) | ||||||||||||||||||||||
parser.add_argument( | ||||||||||||||||||||||
"-w", "--wait_all_processes", type=str, default=const.DEFAULT_WAIT_ALL_PROCESSES | ||||||||||||||||||||||
) | ||||||||||||||||||||||
opt = parser.parse_args() | ||||||||||||||||||||||
return opt | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||
logger = getLogger(__name__) | ||||||||||||||||||||||
handler = StreamHandler() | ||||||||||||||||||||||
handler.setLevel(INFO) | ||||||||||||||||||||||
logger.setLevel(INFO) | ||||||||||||||||||||||
logger.addHandler(handler) | ||||||||||||||||||||||
logger.propagate = False | ||||||||||||||||||||||
opt = parse_options() | ||||||||||||||||||||||
wait_all_processes = opt.wait_all_processes.lower() == "true" | ||||||||||||||||||||||
db_manager_server = opt.db_manager_server_addr.split(":") | ||||||||||||||||||||||
trial_name = "-".join(opt.pod_name.split("-")[:-1]) | ||||||||||||||||||||||
if len(db_manager_server) != 2: | ||||||||||||||||||||||
raise Exception( | ||||||||||||||||||||||
"Invalid Katib DB manager service address: %s" % opt.db_manager_server_addr | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
WaitMainProcesses( | ||||||||||||||||||||||
pool_interval=opt.poll_interval, | ||||||||||||||||||||||
timout=opt.timeout, | ||||||||||||||||||||||
wait_all=wait_all_processes, | ||||||||||||||||||||||
completed_marked_dir=None, | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The documentation on this is a bit sparse, but if I understand the code right, this would require the Kubeflow pipeline to write a file katib/pkg/metricscollector/v1beta1/common/pns.py Lines 95 to 104 in f740889
So I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation. Let me check. |
||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
mc = MetricsCollector(opt.metric_names.split(";")) | ||||||||||||||||||||||
observation_log = mc.parse_file(opt.metrics_file_dir) | ||||||||||||||||||||||
|
||||||||||||||||||||||
channel = grpc.beta.implementations.insecure_channel( | ||||||||||||||||||||||
db_manager_server[0], int(db_manager_server[1]) | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
with api_pb2.beta_create_DBManager_stub(channel) as client: | ||||||||||||||||||||||
logger.info( | ||||||||||||||||||||||
"In " | ||||||||||||||||||||||
+ trial_name | ||||||||||||||||||||||
+ " " | ||||||||||||||||||||||
+ str(len(observation_log.metric_logs)) | ||||||||||||||||||||||
+ " metrics will be reported." | ||||||||||||||||||||||
) | ||||||||||||||||||||||
client.ReportObservationLog( | ||||||||||||||||||||||
api_pb2.ReportObservationLogRequest( | ||||||||||||||||||||||
trial_name=trial_name, observation_log=observation_log | ||||||||||||||||||||||
), | ||||||||||||||||||||||
timeout=timeout_in_seconds, | ||||||||||||||||||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
psutil==5.8.0 | ||
rfc3339>=6.2 | ||
grpcio==1.41.1 | ||
googleapis-common-protos==1.6.0 | ||
protobuf==3.20.0 |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# Copyright 2023 The Kubeflow Authors. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# The Kubeflow pipeline metrics collector KFPMetricParser parses the metrics file | ||
# and returns an ObservationLog of the metrics specified. | ||
# Some documentation on the metrics collector file structure can be found here: | ||
# https://v0-6.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/ | ||
|
||
from datetime import datetime | ||
from logging import getLogger, StreamHandler, INFO | ||
import os | ||
from typing import List | ||
import json | ||
|
||
import rfc3339 | ||
import api_pb2 | ||
from pkg.metricscollector.v1beta1.common import const | ||
|
||
class KFPMetricParser: | ||
def __init__(self, metric_names): | ||
self.metric_names = metric_names | ||
|
||
@staticmethod | ||
def find_all_files(directory): | ||
for root, dirs, files in os.walk(directory): | ||
for f in files: | ||
yield os.path.join(root, f) | ||
|
||
def parse_metrics(self, metric_file_path: str) -> List[api_pb2.MetricLog]: | ||
"""Parse a kubeflow pipeline metrics file | ||
|
||
Args: | ||
fn (function): path to metrics file | ||
|
||
Returns: | ||
List[api_pb2.MetricLog]: A list of logged metrics | ||
""" | ||
metrics = [] | ||
with open(metric_file_path) as f: | ||
metrics_dict = json.load(f) | ||
for m in metrics_dict["metrics"]: | ||
name = m["name"] | ||
value = m["numberValue"] | ||
if name in self.metric_names: | ||
ml = api_pb2.MetricLog( | ||
time_stamp=rfc3339.rfc3339(datetime.now()), | ||
metric=api_pb2.Metric(name=name, value=str(value)), | ||
) | ||
metrics.append(ml) | ||
return metrics | ||
|
||
class MetricsCollector: | ||
def __init__(self, metric_names): | ||
self.logger = getLogger(__name__) | ||
handler = StreamHandler() | ||
handler.setLevel(INFO) | ||
self.logger.setLevel(INFO) | ||
self.logger.addHandler(handler) | ||
self.logger.propagate = False | ||
self.metrics = metric_names | ||
self.parser = KFPMetricParser(metric_names) | ||
|
||
def parse_file(self, directory): | ||
"""Parses the Kubeflow Pipeline metrics files""" | ||
mls = [] | ||
for f in self.parser.find_all_files(directory): | ||
if os.path.isdir(f): | ||
continue | ||
try: | ||
self.logger.info(f + " will be parsed.") | ||
mls.extend(self.parser.parse_metrics(f)) | ||
except Exception as e: | ||
self.logger.warning("Unexpected error: " + str(e)) | ||
continue | ||
|
||
# Metrics logs must contain at least one objective metric value | ||
# Objective metric is located at first index | ||
is_objective_metric_reported = False | ||
for ml in mls: | ||
if ml.metric.name == self.metrics[0]: | ||
is_objective_metric_reported = True | ||
break | ||
# If objective metrics were not reported, insert unavailable value in the DB | ||
if not is_objective_metric_reported: | ||
mls = [ | ||
api_pb2.MetricLog( | ||
time_stamp=rfc3339.rfc3339(datetime.now()), | ||
metric=api_pb2.Metric( | ||
name=self.metrics[0], value=const.UNAVAILABLE_METRIC_VALUE | ||
), | ||
) | ||
] | ||
self.logger.info( | ||
"Objective metric {} is not found in metrics file, {} value is reported".format( | ||
self.metrics[0], const.UNAVAILABLE_METRIC_VALUE | ||
) | ||
) | ||
|
||
return api_pb2.ObservationLog(metric_logs=mls) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let's use the same structure as for other metrics collectors:
cmd/metricscollector/v1beta1/kfp-metricscollector/Dockerfile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andreyvelich My concern is where we will put Dockerfile for KFP v2. So I would suggest we put Dockerfile for the KFP v1 on here.
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Do we really need to support KFP v1 if, eventually, every Kubeflow users should migrate to KFP v2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because KFP v1 and KFP v2 aren't compatible, I think migrating v1 to v2 is harder in production.
So I guess users need a lot of time to update the version.
Hence, supporting KFP v1 in Katib would be useful. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in any case I still have a question ( #2118 (review)) why do we need separate Metrics Collector for KFP if we need to just read the logs from the metrics file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zijianjoy Katib metrics collector parses metrics file line by line and expects metrics name and value to be located in a single line.
@votti from the log line I can see that metrics are written to
/tmp/argo/outputs/artifacts/mlpipeline-metrics.tgz
file, isn't it ?@votti Yeah, this could be an issue since we override the start command to make sure we redirect StdOut to
/var/log/katib/metrics.log
file, so Katib Metrics Collector can parse this file. Otherwise, Metrics Collector can't parse the StdOut. The main differences between StdOut and File metrics collector is that StdOut tails/var/log/katib/metrics.log
file and prints logs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@metrics file:
One of the complexities kubeflow
pipeline
manages is to handle output artifacts (usually compressing them and storing them and saving them to an s3 storage). This is what seems to be broken when using thefilecollector
, as something while compressing and copying the file to/tmp/argo/outputs/artifacts/mlpipeline-metrics.tgz
seems to go wrong.After finding some time to look into it, I think the reason is very similar to the
stdout
collector:The collector modifies the argo CMD/ARG in a way that I think causes these issues:
From the pod definition: Unmodified (eg when using the kubeflow custom metrics collector):
When using the
filecollector
as metrics collector:I think this could be solved by following this proposal: #2181
Until this is fixed, I think having a custom metrics collector that does not modify the command is a necessary workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@votti I think this also could be solved with this feature, isn't: #577 ?
Basically, we can use Katib SDK to implement API for pushing metrics for Katib DB instead of using pull-based metrics collectors which require to change entrypoint.
User will require to report metrics in their Objective Training function.
For example:
We might need to do additional changes to Katib controller to verify that metrics were reported by user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Push based metrics collection: that sounds like a good potential solution!
So the
KatibClient
can infer automatically whichtrial
thesemetrics
are associated with?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@votti Currently, user can get the Trial name using
${trialSpec.Name}
template in their Trial's Pod environment vars. Then, user can runKatibClient
API with appropriate Trial Name to insert metrics to the Katib DB.I think, we should always add
TRIAL_NAME
env to the Trial pod since it is useful for many use-cases (e.g. for exporting trained model to S3, saving Trial metrics to DB, etc.)WDYT @tenzen-y @johnugeorge @votti ?