Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find potential matches faster #61

Merged
merged 22 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
90a5a0d
Filter potential matches with CPCs too
patricoferris Sep 19, 2023
8b3cad2
Fix CPC ordering
patricoferris Sep 20, 2023
bec9e71
Upgrade rerunner to understand rescaled_cpcs and few other tweaks
robinmessage Sep 21, 2023
3b926b4
Add DRangedTree utility for fast finding of potential matches
robinmessage Sep 21, 2023
c8d06dc
WIP: noddling around to make tree work fast with new structure and co…
robinmessage Sep 21, 2023
6b7e91f
WIP: trying to eliminate overlaps faster but currently broken
robinmessage Sep 21, 2023
57455fe
WIP: working enough version of DRangedTree
robinmessage Sep 21, 2023
4f758f0
WIP: crashing in yigracheffe trying to retrieve cpc pixels
robinmessage Sep 21, 2023
dd28d1c
Kind of working CPC matching, but CPC appears offset from Patrick's c…
robinmessage Sep 21, 2023
4d811ef
Tweaking CPC offsets
robinmessage Sep 21, 2023
69ed5cf
WIP: use coarse CPC; work around Yigracheffe issue
robinmessage Sep 22, 2023
69f69fd
Pull out number of divisions into a constant and increase to 40x40
robinmessage Sep 26, 2023
5ce1171
WIP: manually realign Gola CPCs
robinmessage Sep 26, 2023
c28e567
Improved test suite to account for new CPC columns and fraction of po…
robinmessage Sep 27, 2023
23cc3d0
Use expected_fraction to cut branches in DRangedTree early
robinmessage Sep 27, 2023
99a7b93
Fix DRangedTree bounding on left-hand side of values
robinmessage Sep 27, 2023
8cda030
Tweak DRangedTree self test
robinmessage Sep 27, 2023
3072cf7
Code review fixes, most specifically horizontal striping
robinmessage Oct 26, 2023
db1f4d8
Fix command line description of find_potential_matches
robinmessage Oct 26, 2023
13edeb4
Replace find_potential_matches with fast version
robinmessage Oct 26, 2023
72a0395
Lint
robinmessage Oct 26, 2023
a641f21
Types
robinmessage Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions methods/matching/calculate_k.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pandas as pd
from geopandas import gpd # type: ignore
from yirgacheffe.layers import TiledGroupLayer, RasterLayer, VectorLayer # type: ignore
from yirgacheffe.layers import GroupLayer, RasterLayer, VectorLayer # type: ignore
from yirgacheffe.window import PixelScale # type: ignore

from methods.common import LandUseClass
Expand Down Expand Up @@ -47,14 +47,14 @@ def build_layer_collection(
outline_layer = VectorLayer.layer_from_file(boundary_filename, None, pixel_scale, projection)

lucs = [
TiledGroupLayer([
GroupLayer([
RasterLayer.layer_from_file(os.path.join(jrc_directory_path, filename)) for filename in
glob.glob(f"*{year}*.tif", root_dir=jrc_directory_path)
], name=f"luc_{year}") for year in luc_years
]

cpcs = [
TiledGroupLayer([
GroupLayer([
RasterLayer.layer_from_file(
os.path.join(cpc_directory_path, filename)
) for filename in
Expand All @@ -65,21 +65,21 @@ def build_layer_collection(

# ecoregions is such a heavy layer it pays to just rasterize it once - we should possibly do this once
# as part of import of the ecoregions data
ecoregions = TiledGroupLayer([
ecoregions = GroupLayer([
RasterLayer.layer_from_file(os.path.join(ecoregions_directory_path, filename)) for filename in
glob.glob("*.tif", root_dir=ecoregions_directory_path)
], name="ecoregions")

elevation = TiledGroupLayer([
elevation = GroupLayer([
RasterLayer.layer_from_file(os.path.join(elevation_directory_path, filename)) for filename in
glob.glob("srtm*.tif", root_dir=elevation_directory_path)
], name="elevation")
slopes = TiledGroupLayer([
slopes = GroupLayer([
RasterLayer.layer_from_file(os.path.join(slope_directory_path, filename)) for filename in
glob.glob("slope*.tif", root_dir=slope_directory_path)
], name="slopes")

access = TiledGroupLayer([
access = GroupLayer([
RasterLayer.layer_from_file(os.path.join(access_directory_path, filename)) for filename in
glob.glob("*.tif", root_dir=access_directory_path)
], name="access")
Expand Down
252 changes: 187 additions & 65 deletions methods/matching/find_potential_matches.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,113 @@
import argparse
from collections import defaultdict
import glob
import logging
import math
import os
import sys
import time
from multiprocessing import Manager, Process, Queue, cpu_count

from typing import Mapping, Tuple
from osgeo import gdal # type: ignore
import numpy as np
import pandas as pd
from yirgacheffe.layers import RasterLayer # type: ignore

from methods.matching.calculate_k import build_layer_collection
from methods.common.luc import luc_matching_columns
from methods.matching.calculate_k import build_layer_collection
from methods.utils.dranged_tree import DRangedTree

DIVISIONS = 1000

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# We do not re-use data in this, so set a small block cache size for GDAL, otherwise
# it pointlessly hogs memory, and then spends a long time tidying it up after.
gdal.SetCacheMax(1024 * 1024 * 16)

def build_key(ecoregion, country, luc0, luc5, luc10):
"""Create a 64-bit key for fields that must match exactly"""
if ecoregion < 0 or ecoregion > 0x7fffffff:
raise ValueError("Ecoregion doesn't fit in 31 bits")
if country < 0 or country > 0xffff:
raise ValueError("Country doesn't fit in 16 bits")
if luc0 < 0 or luc0 > 0x1f:
raise ValueError("luc0 doesn't fit in 5 bits")
if luc5 < 0 or luc5 > 0x1f:
raise ValueError("luc5 doesn't fit in 5 bits")
if luc10 < 0 or luc10 > 0x1f:
raise ValueError("luc10 doesn't fit in 5 bits")
return (int(ecoregion) << 32) | (int(country) << 16) | (int(luc0) << 10) | (int(luc5) << 5) | (int(luc10))

def key_builder(start_year: int):
luc0, luc5, luc10 = luc_matching_columns(start_year)
def _build_key(row):
return build_key(row.ecoregion, row.country, row[luc0], row[luc5], row[luc10])
return _build_key

def load_k(
k_filename: str,
sentinal_count: int,
output_queue: Queue,
) -> None:
# put the source pixels into the queue
start_year: int,
) -> Mapping[int, DRangedTree]:

print("Reading k...")
source_pixels = pd.read_parquet(k_filename)
for row in source_pixels.iterrows():
output_queue.put(row)
# To close the pipe put in one sentinel value per worker
for _ in range(sentinal_count):
output_queue.put(None)

# Split source_pixels into classes
source_classes = defaultdict(list)
build_key_for_row = key_builder(start_year)

for _, row in source_pixels.iterrows():
key = build_key_for_row(row)
source_classes[key].append(row)

print("Building k trees...")

source_trees = {}
for key, values in source_classes.items():
source_trees[key] = DRangedTree.build(
np.array([(
row.elevation,
row.slope,
row.access,
row["cpc0_u"],
row["cpc0_d"],
row["cpc5_u"],
row["cpc5_d"],
row["cpc10_u"],
row["cpc10_d"],
) for row in values
]),
np.array([
200,
2.5,
10,
0.1,
0.1,
0.1,
0.1,
0.1,
0.1,
]),
1 / 100, # This is the fraction of R that is in M, used to optimize search speed.
)

print("k trees built.")

return source_trees

def exact_pixel_for_lat_lng(layer, lat: float, lng: float) -> Tuple[int,int]:
"""Get pixel for geo coords. This is relative to the set view window.
Result is rounded down to nearest pixel."""
if "WGS 84" not in layer.projection:
raise NotImplementedError("Not yet supported for other projections")
pixel_scale = layer.pixel_scale
if pixel_scale is None:
raise ValueError("Layer has no pixel scale")
return (
(lng - layer.area.left) / pixel_scale.xstep,
(lat - layer.area.top) / pixel_scale.ystep,
)

def worker(
worker_index: int,
Expand All @@ -46,19 +122,18 @@ def worker(
start_year: int,
_evaluation_year: int,
result_folder: str,
input_queue: Queue
ktrees: Mapping[int, DRangedTree],
coordinate_queue: Queue,
) -> None:
# everything is done at JRC resolution, so load a sample file from there first to get the ideal pixel scale
example_jrc_filename = glob.glob("*.tif", root_dir=jrc_directory_path)[0]
example_jrc_layer = RasterLayer.layer_from_file(os.path.join(jrc_directory_path, example_jrc_filename))

luc0, luc5, luc10 = luc_matching_columns(start_year)

matching_collection = build_layer_collection(
example_jrc_layer.pixel_scale,
example_jrc_layer.projection,
[start_year, start_year - 5, start_year - 10],
[], # CPC not needed at this stage
[start_year, start_year - 5, start_year - 10],
matching_zone_filename,
jrc_directory_path,
cpc_directory_path,
Expand All @@ -69,49 +144,91 @@ def worker(
countries_raster_filename,
)

# To help with such a long running job, we log every 100th element
# motivated by issues with the pipeline hanging before during multi hour jobs
count = 0
while True:
if count % 100 == 0:
logging.info("%d: processing %d", worker_index, count)
count += 1
result_path = os.path.join(result_folder, f"fast_{worker_index}.tif")

matching_pixels = RasterLayer.empty_raster_layer_like(matching_collection.boundary, filename=result_path)
xsize = matching_collection.boundary.window.xsize
ysize = matching_collection.boundary.window.ysize
xstride = math.ceil(xsize)
ystride = math.ceil(ysize / DIVISIONS)

row = input_queue.get()
if row is None:
# Iterate our assigned pixels
while True:
coords = coordinate_queue.get()
if coords is None:
break
index, matching = row
print(f"Worker {worker_index} starting coords {coords}...")
ypos, xpos = coords
ymin = ypos * ystride
xmin = xpos * xstride
ymax = min(ymin + ystride, ysize)
xmax = min(xmin + xstride, xsize)
xwidth = xmax - xmin
ywidth = ymax - ymin
if xwidth <= 0 or ywidth <= 0:
print(f"Worker {worker_index} coords {coords} are outside boundary")
continue
boundary = matching_collection.boundary.read_array(xmin, ymin, xwidth, ywidth)
elevations = matching_collection.elevation.read_array(xmin, ymin, xwidth, ywidth)
ecoregions = matching_collection.ecoregions.read_array(xmin, ymin, xwidth, ywidth)
slopes = matching_collection.slope.read_array(xmin, ymin, xwidth, ywidth)
accesses = matching_collection.access.read_array(xmin, ymin, xwidth, ywidth)
lucs = [x.read_array(xmin, ymin, xwidth, ywidth) for x in matching_collection.lucs]

result_path = os.path.join(result_folder, f"{index}.tif")
if os.path.exists(result_path):
try:
raster = RasterLayer.layer_from_file(result_path)
if raster.sum() > 0:
continue
except FileNotFoundError:
pass
# FIXME: This still doesn't match perfectly with Patrick's scaled CPCs.
boundary_tl = matching_collection.boundary.latlng_for_pixel(xmin, ymin)
cpc_tl = matching_collection.cpcs[0].pixel_for_latlng(*boundary_tl)
boundary_br = matching_collection.boundary.latlng_for_pixel(xmax, ymax)
cpc_br = matching_collection.cpcs[0].pixel_for_latlng(*boundary_br)
cpc_width = cpc_br[0] - cpc_tl[0] + 2 # Get a few spare pixels
cpc_height = cpc_br[1] - cpc_tl[1] + 2
cpcs = [
cpc.read_array(cpc_tl[0], cpc_tl[1], cpc_width, cpc_height)
for cpc in matching_collection.cpcs
]

matching_pixels = RasterLayer.empty_raster_layer_like(matching_collection.boundary, filename=result_path)
exact_cpc_tl = exact_pixel_for_lat_lng(matching_collection.cpcs[0], *boundary_tl)
exact_cpc_br = exact_pixel_for_lat_lng(matching_collection.cpcs[0], *boundary_br)
cpc_width = exact_cpc_br[0] - exact_cpc_tl[0]
cpc_height = exact_cpc_br[1] - exact_cpc_tl[1]

filtered_ecoregions = matching_collection.ecoregions.numpy_apply(lambda chunk: chunk == matching.ecoregion)
filtered_elevation = matching_collection.elevation.numpy_apply(
lambda chunk: np.logical_and(chunk >= (matching.elevation - 200), chunk <= (matching.elevation + 200))
)
filtered_slopes = matching_collection.slope.numpy_apply(
lambda chunk: np.logical_and(chunk >= (matching.slope - 2.5), chunk <= (matching.slope + 2.5))
)
filtered_access = matching_collection.access.numpy_apply(
lambda chunk: np.logical_and(chunk >= (matching.access - 10), chunk <= (matching.access + 10))
)
filtered_luc0 = matching_collection.lucs[0].numpy_apply(lambda chunk: chunk == matching[luc0])
filtered_luc5 = matching_collection.lucs[1].numpy_apply(lambda chunk: chunk == matching[luc5])
filtered_luc10 = matching_collection.lucs[2].numpy_apply(lambda chunk: chunk == matching[luc10])
cpc_scale = (cpc_width / xwidth, cpc_height / ywidth)
cpc_offset = (exact_cpc_tl[0] - cpc_tl[0] + 0.5, exact_cpc_tl[1] - cpc_tl[1] + 0.5)

filtered_countries = matching_collection.countries.numpy_apply(lambda chunk: chunk == matching.country)
countries = matching_collection.countries.read_array(xmin, ymin, xwidth, ywidth)
points = np.zeros((ywidth, xwidth))
for ypos in range(ywidth):
for xpos in range(xwidth):
if boundary[ypos, xpos] == 0:
continue
ecoregion = ecoregions[ypos, xpos]
country = countries[ypos, xpos]
luc0 = lucs[0][ypos, xpos]
luc5 = lucs[1][ypos, xpos]
luc10 = lucs[2][ypos, xpos]
key = build_key(ecoregion, country, luc0, luc5, luc10)
if key in ktrees:
cpcx = math.floor((xpos + 4) * cpc_scale[0] + cpc_offset[0]) # Alignment WHY?
cpcy = math.floor((ypos - 6) * cpc_scale[1] + cpc_offset[1]) # WHY WHY WHY
points[ypos, xpos] = 1 if ktrees[key].contains(np.array([
elevations[ypos, xpos],
slopes[ypos, xpos],
accesses[ypos, xpos],
cpcs[0][cpcy, cpcx],
cpcs[1][cpcy, cpcx],
cpcs[2][cpcy, cpcx],
cpcs[3][cpcy, cpcx],
cpcs[4][cpcy, cpcx],
cpcs[5][cpcy, cpcx],
])) else 0
# Write points to output
# pylint: disable-next=protected-access
matching_pixels._dataset.GetRasterBand(1).WriteArray(points, xmin, ymin)
print(f"Worker {worker_index} completed coords {coords}.")
print(f"Worker {worker_index} finished.")

calc = matching_collection.boundary * filtered_ecoregions * filtered_elevation * filtered_countries * \
filtered_luc0 * filtered_luc5 * filtered_luc10 * filtered_slopes * filtered_access
calc.save(matching_pixels)
# Ensure we flush pixels to disk now we're finished
del matching_pixels._dataset


def find_potential_matches(
Expand All @@ -131,12 +248,19 @@ def find_potential_matches(
) -> None:
os.makedirs(result_folder, exist_ok=True)

assert processes_count >= 3

with Manager() as manager:
source_queue = manager.Queue()
coordinate_queue = manager.Queue()

worker_count = processes_count

# Fill the co-ordinate queue
for ypos in range(DIVISIONS):
coordinate_queue.put([ypos, 0])
for _ in range(worker_count):
coordinate_queue.put(None)

ktree = load_k(k_filename, start_year)

worker_count = processes_count - 2
workers = [Process(target=worker, args=(
index,
matching_zone_filename,
Expand All @@ -150,28 +274,26 @@ def find_potential_matches(
start_year,
evaluation_year,
result_folder,
source_queue
ktree,
coordinate_queue,
)) for index in range(worker_count)]
for worker_process in workers:
worker_process.start()
ingest_process = Process(target=load_k, args=(k_filename, worker_count, source_queue))
ingest_process.start()

processes = workers + [ingest_process]
while processes:
candidates = [x for x in processes if not x.is_alive()]
while workers:
candidates = [x for x in workers if not x.is_alive()]
for candidate in candidates:
candidate.join()
if candidate.exitcode:
for victim in processes:
for victim in workers:
victim.kill()
sys.exit(candidate.exitcode)
processes.remove(candidate)
workers.remove(candidate)
time.sleep(1)


def main():
parser = argparse.ArgumentParser(description="Generates a set of rasters per entry in K with potential matches.")
parser = argparse.ArgumentParser(description="Generates a set of rasters per process with potential matches.")
parser.add_argument(
"--k",
type=str,
Expand Down Expand Up @@ -247,7 +369,7 @@ def main():
type=str,
required=True,
dest="output_directory",
help="Destination directory for storing per-K rasters."
help="Destination directory for storing per-process rasters (horizontally striped)."
)
parser.add_argument(
"--countries-raster",
Expand Down
Loading