From 46df97583318c61ce36c44f0d9c74ce74d9566c2 Mon Sep 17 00:00:00 2001 From: ReubenFrankel <60552974+ReubenFrankel@users.noreply.github.com> Date: Fri, 4 Oct 2024 03:08:40 +0100 Subject: [PATCH] feat: Output record count metric from batch files insert (#267) Bit of an opinionated change since it might be conflating batches/records, but it has been helpful for us to know how many rows (records) were created/updated from batch processing. --- target_snowflake/connector.py | 6 ++++-- target_snowflake/sinks.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 01910c4..74f3e73 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -553,7 +553,8 @@ def merge_from_stage( key_properties=key_properties, ) self.logger.debug("Merging with SQL: %s", merge_statement) - conn.execute(merge_statement, **kwargs) + result = conn.execute(merge_statement, **kwargs) + return result.rowcount def copy_from_stage( self, @@ -578,7 +579,8 @@ def copy_from_stage( file_format=file_format, ) self.logger.debug("Copying with SQL: %s", copy_statement) - conn.execute(copy_statement, **kwargs) + result = conn.execute(copy_statement, **kwargs) + return result.rowcount def drop_file_format(self, file_format: str) -> None: """Drop a file format in the schema. diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index 4c2313b..96772f6 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -171,7 +171,7 @@ def insert_batch_files_via_internal_stage( self, full_table_name: str, files: t.Sequence[str], - ) -> None: + ) -> int: """Process a batch file with the given batch context. Args: @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage( if self.key_properties: # merge into destination table - self.connector.merge_from_stage( + record_count = self.connector.merge_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, @@ -199,7 +199,7 @@ def insert_batch_files_via_internal_stage( ) else: - self.connector.copy_from_stage( + record_count = self.connector.copy_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, @@ -217,6 +217,8 @@ def insert_batch_files_via_internal_stage( if os.path.exists(file_path): # noqa: PTH110 os.remove(file_path) # noqa: PTH107 + return record_count + def process_batch_files( self, encoding: BaseBatchFileEncoding, @@ -232,7 +234,7 @@ def process_batch_files( NotImplementedError: If the batch file encoding is not supported. """ if encoding.format == BatchFileFormat.JSONL: - self.insert_batch_files_via_internal_stage( + record_count = self.insert_batch_files_via_internal_stage( full_table_name=self.full_table_name, files=files, ) @@ -242,6 +244,9 @@ def process_batch_files( msg, ) + with self.record_counter_metric as counter: + counter.increment(record_count) + # TODO: remove after https://github.com/meltano/sdk/issues/1819 is fixed def _singer_validate_message(self, record: dict) -> None: """Ensure record conforms to Singer Spec.