diff --git a/src/troute-network/troute/HYFeaturesNetwork.py b/src/troute-network/troute/HYFeaturesNetwork.py index 85644f3d1..fb8a19225 100644 --- a/src/troute-network/troute/HYFeaturesNetwork.py +++ b/src/troute-network/troute/HYFeaturesNetwork.py @@ -13,71 +13,96 @@ from datetime import datetime from pprint import pformat import os - +import fiona import troute.nhd_io as nhd_io #FIXME from troute.nhd_network import reverse_dict, extract_connections, reverse_network, reachable from .rfc_lake_gage_crosswalk import get_rfc_lake_gage_crosswalk, get_great_lakes_climatology - +import re __verbose__ = False __showtiming__ = False +def find_layer_name(layers, pattern): + """ + Find a layer name in the list of layers that matches the regex pattern. + """ + for layer in layers: + if re.search(pattern, layer, re.IGNORECASE): + return layer + return None + def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool): - # Establish which layers we will read. We'll always need the flowpath tables - layers = ['flowpaths','flowpath_attributes'] - - # If waterbodies are being simulated, read lakes table - if waterbody_parameters.get('break_network_at_waterbodies',False): - layers.append('lakes') - layers.append('nexus') - - # If any DA is activated, read network table as well for gage information - data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters',{}) - streamflow_nudging = data_assimilation_parameters.get('streamflow_da',{}).get('streamflow_nudging',False) - usgs_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usgs',False) - usace_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usace',False) - rfc_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_rfc_da',{}).get('reservoir_rfc_forecasts',False) - if any([streamflow_nudging, usgs_da, usace_da, rfc_da]): - layers.append('network') + # Retrieve available layers from the GeoPackage + available_layers = fiona.listlayers(file_path) + + # patterns for the layers we want to find + layer_patterns = { + 'flowpaths': r'flow[-_]?paths?|flow[-_]?lines?', + 'flowpath_attributes': r'flow[-_]?path[-_]?attributes?|flow[-_]?line[-_]?attributes?', + 'lakes': r'lakes?', + 'nexus': r'nexus?', + 'network': r'network' + } + + # Match available layers to the patterns + matched_layers = {key: find_layer_name(available_layers, pattern) + for key, pattern in layer_patterns.items()} - # If diffusive is activated, read nexus table for lat/lon information - hybrid_parameters = compute_parameters.get('hybrid_parameters',{}) - hybrid_routing = hybrid_parameters.get('run_hybrid_routing', False) - if hybrid_routing & ('nexus' not in layers): - layers.append('nexus') + layers_to_read = ['flowpaths', 'flowpath_attributes'] - # Retrieve geopackage information: + if waterbody_parameters.get('break_network_at_waterbodies', False): + layers_to_read.extend(['lakes', 'nexus']) + + data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters', {}) + if any([ + data_assimilation_parameters.get('streamflow_da', {}).get('streamflow_nudging', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usgs', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usace', False), + data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_rfc_da', {}).get('reservoir_rfc_forecasts', False) + ]): + layers_to_read.append('network') + + hybrid_parameters = compute_parameters.get('hybrid_parameters', {}) + if hybrid_parameters.get('run_hybrid_routing', False) and 'nexus' not in layers_to_read: + layers_to_read.append('nexus') + + # Function that read a layer from the geopackage + def read_layer(layer_name): + if layer_name: + try: + return gpd.read_file(file_path, layer=layer_name) + except Exception as e: + print(f"Error reading {layer_name}: {e}") + return pd.DataFrame() + return pd.DataFrame() + + # Retrieve geopackage information using matched layer names if cpu_pool > 1: - with Parallel(n_jobs=len(layers)) as parallel: - jobs = [] - for layer in layers: - jobs.append( - delayed(gpd.read_file) - #(f, qlat_file_value_col, gw_bucket_col, terrain_ro_col) - #delayed(nhd_io.get_ql_from_csv) - (filename=file_path, layer=layer) - ) - gpkg_list = parallel(jobs) - table_dict = {layers[i]: gpkg_list[i] for i in range(len(layers))} - flowpaths = pd.merge(table_dict.get('flowpaths'), table_dict.get('flowpath_attributes'), on='id') - lakes = table_dict.get('lakes', pd.DataFrame()) - network = table_dict.get('network', pd.DataFrame()) - nexus = table_dict.get('nexus', pd.DataFrame()) + with Parallel(n_jobs=min(cpu_pool, len(layers_to_read))) as parallel: + gpkg_list = parallel(delayed(read_layer)(matched_layers[layer]) for layer in layers_to_read) + + table_dict = {layers_to_read[i]: gpkg_list[i] for i in range(len(layers_to_read))} else: - flowpaths = gpd.read_file(file_path, layer='flowpaths') - flowpath_attributes = gpd.read_file(file_path, layer='flowpath_attributes') - flowpaths = pd.merge(flowpaths, flowpath_attributes, on='id') - # If waterbodies are being simulated, read lakes table - lakes = pd.DataFrame() - if 'lakes' in layers: - lakes = gpd.read_file(file_path, layer='lakes') - # If any DA is activated, read network table as well for gage information - network = pd.DataFrame() - if 'network' in layers: - network = gpd.read_file(file_path, layer='network') - # If diffusive is activated, read nexus table for lat/lon information - nexus = pd.DataFrame() - if 'nexus' in layers: - nexus = gpd.read_file(file_path, layer='nexus') + table_dict = {layer: read_layer(matched_layers[layer]) for layer in layers_to_read} + + # Handle different key column names between flowpaths and flowpath_attributes + flowpaths_df = table_dict.get('flowpaths', pd.DataFrame()) + flowpath_attributes_df = table_dict.get('flowpath_attributes', pd.DataFrame()) + + # Check if 'link' column exists and rename it to 'id' + if 'link' in flowpath_attributes_df.columns: + flowpath_attributes_df.rename(columns={'link': 'id'}, inplace=True) + + # Merge flowpaths and flowpath_attributes + flowpaths = pd.merge( + flowpaths_df, + flowpath_attributes_df, + on='id', + how='inner' + ) + + lakes = table_dict.get('lakes', pd.DataFrame()) + network = table_dict.get('network', pd.DataFrame()) + nexus = table_dict.get('nexus', pd.DataFrame()) return flowpaths, lakes, network, nexus