Skip to content

Commit

Permalink
feat: Add max records limit (#393)
Browse files Browse the repository at this point in the history
Closes #24

I wanted to get rid of the `get_records` function all together, but with
this addition we have to keep the override.
  • Loading branch information
visch authored Mar 22, 2024
1 parent c087f37 commit f35ff84
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 51 deletions.
73 changes: 40 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,46 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).

## Settings

| Setting | Required | Default | Description |
|:-----------------------------|:--------:|:-------:|:------------|
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. [More Information](https://github.com/MeltanoLabs/tap-postgres/issues/171) |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion server, this is the host we'll connect to via ssh
| ssh_tunnel.username | True (if ssh_tunnel set) | False |Username to connect to bastion server
| ssh_tunnel.port | True (if ssh_tunnel set) | 22 | Port to connect to bastion server
| ssh_tunnel.private_key | True (if ssh_tunnel set) | None | Private Key for authentication to the bastion server
| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set
| ssl_enable | False | 0 | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate_enable| False | 0 | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_mode | False | verify-full | SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full. Note if sqlalchemy_url is set this will be ignored. |
| ssl_certificate_authority | False | ~/.postgresql/root.crl | The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate | False | ~/.postgresql/postgresql.crt | The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_private_key | False | ~/.postgresql/postgresql.key | The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_storage_directory | False | .secrets | The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth| False | None | The max depth to flatten schemas. |
| batch_config| False | None | JSON object containing information on data batching. |
| batch_config.encoding.compression| False | None | Compression format to use for batch files. |
| batch_config.encoding.format| False | None | Format to use for batch files. |
| batch_config.storage.prefix| False | None | Prefix to use when writing batch files. |
| batch_config.storage.root| False | None | Root path to use when writing batch files. |
| Setting | Required | Default | Description |
|:--------|:--------:|:-------:|:------------|
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| max_record_count | False | None | Optional. The maximum number of records to return in a single stream. |
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | False | 0 | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details |
| ssh_tunnel.host | False | None | Host of the bastion server, this is the host we'll connect to via ssh |
| ssh_tunnel.username | False | None | Username to connect to bastion server |
| ssh_tunnel.port | False | 22 | Port to connect to bastion server |
| ssh_tunnel.private_key | False | None | Private Key for authentication to the bastion server |
| ssh_tunnel.private_key_password | False | None | Private Key Password, leave None if no password is set |
| ssl_enable | False | 0 | Whether or not to use ssl to verify the server's identity. Use ssl_certificate_authority and ssl_mode for further customization. To use a client certificate to authenticate yourself to the server, use ssl_client_certificate_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate_enable | False | 0 | Whether or not to provide client-side certificates as a method of authentication to the server. Use ssl_client_certificate and ssl_client_private_key for further customization. To use SSL to verify the server's identity, use ssl_enable instead. Note if sqlalchemy_url is set this will be ignored. |
| ssl_mode | False | verify-full | SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION) for more information. Must be one of disable, allow, prefer, require, verify-ca, or verify-full. Note if sqlalchemy_url is set this will be ignored. |
| ssl_certificate_authority | False | ~/.postgresql/root.crl | The certificate authority that should be used to verify the server's identity. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_certificate | False | ~/.postgresql/postgresql.crt | The certificate that should be used to verify your identity to the server. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_client_private_key | False | ~/.postgresql/postgresql.key | The private key for the certificate you provided. Can be provided either as the certificate itself (in .env) or as a filepath to the certificate. Note if sqlalchemy_url is set this will be ignored. |
| ssl_storage_directory | False | .secrets | The folder in which to store SSL certificates provided as raw values. When a certificate/key is provided as a raw value instead of as a filepath, it must be written to a file before it can be used. This configuration option determines where that file is created. |
| default_replication_method | False | FULL_TABLE | Replication method to use if there is not a catalog entry to override this choice. One of `FULL_TABLE`, `INCREMENTAL`, or `LOG_BASED`. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| faker_config | False | None | Config for the [`Faker`](https://faker.readthedocs.io/en/master/) instance variable `fake` used within map expressions. Only applicable if the plugin specifies `faker` as an addtional dependency (through the `singer-sdk` `faker` extra or directly). |
| faker_config.seed | False | None | Value to seed the Faker generator for deterministic output: https://faker.readthedocs.io/en/master/#seeding-the-generator |
| faker_config.locale | False | None | One or more LCID locale strings to produce localized output for: https://faker.readthedocs.io/en/master/#localization |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| batch_config | False | None | |
| batch_config.encoding | False | None | Specifies the format and compression of the batch files. |
| batch_config.encoding.format | False | None | Format to use for batch files. |
| batch_config.encoding.compression | False | None | Compression format to use for batch files. |
| batch_config.storage | False | None | Defines the storage layer to use when writing batch files |
| batch_config.storage.root | False | None | Root path to use when writing batch files. |
| batch_config.storage.prefix | False | None | Prefix to use when writing batch files. |

A full list of supported settings and capabilities is available by running: `tap-postgres --about`

Expand Down
60 changes: 42 additions & 18 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import datetime
import json
import select
import typing
import typing as t
from functools import cached_property
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Iterable, Mapping
Expand Down Expand Up @@ -258,12 +258,18 @@ class PostgresStream(SQLStream):
"""Stream class for Postgres streams."""

connector_class = PostgresConnector
supports_nulls_first = True

# JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
def max_record_count(self) -> int | None:
"""Return the maximum number of records to fetch in a single query."""
return self.config.get("max_record_count")

# Get records from stream
def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a generator of record-type dictionary objects.
If the stream has a replication_key value defined, records will be sorted by the
incremental key. If the stream also has an available starting bookmark, the
Expand All @@ -281,30 +287,48 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
not support partitioning.
"""
if context:
raise NotImplementedError(
f"Stream '{self.name}' does not support partitioning."
)
msg = f"Stream '{self.name}' does not support partitioning."
raise NotImplementedError(msg)

# pulling rows with only selected columns from stream
selected_column_names = [k for k in self.get_selected_schema()["properties"]]
selected_column_names = self.get_selected_schema()["properties"].keys()
table = self.connector.get_table(
self.fully_qualified_name, column_names=selected_column_names
full_table_name=self.fully_qualified_name,
column_names=selected_column_names,
)
query = table.select()

if self.replication_key:
replication_key_col = table.columns[self.replication_key]

# Nulls first because the default is to have nulls as the "highest" value
# which incorrectly causes the tap to attempt to store null state.
query = query.order_by(sa.nullsfirst(replication_key_col))
order_by = (
sa.nulls_first(replication_key_col.asc())
if self.supports_nulls_first
else replication_key_col.asc()
)
query = query.order_by(order_by)

start_val = self.get_starting_replication_key_value(context)
if start_val:
query = query.filter(replication_key_col >= start_val)
query = query.where(replication_key_col >= start_val)

if self.ABORT_AT_RECORD_COUNT is not None:
# Limit record count to one greater than the abort threshold. This ensures
# `MaxRecordsLimitException` exception is properly raised by caller
# `Stream._sync_records()` if more records are available than can be
# processed.
query = query.limit(self.ABORT_AT_RECORD_COUNT + 1)

if self.max_record_count():
query = query.limit(self.max_record_count())

with self.connector._connect() as con:
for row in con.execute(query).mappings():
yield dict(row)
with self.connector._connect() as conn:
for record in conn.execute(query).mappings():
# TODO: Standardize record mapping type
# https://github.com/meltano/sdk/issues/2096
transformed_record = self.post_process(dict(record))
if transformed_record is None:
# Record filtered out during post_process()
continue
yield transformed_record


class PostgresLogBasedStream(SQLStream):
Expand All @@ -325,7 +349,7 @@ def config(self) -> Mapping[str, Any]:
@cached_property
def schema(self) -> dict:
"""Override schema for log-based replication adding _sdc columns."""
schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict())
schema_dict = t.cast(dict, self._singer_catalog_entry.schema.to_dict())
for property in schema_dict["properties"].values():
if isinstance(property["type"], list):
property["type"].append("null")
Expand Down
9 changes: 9 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ def __init__(
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"max_record_count",
th.IntegerType,
default=None,
description=(
"Optional. The maximum number of records to return in a "
"single stream."
),
),
th.Property(
"sqlalchemy_url",
th.StringType,
Expand Down

0 comments on commit f35ff84

Please sign in to comment.