Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizing read_geopkg function #819

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 79 additions & 54 deletions src/troute-network/troute/HYFeaturesNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,96 @@
from datetime import datetime
from pprint import pformat
import os

import fiona
import troute.nhd_io as nhd_io #FIXME
from troute.nhd_network import reverse_dict, extract_connections, reverse_network, reachable
from .rfc_lake_gage_crosswalk import get_rfc_lake_gage_crosswalk, get_great_lakes_climatology

import re
__verbose__ = False
__showtiming__ = False

def find_layer_name(layers, pattern):
"""
Find a layer name in the list of layers that matches the regex pattern.
"""
for layer in layers:
if re.search(pattern, layer, re.IGNORECASE):
return layer
return None

def read_geopkg(file_path, compute_parameters, waterbody_parameters, cpu_pool):
# Establish which layers we will read. We'll always need the flowpath tables
layers = ['flowpaths','flowpath_attributes']

# If waterbodies are being simulated, read lakes table
if waterbody_parameters.get('break_network_at_waterbodies',False):
layers.append('lakes')
layers.append('nexus')

# If any DA is activated, read network table as well for gage information
data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters',{})
streamflow_nudging = data_assimilation_parameters.get('streamflow_da',{}).get('streamflow_nudging',False)
usgs_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usgs',False)
usace_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_persistence_usace',False)
rfc_da = data_assimilation_parameters.get('reservoir_da',{}).get('reservoir_rfc_da',{}).get('reservoir_rfc_forecasts',False)
if any([streamflow_nudging, usgs_da, usace_da, rfc_da]):
layers.append('network')
# Retrieve available layers from the GeoPackage
available_layers = fiona.listlayers(file_path)

# patterns for the layers we want to find
layer_patterns = {
'flowpaths': r'flow[-_]?paths?|flow[-_]?lines?',
'flowpath_attributes': r'flow[-_]?path[-_]?attributes?|flow[-_]?line[-_]?attributes?',
'lakes': r'lakes?',
'nexus': r'nexus?',
'network': r'network'
}

# Match available layers to the patterns
matched_layers = {key: find_layer_name(available_layers, pattern)
for key, pattern in layer_patterns.items()}

# If diffusive is activated, read nexus table for lat/lon information
hybrid_parameters = compute_parameters.get('hybrid_parameters',{})
hybrid_routing = hybrid_parameters.get('run_hybrid_routing', False)
if hybrid_routing & ('nexus' not in layers):
layers.append('nexus')
layers_to_read = ['flowpaths', 'flowpath_attributes']

# Retrieve geopackage information:
if waterbody_parameters.get('break_network_at_waterbodies', False):
layers_to_read.extend(['lakes', 'nexus'])

data_assimilation_parameters = compute_parameters.get('data_assimilation_parameters', {})
if any([
data_assimilation_parameters.get('streamflow_da', {}).get('streamflow_nudging', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usgs', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_usace', False),
data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_rfc_da', {}).get('reservoir_rfc_forecasts', False)
]):
layers_to_read.append('network')

hybrid_parameters = compute_parameters.get('hybrid_parameters', {})
if hybrid_parameters.get('run_hybrid_routing', False) and 'nexus' not in layers_to_read:
layers_to_read.append('nexus')

# Function that read a layer from the geopackage
def read_layer(layer_name):
if layer_name:
try:
return gpd.read_file(file_path, layer=layer_name)
except Exception as e:
print(f"Error reading {layer_name}: {e}")
return pd.DataFrame()
return pd.DataFrame()

# Retrieve geopackage information using matched layer names
if cpu_pool > 1:
with Parallel(n_jobs=len(layers)) as parallel:
jobs = []
for layer in layers:
jobs.append(
delayed(gpd.read_file)
#(f, qlat_file_value_col, gw_bucket_col, terrain_ro_col)
#delayed(nhd_io.get_ql_from_csv)
(filename=file_path, layer=layer)
)
gpkg_list = parallel(jobs)
table_dict = {layers[i]: gpkg_list[i] for i in range(len(layers))}
flowpaths = pd.merge(table_dict.get('flowpaths'), table_dict.get('flowpath_attributes'), on='id')
lakes = table_dict.get('lakes', pd.DataFrame())
network = table_dict.get('network', pd.DataFrame())
nexus = table_dict.get('nexus', pd.DataFrame())
with Parallel(n_jobs=min(cpu_pool, len(layers_to_read))) as parallel:
gpkg_list = parallel(delayed(read_layer)(matched_layers[layer]) for layer in layers_to_read)

table_dict = {layers_to_read[i]: gpkg_list[i] for i in range(len(layers_to_read))}
else:
flowpaths = gpd.read_file(file_path, layer='flowpaths')
flowpath_attributes = gpd.read_file(file_path, layer='flowpath_attributes')
flowpaths = pd.merge(flowpaths, flowpath_attributes, on='id')
# If waterbodies are being simulated, read lakes table
lakes = pd.DataFrame()
if 'lakes' in layers:
lakes = gpd.read_file(file_path, layer='lakes')
# If any DA is activated, read network table as well for gage information
network = pd.DataFrame()
if 'network' in layers:
network = gpd.read_file(file_path, layer='network')
# If diffusive is activated, read nexus table for lat/lon information
nexus = pd.DataFrame()
if 'nexus' in layers:
nexus = gpd.read_file(file_path, layer='nexus')
table_dict = {layer: read_layer(matched_layers[layer]) for layer in layers_to_read}

# Handle different key column names between flowpaths and flowpath_attributes
flowpaths_df = table_dict.get('flowpaths', pd.DataFrame())
flowpath_attributes_df = table_dict.get('flowpath_attributes', pd.DataFrame())

# Check if 'link' column exists and rename it to 'id'
if 'link' in flowpath_attributes_df.columns:
flowpath_attributes_df.rename(columns={'link': 'id'}, inplace=True)

# Merge flowpaths and flowpath_attributes
flowpaths = pd.merge(
flowpaths_df,
flowpath_attributes_df,
on='id',
how='inner'
)

lakes = table_dict.get('lakes', pd.DataFrame())
network = table_dict.get('network', pd.DataFrame())
nexus = table_dict.get('nexus', pd.DataFrame())

return flowpaths, lakes, network, nexus

Expand Down
Loading