Skip to content

Commit

Permalink
Merge branch 'ml_platfrom_v034' into feat/bytewax_write_mini_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
KarolisKont committed Oct 17, 2023
2 parents a9f7463 + 2949d12 commit 0bb295f
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 24 deletions.
2 changes: 1 addition & 1 deletion java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
<version>1.11.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-java-root -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY README.md README.md
# git dir to infer the version of feast we're installing.
# https://github.com/pypa/setuptools_scm#usage-from-docker
# I think it also assumes that this dockerfile is being built from the root of the directory.
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,gcp,bytewax]'
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake]'

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from typing import List

Expand All @@ -6,14 +7,15 @@
import s3fs
from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping

DEFAULT_BATCH_SIZE = 1000
logger = logging.getLogger(__name__)


class BytewaxMaterializationDataflow:
Expand All @@ -22,17 +24,20 @@ def __init__(
config: RepoConfig,
feature_view: FeatureView,
paths: List[str],
worker_index: int,
):
self.config = config
self.feature_store = FeatureStore(config=config)

self.feature_view = feature_view
self.worker_index = worker_index
self.paths = paths

self._run_dataflow()

def process_path(self, path):
fs = s3fs.S3FileSystem()
logger.info(f"Processing path {path}")
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
Expand All @@ -42,11 +47,7 @@ def process_path(self, path):
return batches

def input_builder(self, worker_index, worker_count, _state):
worker_paths = distribute(self.paths, worker_index, worker_count)
for path in worker_paths:
yield None, path

return
return [(None, self.paths[self.worker_index])]

def output_builder(self, worker_index, worker_count):
def yield_batch(iterable, batch_size):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import uuid
from datetime import datetime
from time import sleep
from typing import Callable, List, Literal, Sequence, Union

import yaml
from kubernetes import client
from kubernetes import config as k8s_config
from kubernetes import utils
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import FailToCreateError
from pydantic import StrictStr
from tqdm import tqdm
Expand All @@ -16,6 +19,7 @@
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
Expand All @@ -27,6 +31,8 @@

from .bytewax_materialization_job import BytewaxMaterializationJob

logger = logging.getLogger(__name__)


class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for Bytewax"""
Expand Down Expand Up @@ -65,7 +71,22 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
""" (optional) additional labels to append to kubernetes objects """

max_parallelism: int = 10
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
""" (optional) Maximum number of pods allowed to run in parallel"""

synchronous: bool = False
""" (optional) If true, wait for materialization for one feature to complete before moving to the next """

retry_limit: int = 2
""" (optional) Maximum number of times to retry a materialization worker pod"""

active_deadline_seconds: int = 86400
""" (optional) Maximum amount of time a materialization job is allowed to run"""

job_batch_size: int = 100
""" (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""

print_pod_logs_on_failure: bool = True
"""(optional) Print pod logs on job failure. Only applies to synchronous materialization"""

mini_batch_size: int = 1000
""" (optional) Number of rows to process per write operation (default 1000)"""
Expand Down Expand Up @@ -173,8 +194,92 @@ def _materialize_one(
)

paths = offline_job.to_remote_storage()
if self.batch_engine_config.synchronous:
offset = 0
total_pods = len(paths)
batch_size = self.batch_engine_config.job_batch_size
if batch_size < 1:
raise ValueError("job_batch_size must be a value greater than 0")
if batch_size < self.batch_engine_config.max_parallelism:
logger.warning(
"job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism"
)
batch_size = self.batch_engine_config.max_parallelism

while True:
next_offset = min(offset + batch_size, total_pods)
job = self._await_path_materialization(
paths[offset:next_offset],
feature_view,
offset,
next_offset,
total_pods,
)
offset += batch_size
if offset >= total_pods:
break
else:
job_id = str(uuid.uuid4())
job = self._create_kubernetes_job(job_id, paths, feature_view)

return job

def _await_path_materialization(
self, paths, feature_view, batch_start, batch_end, total_pods
):
job_id = str(uuid.uuid4())
return self._create_kubernetes_job(job_id, paths, feature_view)
job = self._create_kubernetes_job(job_id, paths, feature_view)

try:
while job.status() in (
MaterializationJobStatus.WAITING,
MaterializationJobStatus.RUNNING,
):
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) running..."
)
sleep(30)
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) complete with status {job.status()}"
)
except BaseException as e:
if self.batch_engine_config.print_pod_logs_on_failure:
self._print_pod_logs(job.job_id(), feature_view, batch_start)

logger.info(f"Deleting job {job.job_id()}")
try:
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as ae:
logger.warning(f"Could not delete job due to API Error: {ae.body}")
raise e
finally:
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
try:
self.v1.delete_namespaced_config_map(
self._configmap_name(job_id), self.namespace
)
except ApiException as ae:
logger.warning(
f"Could not delete configmap due to API Error: {ae.body}"
)

return job

def _print_pod_logs(self, job_id, feature_view, offset=0):
pods_list = self.v1.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
try:
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
)
except ApiException as e:
logger.warning(f"Could not retrieve pod logs due to: {e.body}")

def _create_kubernetes_job(self, job_id, paths, feature_view):
try:
Expand Down Expand Up @@ -210,7 +315,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
"labels": {**labels, **self.batch_engine_config.labels},
},
"data": {
Expand All @@ -223,7 +328,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_job_definition(self, job_id, namespace, pods, env):
def _configmap_name(self, job_id):
return f"feast-{job_id}"

def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
"""Create a kubernetes job definition."""
job_env = [
{"name": "RUST_BACKTRACE", "value": "full"},
Expand Down Expand Up @@ -284,8 +392,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
"spec": {
"ttlSecondsAfterFinished": 3600,
"backoffLimit": self.batch_engine_config.retry_limit,
"completions": pods,
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
"activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds,
"completionMode": "Indexed",
"template": {
"metadata": {
Expand Down Expand Up @@ -324,7 +434,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand Down Expand Up @@ -355,7 +465,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{"mountPath": "/etc/bytewax", "name": "hostfile"},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand All @@ -365,13 +475,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{
"configMap": {
"defaultMode": 420,
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
"name": "python-files",
},
{
"configMap": {"name": f"feast-{job_id}"},
"name": f"feast-{job_id}",
"configMap": {"name": self._configmap_name(job_id)},
"name": self._configmap_name(job_id),
},
],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ def status(self):
if job_status.completion_time is None:
return MaterializationJobStatus.RUNNING
elif job_status.failed is not None:
self._error = Exception(f"Job {self.job_id()} failed")
return MaterializationJobStatus.ERROR
elif job_status.active is None and job_status.succeeded is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED
elif job_status.active is None:
if job_status.completion_time is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED
return MaterializationJobStatus.WAITING

def should_be_retried(self):
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import yaml

from feast import FeatureStore, RepoConfig
Expand All @@ -19,4 +21,5 @@
config,
store.get_feature_view(bytewax_config["feature_view"]),
bytewax_config["paths"],
int(os.environ["JOB_COMPLETION_INDEX"]),
)
2 changes: 1 addition & 1 deletion sdk/python/feast/ui_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def read_registry():
# For all other paths (such as paths that would otherwise be handled by react router), pass to React
@app.api_route("/p/{path_name:path}", methods=["GET"])
def catch_all():
filename = ui_dir + "index.html"
filename = ui_dir.joinpath("index.html")

with open(filename) as f:
content = f.read()
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ comm==0.1.4
# ipywidgets
coverage[toml]==7.3.1
# via pytest-cov
cryptography==41.0.3
cryptography==41.0.4
# via
# adal
# azure-identity
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.8-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ comm==0.1.4
# ipywidgets
coverage[toml]==7.3.1
# via pytest-cov
cryptography==41.0.3
cryptography==41.0.4
# via
# adal
# azure-identity
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.9-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ comm==0.1.4
# ipywidgets
coverage[toml]==7.3.1
# via pytest-cov
cryptography==41.0.3
cryptography==41.0.4
# via
# adal
# azure-identity
Expand Down

0 comments on commit 0bb295f

Please sign in to comment.