From db64f0e22ed6adfa4396ef9098b738688a5618ca Mon Sep 17 00:00:00 2001 From: David Bitner Date: Thu, 21 Jul 2022 09:37:15 -0500 Subject: [PATCH 1/4] initial playing with keyset pagination --- tifeatures/factory.py | 35 ++++++++ tifeatures/layer.py | 183 +++++++++++++++++++++--------------------- 2 files changed, 125 insertions(+), 93 deletions(-) diff --git a/tifeatures/factory.py b/tifeatures/factory.py index f58445a..5f97aa6 100644 --- a/tifeatures/factory.py +++ b/tifeatures/factory.py @@ -603,6 +603,41 @@ async def items( for (key, value) in request.query_params.items() if key.lower() not in exclude ] + _select = collection.select(properties) + _from = collection._from() + _geom = collection._geom(geom_column, bbox_only, simplify) + _where = collection._where( + ids=ids_filter, + datetime=datetime_filter, + bbox=bbox_filter, + properties=properties_filter, + cql=cql_filter, + geom=geom_column, + dt=datetime_column + ) + _sortby = collection._sortby(sort=sortby) + + if next or prev: + # Get columns used in sort from table using pk to create offset + """ + SELECT sortcols FROM table WHERE pk = pk + """ + # get first record + """SELECT pk FROM table WHERE LIMIT 1;""" + # get last - limit record if limit set + """SELECT pk FROM table WHERE OFFSET LIMIT 1;""" + # get previous record + if next is None and offset is None: + # no paging parameters, so this is first record + prev = None + elif next is not None: + """ + SELECT sortcols FROM table WHERE pk = pk + """ + SELECT pk FROM table WHERE AND + """ + # get next record + """SELECT pk FROM table WHERE OFFSET LIMIT 1;""" items, matched_items = await collection.features( request.app.state.pool, diff --git a/tifeatures/layer.py b/tifeatures/layer.py index 0053f61..81684df 100644 --- a/tifeatures/layer.py +++ b/tifeatures/layer.py @@ -262,32 +262,53 @@ def _datetime_filter_to_sql(self, interval: List[str], dt_name: str): logic.V(dt_name) < logic.S(pg_funcs.cast(end, "timestamptz")), ) - def _sortby(self, sortby: Optional[str]): + def _sorting(self, sortby: Optional[str]): + pk = self.id_column + sortcols=[] sorts = [] - if sortby: - for s in sortby.strip().split(","): - parts = re.match( - "^(?P[+-]?)(?P.*)$", s - ).groupdict() # type:ignore - - direction = parts["direction"] - column = parts["column"].strip() - if self.get_column(column): - if direction == "-": - sorts.append(logic.V(column).desc()) - else: - sorts.append(logic.V(column)) + reversesorts = [] + prevwheres = [] + nextwheres = [] + sortby = sortby.strip() + if sortby is None: + sortby = pk + if pk not in sortby: + sortby += ',' + pk + sortbyarr = sortby.split(",") + pkseen = False + + q,p = render( + for s in sortbyarr: + parts = re.match( + "^(?P[+-]?)(?P.*)$", s + ).groupdict() # type:ignore + + direction = parts["direction"] + column = parts["column"].strip() + if column == pk: + pkseen = True + if self.get_column(column): + sortcols.append(column) + if direction == "-": + sorts.append(logic.V(column).desc()) + reversesorts.append(logic.V(column).desc()) + else: - raise InvalidPropertyName(f"Property {column} does not exist.") + sorts.append(logic.V(column)) + reversesorts.append(logic.V(column).desc()) + else: + raise InvalidPropertyName(f"Property {column} does not exist.") - else: - sorts.append(logic.V(self.id_column)) + if not pkseen: + sorts.append(logic.V(pk)) + reversesorts.append(logic.V(pk).desc()) return clauses.OrderBy(*sorts) + + def _features_query( self, - *, ids_filter: Optional[List[str]] = None, bbox_filter: Optional[List[float]] = None, datetime_filter: Optional[List[str]] = None, @@ -326,8 +347,8 @@ def _features_count_query( datetime_filter: Optional[List[str]] = None, properties_filter: Optional[List[Tuple[str, str]]] = None, cql_filter: Optional[AstType] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, ): """Build features COUNT query.""" return ( @@ -344,96 +365,72 @@ def _features_count_query( ) ) + async def query_count(self, pool, **kwargs): + """Get the estimated count/cost from query.""" + if 'count_exact' in kwargs: + async with pool.acquire() as conn: + q,p = render( + """ + SELECT count(*) + :f + """, + f=self._from(), + w = self._where(**kwargs) + ) + return conn.fetchval(q,*p) + q,p = render( + """ + EXPLAIN (FORMAT JSON) + SELECT 1 + :f + :o + """, + f=self._from(), + w = self._where(**kwargs) + ) + async with pool.acquire() as conn: + explain = await conn.fetchval(q, *p) + return explain[0]['Plan']['Plan Rows'] + async def query( self, pool: asyncpg.BuildPgPool, - *, - ids_filter: Optional[List[str]] = None, - bbox_filter: Optional[List[float]] = None, - datetime_filter: Optional[List[str]] = None, - properties_filter: Optional[List[Tuple[str, str]]] = None, - cql_filter: Optional[AstType] = None, - sortby: Optional[str] = None, - properties: Optional[List[str]] = None, - geom: str = None, - dt: str = None, - limit: Optional[int] = None, - offset: Optional[int] = None, - bbox_only: Optional[bool] = None, - simplify: Optional[float] = None, + **kwargs ) -> Tuple[FeatureCollection, int]: """Build and run Pg query.""" + geom = kwargs['geom'] if geom and geom.lower() != "none" and not self.geometry_column(geom): raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom}.") - sql_query = """ - WITH - features AS ( - :features_q - ), - total_count AS ( - :count_q - ) - SELECT json_build_object( - 'type', 'FeatureCollection', - 'features', - ( - SELECT - json_agg( - json_build_object( - 'type', 'Feature', - 'id', :id_column, - 'geometry', :geometry_q, - 'properties', to_jsonb( features.* ) - :geom_columns::text[] - ) - ) - FROM features - ), - 'total_count', - ( - SELECT count FROM total_count - ) + q,p = render( + """ + WITH features AS ( + :features + ) + SELECT + json_build_object( + 'type', 'Feature', + 'id', :id_column, + 'geometry', :geometry_q, + 'properties', to_jsonb( features.* ) - :geom_columns::text[] ) + FROM features ; - """ - q, p = render( - sql_query, - features_q=self._features_query( - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - sortby=sortby, - properties=properties, - geom=geom, - dt=dt, - limit=limit, - offset=offset, - ), - count_q=self._features_count_query( - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - geom=geom, - dt=dt, - ), + """, id_column=logic.V(self.id_column), - geometry_q=self._geom( - geometry_column=self.geometry_column(geom), - bbox_only=bbox_only, - simplify=simplify, - ), + geometry_q=self._geom(**kwargs), geom_columns=[g.name for g in self.geometry_columns], + features=self._features(**kwargs), ) async with pool.acquire() as conn: items = await conn.fetchval(q, *p) + features = (item for item in items) + + count = await self.query_count(**kwargs) return ( - FeatureCollection(features=items.get("features") or []), - items["total_count"], + FeatureCollection(features=features or []), + count, ) async def features( From 9e342762f2bb53c30b3d7e6c040d0449ac7676cf Mon Sep 17 00:00:00 2001 From: David Bitner Date: Mon, 25 Jul 2022 13:28:06 -0500 Subject: [PATCH 2/4] Stream features rather than creating output all in memory first --- tifeatures/dbmodel.py | 16 +- tifeatures/factory.py | 414 +++++++++++++++++++++--------------------- tifeatures/layer.py | 155 +++++++--------- 3 files changed, 279 insertions(+), 306 deletions(-) diff --git a/tifeatures/dbmodel.py b/tifeatures/dbmodel.py index f9c54a2..200d10e 100644 --- a/tifeatures/dbmodel.py +++ b/tifeatures/dbmodel.py @@ -90,16 +90,22 @@ def id_column_info(self) -> Column: # type: ignore if col.name == self.id_column: return col - def columns(self, properties: Optional[List[str]] = None) -> List[str]: + def columns( + self, properties: Optional[List[str]] = None, nogeo: bool = False + ) -> List[str]: """Return table columns optionally filtered to only include columns from properties.""" - cols = [c.name for c in self.properties] + if nogeo: + cols = [c.name for c in self.properties if not c.type.startswith("geo")] + else: + cols = [c.name for c in self.properties] if properties is not None: if self.id_column not in properties: properties.append(self.id_column) - geom_col = self.geometry_column() - if geom_col: - properties.append(geom_col.name) + if not nogeo: + geom_col = self.geometry_column() + if geom_col: + properties.append(geom_col.name) cols = [col for col in cols if col in properties] diff --git a/tifeatures/factory.py b/tifeatures/factory.py index 5f97aa6..7bb7e2e 100644 --- a/tifeatures/factory.py +++ b/tifeatures/factory.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, field from typing import Any, Callable, Dict, Generator, Iterable, List, Optional +import buildpg import jinja2 from pygeofilter.ast import AstType @@ -564,7 +565,7 @@ async def items( description="Limits the number of features in the response.", ), offset: Optional[int] = Query( - None, + 0, ge=0, description="Starts the response at an offset.", ), @@ -578,9 +579,12 @@ async def items( description="Simplify the output geometry to given threshold in decimal degrees.", ), output_type: Optional[MediaType] = Depends(ItemsOutputType), + count_exact: Optional[bool] = Query( + False, + description="Return an exact count of features rather than an estimate.", + ), ): - offset = offset or 0 - + pool = request.app.state.pool # =VALUE - filter features for a property having a value. Multiple property filters are ANDed together. exclude = [ "f", @@ -603,9 +607,15 @@ async def items( for (key, value) in request.query_params.items() if key.lower() not in exclude ] - _select = collection.select(properties) - _from = collection._from() + _select = collection._select(properties) + _select += buildpg.logic.Empty().comma( + buildpg.V(collection.id_column).as_("itemid") + ) _geom = collection._geom(geom_column, bbox_only, simplify) + if _geom: + _select += buildpg.logic.Empty().comma(_geom) + + _from = collection._from() _where = collection._where( ids=ids_filter, datetime=datetime_filter, @@ -613,220 +623,202 @@ async def items( properties=properties_filter, cql=cql_filter, geom=geom_column, - dt=datetime_column - ) - _sortby = collection._sortby(sort=sortby) - - if next or prev: - # Get columns used in sort from table using pk to create offset - """ - SELECT sortcols FROM table WHERE pk = pk - """ - # get first record - """SELECT pk FROM table WHERE LIMIT 1;""" - # get last - limit record if limit set - """SELECT pk FROM table WHERE OFFSET LIMIT 1;""" - # get previous record - if next is None and offset is None: - # no paging parameters, so this is first record - prev = None - elif next is not None: - """ - SELECT sortcols FROM table WHERE pk = pk - """ - SELECT pk FROM table WHERE AND - """ - # get next record - """SELECT pk FROM table WHERE OFFSET LIMIT 1;""" - - items, matched_items = await collection.features( - request.app.state.pool, - ids_filter=ids_filter, - bbox_filter=bbox_filter, - datetime_filter=datetime_filter, - properties_filter=properties_filter, - cql_filter=cql_filter, - sortby=sortby, - properties=properties, - limit=limit, - offset=offset, - geom=geom_column, dt=datetime_column, - bbox_only=bbox_only, - simplify=simplify, - ) - - if output_type in ( - MediaType.csv, - MediaType.json, - MediaType.ndjson, - ): - if items and items[0].geometry is not None: - rows = ( - { - "collectionId": collection.id, - "itemId": f.id, - **f.properties, - "geometry": f.geometry.wkt, - } - for f in items - ) - - else: - rows = ( - { - "collectionId": collection.id, - "itemId": f.id, - **f.properties, - } - for f in items - ) - - # CSV Response - if output_type == MediaType.csv: - return StreamingResponse( - create_csv_rows(rows), - media_type=MediaType.csv, - headers={ - "Content-Disposition": "attachment;filename=items.csv" - }, - ) - - # JSON Response - if output_type == MediaType.json: - return JSONResponse([row for row in rows]) - - # NDJSON Response - if output_type == MediaType.ndjson: - return StreamingResponse( - (json.dumps(row) + "\n" for row in rows), - media_type=MediaType.ndjson, - headers={ - "Content-Disposition": "attachment;filename=items.ndjson" - }, - ) - - qs = "?" + str(request.query_params) if request.query_params else "" - links = [ - model.Link( - title="Collection", - href=self.url_for( - request, "collection", collectionId=collection.id - ), - rel="collection", - type=MediaType.json, - ), - model.Link( - title="Items", - href=self.url_for(request, "items", collectionId=collection.id) - + qs, - rel="self", - type=MediaType.geojson, - ), - ] - - items_returned = len(items) - - if (matched_items - items_returned) > offset: - next_offset = offset + items_returned - query_params = QueryParams( - {**request.query_params, "offset": next_offset} - ) - url = ( - self.url_for(request, "items", collectionId=collection.id) - + f"?{query_params}" - ) - links.append( - model.Link( - href=url, - rel="next", - type=MediaType.geojson, - title="Next page", - ), - ) - - if offset: - query_params = dict(request.query_params) - query_params.pop("offset") - prev_offset = max(offset - items_returned, 0) - if prev_offset: - query_params = QueryParams({**query_params, "offset": prev_offset}) - else: - query_params = QueryParams({**query_params}) - - url = self.url_for(request, "items", collectionId=collection.id) - if query_params: - url += f"?{query_params}" - - links.append( - model.Link( - href=url, - rel="prev", - type=MediaType.geojson, - title="Previous page", - ), - ) - - data = model.Items( - id=collection.id, - title=collection.title or collection.id, - description=collection.description or collection.title or collection.id, - numberMatched=matched_items, - numberReturned=items_returned, - links=links, - features=[ - model.Item( - **{ - **feature.dict(), - "links": [ - model.Link( - title="Collection", - href=self.url_for( - request, - "collection", - collectionId=collection.id, - ), - rel="collection", - type=MediaType.json, - ), - model.Link( - title="Item", - href=self.url_for( - request, - "item", - collectionId=collection.id, - itemId=feature.properties[collection.id_column], - ), - rel="item", - type=MediaType.json, - ), - ], - } - ) - for feature in items - ], ) - - # HTML Response - if output_type == MediaType.html: - return self._create_html_response( - request, - data.json(exclude_none=True), - template_name="items", - ) - - # GeoJSONSeq Response - elif output_type == MediaType.geojsonseq: + _sortby = collection._sortby(sortby, limit, offset) + _features = _select + _from + _where + _sortby + total_count = await collection.query_count(pool, _from, _where, count_exact) + if total_count == 0: + raise NotFound + print(total_count) + print(output_type) + + if output_type == MediaType.geojsonseq: return StreamingResponse( - data.json_seq(exclude_none=True), + collection.query_geojson_rows(pool, _features), media_type=MediaType.geojsonseq, headers={ - "Content-Disposition": "attachment;filename=items.geojson" + "Content-Disposition": "attachment;filename=items.geojsonseq" }, ) - # Default to GeoJSON Response - return data + if output_type == MediaType.geojson: + return StreamingResponse( + collection.query_geojson(pool, _features, total_count), + media_type=MediaType.geojson, + ) + + # if output_type in ( + # MediaType.csv, + # MediaType.json, + # MediaType.ndjson, + # ): + # if items and items[0].geometry is not None: + # rows = ( + # { + # "collectionId": collection.id, + # "itemId": f.id, + # **f.properties, + # "geometry": f.geometry.wkt, + # } + # for f in items + # ) + + # else: + # rows = ( + # { + # "collectionId": collection.id, + # "itemId": f.id, + # **f.properties, + # } + # for f in items + # ) + + # # CSV Response + # if output_type == MediaType.csv: + # return StreamingResponse( + # create_csv_rows(rows), + # media_type=MediaType.csv, + # headers={ + # "Content-Disposition": "attachment;filename=items.csv" + # }, + # ) + + # # JSON Response + # if output_type == MediaType.json: + # return JSONResponse([row for row in rows]) + + # # NDJSON Response + # if output_type == MediaType.ndjson: + # return StreamingResponse( + # (json.dumps(row) + "\n" for row in rows), + # media_type=MediaType.ndjson, + # headers={ + # "Content-Disposition": "attachment;filename=items.ndjson" + # }, + # ) + + # qs = "?" + str(request.query_params) if request.query_params else "" + # links = [ + # model.Link( + # title="Collection", + # href=self.url_for( + # request, "collection", collectionId=collection.id + # ), + # rel="collection", + # type=MediaType.json, + # ), + # model.Link( + # title="Items", + # href=self.url_for(request, "items", collectionId=collection.id) + # + qs, + # rel="self", + # type=MediaType.geojson, + # ), + # ] + + # items_returned = len(items) + + # if (matched_items - items_returned) > offset: + # next_offset = offset + items_returned + # query_params = QueryParams( + # {**request.query_params, "offset": next_offset} + # ) + # url = ( + # self.url_for(request, "items", collectionId=collection.id) + # + f"?{query_params}" + # ) + # links.append( + # model.Link( + # href=url, + # rel="next", + # type=MediaType.geojson, + # title="Next page", + # ), + # ) + + # if offset: + # query_params = dict(request.query_params) + # query_params.pop("offset") + # prev_offset = max(offset - items_returned, 0) + # if prev_offset: + # query_params = QueryParams({**query_params, "offset": prev_offset}) + # else: + # query_params = QueryParams({**query_params}) + + # url = self.url_for(request, "items", collectionId=collection.id) + # if query_params: + # url += f"?{query_params}" + + # links.append( + # model.Link( + # href=url, + # rel="prev", + # type=MediaType.geojson, + # title="Previous page", + # ), + # ) + + # data = model.Items( + # id=collection.id, + # title=collection.title or collection.id, + # description=collection.description or collection.title or collection.id, + # numberMatched=matched_items, + # numberReturned=items_returned, + # links=links, + # features=[ + # model.Item( + # **{ + # **feature.dict(), + # "links": [ + # model.Link( + # title="Collection", + # href=self.url_for( + # request, + # "collection", + # collectionId=collection.id, + # ), + # rel="collection", + # type=MediaType.json, + # ), + # model.Link( + # title="Item", + # href=self.url_for( + # request, + # "item", + # collectionId=collection.id, + # itemId=feature.properties[collection.id_column], + # ), + # rel="item", + # type=MediaType.json, + # ), + # ], + # } + # ) + # for feature in items + # ], + # ) + + # # HTML Response + # if output_type == MediaType.html: + # return self._create_html_response( + # request, + # data.json(exclude_none=True), + # template_name="items", + # ) + + # # GeoJSONSeq Response + # elif output_type == MediaType.geojsonseq: + # return StreamingResponse( + # data.json_seq(exclude_none=True), + # media_type=MediaType.geojsonseq, + # headers={ + # "Content-Disposition": "attachment;filename=items.geojson" + # }, + # ) + + # # Default to GeoJSON Response + # return data @self.router.get( "/collections/{collectionId}/items/{itemId}", diff --git a/tifeatures/layer.py b/tifeatures/layer.py index 81684df..4616fc6 100644 --- a/tifeatures/layer.py +++ b/tifeatures/layer.py @@ -3,6 +3,7 @@ import abc import re from dataclasses import dataclass +from time import time from typing import Any, ClassVar, Dict, List, Optional, Tuple from buildpg import asyncpg, clauses @@ -114,7 +115,7 @@ def bounds_default(cls, values): return values def _select(self, properties: Optional[List[str]]): - return clauses.Select(self.columns(properties)) + return clauses.Select(self.columns(properties, True)) def _select_count(self): return clauses.Select(pg_funcs.count("*")) @@ -124,12 +125,17 @@ def _from(self): def _geom( self, - geometry_column: Optional[GeometryColumn], + geom_column: Optional[str], bbox_only: Optional[bool], simplify: Optional[float], ): - if geometry_column is None: - return pg_funcs.cast(None, "json") + if geom_column and geom_column.lower() == "none": + return None + + geometry_column = self.geometry_column(geom_column) + + if not geometry_column: + raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom_column}.") g = logic.V(geometry_column.name) g = pg_funcs.cast(g, "geometry") @@ -146,7 +152,7 @@ def _geom( simplify, ) - g = logic.Func("ST_AsGeoJson", g) + g = g.as_("geom") return g @@ -157,8 +163,8 @@ def _where( bbox: Optional[List[float]] = None, properties: Optional[List[Tuple[str, Any]]] = None, cql: Optional[AstType] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, ): """Construct WHERE query.""" wheres = [logic.S(True)] @@ -262,50 +268,34 @@ def _datetime_filter_to_sql(self, interval: List[str], dt_name: str): logic.V(dt_name) < logic.S(pg_funcs.cast(end, "timestamptz")), ) - def _sorting(self, sortby: Optional[str]): + def _sortby(self, sortby: Optional[str], limit: int, offset: int): pk = self.id_column - sortcols=[] sorts = [] - reversesorts = [] - prevwheres = [] - nextwheres = [] - sortby = sortby.strip() - if sortby is None: - sortby = pk - if pk not in sortby: - sortby += ',' + pk - sortbyarr = sortby.split(",") pkseen = False - q,p = render( - for s in sortbyarr: - parts = re.match( - "^(?P[+-]?)(?P.*)$", s - ).groupdict() # type:ignore - - direction = parts["direction"] - column = parts["column"].strip() - if column == pk: - pkseen = True - if self.get_column(column): - sortcols.append(column) - if direction == "-": - sorts.append(logic.V(column).desc()) - reversesorts.append(logic.V(column).desc()) - + if sortby: + for s in sortby.strip().split(","): + parts = re.match( + "^(?P[+-]?)(?P.*)$", s + ).groupdict() # type:ignore + + direction = parts["direction"] + column = parts["column"].strip() + if column == pk: + pkseen = True + if self.get_column(column): + if direction == "-": + sorts.append(logic.V(column).desc()) + + else: + sorts.append(logic.V(column)) else: - sorts.append(logic.V(column)) - reversesorts.append(logic.V(column).desc()) - else: - raise InvalidPropertyName(f"Property {column} does not exist.") + raise InvalidPropertyName(f"Property {column} does not exist.") if not pkseen: sorts.append(logic.V(pk)) - reversesorts.append(logic.V(pk).desc()) - - return clauses.OrderBy(*sorts) - + return clauses.OrderBy(*sorts) + clauses.Limit(limit) + clauses.Offset(offset) def _features_query( self, @@ -316,8 +306,8 @@ def _features_query( cql_filter: Optional[AstType] = None, sortby: Optional[str] = None, properties: Optional[List[str]] = None, - geom: str = None, - dt: str = None, + geom: Optional[str] = None, + dt: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, ): @@ -365,73 +355,58 @@ def _features_count_query( ) ) - async def query_count(self, pool, **kwargs): + async def query_count(self, pool, _from_clause, _where_clause, count_exact): """Get the estimated count/cost from query.""" - if 'count_exact' in kwargs: + from_where_clause = _from_clause + _where_clause + if count_exact: async with pool.acquire() as conn: - q,p = render( - """ - SELECT count(*) - :f - """, - f=self._from(), - w = self._where(**kwargs) - ) - return conn.fetchval(q,*p) - q,p = render( + q, p = (self._select_count() + from_where_clause).render() + return conn.fetchval(q, *p) + q, p = render( """ EXPLAIN (FORMAT JSON) SELECT 1 - :f - :o + :from_where_clause """, - f=self._from(), - w = self._where(**kwargs) + from_where_clause=from_where_clause, ) async with pool.acquire() as conn: explain = await conn.fetchval(q, *p) - return explain[0]['Plan']['Plan Rows'] - - async def query( - self, - pool: asyncpg.BuildPgPool, - **kwargs - ) -> Tuple[FeatureCollection, int]: - """Build and run Pg query.""" - geom = kwargs['geom'] - if geom and geom.lower() != "none" and not self.geometry_column(geom): - raise InvalidGeometryColumnName(f"Invalid Geometry Column: {geom}.") + return explain[0]["Plan"]["Plan Rows"] - q,p = render( + async def query_geojson_rows(self, pool, features_query): + """Build and run Pg query to get json rows.""" + st = time() + q, p = render( """ WITH features AS ( - :features + :features_query ) SELECT json_build_object( 'type', 'Feature', - 'id', :id_column, - 'geometry', :geometry_q, - 'properties', to_jsonb( features.* ) - :geom_columns::text[] - ) + 'id', itemid, + 'geometry', ST_ASGeoJson(geom)::json, + 'properties', to_jsonb( features.* ) - '{pk,geom}'::text[] + )::text FROM features ; """, - id_column=logic.V(self.id_column), - geometry_q=self._geom(**kwargs), - geom_columns=[g.name for g in self.geometry_columns], - features=self._features(**kwargs), + features_query=features_query, ) async with pool.acquire() as conn: - items = await conn.fetchval(q, *p) - features = (item for item in items) - - count = await self.query_count(**kwargs) - - return ( - FeatureCollection(features=features or []), - count, - ) + async with conn.transaction(): + async for record in conn.cursor(q, *p, prefetch=50, timeout=120): + yield record[0] + "\n" + + async def query_geojson(self, pool, features_query, total): + """Build and run Pg query to get json rows.""" + cnt = 0 + yield '{"type":"FeatureCollection","features":[' + async for rec in self.query_geojson_rows(pool, features_query): + cnt += 1 + yield rec + yield f'],"numberMatched":{total},"numberReturned":{cnt}}}' async def features( self, From eaba0af840c8fc489910ee2663c359f46e9d88b5 Mon Sep 17 00:00:00 2001 From: David Bitner Date: Mon, 25 Jul 2022 15:23:54 -0500 Subject: [PATCH 3/4] add links, fix geojson output --- tifeatures/factory.py | 14 ++++++++++++-- tifeatures/layer.py | 44 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/tifeatures/factory.py b/tifeatures/factory.py index 7bb7e2e..f886c44 100644 --- a/tifeatures/factory.py +++ b/tifeatures/factory.py @@ -633,9 +633,17 @@ async def items( print(total_count) print(output_type) + collection_href = str( + self.url_for( + request, + "collection", + collectionId=collection.id, + ) + ) + if output_type == MediaType.geojsonseq: return StreamingResponse( - collection.query_geojson_rows(pool, _features), + collection.query_geojson_rows(pool, _features, collection_href), media_type=MediaType.geojsonseq, headers={ "Content-Disposition": "attachment;filename=items.geojsonseq" @@ -644,7 +652,9 @@ async def items( if output_type == MediaType.geojson: return StreamingResponse( - collection.query_geojson(pool, _features, total_count), + collection.query_geojson( + pool, _features, collection_href, total_count + ), media_type=MediaType.geojson, ) diff --git a/tifeatures/layer.py b/tifeatures/layer.py index 4616fc6..0748ebe 100644 --- a/tifeatures/layer.py +++ b/tifeatures/layer.py @@ -6,6 +6,7 @@ from time import time from typing import Any, ClassVar, Dict, List, Optional, Tuple +import orjson from buildpg import asyncpg, clauses from buildpg import funcs as pg_funcs from buildpg import logic, render @@ -374,7 +375,7 @@ async def query_count(self, pool, _from_clause, _where_clause, count_exact): explain = await conn.fetchval(q, *p) return explain[0]["Plan"]["Plan Rows"] - async def query_geojson_rows(self, pool, features_query): + async def query_geojson_rows(self, pool, features_query, collection_href): """Build and run Pg query to get json rows.""" st = time() q, p = render( @@ -383,30 +384,59 @@ async def query_geojson_rows(self, pool, features_query): :features_query ) SELECT - json_build_object( + jsonb_build_object( 'type', 'Feature', 'id', itemid, 'geometry', ST_ASGeoJson(geom)::json, - 'properties', to_jsonb( features.* ) - '{pk,geom}'::text[] + 'properties', to_jsonb( features.* ) - '{itemid,geom}'::text[], + 'links', jsonb_build_array( + jsonb_build_object( + 'title', 'Collection', + 'href', :collection_href::text, + 'rel', 'collection', + 'type', 'application/json' + ), + jsonb_build_object( + 'title', 'Item', + 'href', format('%s/items/%s',:collection_href::text,itemid), + 'rel', 'item', + 'type', 'application/json' + ) + ) )::text FROM features ; """, features_query=features_query, + collection_href=collection_href, ) + print(q, p) async with pool.acquire() as conn: async with conn.transaction(): async for record in conn.cursor(q, *p, prefetch=50, timeout=120): yield record[0] + "\n" - async def query_geojson(self, pool, features_query, total): + async def query_geojson(self, pool, features_query, collection_href, total): """Build and run Pg query to get json rows.""" cnt = 0 yield '{"type":"FeatureCollection","features":[' - async for rec in self.query_geojson_rows(pool, features_query): + async for rec in self.query_geojson_rows(pool, features_query, collection_href): cnt += 1 - yield rec - yield f'],"numberMatched":{total},"numberReturned":{cnt}}}' + if cnt > 1: + yield "," + rec + else: + yield rec + links = orjson.dumps( + [ + { + "title": "Collection", + "href": collection_href, + "rel": "collection", + "type": "application/json", + } + ] + ).decode() + yield f'],"numberMatched":{total},"numberReturned":{cnt}, "links":{links}}}' async def features( self, From eda1f472d1521164c13988556a40b23c385c353d Mon Sep 17 00:00:00 2001 From: David Bitner Date: Mon, 25 Jul 2022 15:48:33 -0500 Subject: [PATCH 4/4] paging links --- tifeatures/factory.py | 8 +++++- tifeatures/layer.py | 62 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/tifeatures/factory.py b/tifeatures/factory.py index f886c44..06521c5 100644 --- a/tifeatures/factory.py +++ b/tifeatures/factory.py @@ -641,6 +641,8 @@ async def items( ) ) + qs = "?" + str(request.query_params) if request.query_params else "" + if output_type == MediaType.geojsonseq: return StreamingResponse( collection.query_geojson_rows(pool, _features, collection_href), @@ -653,7 +655,11 @@ async def items( if output_type == MediaType.geojson: return StreamingResponse( collection.query_geojson( - pool, _features, collection_href, total_count + pool, + _features, + collection_href, + total_count, + query_params=request.query_params, ), media_type=MediaType.geojson, ) diff --git a/tifeatures/layer.py b/tifeatures/layer.py index 0748ebe..62259cc 100644 --- a/tifeatures/layer.py +++ b/tifeatures/layer.py @@ -1,12 +1,12 @@ """tifeatures.layers.""" import abc +import json import re from dataclasses import dataclass from time import time from typing import Any, ClassVar, Dict, List, Optional, Tuple -import orjson from buildpg import asyncpg, clauses from buildpg import funcs as pg_funcs from buildpg import logic, render @@ -27,6 +27,8 @@ from tifeatures.filter.evaluate import to_filter from tifeatures.filter.filters import bbox_to_wkt +from starlette.datastructures import QueryParams + # Links to geojson schema geojson_schema = { "GEOMETRY": "https://geojson.org/schema/Geometry.json", @@ -416,7 +418,9 @@ async def query_geojson_rows(self, pool, features_query, collection_href): async for record in conn.cursor(q, *p, prefetch=50, timeout=120): yield record[0] + "\n" - async def query_geojson(self, pool, features_query, collection_href, total): + async def query_geojson( + self, pool, features_query, collection_href, total, query_params=None + ): """Build and run Pg query to get json rows.""" cnt = 0 yield '{"type":"FeatureCollection","features":[' @@ -426,17 +430,55 @@ async def query_geojson(self, pool, features_query, collection_href, total): yield "," + rec else: yield rec - links = orjson.dumps( - [ + links = [ + { + "title": "Collection", + "href": collection_href, + "rel": "collection", + "type": "application/json", + }, + { + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "self", + "type": "application/json", + }, + ] + + if query_params: + offset = int(query_params.get("offset", 0)) + else: + offset = 0 + + if (total - cnt) > offset: + next_offset = offset + cnt + query_params = QueryParams({**query_params, "offset": next_offset}) + + links.append( + { + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "next", + "type": "application/json", + } + ) + + if offset > 0: + prev_offset = max(offset - cnt, 0) + query_params = QueryParams({**query_params, "offset": prev_offset}) + + links.append( { - "title": "Collection", - "href": collection_href, - "rel": "collection", + "title": "Items", + "href": f"{collection_href}/items?{str(query_params)}", + "rel": "prev", "type": "application/json", } - ] - ).decode() - yield f'],"numberMatched":{total},"numberReturned":{cnt}, "links":{links}}}' + ) + + links_str = json.dumps(links) + + yield f'],"numberMatched":{total},"numberReturned":{cnt}, "links":{links_str}}}' async def features( self,