diff --git a/src/troute-network/troute/AbstractNetwork.py b/src/troute-network/troute/AbstractNetwork.py index 3e7234a47..72528337f 100644 --- a/src/troute-network/troute/AbstractNetwork.py +++ b/src/troute-network/troute/AbstractNetwork.py @@ -2,6 +2,7 @@ from functools import partial import pandas as pd import numpy as np +import multiprocessing from datetime import datetime, timedelta import os @@ -927,14 +928,16 @@ def get_timesteps_from_nex(nexus_files): return output_file_timestamps -def split_csv_file(nexus_file, catchment_id, binary_folder): +def split_csv_file(nexus_file, binary_folder): + catchment_id = get_id_from_filename(nexus_file) # Split the csv file into multiple csv files # Unescaped command: awk -F ', ' '{ filename="test/tempfile_"$1".csv"; print "114085, "$NF >> filename; close(filename)}' nex-114085_output.csv cmd = f'awk -F \', \' \'{{ filename="{binary_folder}/tempfile_"$1".csv"; print "{catchment_id}, "$NF >> filename; close(filename) }}\' {nexus_file}' os.system(cmd) -def rewrite_to_parquet(tempfile_id, output_file_id, binary_folder): +def rewrite_to_parquet(file_args, binary_folder): + tempfile_id, output_file_id = file_args # Rewrite the csv file to parquet df = pd.read_csv(f'{binary_folder}/tempfile_{tempfile_id}.csv', names=['feature_id', output_file_id]) df.set_index('feature_id', inplace=True) # Set feature_id as the index @@ -948,15 +951,19 @@ def rewrite_to_parquet(tempfile_id, output_file_id, binary_folder): def nex_files_to_binary(nexus_files, binary_folder): # Get the output files output_timesteps = get_timesteps_from_nex(nexus_files) - + partial_split_csv_file = partial(split_csv_file, binary_folder=binary_folder) # Split the csv file into multiple csv files - for nexus_file in nexus_files: - catchment_id = get_id_from_filename(nexus_file) - split_csv_file(nexus_file, catchment_id, binary_folder) + with multiprocessing.Pool() as pool: + pool.map(partial_split_csv_file, nexus_files) + + # create a list of tuples to simplify pool.map call + temp_to_timestep_list = list(enumerate(output_timesteps)) + partial_rewrite_to_parquet = partial(rewrite_to_parquet, binary_folder=binary_folder) # Rewrite the temp csv files to parquet - for tempfile_id, nexus_file in enumerate(output_timesteps): - rewrite_to_parquet(tempfile_id, nexus_file, binary_folder) + with multiprocessing.Pool() as pool: + pool.map(partial_rewrite_to_parquet, temp_to_timestep_list) + # Clean up the temp files os.system(f'rm -rf {binary_folder}/tempfile_*.csv')