From 734a3cc0799b3c13095e693eaf25dd38cb43db1e Mon Sep 17 00:00:00 2001 From: Uche Madu Date: Wed, 18 Oct 2023 21:00:29 +0100 Subject: [PATCH] fix pyspark logic --- dags/user_analytics.py | 5 ++--- pyspark-scripts/gcs_utils.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dags/user_analytics.py b/dags/user_analytics.py index ffe2662..bbd9d48 100644 --- a/dags/user_analytics.py +++ b/dags/user_analytics.py @@ -239,15 +239,14 @@ def dataproc_tasks() -> None: CLUSTER_GENERATOR_CONFIG = ClusterGenerator( project_id=PROJECT_ID, - zone=ZONE, - master_machine_type="n2-standard-2", + master_machine_type="n2-standard-4", master_disk_size=32, worker_machine_type="n2-standard-2", worker_disk_size=32, num_workers=2, storage_bucket=BUCKET_NAME, init_actions_uris=[PIP_INIT_FILE], - metadata={"PIP_PACKAGES": "spark-nlp==5.1.2 google-cloud-storage==2.12.0 transformers==4.25.1 tensorflow==2.11.0"}, + metadata={"PIP_PACKAGES": "spark-nlp==5.1.2 google-cloud-storage==2.12.0 scipy==1.11.3 transformers==4.25.1 tensorflow==2.11.0"}, properties={ 'spark:spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark:spark.driver.maxResultSize': '0', diff --git a/pyspark-scripts/gcs_utils.py b/pyspark-scripts/gcs_utils.py index 05ceaea..1faae3d 100644 --- a/pyspark-scripts/gcs_utils.py +++ b/pyspark-scripts/gcs_utils.py @@ -137,7 +137,7 @@ def load_files_from_gcs( # Load data from GCS df = (spark.read.format(file_format) .options(**(read_options or {})) - .load(f"gs://{directory_path}/{file}")) + .load(f"gs://{file}")) dataframes.append(df)