diff --git a/hydrography-approach/config.yml b/hydrography-approach/config.yml index b8e0b40..e08515e 100644 --- a/hydrography-approach/config.yml +++ b/hydrography-approach/config.yml @@ -35,4 +35,8 @@ output_files: bridge_with_proj_points: "output-data/{{ state }}/csv-files/bridge-osm-association-with-projected-points.csv" bridge_match_percentage: "output-data/{{ state }}/csv-files/Association-match-check-with-percentage.csv" final_bridges_csv: "output-data/{{ state }}/csv-files/Final-bridges-with-percentage-match.csv" + +logging: + log_file_path: "hydrography-pipeline.log" + diff --git a/hydrography-approach/processing_scripts/associate_data/calculate_match_percentage.py b/hydrography-approach/processing_scripts/associate_data/calculate_match_percentage.py index acccfe7..739870b 100644 --- a/hydrography-approach/processing_scripts/associate_data/calculate_match_percentage.py +++ b/hydrography-approach/processing_scripts/associate_data/calculate_match_percentage.py @@ -2,33 +2,52 @@ from fuzzywuzzy import fuzz -# Function to calculate similarity -def calculate_osm_similarity(row): +def calculate_osm_similarity(row: pd.Series) -> int: + """ + Calculate the similarity between OSM name and Facility Carried By Structure. + """ return fuzz.token_sort_ratio( row["osm_name"], row["7 - Facility Carried By Structure"] ) -def calculate_nhd_similarity(row): +def calculate_nhd_similarity(row: pd.Series) -> int: + """ + Calculate the similarity between stream name and Features Intersected. + """ return fuzz.token_sort_ratio(row["stream_name"], row["6A - Features Intersected"]) -def calculate_cross_similarity_1(row): +def calculate_cross_similarity_1(row: pd.Series) -> int: + """ + Calculate the similarity between OSM name and Features Intersected. + """ return fuzz.token_sort_ratio(row["osm_name"], row["6A - Features Intersected"]) -def calculate_cross_similarity_2(row): +def calculate_cross_similarity_2(row: pd.Series) -> int: + """ + Calculate the similarity between stream name and Facility Carried By Structure. + """ return fuzz.token_sort_ratio( row["stream_name"], row["7 - Facility Carried By Structure"] ) -def run(bridge_with_proj_points, bridge_match_percentage): - df = pd.read_csv(bridge_with_proj_points) +def run(bridge_with_proj_points: str, bridge_match_percentage: str) -> None: + """ + Read the CSV file, calculate similarity scores, and save the results to a CSV file. + """ + try: + df = pd.read_csv(bridge_with_proj_points) - # Apply the function row-wise - df["osm_similarity"] = df.apply(calculate_osm_similarity, axis=1) - df["nhd_similarity"] = df.apply(calculate_nhd_similarity, axis=1) + # Apply the function row-wise + df["osm_similarity"] = df.apply(calculate_osm_similarity, axis=1) + df["nhd_similarity"] = df.apply(calculate_nhd_similarity, axis=1) - # Save the DataFrame with similarity scores - df.to_csv(bridge_match_percentage, index=False) + # Save the DataFrame with similarity scores + df.to_csv(bridge_match_percentage, index=False) + + except Exception as e: + print(f"Error processing similarity calculations: {e}") + raise diff --git a/hydrography-approach/processing_scripts/associate_data/determine_final_osm_id.py b/hydrography-approach/processing_scripts/associate_data/determine_final_osm_id.py index 2a6e2f2..ddaa9ed 100644 --- a/hydrography-approach/processing_scripts/associate_data/determine_final_osm_id.py +++ b/hydrography-approach/processing_scripts/associate_data/determine_final_osm_id.py @@ -1,103 +1,120 @@ +import logging import math +from typing import Optional, Tuple import pandas as pd -def haversine(lon1, lat1, lon2, lat2): +def haversine(lon1: float, lat1: float, lon2: float, lat2: float) -> float: """ - Function to calculate Haversine distance among two points + Function to calculate Haversine distance between two points. + + :param lon1: Longitude of the first point. + :param lat1: Latitude of the first point. + :param lon2: Longitude of the second point. + :param lat2: Latitude of the second point. + :return: Haversine distance in kilometers. """ # Radius of the Earth in kilometers R = 6371.0 - # Convert latitude and longitude from degrees to radians - lon1 = math.radians(lon1) - lat1 = math.radians(lat1) - lon2 = math.radians(lon2) - lat2 = math.radians(lat2) + try: + # Convert latitude and longitude from degrees to radians + lon1 = math.radians(lon1) + lat1 = math.radians(lat1) + lon2 = math.radians(lon2) + lat2 = math.radians(lat2) - # Compute differences between the coordinates - dlon = lon2 - lon1 - dlat = lat2 - lat1 + # Compute differences between the coordinates + dlon = lon2 - lon1 + dlat = lat2 - lat1 - # Haversine formula - a = ( - math.sin(dlat / 2) ** 2 - + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 - ) - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + # Haversine formula + a = ( + math.sin(dlat / 2) ** 2 + + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 + ) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + # Distance in kilometers + distance = R * c - # Distance in kilometers - distance = R * c + return distance - return distance + except TypeError as e: + raise ValueError("Invalid input type. All inputs must be numeric.") from e -def extract_coordinates(wkt): +def extract_coordinates(wkt: Optional[str]) -> Tuple[Optional[float], Optional[float]]: """ - Function to extract latitude and longitude from WKT (Well-Known Text) format + Function to extract latitude and longitude from WKT (Well-Known Text) format. + + :param wkt: WKT string containing the coordinates. + :return: Tuple containing latitude and longitude. """ if pd.isna(wkt): return None, None - # Remove 'POINT (' and ')' - coords = wkt.replace("POINT (", "").replace(")", "") - # Split the coordinates - lon, lat = coords.split() - return float(lat), float(lon) + + try: + # Remove 'POINT (' and ')' + coords = wkt.replace("POINT (", "").replace(")", "") + # Split the coordinates + lon, lat = coords.split() + return float(lat), float(lon) + + except (AttributeError, ValueError) as e: + raise ValueError("Invalid WKT format. Could not extract coordinates.") from e -def determine_final_osm_id(group): +def determine_final_osm_id(group: pd.DataFrame) -> pd.Series: """ - Function to determine the final_osm_id, final_long, and final_lat for each group + Function to determine the final_osm_id, final_long, and final_lat for each group. + + :param group: DataFrame containing group data. + :return: Series with final_osm_id, osm_name, final_stream_id, stream_name, final_long, final_lat. """ - true_stream = group[group["Is_Stream_Identical"]] - min_dist = group[group["Is_Min_Dist"]] - if group["combo-count"].iloc[0] == 1: - # If there is only one unique OSM id - osm_id = group["osm_id"].iloc[0] - osm_name = group["name"].iloc[0] - if len(min_dist) == 0: - stream_id = pd.NA - stream_name = pd.NA - else: - stream_id = min_dist["permanent_identifier_x"].iloc[0] - stream_name = min_dist["gnis_name"].iloc[0] - - if len(true_stream) == 1: - long, lat = true_stream[["Long_intersection", "Lat_intersection"]].iloc[0] - elif len(true_stream) > 1: - min_dist_match = true_stream[true_stream["Is_Min_Dist"]] - if not min_dist_match.empty: - long, lat = min_dist_match[ - ["Long_intersection", "Lat_intersection"] - ].iloc[0] + try: + true_stream = group[group["Is_Stream_Identical"]] + min_dist = group[group["Is_Min_Dist"]] + + if group["combo-count"].iloc[0] == 1: + # If there is only one unique OSM id + osm_id = group["osm_id"].iloc[0] + osm_name = group["name"].iloc[0] + + if len(min_dist) == 0: + stream_id = pd.NA + stream_name = pd.NA else: + stream_id = min_dist["permanent_identifier_x"].iloc[0] + stream_name = min_dist["gnis_name"].iloc[0] + + if len(true_stream) == 1: long, lat = true_stream[["Long_intersection", "Lat_intersection"]].iloc[ 0 ] - else: - # If there are no rows with stream_check as TRUE, use MIN-DIST - if not min_dist.empty: - long, lat = min_dist[["Long_intersection", "Lat_intersection"]].iloc[0] + elif len(true_stream) > 1: + min_dist_match = true_stream[true_stream["Is_Min_Dist"]] + if not min_dist_match.empty: + long, lat = min_dist_match[ + ["Long_intersection", "Lat_intersection"] + ].iloc[0] + else: + long, lat = true_stream[ + ["Long_intersection", "Lat_intersection"] + ].iloc[0] else: - long, lat = group[["Long_intersection", "Lat_intersection"]].iloc[0] - else: - if len(true_stream) == 1: - # If there is exactly one OSM id with stream_check as TRUE - osm_id, osm_name, stream_id, stream_name, long, lat = true_stream[ - [ - "osm_id", - "name", - "permanent_identifier_x", - "gnis_name", - "Long_intersection", - "Lat_intersection", - ] - ].iloc[0] + # If there are no rows with stream_check as TRUE, use MIN-DIST + if not min_dist.empty: + long, lat = min_dist[ + ["Long_intersection", "Lat_intersection"] + ].iloc[0] + else: + long, lat = group[["Long_intersection", "Lat_intersection"]].iloc[0] else: - # If there are multiple OSM ids with stream_check as TRUE, use 'MIN-DIST' - if not min_dist.empty: - osm_id, osm_name, stream_id, stream_name, long, lat = min_dist[ + if len(true_stream) == 1: + # If there is exactly one OSM id with stream_check as TRUE + osm_id, osm_name, stream_id, stream_name, long, lat = true_stream[ [ "osm_id", "name", @@ -108,61 +125,108 @@ def determine_final_osm_id(group): ] ].iloc[0] else: - osm_id, osm_name, stream_id, stream_name, long, lat = [ - pd.NA, - pd.NA, - pd.NA, - pd.NA, - pd.NA, - pd.NA, - ] - return pd.Series( - [osm_id, osm_name, stream_id, stream_name, long, lat], - index=[ - "final_osm_id", - "osm_name", - "final_stream_id", - "stream_name", - "final_long", - "final_lat", - ], - ) - + # If there are multiple OSM ids with stream_check as TRUE, use 'MIN-DIST' + if not min_dist.empty: + osm_id, osm_name, stream_id, stream_name, long, lat = min_dist[ + [ + "osm_id", + "name", + "permanent_identifier_x", + "gnis_name", + "Long_intersection", + "Lat_intersection", + ] + ].iloc[0] + else: + osm_id, osm_name, stream_id, stream_name, long, lat = [ + pd.NA, + pd.NA, + pd.NA, + pd.NA, + pd.NA, + pd.NA, + ] -def merge_join_data_with_intersections(all_join_csv, intersections_csv): + return pd.Series( + [osm_id, osm_name, stream_id, stream_name, long, lat], + index=[ + "final_osm_id", + "osm_name", + "final_stream_id", + "stream_name", + "final_long", + "final_lat", + ], + ) + + except (IndexError, KeyError) as e: + raise ValueError( + "Error processing group data for final OSM ID determination." + ) from e + + +def merge_join_data_with_intersections( + all_join_csv: str, intersections_csv: str +) -> pd.DataFrame: """ Function to tag all data join result with intersections information. - """ - # Load the final join data - final_join_data = pd.read_csv(all_join_csv) - - # Load the intersection data - intersection_data = pd.read_csv(intersections_csv, low_memory=False) - intersection_data = intersection_data[ - ["WKT", "osm_id", "permanent_identifier", "gnis_name"] - ] - - # Ensure 'osm_id' and 'permanent_identifier_x' in df are of the same type as df2 columns - final_join_data["osm_id"] = final_join_data["osm_id"] - final_join_data["permanent_identifier_x"] = final_join_data[ - "permanent_identifier_x" - ] - - # Perform the left merge - df = pd.merge( - final_join_data, - intersection_data, - how="left", - left_on=["osm_id", "permanent_identifier_x"], - right_on=["osm_id", "permanent_identifier"], - ) - - return df + :param all_join_csv: Path to the CSV file with the final join data. + :param intersections_csv: Path to the CSV file with intersection data. + :return: DataFrame with merged data containing intersections information. + """ + try: + # Load the final join data + final_join_data = pd.read_csv(all_join_csv) + + # Load the intersection data + intersection_data = pd.read_csv(intersections_csv, low_memory=False) + intersection_data = intersection_data[ + ["WKT", "osm_id", "permanent_identifier", "gnis_name"] + ] -def create_intermediate_association(df, intermediate_association): + # Ensure 'osm_id' and 'permanent_identifier_x' in df are of the same type as df2 columns + final_join_data["osm_id"] = final_join_data["osm_id"].astype(str) + final_join_data["permanent_identifier_x"] = final_join_data[ + "permanent_identifier_x" + ].astype(str) + + intersection_data["osm_id"] = intersection_data["osm_id"].astype(str) + intersection_data["permanent_identifier"] = intersection_data[ + "permanent_identifier" + ].astype(str) + + # Perform the left merge + merged_df = pd.merge( + final_join_data, + intersection_data, + how="left", + left_on=["osm_id", "permanent_identifier_x"], + right_on=["osm_id", "permanent_identifier"], + ) + + return merged_df + + except FileNotFoundError as e: + raise FileNotFoundError("One of the CSV files could not be found.") from e + except pd.errors.EmptyDataError as e: + raise ValueError("One of the CSV files is empty.") from e + except KeyError as e: + raise KeyError("A required column is missing from the CSV files.") from e + except Exception as e: + raise RuntimeError("An unexpected error occurred while merging data.") from e + + +def create_intermediate_association( + df: pd.DataFrame, intermediate_association: str, logger: logging.Logger +) -> pd.DataFrame: """ Function to create intermediate association among bridges and ways. + + :param df: DataFrame containing bridge and way data. + :param intermediate_association: Path to the output CSV file. + :param logger: Logger object for logging information and errors. + :return: DataFrame with additional columns and calculations. """ # Apply the function to the WKT column to create new columns df[["Lat_intersection", "Long_intersection"]] = df["WKT"].apply( @@ -199,15 +263,25 @@ def create_intermediate_association(df, intermediate_association): ].transform("nunique") # Save intermediate results - df.to_csv(intermediate_association) - print(f"\n{intermediate_association} file has been created successfully!") + try: + df.to_csv(intermediate_association) + logger.info(f"{intermediate_association} file has been created successfully!") + except IOError as e: + logger.error(f"Failed to write {intermediate_association}: {str(e)}") return df -def create_final_associations(df, association_with_intersections): +def create_final_associations( + df: pd.DataFrame, association_with_intersections: str, logger: logging.Logger +) -> pd.DataFrame: """ Function to create final association among bridges and ways. + + :param df: DataFrame containing bridge and way data. + :param association_with_intersections: Path to the output CSV file. + :param logger: Logger object for logging information and errors. + :return: DataFrame with final associations. """ # Group by 'BRIDGE_ID' and calculate the number of unique 'osm_id's for each group unique_osm_count = ( @@ -229,18 +303,33 @@ def create_final_associations(df, association_with_intersections): df = df.merge(final_values_df, on="8 - Structure Number", how="left") # Save the updated dataframe to a new CSV file - df.to_csv( - association_with_intersections, - index=False, - ) - print(f"\n{association_with_intersections} file has been created successfully!") + try: + df.to_csv( + association_with_intersections, + index=False, + ) + logger.info( + f"{association_with_intersections} file has been created successfully!" + ) + except IOError as e: + logger.error(f"Failed to write {association_with_intersections}: {str(e)}") return df -def add_bridge_details(df, nbi_bridge_data, bridge_association_lengths): +def add_bridge_details( + df: pd.DataFrame, + nbi_bridge_data: str, + bridge_association_lengths: str, + logger: logging.Logger, +) -> None: """ Function to add bridge information to associated data. + + :param df: DataFrame containing bridge and associated data. + :param nbi_bridge_data: Path to the NBI bridge data CSV file. + :param bridge_association_lengths: Path to the output CSV file with bridge details. + :param logger: Logger object for logging information and errors. """ bridge_data_df = pd.read_csv( nbi_bridge_data, @@ -285,26 +374,48 @@ def add_bridge_details(df, nbi_bridge_data, bridge_association_lengths): ) # Save the resulting DataFrame to a new CSV file - result_df.to_csv( - bridge_association_lengths, - index=False, - ) - print( - f"\n{bridge_association_lengths} file has been created successfully!" - ) + try: + result_df.to_csv( + bridge_association_lengths, + index=False, + ) + logger.info(f"{bridge_association_lengths} file has been created successfully!") + except IOError as e: + logger.error(f"Failed to write {bridge_association_lengths}: {str(e)}") def process_final_id( - all_join_csv, - intersections_csv, - intermediate_association, - association_with_intersections, - nbi_bridge_data, - bridge_association_lengths -): + all_join_csv: str, + intersections_csv: str, + intermediate_association: str, + association_with_intersections: str, + nbi_bridge_data: str, + bridge_association_lengths: str, + logger: logging.Logger, +) -> None: + """ + Function to process and merge data, creating final bridge associations and details. + + :param all_join_csv: Path to the CSV file with all join data. + :param intersections_csv: Path to the CSV file with intersection data. + :param intermediate_association: Path to save intermediate association results. + :param association_with_intersections: Path to save final association results with intersections. + :param nbi_bridge_data: Path to the NBI bridge data CSV file. + :param bridge_association_lengths: Path to save bridge details. + :param logger: Logger object for logging information and errors. + """ + # Merge join data with intersections df = merge_join_data_with_intersections(all_join_csv, intersections_csv) - intermediate_df = create_intermediate_association(df, intermediate_association) + + # Create intermediate association + intermediate_df = create_intermediate_association( + df, intermediate_association, logger + ) + + # Create final associations final_df = create_final_associations( - intermediate_df, association_with_intersections + intermediate_df, association_with_intersections, logger ) - add_bridge_details(final_df, nbi_bridge_data, bridge_association_lengths) + + # Add bridge details + add_bridge_details(final_df, nbi_bridge_data, bridge_association_lengths, logger) diff --git a/hydrography-approach/processing_scripts/associate_data/exclude_nearby_bridges.py b/hydrography-approach/processing_scripts/associate_data/exclude_nearby_bridges.py index bd7521c..31607e4 100644 --- a/hydrography-approach/processing_scripts/associate_data/exclude_nearby_bridges.py +++ b/hydrography-approach/processing_scripts/associate_data/exclude_nearby_bridges.py @@ -1,17 +1,24 @@ +from typing import Callable + import pandas as pd -def load_bridge_info(csv_file): +def load_bridge_info(csv_file: str) -> pd.DataFrame: """Load bridge information CSV into a DataFrame.""" return pd.read_csv(csv_file) -def load_nearby_join(csv_file): +def load_nearby_join(csv_file: str) -> pd.DataFrame: """Load nearby join CSV into a DataFrame.""" return pd.read_csv(csv_file) -def filter_duplicates_and_output(bridge_df, join_df, output_csv): +def filter_duplicates_and_output( + bridge_df: pd.DataFrame, + join_df: pd.DataFrame, + output_csv: str, + logger: Callable[[str], None], +) -> None: """Filter duplicates based on osm_similarity score and output filtered bridge info.""" filtered_df = join_df[ @@ -36,35 +43,36 @@ def filter_duplicates_and_output(bridge_df, join_df, output_csv): bridge_df["8 - Structure Number"] == sn2, "osm_similarity" ].values[0] + # Determine which ID to retain based on osm_similarity score + if osm_similarity_sn1 > osm_similarity_sn2: + remove_ids.add(sn2) + elif osm_similarity_sn2 > osm_similarity_sn1: + remove_ids.add(sn1) + else: + remove_ids.add(sn2) # Arbitrarily keep sn1 if scores are equal + except IndexError: # Handle the case where ID is not found in bridge_df - print(f"id {sn1} or {sn2} not found in bridge_df") + logger(f"id {sn1} or {sn2} not found in bridge_df") continue - # Determine which ID to retain based on osm_similarity score - if osm_similarity_sn1 > osm_similarity_sn2: - remove_ids.add(sn2) - elif osm_similarity_sn2 > osm_similarity_sn1: - remove_ids.add(sn1) - else: - remove_ids.add(sn2) # Arbitrarily keep sn1 if scores are equal - else: - continue - - # Print set of IDs that are retained - print("IDs to be removed:", remove_ids) - - # Filter bridge_df based on retain_ids and output to a new CSV + # Filter bridge_df based on remove_ids and output to a new CSV filtered_bridge_df = bridge_df[~bridge_df["8 - Structure Number"].isin(remove_ids)] filtered_bridge_df.to_csv(output_csv, index=False) - print(f"Filtered bridge information saved to '{output_csv}'.") + logger(f"Filtered bridge information saved to '{output_csv}'.") -def run(bridge_match_percentage, nearby_join_csv, final_bridges_csv): +def run( + bridge_match_percentage: str, + nearby_join_csv: str, + final_bridges_csv: str, + logger: Callable[[str], None], +) -> None: + """Load data, filter duplicates, and save the filtered bridge info.""" # Load data bridge_df = load_bridge_info(bridge_match_percentage) join_df = load_nearby_join(nearby_join_csv) # Filter duplicates based on osm_similarity score and output filtered bridge info - filter_duplicates_and_output(bridge_df, join_df, final_bridges_csv) + filter_duplicates_and_output(bridge_df, join_df, final_bridges_csv, logger) diff --git a/hydrography-approach/processing_scripts/associate_data/get_point_projections_on_ways.py b/hydrography-approach/processing_scripts/associate_data/get_point_projections_on_ways.py index ec1bb47..bc9d022 100644 --- a/hydrography-approach/processing_scripts/associate_data/get_point_projections_on_ways.py +++ b/hydrography-approach/processing_scripts/associate_data/get_point_projections_on_ways.py @@ -1,32 +1,38 @@ +import logging +from typing import Dict, List + import geopandas as gpd import pandas as pd +from shapely.geometry import LineString, Point + +def project_point_on_line(point: Point, line: LineString) -> Point: + """Project a point onto a line and return the projected point.""" + return line.interpolate(line.project(point)) -def project_point_on_line(point, line): - # Calculate the projected point on the line - projected_point = line.interpolate(line.project(point)) - return projected_point +def run( + final_bridges: str, + filtered_highways: str, + bridge_association_lengths: str, + bridge_with_proj_points: str, + logger: logging.Logger, +) -> None: + """Project bridge points onto OSM ways and save the results to a CSV file.""" -def run(final_bridges, filtered_highways, bridge_association_lengths, bridge_with_proj_points): # Load geopackage files - bridge_points_gdf = gpd.read_file( - final_bridges - ) - osm_ways_gdf = gpd.read_file( - filtered_highways, layer="lines" - ) + bridge_points_gdf = gpd.read_file(final_bridges) + osm_ways_gdf = gpd.read_file(filtered_highways, layer="lines") # Load CSV file - associations_df = pd.read_csv( - bridge_association_lengths - ) + associations_df = pd.read_csv(bridge_association_lengths) # Ensure CRS is consistent - bridge_points_gdf = bridge_points_gdf.to_crs(epsg=4326) - osm_ways_gdf = osm_ways_gdf.to_crs(epsg=4326) + if bridge_points_gdf.crs != osm_ways_gdf.crs: + bridge_points_gdf = bridge_points_gdf.to_crs(epsg=4326) + osm_ways_gdf = osm_ways_gdf.to_crs(epsg=4326) - # Trim whitespace from 8 - Structure Number in associations_df and bridge_points_gdf + # Trim whitespace from structure numbers associations_df["8 - Structure Number"] = associations_df[ "8 - Structure Number" ].str.strip() @@ -34,7 +40,7 @@ def run(final_bridges, filtered_highways, bridge_association_lengths, bridge_wit "8 - Structure Number" ].str.strip() - projected_data = [] + projected_data: List[Dict] = [] for _, row in associations_df.iterrows(): structure_number = row["8 - Structure Number"] @@ -57,7 +63,6 @@ def run(final_bridges, filtered_highways, bridge_association_lengths, bridge_wit # Project the bridge point onto the OSM way projected_point = project_point_on_line(bridge_point, osm_way) - # Append the result to the list projected_data.append( { "8 - Structure Number": structure_number, @@ -69,12 +74,13 @@ def run(final_bridges, filtered_highways, bridge_association_lengths, bridge_wit "7 - Facility Carried By Structure": row[ "7 - Facility Carried By Structure" ], - "bridge_length": round(row["bridge_length"]/3.281,2), + "bridge_length": round(row["bridge_length"] / 3.281, 2), "projected_long": projected_point.x, "projected_lat": projected_point.y, } ) - except (ValueError, KeyError, IndexError): + + except (ValueError, KeyError, IndexError) as e: # Handle cases where final_osm_id is NaN or OSM way is not found projected_data.append( { @@ -87,17 +93,15 @@ def run(final_bridges, filtered_highways, bridge_association_lengths, bridge_wit "7 - Facility Carried By Structure": row[ "7 - Facility Carried By Structure" ], - "bridge_length": round(row["bridge_length"]/3.281,2), + "bridge_length": round(row["bridge_length"] / 3.281, 2), "projected_long": "", "projected_lat": "", } ) + logger.error(f"Error processing structure number {structure_number}: {e}") # Create output DataFrame output_df = pd.DataFrame(projected_data) # Save to CSV - output_df.to_csv( - bridge_with_proj_points, - index=False, - ) \ No newline at end of file + output_df.to_csv(bridge_with_proj_points, index=False) diff --git a/hydrography-approach/processing_scripts/associate_data/join_all_data.py b/hydrography-approach/processing_scripts/associate_data/join_all_data.py index 76d9d89..efe8ff5 100644 --- a/hydrography-approach/processing_scripts/associate_data/join_all_data.py +++ b/hydrography-approach/processing_scripts/associate_data/join_all_data.py @@ -1,67 +1,70 @@ import os import shutil +from typing import Dict, Optional import dask.dataframe as dd import pandas as pd -def process_all_join(nbi_30_join_csv, nbi_10_join_csv, all_join_dask, all_join_csv): - left_csv = nbi_30_join_csv - right_csv = nbi_10_join_csv +def process_all_join( + nbi_30_join_csv: str, + nbi_10_join_csv: str, + all_join_dask: str, + all_join_csv: str, + logger: Optional[object] = None, +) -> None: + """Process join of NBI 30 and NBI 10 CSVs using Dask and save the result to a single CSV file.""" # Specify the data types for the CSV columns - dtype_left = { + dtype: Dict[str, str] = { "OBJECTID_2": "float64", "osm_id": "float64", "permanent_identifier": "object", # Add other columns with their expected data types } - dtype_right = { - "OBJECTID_2": "float64", - "osm_id": "float64", - "permanent_identifier": "object", - # Add other columns with their expected data types - } + try: + # Load the CSV files into Dask DataFrames with specified dtypes + left_ddf = dd.read_csv(nbi_30_join_csv, dtype=dtype) + right_ddf = dd.read_csv(nbi_10_join_csv, dtype=dtype) + + # Perform a left join on the '8 - Structure Number' column + result_ddf = left_ddf.merge(right_ddf, on="8 - Structure Number", how="left") + + # Ensure the output directory exists + os.makedirs(all_join_dask, exist_ok=True) - # Load the CSV files into Dask DataFrames with specified dtypes - left_ddf = dd.read_csv( - left_csv, - dtype=dtype_left, - ) - right_ddf = dd.read_csv( - right_csv, - dtype=dtype_right, - ) + # Save the result to a directory with multiple part files + result_ddf.to_csv( + os.path.join(all_join_dask, "*.csv"), index=False, single_file=False + ) - # Perform a left join on the 'bridge_id' column - result_ddf = left_ddf.merge(right_ddf, on="8 - Structure Number", how="left") + # Ensure the Dask computations are done before combining files + dd.compute() - # Save the result to a directory with multiple part files - result_ddf.to_csv( - all_join_dask + "/*.csv", - index=False, - ) + # List the part files + part_files = sorted( + os.path.join(all_join_dask, f) + for f in os.listdir(all_join_dask) + if f.endswith(".csv") + ) - # Ensure the Dask computations are done before combining files - dd.compute() + # Combine the part files into a single DataFrame + combined_df = pd.concat(pd.read_csv(file) for file in part_files) - # List the part files - part_files = sorted( - os.path.join(all_join_dask, f) - for f in os.listdir(all_join_dask) - if f.endswith(".csv") - ) + # Save the combined DataFrame to a single CSV file + combined_df.to_csv(all_join_csv, index=False) - # Combine the part files into a single DataFrame - combined_df = pd.concat(pd.read_csv(file) for file in part_files) + if logger: + logger.info(f"Output file: {all_join_csv} has been created successfully!") - # Save the combined DataFrame to a single CSV file - combined_df.to_csv( - all_join_csv, - index=False, - ) - print(f"Output file: {all_join_csv} has been created successfully!") + except Exception as e: + if logger: + logger.error(f"An error occurred: {e}") + else: + print(f"An error occurred: {e}") - # Optional: Clean up the part files - shutil.rmtree(all_join_dask) \ No newline at end of file + finally: + # Optional: Clean up the part files + if os.path.exists(all_join_dask): + shutil.rmtree(all_join_dask) diff --git a/hydrography-approach/processing_scripts/filter_data/filter_osm_ways.py b/hydrography-approach/processing_scripts/filter_data/filter_osm_ways.py index 98ba64a..fe66052 100644 --- a/hydrography-approach/processing_scripts/filter_data/filter_osm_ways.py +++ b/hydrography-approach/processing_scripts/filter_data/filter_osm_ways.py @@ -1,23 +1,37 @@ import subprocess +from typing import Callable, List -def filter_osm_pbf(input_file, output_file, filters): +def filter_osm_pbf(input_file: str, output_file: str, filters: List[str]) -> None: """ Filter the OSM PBF file based on the specified filters. """ cmd = ["osmium", "tags-filter", input_file] + filters + ["-o", output_file] - subprocess.run(cmd, check=True) + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print(f"Error filtering OSM PBF file: {e}") + raise -def convert_to_geopackage(input_file, output_file): +def convert_to_geopackage(input_file: str, output_file: str) -> None: """ Convert the filtered OSM PBF file to a GeoPackage. """ cmd = ["ogr2ogr", "-f", "GPKG", output_file, input_file] - subprocess.run(cmd, check=True) + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + print(f"Error converting to GeoPackage: {e}") + raise -def filter_ways(input_osm_pbf, output_osm_pbf, output_gpkg): +def filter_ways( + input_osm_pbf: str, + output_osm_pbf: str, + output_gpkg: str, + logger: Callable[[str], None], +) -> None: """ Perform filter operation. """ @@ -50,4 +64,4 @@ def filter_ways(input_osm_pbf, output_osm_pbf, output_gpkg): # Convert the filtered OSM PBF file to a GeoPackage convert_to_geopackage(output_osm_pbf, output_gpkg) - print(f"Output file: {output_gpkg} has been created successfully!") + logger(f"Output file: {output_gpkg} has been created successfully!") diff --git a/hydrography-approach/processing_scripts/filter_data/process_filter_nbi_bridges.py b/hydrography-approach/processing_scripts/filter_data/process_filter_nbi_bridges.py index fb5be53..54c2ece 100644 --- a/hydrography-approach/processing_scripts/filter_data/process_filter_nbi_bridges.py +++ b/hydrography-approach/processing_scripts/filter_data/process_filter_nbi_bridges.py @@ -1,61 +1,83 @@ +from typing import Callable + import geopandas as gpd import pandas as pd from shapely.geometry import Point -exclude_bridges = [] - -def exclude_duplicate_bridges(df, output_duplicate_exclude_csv): +def exclude_duplicate_bridges( + df: pd.DataFrame, output_duplicate_exclude_csv: str +) -> pd.DataFrame: """ - Function to exclude duplicate bridges, remove non-posted culverts and save the result to a CSV + Exclude duplicate bridges, remove non-posted culverts and save the result to a CSV. """ + try: + # Drop duplicate bridges based on coordinates + df.drop_duplicates( + subset=["16 - Latitude (decimal)", "17 - Longitude (decimal)"], inplace=True + ) - # Drop duplicate bridges based on coordinates - df.drop_duplicates( - subset=["16 - Latitude (decimal)", "17 - Longitude (decimal)"], inplace=True - ) + # Drop duplicate bridges based on Bridge ID + df = df[~df["8 - Structure Number"].str.contains("*", regex=False)] - # Drop duplicate bridges based on Bridge ID - df = df[~df['8 - Structure Number'].str.contains('*', regex=False)] + # Remove culverts which are not posted + df = df[ + ~( + (df["43B - Main Span Design"] == "Culvert") + & (df["41 - Structure Operational Status Code"] != "P") + ) + ] - # Remove culverts which are not posted - df = df[ - ~( - (df["43B - Main Span Design"] == "Culvert") - & (df["41 - Structure Operational Status Code"] != "P") - ) - ] + df.to_csv(output_duplicate_exclude_csv, index=False) - df.to_csv(output_duplicate_exclude_csv, index=False) + except Exception as e: + print(f"Error processing duplicates and culverts: {e}") + raise return df -def convert_to_gpkg(df, output_gpkg_file): +def convert_to_gpkg( + df: pd.DataFrame, output_gpkg_file: str, logger: Callable[[str], None] +) -> None: """ - Function to convert the DataFrame to a GeoPackage + Convert the DataFrame to a GeoPackage. """ + try: + # Create geometry from latitude and longitude + geometry = [ + Point(xy) + for xy in zip(df["17 - Longitude (decimal)"], df["16 - Latitude (decimal)"]) + ] + gdf = gpd.GeoDataFrame(df, geometry=geometry) - # Create geometry from latitude and longitude - geometry = [ - Point(xy) - for xy in zip(df["17 - Longitude (decimal)"], df["16 - Latitude (decimal)"]) - ] - gdf = gpd.GeoDataFrame(df, geometry=geometry) + gdf.to_file(output_gpkg_file, driver="GPKG") - gdf.to_file(output_gpkg_file, driver="GPKG") + logger(f"GeoPackage saved successfully to {output_gpkg_file}") - print(f"GeoPackage saved successfully to {output_gpkg_file}") + except Exception as e: + print(f"Error saving GeoPackage: {e}") + raise -def create_nbi_geopackage(input_csv, output_duplicate_exclude_csv, output_gpkg_file): +def create_nbi_geopackage( + input_csv: str, + output_duplicate_exclude_csv: str, + output_gpkg_file: str, + logger: Callable[[str], None], +) -> None: """ - Funtion to perform processing of coordinates and filtering of bridges + Perform processing of coordinates and filtering of bridges. """ + try: + df = pd.read_csv(input_csv) + + # Exclude duplicate bridges and save the result to a CSV + df = exclude_duplicate_bridges(df, output_duplicate_exclude_csv) - df = pd.read_csv(input_csv) - # Exclude duplicate bridges and save the result to a CSV - df = exclude_duplicate_bridges(df, output_duplicate_exclude_csv) + # Convert the final DataFrame to a GeoPackage file + convert_to_gpkg(df, output_gpkg_file, logger) - # Convert the final DataFrame to a GeoPackage file - convert_to_gpkg(df, output_gpkg_file) + except Exception as e: + print(f"Error creating NBI GeoPackage: {e}") + raise diff --git a/hydrography-approach/processing_scripts/tag_data/tag_nbi_and_osm_data.py b/hydrography-approach/processing_scripts/tag_data/tag_nbi_and_osm_data.py index 9641b34..0c7a8ed 100644 --- a/hydrography-approach/processing_scripts/tag_data/tag_nbi_and_osm_data.py +++ b/hydrography-approach/processing_scripts/tag_data/tag_nbi_and_osm_data.py @@ -1,7 +1,9 @@ import csv +import logging import os import subprocess import sys +from typing import Optional, Tuple from qgis.analysis import QgsNativeAlgorithms from qgis.core import ( @@ -199,50 +201,61 @@ def get_line_intersections(filtered_osm_gl, rivers_gl): return intersections -def load_layers(nbi_points_fp, osm_fp): - """ - Load required layers - """ - nbi_points_gl = QgsVectorLayer(nbi_points_fp, "nbi-points", "ogr") - if not nbi_points_gl.isValid(): - print("NBI points layer failed to load!") - sys.exit(1) - - osm_gl = QgsVectorLayer(osm_fp, "filtered", "ogr") - if not osm_gl.isValid(): - print("OSM ways layer failed to load!") - sys.exit(1) +class LayerLoadError(Exception): + """Custom exception for errors in loading vector layers.""" - return nbi_points_gl, osm_gl + def __init__(self, layer_name: str, message: str): + super().__init__(f"{layer_name} layer: {message}") + self.layer_name = layer_name -''' -def load_layers(nbi_points_fp, osm_fp): +def load_layers( + nbi_points_fp: str, osm_fp: str, logger: logging.Logger +) -> Tuple[Optional[QgsVectorLayer], Optional[QgsVectorLayer]]: """ - Load required layers and create spatial indexes + Load required layers with improved error handling. + + Args: + nbi_points_fp (str): File path to the NBI points layer. + osm_fp (str): File path to the OSM ways layer. + logger: Logger instance for logging errors. + + Returns: + Tuple[Optional[QgsVectorLayer], Optional[QgsVectorLayer]]: + A tuple containing the NBI points layer and the OSM ways layer. """ - nbi_points_gl = QgsVectorLayer(nbi_points_fp, "nbi-points", "ogr") - if not nbi_points_gl.isValid(): - print("NBI points layer failed to load!") - sys.exit(1) - osm_gl = QgsVectorLayer(osm_fp, "filtered", "ogr") - if not osm_gl.isValid(): - print("OSM ways layer failed to load!") - sys.exit(1) + def load_layer(fp: str, layer_name: str) -> Optional[QgsVectorLayer]: + """ + Load a layer and log errors if the loading fails. - # Create spatial index for NBI points - nbi_index = QgsSpatialIndex(nbi_points_gl.getFeatures()) + Args: + fp (str): File path to the layer. + layer_name (str): Name of the layer for logging purposes. - # Create spatial index for OSM ways - osm_index = QgsSpatialIndex(osm_gl.getFeatures()) + Returns: + Optional[QgsVectorLayer]: Loaded layer or None if failed. + """ + layer = QgsVectorLayer(fp, layer_name, "ogr") + if not layer.isValid(): + logger.error( + f"{layer_name} layer failed to load. Check the file path and ensure the file exists." + ) + raise LayerLoadError(layer_name, "Failed to load layer.") + return layer + + try: + nbi_points_gl = load_layer(nbi_points_fp, "nbi-points") + osm_gl = load_layer(osm_fp, "filtered") + except LayerLoadError as e: + logger.error(e) + raise # Re-raise the exception to be handled at a higher level return nbi_points_gl, osm_gl -''' def process_bridge( - nbi_points_gl, exploded_osm_gl, bridge_yes_join_csv, yes_filter_bridges + nbi_points_gl, exploded_osm_gl, bridge_yes_join_csv, yes_filter_bridges, logger ): """ Process bridges: filter and join NBI data with OSM data @@ -272,7 +285,7 @@ def process_bridge( filtered_layer, yes_filter_bridges, "utf-8", filtered_layer.crs(), "GPKG" ) - print(f"\nOutput file: {yes_filter_bridges} has been created successfully!") + logger.info(f"Output file: {yes_filter_bridges} has been created successfully!") QgsProject.instance().removeMapLayer(filtered_osm_gl.id()) QgsProject.instance().removeMapLayer(buffer_80.id()) @@ -282,7 +295,7 @@ def process_bridge( def process_layer_tag( - nbi_points_gl, exploded_osm_gl, manmade_join_csv, manmade_filter_bridges + nbi_points_gl, exploded_osm_gl, manmade_join_csv, manmade_filter_bridges, logger ): """ Process layer tags: filter and join NBI data with OSM data based on layer tag @@ -312,7 +325,7 @@ def process_layer_tag( filtered_layer, manmade_filter_bridges, "utf-8", filtered_layer.crs(), "GPKG" ) - print(f"\nOutput file: {manmade_filter_bridges} has been created successfully!") + logger.info(f"Output file: {manmade_filter_bridges} has been created successfully!") QgsProject.instance().removeMapLayer(filtered_osm_gl.id()) QgsProject.instance().removeMapLayer(buffer_30.id()) @@ -322,7 +335,7 @@ def process_layer_tag( def process_parallel_bridges( - nbi_points_gl, exploded_osm_gl, parallel_join_csv, parallel_filter_bridges + nbi_points_gl, exploded_osm_gl, parallel_join_csv, parallel_filter_bridges, logger ): """ Process parallel bridges: identify and filter parallel bridges @@ -361,7 +374,9 @@ def process_parallel_bridges( filtered_layer, parallel_filter_bridges, "utf-8", filtered_layer.crs(), "GPKG" ) - print(f"\nOutput file: {parallel_filter_bridges} has been created successfully!") + logger.info( + f"Output file: {parallel_filter_bridges} has been created successfully!" + ) QgsProject.instance().removeMapLayer(filtered_osm_gl.id()) QgsProject.instance().removeMapLayer(buffer_30.id()) @@ -400,6 +415,7 @@ def process_culverts_from_pbf( state_name, culvert_join_csv, final_bridges, + logger, ): """ Process and filter out tunnels marked as culverts from a local OSM PBF file. @@ -485,7 +501,7 @@ def process_culverts_from_pbf( filtered_layer, final_bridges, "utf-8", filtered_layer.crs(), "GPKG" ) - print(f"\nOutput file: {final_bridges} has been created successfully!") + logger.info(f"Output file: {final_bridges} has been created successfully!") # Remove temporary layers from the project QgsProject.instance().removeMapLayer(osm_layer.id()) @@ -505,16 +521,18 @@ def process_buffer_join( osm_nhd_join_csv, nbi_10_join_csv, nbi_30_join_csv, + logger, ): """ Process buffer join: join NBI data with OSM and river data """ base_filename = os.path.splitext(os.path.basename(rivers_data))[0] rivers_fp = rivers_data + f"|layername=NHD-{state_name}-Flowline" + # rivers_fp = rivers_data + "|layername=NHDFlowline" rivers_gl = QgsVectorLayer(rivers_fp, "rivers", "ogr") if not rivers_gl.isValid(): - print("Rivers layer failed to load!") + logger.error("Rivers layer failed to load!") sys.exit(1) filter_expression = "highway not in ('abandoned','bridleway','construction','corridor','crossing','cycleway','elevator','escape','footway','living_street','path','pedestrian','planned','proposed','raceway','rest_area','steps') AND bridge IS NULL AND layer IS NULL" @@ -526,7 +544,7 @@ def process_buffer_join( intersections, intersections_csv, ) - print(f"\nOutput file: {intersections_csv} has been created successfully!") + logger.info(f"Output file: {intersections_csv} has been created successfully!") osm_river_join = join_by_location( osm_gl, @@ -545,7 +563,7 @@ def process_buffer_join( osm_river_join, osm_nhd_join_csv, ) - print(f"\nOutput file: {osm_nhd_join_csv} has been created successfully!") + logger.info(f"Output file: {osm_nhd_join_csv} has been created successfully!") buffer_10 = create_buffer(nbi_points_gl, 0.0001) buffer_30 = create_buffer(nbi_points_gl, 0.0003) @@ -568,13 +586,12 @@ def process_buffer_join( "permanent_identifier", ] - vl_to_csv_filter( nbi_10_river_join, nbi_10_join_csv, keep_fields, ) - print(f"\nOutput file: {nbi_10_join_csv} has been created successfully!") + logger.info(f"Output file: {nbi_10_join_csv} has been created successfully!") nbi_30_osm_river_join = join_by_location( buffer_30, osm_river_join, [], geometric_predicates=[0] @@ -597,7 +614,7 @@ def process_buffer_join( nbi_30_join_csv, keep_fields, ) - print(f"\nOutput file: {nbi_30_join_csv} has been created successfully!") + logger.info(f"Output file: {nbi_30_join_csv} has been created successfully!") def process_tagging( @@ -612,7 +629,6 @@ def process_tagging( parallel_filter_bridges, nearby_join_csv, state_folder, - state_name, culvert_join_csv, final_bridges, rivers_data, @@ -620,6 +636,8 @@ def process_tagging( osm_nhd_join_csv, nbi_10_join_csv, nbi_30_join_csv, + logger, + state_name, ): # Get QGIS pathname for NBI points vector layer base_filename = os.path.splitext(os.path.basename(nbi_geopackage))[0] @@ -628,17 +646,21 @@ def process_tagging( osm_fp = filtered_highways + "|layername=lines" osm_pbf_path = state_latest_osm - nbi_points_gl, osm_gl = load_layers(nbi_points_fp, osm_fp) + nbi_points_gl, osm_gl = load_layers(nbi_points_fp, osm_fp, logger) exploded_osm_gl = explode_osm_data(osm_gl) output_layer1 = process_bridge( - nbi_points_gl, exploded_osm_gl, bridge_yes_join_csv, yes_filter_bridges + nbi_points_gl, exploded_osm_gl, bridge_yes_join_csv, yes_filter_bridges, logger ) output_layer2 = process_layer_tag( - output_layer1, exploded_osm_gl, manmade_join_csv, manmade_filter_bridges + output_layer1, exploded_osm_gl, manmade_join_csv, manmade_filter_bridges, logger ) output_layer3 = process_parallel_bridges( - output_layer2, exploded_osm_gl, parallel_join_csv, parallel_filter_bridges + output_layer2, + exploded_osm_gl, + parallel_join_csv, + parallel_filter_bridges, + logger, ) process_nearby_bridges(output_layer3, nearby_join_csv) output_layer4 = process_culverts_from_pbf( @@ -648,6 +670,7 @@ def process_tagging( state_name, culvert_join_csv, final_bridges, + logger, ) process_buffer_join( output_layer4, @@ -659,4 +682,5 @@ def process_tagging( osm_nhd_join_csv, nbi_10_join_csv, nbi_30_join_csv, + logger, ) diff --git a/hydrography-approach/run-hydrography-pipeline.py b/hydrography-approach/run-hydrography-pipeline.py index 96174f6..28e0d74 100644 --- a/hydrography-approach/run-hydrography-pipeline.py +++ b/hydrography-approach/run-hydrography-pipeline.py @@ -1,5 +1,7 @@ +import logging import os - +from typing import Dict +import sys import yaml from jinja2 import Environment, FileSystemLoader, select_autoescape @@ -13,146 +15,239 @@ from processing_scripts.filter_data import filter_osm_ways, process_filter_nbi_bridges from processing_scripts.tag_data import tag_nbi_and_osm_data +# Initialize global logger +logger = logging.getLogger(__name__) + -def load_config(state_name): +def load_config(state_name: str) -> Dict: """ - Load configuration + Load and render configuration from a YAML file. """ - env = Environment( - loader=FileSystemLoader("."), autoescape=select_autoescape(["yaml"]) - ) + try: + env = Environment( + loader=FileSystemLoader("."), autoescape=select_autoescape(["yaml"]) + ) - with open("config.yml", "r") as file: - template = env.from_string(file.read()) - rendered_yaml = template.render(state=f"{state_name}") - config = yaml.safe_load(rendered_yaml) + with open("config.yml", "r") as file: + template = env.from_string(file.read()) + rendered_yaml = template.render(state=state_name) + config = yaml.safe_load(rendered_yaml) - return config + return config + except FileNotFoundError as e: + print(f"Error: The configuration file was not found. Details: {e}") + raise -def main(): - # Mention state to process - state_name = "Kentucky" + except yaml.YAMLError as e: + print(f"Error: Failed to parse the YAML configuration. Details: {e}") + raise - # Load config file - print("\nLoading the config file.") - config = load_config(state_name) + except Exception as e: + print(f"An unexpected error occurred while loading the configuration. Details: {e}") + raise - # Make the required directories for storing outputs - print("\nMake the required directories.") - os.makedirs( - config["output_data_folders"]["state_folder"], - exist_ok=True, - ) - os.makedirs(config["output_data_folders"]["csv_files"], exist_ok=True) - os.makedirs(config["output_data_folders"]["gpkg_files"], exist_ok=True) - os.makedirs(config["output_data_folders"]["pbf_files"], exist_ok=True) - # --------------------------------------------Filter OSM ways data-------------------------------------------- +def create_directories(config: Dict[str, str]) -> None: + """ + Create directories for output data as specified in the configuration. + + :param config: Dictionary containing paths for directories to be created. + """ + output_folders = config.get("output_data_folders", {}) + + # Define required folder keys + required_folders = ["state_folder", "csv_files", "gpkg_files", "pbf_files"] + + for folder in required_folders: + folder_path = output_folders.get(folder) + if folder_path: + if not os.path.exists(folder_path): + try: + os.makedirs(folder_path) + logger.info(f"Directory created: {folder_path}") + except Exception as e: + logger.error(f"Failed to create directory {folder_path}: {e}") + else: + logger.info(f"Directory already exists: {folder_path}") + else: + logger.warning(f"Path for {folder} not specified in configuration.") + + +def filter_osm_data(config: Dict) -> None: + """ + Filter OSM ways data using the provided configuration. + """ input_osm_pbf = config["input_data_folder"]["state_latest_osm"] output_osm_pbf = config["output_files"]["filtered_osm_pbf"] output_gpkg = config["output_files"]["filtered_highways"] - print("\nFiltering OSM ways data.") - filter_osm_ways.filter_ways(input_osm_pbf, output_osm_pbf, output_gpkg) + logger.info("Filtering OSM ways data.") + filter_osm_ways.filter_ways(input_osm_pbf, output_osm_pbf, output_gpkg, logger) + - # --------------------------------------------Filter NBI data and create geopackage-------------------------------------------- +def filter_nbi_data(config: Dict) -> None: + """ + Filter NBI bridge data and create a GeoPackage. + """ input_csv = config["input_data_folder"]["nbi_bridge_data"] output_duplicate_exclude_csv = config["output_files"]["duplicate_exclude_csv"] output_gpkg_file = config["output_files"]["nbi_geopackage"] - print("\nFiltering NBI bridge data.") + logger.info("Filtering NBI bridge data.") process_filter_nbi_bridges.create_nbi_geopackage( - input_csv, output_duplicate_exclude_csv, output_gpkg_file + input_csv, output_duplicate_exclude_csv, output_gpkg_file, logger ) - # --------------------------------------------Tag NBI data with OSM-NHD join data-------------------------------------------- - nbi_geopackage = config["output_files"]["nbi_geopackage"] - filtered_highways = config["output_files"]["filtered_highways"] - state_latest_osm = config["input_data_folder"]["state_latest_osm"] - bridge_yes_join_csv = config["output_files"]["bridge_yes_join_csv"] - yes_filter_bridges = config["output_files"]["yes_filter_bridges"] - manmade_join_csv = config["output_files"]["manmade_join_csv"] - manmade_filter_bridges = config["output_files"]["manmade_filter_bridges"] - parallel_join_csv = config["output_files"]["parallel_join_csv"] - parallel_filter_bridges = config["output_files"]["parallel_filter_bridges"] - nearby_join_csv = config["output_files"]["nearby_join_csv"] - state_folder = config["output_data_folders"]["state_folder"] - culvert_join_csv = config["output_files"]["culvert_join_csv"] - final_bridges = config["output_files"]["final_bridges"] - rivers_data = config["input_data_folder"]["nhd_streams_flowline"] - intersections_csv = config["output_files"]["intersections_csv"] - osm_nhd_join_csv = config["output_files"]["osm_nhd_join_csv"] - nbi_10_join_csv = config["output_files"]["nbi_10_join_csv"] - nbi_30_join_csv = config["output_files"]["nbi_30_join_csv"] - - print("\nTagging NBI and OSM data.") + +def tag_data(config: Dict, state_name: str) -> None: + """ + Tag NBI data with OSM-NHD join data. + """ + # Extract file paths from config + file_paths = { + "nbi_geopackage": config["output_files"]["nbi_geopackage"], + "filtered_highways": config["output_files"]["filtered_highways"], + "state_latest_osm": config["input_data_folder"]["state_latest_osm"], + "bridge_yes_join_csv": config["output_files"]["bridge_yes_join_csv"], + "yes_filter_bridges": config["output_files"]["yes_filter_bridges"], + "manmade_join_csv": config["output_files"]["manmade_join_csv"], + "manmade_filter_bridges": config["output_files"]["manmade_filter_bridges"], + "parallel_join_csv": config["output_files"]["parallel_join_csv"], + "parallel_filter_bridges": config["output_files"]["parallel_filter_bridges"], + "nearby_join_csv": config["output_files"]["nearby_join_csv"], + "state_folder": config["output_data_folders"]["state_folder"], + "culvert_join_csv": config["output_files"]["culvert_join_csv"], + "final_bridges": config["output_files"]["final_bridges"], + "rivers_data": config["input_data_folder"]["nhd_streams_flowline"], + "intersections_csv": config["output_files"]["intersections_csv"], + "osm_nhd_join_csv": config["output_files"]["osm_nhd_join_csv"], + "nbi_10_join_csv": config["output_files"]["nbi_10_join_csv"], + "nbi_30_join_csv": config["output_files"]["nbi_30_join_csv"], + } + + logger.info("Tagging NBI and OSM data.") tag_nbi_and_osm_data.process_tagging( - nbi_geopackage, - filtered_highways, - state_latest_osm, - bridge_yes_join_csv, - yes_filter_bridges, - manmade_join_csv, - manmade_filter_bridges, - parallel_join_csv, - parallel_filter_bridges, - nearby_join_csv, - state_folder, - state_name, - culvert_join_csv, - final_bridges, - rivers_data, - intersections_csv, - osm_nhd_join_csv, - nbi_10_join_csv, - nbi_30_join_csv, + **file_paths, logger=logger, state_name=state_name ) - # --------------------------------------------Associate join data-------------------------------------------- - all_join_dask = config["output_files"]["all_join_dask"] - all_join_csv = config["output_files"]["all_join_csv"] - intermediate_association = config["output_files"]["intermediate_association"] - association_with_intersections = config["output_files"][ - "association_with_intersections" - ] - bridge_association_lengths = config["output_files"]["bridge_association_lengths"] - bridge_with_proj_points = config["output_files"]["bridge_with_proj_points"] - bridge_match_percentage = config["output_files"]["bridge_match_percentage"] - final_bridges_csv = config["output_files"]["final_bridges_csv"] - - print("\nJoining association data together.") + +def associate_join_data(config: Dict) -> None: + """ + Associate and process join data. + """ + files = { + "all_join_dask": config["output_files"]["all_join_dask"], + "all_join_csv": config["output_files"]["all_join_csv"], + "intermediate_association": config["output_files"]["intermediate_association"], + "association_with_intersections": config["output_files"][ + "association_with_intersections" + ], + "bridge_association_lengths": config["output_files"][ + "bridge_association_lengths" + ], + "bridge_with_proj_points": config["output_files"]["bridge_with_proj_points"], + "bridge_match_percentage": config["output_files"]["bridge_match_percentage"], + "final_bridges_csv": config["output_files"]["final_bridges_csv"], + } + + logger.info("Joining association data together.") join_all_data.process_all_join( - nbi_30_join_csv, nbi_10_join_csv, all_join_dask, all_join_csv + config["output_files"]["nbi_30_join_csv"], + config["output_files"]["nbi_10_join_csv"], + files["all_join_dask"], + files["all_join_csv"], + logger, ) - print("\nDetermining final OSM way ID for each NBI bridge.") + logger.info("Determining final OSM way ID for each NBI bridge.") determine_final_osm_id.process_final_id( - all_join_csv, - intersections_csv, - intermediate_association, - association_with_intersections, - input_csv, - bridge_association_lengths, + files["all_join_csv"], + config["output_files"]["intersections_csv"], + files["intermediate_association"], + files["association_with_intersections"], + config["input_data_folder"]["nbi_bridge_data"], + files["bridge_association_lengths"], + logger, ) - print("\nGetting NBI point projections on associated ways.") + logger.info("Getting NBI point projections on associated ways.") get_point_projections_on_ways.run( - final_bridges, - filtered_highways, - bridge_association_lengths, - bridge_with_proj_points, + config["output_files"]["final_bridges"], + config["output_files"]["filtered_highways"], + files["bridge_association_lengths"], + files["bridge_with_proj_points"], + logger ) - print("\nCalculating fuzzy match for OSM road name.") - calculate_match_percentage.run(bridge_with_proj_points, bridge_match_percentage) + logger.info("Calculating fuzzy match for OSM road name.") + calculate_match_percentage.run( + files["bridge_with_proj_points"], files["bridge_match_percentage"] + ) - print("\nExcluding nearby bridges.") + logger.info("Excluding nearby bridges.") exclude_nearby_bridges.run( - bridge_match_percentage, nearby_join_csv, final_bridges_csv + files["bridge_match_percentage"], + config["output_files"]["nearby_join_csv"], + files["final_bridges_csv"], + logger, ) - print("\nProcess completed.") + +def main() -> None: + """ + Main function to orchestrate the data processing pipeline. + """ + state_name = "Kentucky" + + try: + # Load configuration + config = load_config(state_name) + + except (FileNotFoundError, yaml.YAMLError, Exception) as e: + print(f"Configuration loading failed: {e}") + sys.exit(1) # Exit with a non-zero status to indicate an error + + try: + # Configure logging after loading config + log_file_path = config["logging"].get( + "log_file_path", "hydrography-pipeline.log" + ) + logging.basicConfig( + filename=log_file_path, # Use the path from the configuration + level=logging.INFO, + format="%(asctime)s - [%(levelname)s] - (%(filename)s).%(funcName)s - %(message)s", + ) + + # Create directories + logger.info("Creating directories.") + create_directories(config) + + # Filter OSM & NBI data + filter_osm_data(config) + filter_nbi_data(config) + + # Tag NBI and OSM data + tag_data(config, state_name) + + # Associate and process join data + associate_join_data(config) + logger.info("Association process completed.") + + except FileNotFoundError as e: + logger.error(f"File not found: {e}", exc_info=True) + except yaml.YAMLError as e: + logger.error(f"YAML configuration error: {e}", exc_info=True) + except KeyError as e: + logger.error(f"Missing key in configuration or data: {e}", exc_info=True) + except Exception as e: + logger.error(f"An unexpected error occurred: {e}", exc_info=True) + raise # Re-raise the exception to ensure the program terminates with an error status + if __name__ == "__main__": - main() + try: + main() + except Exception as e: + logger.error(f"An error occurred: {e}", exc_info=True) + raise