Skip to content

Commit

Permalink
adding execution_stats to python metadata (#650)
Browse files Browse the repository at this point in the history
* adding execution_stats to python metadata

* add psutil to pyproject

* added execution stats to Spark
  • Loading branch information
blublinsky authored Oct 4, 2024
1 parent afc4150 commit 2a86cec
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions data-processing-lib/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"boto3==1.34.69",
"argparse",
"mmh3",
"psutil",
]

[project_urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import time
import traceback
import psutil
from datetime import datetime
from multiprocessing import Pool
from typing import Any
Expand All @@ -24,12 +25,31 @@
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.utils import get_logger
from data_processing.utils import GB, get_logger


logger = get_logger(__name__)


@staticmethod
def _execution_resources() -> dict[str, Any]:
"""
Get Execution resource
:return: tuple of cpu/memory usage
"""
# Getting loadover15 minutes
load1, load5, load15 = psutil.getloadavg()
# Getting memory used
mused = round(psutil.virtual_memory()[3] / GB, 2)
return {
"cpus": round((load15/os.cpu_count()) * 100, 1),
"gpus": 0,
"memory": mused,
"object_store": 0,
}



def orchestrate(
data_access_factory: DataAccessFactoryBase,
runtime_config: PythonTransformRuntimeConfiguration,
Expand All @@ -43,6 +63,7 @@ def orchestrate(
:return: 0 - success or 1 - failure
"""
start_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_time = time.time()
logger.info(f"orchestrator {runtime_config.get_name()} started at {start_ts}")
# create statistics
statistics = TransformStatistics()
Expand Down Expand Up @@ -118,6 +139,7 @@ def orchestrate(
"job_input_params": input_params
| data_access_factory.get_input_params()
| execution_config.get_input_params(),
"execution_stats": _execution_resources() | {"execution time, min": round((time.time() - start_time) / 60.0, 3)},
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from data_processing.data_access import DataAccessFactoryBase
from data_processing.transform import TransformStatistics
from data_processing.utils import get_logger
from data_processing.utils import GB, get_logger
from data_processing_spark.runtime.spark import (
SparkTransformFileProcessor,
SparkTransformRuntimeConfiguration,
Expand Down Expand Up @@ -123,6 +123,12 @@ def process_partition(iterator):
try:
# build and save metadata
logger.debug("Building job metadata")
cpus = sc.defaultParallelism
executors = sc._jsc.sc().getExecutorMemoryStatus()
memory = 0.0
for i in range(executors.size()):
memory += executors.toList().apply(i)._2()._1()
resources = {"cpus": cpus, "gpus": 0, "memory": round(memory/GB, 2), "object_store": 0}
input_params = runtime_config.get_transform_metadata() | execution_configuration.get_input_params()
metadata = {
"pipeline": execution_configuration.pipeline_id,
Expand All @@ -136,8 +142,8 @@ def process_partition(iterator):
"job_input_params": input_params | data_access_factory.get_input_params(),
"execution_stats": {
"num partitions": num_partitions,
"execution time, min": (time.time() - start_time) / 60,
},
"execution time, min": round((time.time() - start_time) / 60, 3),
} | resources,
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand Down

0 comments on commit 2a86cec

Please sign in to comment.