Skip to content

Commit

Permalink
Merge pull request #23 from opengeos/refactor
Browse files Browse the repository at this point in the history
finished refactoring
  • Loading branch information
cholmes authored Oct 6, 2023
2 parents 7890dda + 838124d commit c37fccc
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 84 deletions.
65 changes: 56 additions & 9 deletions open_buildings/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import matplotlib.pyplot as plt
from open_buildings.google.process import process_benchmark, process_geometries
from open_buildings.download_buildings import download as download_buildings
from open_buildings.overture.add_columns import process_parquet_files
from open_buildings.overture.partition import process_db
from datetime import datetime, timedelta
from tabulate import tabulate
import boto3 # Required for S3 operations
Expand Down Expand Up @@ -38,20 +40,33 @@ def handle_comma_separated(ctx, param, value):

@tools.command(name="get_buildings")
@click.argument('geojson_input', type=click.File('r'), required=False)
@click.option('--only-quadkey', is_flag=True, help='Include only the quadkey in the WHERE clause.')
@click.option('--format', default=None, type=click.Choice(['shapefile', 'geojson', 'geopackage', 'flatgeobuf', 'parquet']), help='Output format for the SQL query. Defaults to the extension of the dst file.')
@click.option('--generate-sql', is_flag=True, default=False, help='Generate and print SQL without executing.')
@click.option('--dst', type=str, default="buildings.parquet", help='Destination file name (without extension) or full path for the output.')
@click.option('--dst', type=str, default="buildings.json", help='Destination file name (without extension) or full path for the output.')
@click.option('-s', '--silent', is_flag=True, default=False, help='Suppress all print outputs.')
@click.option('--time-report', is_flag=True, default=True, help='Report how long the operation took to run.')
@click.option('--overwrite', default=False, is_flag=True, help='Overwrite the destination file if it already exists.')
@click.option('--verbose', default=False, is_flag=True, help='Print detailed logs with timestamps.')
@click.option('--run-gpq', is_flag=True, default=True, help='Run gpq conversion to ensure the output is valid GeoParquet.')
@click.option('--data-path', type=str, default="s3://us-west-2.opendata.source.coop/cholmes/overture/geoparquet-country-quad-2/*.parquet", help='Path to the root of the buildings parquet data.')
@click.option('--hive-partitioning', is_flag=True, default=False, help='Use Hive partitioning when reading the parquet data.')
@click.option('--source', default="overture", type=click.Choice(['google', 'overture']), help='Dataset to query')
@click.option('--country_iso', type=str, default=None, help='Country ISO code to filter the data by.')
def get_buildings(geojson_input, only_quadkey, format, generate_sql, dst, silent, time_report, overwrite, verbose, run_gpq, data_path, hive_partitioning, country_iso):
download_buildings(geojson_input, only_quadkey, format, generate_sql, dst, silent, time_report, overwrite, verbose, run_gpq, data_path, hive_partitioning, country_iso)
def get_buildings(geojson_input, format, dst, silent, overwrite, verbose, source, country_iso):
"""Tool to extract buildings in common geospatial formats from large archives of GeoParquet data online.
Defaults to GeoJSON output in a file called buildings.json. It's recommended to set the --dst flat to
output. """
# map source of google and overture to values for data_path and hive
data_path = None
hive_partitioning = False
# case insensitive matching
if source.lower() == "google":
data_path = "s3://us-west-2.opendata.source.coop/google-research-open-buildings/geoparquet-by-country/*/*.parquet"
hive_partitioning = True
elif source.lower() == "overture":
data_path = "s3://us-west-2.opendata.source.coop/cholmes/overture/geoparquet-country-quad-hive/*/*.parquet"
hive_partitioning = True
else:
raise ValueError('Invalid source')


generate_sql = False
download_buildings(geojson_input, format, generate_sql, dst, silent, overwrite, verbose, data_path, hive_partitioning, country_iso)

@google.command('benchmark')
@click.argument('input_path', type=click.Path(exists=True))
Expand Down Expand Up @@ -167,6 +182,24 @@ def convert(
verbose,
)

@overture.command('add_columns')
@click.argument('input_folder', type=click.Path(exists=True))
@click.argument('output_folder', type=click.Path())
@click.argument('country_parquet_path', type=click.Path(exists=True))
@click.option('--overwrite', is_flag=True, help="Whether to overwrite any existing output files.")
@click.option('--no-quadkey', is_flag=True, help="Whether to add a quadkey column to the output.")
@click.option('--no-country-iso', is_flag=True, help="Whether to add a country_iso column to the output.")
@click.option('--verbose', is_flag=True, help="Whether to print detailed processing information.")
def add_columns(
input_folder, output_folder, country_parquet_path, overwrite, no_quadkey, no_country_iso, verbose
):
"""Adds columns to the input Overture parquet files, using Overture country for admin boundaries, outputting GeoParquet ordered by quadkey the output folder"""
add_quadkey = not no_quadkey
add_country_iso = not no_country_iso
"""Adds columns to the input parquet files, outputting to the output folder"""
process_parquet_files(
input_folder, output_folder, country_parquet_path, overwrite, add_quadkey, add_country_iso, verbose
)

@overture.command('download')
@click.argument('destination_folder', type=click.Path())
Expand Down Expand Up @@ -198,5 +231,19 @@ def overture_download(destination_folder, theme):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] Downloaded {file_name}")

@overture.command('partition')
@click.argument('duckdb-path', type=click.Path(exists=True))
@click.option('--output-folder', default=os.getcwd(), type=click.Path(), help='Folder to store the output files')
@click.option('--geo-conversion', default='gpq', type=click.Choice(['gpq', 'none', 'pandas', 'ogr'], case_sensitive=False))
@click.option('--verbose', is_flag=True, default=False, help='Print verbose output')
@click.option('--max-per-file', default=10000000, type=int, help='Maximum number of rows per file')
@click.option('--row-group-size', default=10000, type=int, help='Row group size for Parquet files')
@click.option('--hive', is_flag=True, default=False, help='Output files in Hive format (folder structure)')
@click.option('--table-name', default='buildings', type=str, help='Name of the table to process')
def partition(duckdb_path, output_folder, geo_conversion, verbose, max_per_file, row_group_size, hive, table_name):
"""Partition a DuckDB database of all overture data by country_iso"""
process_db(duckdb_path, output_folder, geo_conversion, verbose, max_per_file, row_group_size, hive, table_name)


if __name__ == "__main__":
sys.exit(main())
53 changes: 26 additions & 27 deletions open_buildings/download_buildings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import time
import datetime
import os
import pandas as pd
import geopandas as gpd
import subprocess
from shapely import wkb
import shutil


def geojson_to_quadkey(data: dict) -> str:
Expand Down Expand Up @@ -126,8 +130,7 @@ def quad2json(quadkey_input):
click.echo(json.dumps(result, indent=2))


def download(geojson_input, only_quadkey, format, generate_sql, dst, silent, time_report, overwrite, verbose, run_gpq, data_path, hive_partitioning, country_iso):
"""Download buildings data based on the input GeoJSON."""
def download(geojson_input, format, generate_sql, dst, silent, overwrite, verbose, data_path, hive_partitioning, country_iso):

def print_timestamped_message(message):
if not silent:
Expand All @@ -151,17 +154,16 @@ def print_timestamped_message(message):

print_timestamped_message(f"Querying and downloading data with Quadkey: {quadkey}")
hive_value = 1 if hive_partitioning else 0
base_sql = f"select id, country_iso, ST_AsWKB(ST_GeomFromWKB(geometry)) AS geometry from read_parquet('{data_path}', hive_partitioning={hive_value})"
base_sql = f"select * EXCLUDE geometry, ST_AsWKB(ST_GeomFromWKB(geometry)) AS geometry from read_parquet('{data_path}', hive_partitioning={hive_value})"
where_clause = "WHERE "
if country_iso:
where_clause += f"country_iso = '{country_iso}' AND "
where_clause += f"quadkey LIKE '{quadkey}%'"
if not only_quadkey:
where_clause += f" AND\nST_Within(ST_GeomFromWKB(geometry), ST_GeomFromText('{wkt}'))"
where_clause += f" AND\nST_Within(ST_GeomFromWKB(geometry), ST_GeomFromText('{wkt}'))"

output_extension = {
'shapefile': '.shp',
'geojson': '.geojson',
'geojson': 'json',
'geopackage': '.gpkg',
'flatgeobuf': '.fgb',
'parquet': '.parquet'
Expand Down Expand Up @@ -202,27 +204,24 @@ def print_timestamped_message(message):
print_timestamped_message(copy_statement)
if not generate_sql:
conn.execute(f"COPY buildings TO '{dst}' WITH (FORMAT Parquet);")
if run_gpq:
print_timestamped_message(
f"Running gpq convert on {dst}. This takes extra time but ensures the output is valid GeoParquet."
)
base_name, ext = os.path.splitext(dst)
temp_output_file_path = base_name + '_temp' + ext

# convert from parquet file with a geometry column named wkb to GeoParquet
command = ['gpq', 'convert', dst, temp_output_file_path]
gpq_start_time = time.time()
subprocess.run(command, check=True)
os.rename(temp_output_file_path, dst)
gpq_end_time = time.time()
gpq_elapsed_time = gpq_end_time - gpq_start_time

try:
df = pd.read_parquet(dst)

# Convert WKB geometry to geopandas geometry
df['geometry'] = df['geometry'].apply(wkb.loads, hex=True)
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:4326")
# Change output file the input_filename with .parquet replaced with _geo.parquet
output_filename = dst.replace(".parquet", "_geo.parquet")

gdf.to_parquet(output_filename)
# delete the original file
os.remove(dst)
# Rename (move) the output file to the input filename
shutil.move(output_filename, dst)
if verbose:
print_timestamped_message(f"Time taken to run gpq: {gpq_elapsed_time:.2f} seconds")
else:
print_timestamped_message(
f"Skipping gpq convert on {output_file_path}. This means the output Parquet will be WKB, but it will need to be converted to GeoParquet."
)
print_timestamped_message(f"Finished processing {dst} at {time.ctime()}")
except Exception as e:
print(f"Error processing {dst} to geoparquet: {e}")
else:
gdal_format = {
'shapefile': 'ESRI Shapefile',
Expand All @@ -233,7 +232,7 @@ def print_timestamped_message(message):
conn.execute(f"COPY buildings TO '{dst}' WITH (FORMAT GDAL, DRIVER '{gdal_format[format]}');")
end_time = time.time()

if time_report:
if verbose:
elapsed_time = end_time - start_time
print_timestamped_message(f"Operation took {elapsed_time:.2f} seconds.")

Expand Down
27 changes: 12 additions & 15 deletions open_buildings/google/add_columns.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
# This script is used to take an Overture Parquet file and add columns
# useful for partitioning - it can put in both a quadkey and the country
# ISO code. And then it will write out parquet and use gpq to convert the
# parquet to geoparquet.
#
# There is much more to do, my plan is to incorporate it into the open_buildings
# CLI and let people pick which of the columns they want to add. Also could
# be nice to add the ability to get the data downloaded - this just assumes
# you've already got it. Also need to add the command to create the
# countries.parquet, but it's basically the one in https://github.com/OvertureMaps/data/blob/main/duckdb_queries/admins.sql
# but saved to parquet. You also could just use that command to pull it
# directly into your duckdb database, and change this code (perhaps we
# add an option to pull it remote if not present). This also would
# ideally work with any of the Overture data types, and let you choose
# your table names.
# This script is a slightly more generic version of the overture add_columns.py. There's
# some chance it could be completely generic, but I was just trying to work on google buildings
# so put it under there. The main difference is that it doesn't use the midpoint of the
# bbox struct, since that's unique to overture. It just uses the centroid of the geometry.
# That could likely work just as well if not better for overture too, so we likely can just
# get rid of that.
# The other thing that would be nice to make it truly generic is to be able to supply the
# table name, since this should work fine with other types of data. Could also just call it
# 'features' by default, the table name doesn't really matter in these processings. Should probably check
# to be sure it works with lines and points too. So this could use clean up, also just
# removing the 'midpoint' code.

import os
import duckdb
import time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,8 @@
# useful for partitioning - it can put in both a quadkey and the country
# ISO code. And then it will write out parquet and use gpq to convert the
# parquet to geoparquet.
#
# There is much more to do, my plan is to incorporate it into the open_buildings
# CLI and let people pick which of the columns they want to add. Also could
# be nice to add the ability to get the data downloaded - this just assumes
# you've already got it. Also need to add the command to create the
# countries.parquet, but it's basically the one in https://github.com/OvertureMaps/data/blob/main/duckdb_queries/admins.sql
# but saved to parquet. You also could just use that command to pull it
# directly into your duckdb database, and change this code (perhaps we
# add an option to pull it remote if not present). This also would
# ideally work with any of the Overture data types, and let you choose
# your table names.


import os
import duckdb
import time
Expand Down Expand Up @@ -68,7 +59,7 @@ def add_country_iso(con, country_parquet_path):
WHERE ST_Intersects(ST_GeomFromWKB(countries.geometry), ST_GeomFromWKB(buildings.geometry))
""")

def process_parquet_file(input_parquet_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False):
def process_parquet_file(input_parquet_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False, verbose=False):
# Ensure output_folder exists
os.makedirs(output_folder, exist_ok=True)

Expand Down Expand Up @@ -109,6 +100,7 @@ def process_parquet_file(input_parquet_path, output_folder, country_parquet_path
# Write out to Parquet
con.execute(f"COPY (SELECT * FROM buildings ORDER BY quadkey) TO '{output_parquet_path}' WITH (FORMAT Parquet)")

#TODO: turn this into an option to convert to geoparquet or not
if (True):
print(f"Converting to geoparquet: {output_parquet_path}")
# Create a temporary file
Expand All @@ -125,16 +117,16 @@ def process_parquet_file(input_parquet_path, output_folder, country_parquet_path

print(f"Processing complete for file {input_parquet_path}")

def process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False):
def process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False, verbose=False):
# If input_path is a directory, process all Parquet files in it
if os.path.isdir(input_path):
for file in glob.glob(os.path.join(input_path, "*")):
process_parquet_file(file, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option)
process_parquet_file(file, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option, verbose)
else:
process_parquet_file(input_path, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option)
process_parquet_file(input_path, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option, verbose)

# Call the function
input_path = '/Volumes/fastdata/overture/s3-data/buildings/'
output_folder = '/Volumes/fastdata/overture/refined-parquet/'
country_parquet_path = '/Volumes/fastdata/overture/countries.parquet'
process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=True, add_country_iso_option=True)
# Call the function - uncomment if you want to call this directly from python and put values in here.
#input_path = '/Volumes/fastdata/overture/s3-data/buildings/'
#output_folder = '/Volumes/fastdata/overture/refined-parquet/'
#country_parquet_path = '/Volumes/fastdata/overture/countries.parquet'
#process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=True, add_country_iso_option=True)
16 changes: 3 additions & 13 deletions open_buildings/overture/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def convert_pandas(input_filename, rg_size, verbose):
except Exception as e:
print(f"Error processing {input_filename}: {e}")

# Note, this doesn't work, but I'm not sure why. May be that ogr doesn't really support
# compatible geospatial parquet, but it really looks like it should. Maybe there's something
# weird with the ones written out.
def convert_ogr(input_filename, rg_size, verbose):
output_filename = input_filename.replace(".parquet", "_geo.parquet")
rg_cmd = f"ROW_GROUP_SIZE={rg_size}"
Expand Down Expand Up @@ -149,19 +152,6 @@ def process_quadkey_recursive(conn, table_name, country_code, output_folder, len
convert_to_geoparquet(quad_output_filename, geo_conversion, row_group_size, verbose)


# TODO: add option for 'hive' output (put things in folder)
# TODO: add option to read duckdb path from an environment variable
# TODO: add row group size option (first works with duckdb)

@click.command()
@click.argument('duckdb-path', type=click.Path(exists=True))
@click.option('--output-folder', default=os.getcwd(), type=click.Path(), help='Folder to store the output files')
@click.option('--geo-conversion', default='gpq', type=click.Choice(['gpq', 'none', 'pandas', 'ogr'], case_sensitive=False))
@click.option('--verbose', is_flag=True, default=False, help='Print verbose output')
@click.option('--max-per-file', default=10000000, type=int, help='Maximum number of rows per file')
@click.option('--row-group-size', default=10000, type=int, help='Row group size for Parquet files')
@click.option('--hive', is_flag=True, default=False, help='Output files in Hive format (folder structure)')
@click.option('--table-name', default='buildings', type=str, help='Name of the table to process')
def process_db(duckdb_path, output_folder, geo_conversion, verbose, max_per_file, row_group_size, hive, table_name):
# create output folder if it does not exist
os.makedirs(output_folder, exist_ok=True)
Expand Down

0 comments on commit c37fccc

Please sign in to comment.