Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge consecutive range requests #29

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ async with COGReader("https://async-cog-reader-test-data.s3.amazonaws.com/naip_i
### Configuration
Configuration options are exposed through environment variables:
- **INGESTED_BYTES_AT_OPEN** - defines the number of bytes in the first GET request at file opening (defaults to 16KB)
- **HTTP_MERGE_CONSECUTIVE_RANGES** - determines if consecutive ranges are merged into a single request (defaults to FALSE)

Refer to [`aiocogeo/config.py`](https://github.com/geospatial-jeff/aiocogeo/blob/master/aiocogeo/config.py) for more details about configuration options.


## CLI
Expand Down
181 changes: 149 additions & 32 deletions aiocogeo/cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
from skimage.transform import resize

from . import config
from .constants import PHOTOMETRIC
from .errors import InvalidTiffError, TileNotFoundError
from .filesystems import Filesystem
Expand All @@ -26,7 +27,6 @@ class COGReader:
_version: Optional[int] = 42
_big_tiff: Optional[bool] = False


async def __aenter__(self):
"""Open the image and read the header"""
async with Filesystem.create_from_filepath(self.filepath) as file_reader:
Expand Down Expand Up @@ -112,7 +112,10 @@ async def _read_header(self) -> None:
# Hack to correctly identify full resolution mask ifds in deflate compressed images
# This assumes that the image ifd always comes before the mask ifd
try:
if ifd.is_mask or (ifd.is_full_resolution and ifd.ImageHeight.value == self.ifds[0].ImageHeight.value):
if ifd.is_mask or (
ifd.is_full_resolution
and ifd.ImageHeight.value == self.ifds[0].ImageHeight.value
):
self.mask_ifds.append(ifd)
else:
self.ifds.append(ifd)
Expand Down Expand Up @@ -140,7 +143,9 @@ def geotransform(self, ovr_level: int = 0) -> affine.Affine:
)
return gt

def _get_overview_level(self, bounds: Tuple[float, float, float, float], width: int, height: int) -> int:
def _get_overview_level(
self, bounds: Tuple[float, float, float, float], width: int, height: int
) -> int:
"""
Calculate appropriate overview level given request bounds and shape (width + height). Based on rio-tiler:
https://github.com/cogeotiff/rio-tiler/blob/v2/rio_tiler/utils.py#L79-L135
Expand Down Expand Up @@ -168,11 +173,10 @@ def _get_overview_level(self, bounds: Tuple[float, float, float, float], width:

return ovr_level


@cached(
cache=Cache.MEMORY,
# Cache key comes from filepath and x/y/z coordinates of image tile
key_builder=lambda fn,*args,**kwargs: f"{args[0].filepath}-{args[1]}-{args[2]}-{args[3]}"
key_builder=lambda fn, *args, **kwargs: f"{args[0].filepath}-{args[1]}-{args[2]}-{args[3]}",
)
async def get_tile(self, x: int, y: int, z: int) -> np.ndarray:
"""
Expand Down Expand Up @@ -207,11 +211,51 @@ async def get_tile(self, x: int, y: int, z: int) -> np.ndarray:
decoded = ifd._decompress(img_bytes[0])
if self.is_masked:
mask = mask_ifd._decompress_mask(img_bytes[1])
decoded = np.ma.masked_where(np.broadcast_to(mask, decoded.shape)==0, decoded)
decoded = np.ma.masked_where(
np.broadcast_to(mask, decoded.shape) == 0, decoded
)

return decoded

def _calculate_image_tiles(self, bounds: Tuple[float, float, float, float], ovr_level: int) -> Dict[str, Any]:
@staticmethod
def merge_range_requests(
ranges: List[Tuple[int, int, int, int]]
) -> Tuple[Tuple[int, int], Tuple[Tuple[int, int, int, int]]]:
"""
Helper function to merge consecutive range requests while keeping track of the initial ranges. Returns an
iterator which yields a tuple of tuples, where the first is a tuple with the start/end of a merged request and
the second is another tuple of tuples where each tuple contains the original ranges and tile indices.

For example, if we have two ranges A->B and B->C representing tiles in the top row of a COG, this
method will yield:

((A, C), ((A, B, 0, 0), (B, C, 1, 0)))

"""
# Create our start position (start, end)
saved = [ranges[0][0], ranges[0][0] + ranges[0][1]]
indices = []
for offset, byte_count, idx, idy in sorted(ranges):
# Get the next range
start = offset
end = offset + byte_count

# Merge ranges
if start <= saved[1]:
saved[1] = max(saved[1], end)
# Keep track of initial range
indices.append((offset, byte_count, idx, idy))
else:
yield tuple(saved), tuple(indices)
# Keep track of initial range
indices = [(offset, byte_count, idx, idy)]
saved[0] = start
saved[1] = end
yield tuple(saved), tuple(indices)

def _calculate_image_tiles(
self, bounds: Tuple[float, float, float, float], ovr_level: int
) -> Dict[str, Any]:
"""
Internal method to calculate which images tiles need to be requested for a partial read. Also returns metadata
about those image tiles.
Expand Down Expand Up @@ -241,12 +285,7 @@ def _calculate_image_tiles(self, bounds: Tuple[float, float, float, float], ovr_
# Create geotransform for the fused image
_tlx, _tly = geotransform * (tile_bounds[0], tile_bounds[1])
fused_gt = affine.Affine(
geotransform.a,
geotransform.b,
_tlx,
geotransform.d,
geotransform.e,
_tly
geotransform.a, geotransform.b, _tlx, geotransform.d, geotransform.e, _tly
)
inv_fused_gt = ~fused_gt
xorigin, yorigin = [round(v) for v in inv_fused_gt * (bounds[0], bounds[3])]
Expand All @@ -259,16 +298,57 @@ def _calculate_image_tiles(self, bounds: Tuple[float, float, float, float], ovr_
}

@staticmethod
def _stitch_image_tile(fut: asyncio.Future, fused_arr: np.ndarray, idx: int, idy: int, tile_width: int, tile_height: int) -> None:
"""Internal asyncio callback used to mosaic each image tile into a larger array."""
img_arr = fut.result()
def _stitch_image_tile(
tile_arr: np.ndarray,
fused_arr: np.ndarray,
idx: int,
idy: int,
tile_width: int,
tile_height: int,
) -> None:
"""
Helper method to mosaic a tile into a larger numpy array based on its position with respect to the larger
array
"""
fused_arr[
:,
idy * tile_height : (idy + 1) * tile_height,
idx * tile_width : (idx + 1) * tile_width
] = img_arr

async def read(self, bounds: Tuple[float, float, float, float], shape: Tuple[int, int]) -> Union[np.ndarray, np.ma.masked_array]:
idx * tile_width : (idx + 1) * tile_width,
] = tile_arr

def _stitch_merged_image_tile(
self,
fut: asyncio.Future,
fused: np.ndarray,
offset: int,
range_indices: Tuple[int, int, int, int],
tile_width: int,
tile_height: int,
ovr_level: int,
) -> None:
"""Internal asyncio callback to mosaic merged ranged requests into a larger array"""
img_bytes = fut.result()
# Compression is applied to each block, so we need to "unmerge" the blocks after performing the merged range
# request so we can decompress each block independently
# TODO: Each tile is discrete so we should be able to do this in parallel for a minor speed up
for (tile_offset, byte_count, idx, idy) in range_indices:
# Find the start/end of the tile with respect to the merged request
tile_start = tile_offset - offset
tile_end = tile_start + byte_count
# Extract the tile
tile_bytes = img_bytes[tile_start:tile_end]
# Decompress and mosaic
decoded = self.ifds[ovr_level]._decompress(tile_bytes)
self._stitch_image_tile(decoded, fused, idx, idy, tile_width, tile_height)

def _stitch_image_tile_callback(self, fut: asyncio.Future, *args, **kwargs):
"""Internal asyncio callback to mosaic an image tile into a larger array."""
tile_arr = fut.result()
self._stitch_image_tile(tile_arr, *args, **kwargs)

async def read(
self, bounds: Tuple[float, float, float, float], shape: Tuple[int, int]
) -> Union[np.ndarray, np.ma.masked_array]:
"""
Perform a partial read. All pixels within the specified bounding box are read from the image and the array is
resampled to match the desired shape.
Expand All @@ -290,34 +370,69 @@ async def read(self, bounds: Tuple[float, float, float, float], shape: Tuple[int
(xmax + 1 - xmin) * tile_width,
)
).astype(ifd.dtype)
for idx, xtile in enumerate(range(xmin, xmax + 1)):
for idy, ytile in enumerate(range(ymin, ymax + 1)):

if config.HTTP_MERGE_CONSECUTIVE_RANGES == "TRUE":
# Aggregate ranges
ranges = []
for idx, xtile in enumerate(range(xmin, xmax + 1)):
for idy, ytile in enumerate(range(ymin, ymax + 1)):
offset = ifd.TileOffsets[(ytile * ifd.tile_count[0]) + xtile]
byte_count = ifd.TileByteCounts[(ytile * ifd.tile_count[0]) + xtile]
ranges.append((offset, byte_count, idx, idy))

# Merge range requests, iterate through each merged request
for (offsets, indices) in self.merge_range_requests(ranges):
get_tile_task = asyncio.create_task(
self.get_tile(xtile, ytile, ovr_level)
self._file_reader.range_request(
offsets[0], offsets[1] - offsets[0] - 1
)
)
get_tile_task.add_done_callback(
partial(
self._stitch_image_tile,
fused_arr=fused,
idx=idx,
idy=idy,
self._stitch_merged_image_tile,
fused=fused,
offset=offsets[0],
range_indices=indices,
tile_width=tile_width,
tile_height=tile_height,
ovr_level=ovr_level,
)
)
tile_tasks.append(get_tile_task)
else:
for idx, xtile in enumerate(range(xmin, xmax + 1)):
for idy, ytile in enumerate(range(ymin, ymax + 1)):
get_tile_task = asyncio.create_task(
self.get_tile(xtile, ytile, ovr_level)
)
get_tile_task.add_done_callback(
partial(
self._stitch_image_tile_callback,
fused_arr=fused,
idx=idx,
idy=idy,
tile_width=tile_width,
tile_height=tile_height,
)
)
tile_tasks.append(get_tile_task)

# Request tiles
await asyncio.gather(*tile_tasks)

# Clip to request bounds
clipped = fused[
:,
img_tiles['tly']: img_tiles['tly'] + img_tiles['height'],
img_tiles['tlx']: img_tiles['tlx'] + img_tiles['width']
img_tiles["tly"] : img_tiles["tly"] + img_tiles["height"],
img_tiles["tlx"] : img_tiles["tlx"] + img_tiles["width"],
]

# Resample to match request size
resized = resize(
clipped, output_shape=(ifd.bands, shape[0], shape[1]), preserve_range=True, anti_aliasing=True
clipped,
output_shape=(ifd.bands, shape[0], shape[1]),
preserve_range=True,
anti_aliasing=True,
).astype(ifd.dtype)

return resized
Expand All @@ -341,7 +456,9 @@ def create_tile_matrix_set(self, identifier: str = None) -> Dict[str, Any]:
tms = {
"title": f"Tile matrix for {self.filepath}",
"identifier": identifier or str(uuid.uuid4()),
"supportedCRS": urljoin(f"http://www.opengis.net", f"/def/crs/EPSG/0/{self.epsg}"),
"tileMatrix": list(reversed(matrices))
"supportedCRS": urljoin(
f"http://www.opengis.net", f"/def/crs/EPSG/0/{self.epsg}"
),
"tileMatrix": list(reversed(matrices)),
}
return tms
20 changes: 10 additions & 10 deletions aiocogeo/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class Compression(metaclass=abc.ABCMeta):
Predictor: Optional[Tag]
JPEGTables: Optional[Tag]


@property
@abc.abstractmethod
def bands(self) -> int:
Expand Down Expand Up @@ -48,17 +47,16 @@ def _decompress(self, tile: bytes) -> np.ndarray:

def _decompress_mask(self, tile: bytes) -> np.ndarray:
"""Internal method to decompress a binary mask and rescale to uint8"""
decoded = np.frombuffer(imagecodecs.zlib_decode(tile), np.dtype('uint8'))
mask = np.unpackbits(decoded).reshape(self.TileHeight.value, self.TileWidth.value) * 255
decoded = np.frombuffer(imagecodecs.zlib_decode(tile), np.dtype("uint8"))
mask = (
np.unpackbits(decoded).reshape(self.TileHeight.value, self.TileWidth.value)
* 255
)
return mask

def _reshape(self, arr: np.ndarray) -> np.ndarray:
"""Internal method to reshape an array to the size expected by the IFD"""
return arr.reshape(
self.TileHeight.value,
self.TileWidth.value,
self.bands,
)
return arr.reshape(self.TileHeight.value, self.TileWidth.value, self.bands,)

def _unpredict(self, arr: np.ndarray) -> None:
"""Internal method to unpredict if there is horizontal differencing"""
Expand Down Expand Up @@ -95,6 +93,8 @@ def _webp(self, tile: bytes) -> np.ndarray:

def _deflate(self, tile: bytes) -> np.ndarray:
"""Internal method to decompress DEFLATE image bytes and convert to numpy array"""
decoded = self._reshape(np.frombuffer(imagecodecs.zlib_decode(tile), self.dtype))
decoded = self._reshape(
np.frombuffer(imagecodecs.zlib_decode(tile), self.dtype)
)
self._unpredict(decoded)
return np.rollaxis(decoded, 2, 0)
return np.rollaxis(decoded, 2, 0)
14 changes: 12 additions & 2 deletions aiocogeo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@
import logging

# Changes the log level
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "WARN")
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "ERROR")

print(os.environ)

# https://gdal.org/user/virtual_file_systems.html#vsicurl-http-https-ftp-files-random-access
# Defines the number of bytes read in the first GET request at file opening
# Can help performance when reading images with a large header
INGESTED_BYTES_AT_OPEN: int = os.getenv("INGESTED_BYTES_AT_OPEN", 16384)
INGESTED_BYTES_AT_OPEN: int = os.getenv("INGESTED_BYTES_AT_OPEN", 16384)


# https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_HTTP_MERGE_CONSECUTIVE_RANGES
# Determines if consecutive range requests are merged into a single request, reducing the number of HTTP GET range
# requests required to read consecutive internal image tiles
HTTP_MERGE_CONSECUTIVE_RANGES: str = os.getenv(
"HTTP_MERGE_CONSECUTIVE_RANGES", "FALSE"
).upper()
Loading