From e293c2c9109ef76b2ebe076b39c8870ab1003e04 Mon Sep 17 00:00:00 2001 From: Wojciech Chmiel Date: Tue, 23 Jul 2024 12:53:52 +0200 Subject: [PATCH] Add batching, to not go over 10MB size --- plugin_scripts/insert_rows.py | 26 +++++++++++++++++++++++--- tests/test_insert_rows.py | 2 +- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/plugin_scripts/insert_rows.py b/plugin_scripts/insert_rows.py index c155a7b..41a8f5a 100755 --- a/plugin_scripts/insert_rows.py +++ b/plugin_scripts/insert_rows.py @@ -1,13 +1,25 @@ import json import logging import sys +from itertools import islice from google.cloud import bigquery + from .config import Config, read_config sys.tracebacklimit = 0 +BATCH_SIZE = 20000 + +def batched(iterable, n): + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError('n must be at least one') + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + yield batch + def insert_rows(config: Config) -> None: """ @@ -31,14 +43,22 @@ def insert_rows(config: Config) -> None: with open(config.bq_rows_as_json_path, "r") as row_file: rows = json.load(row_file) - logging.info(f"Loaded {len(rows)} rows. Inserting...") + if not isinstance(rows, list): + raise ValueError(f"Expected JSON file to be a list of rows, was: {type(rows)}") + + logging.info(f"Loaded {len(rows)} rows. Inserting in batches {BATCH_SIZE}...") + + total_errors = [] + for batch in batched(rows, BATCH_SIZE): + errors = client.insert_rows_json(table_ref, batch) + total_errors.extend(errors) errors = client.insert_rows_json(table_ref, rows) logging.info(f"Inserted rows with {len(errors)} errors") - for e in errors: + for e in total_errors: logging.error(e) - if len(errors) > 0: + if len(total_errors) > 0: raise Exception("Got exceptions on returning rows, see above.") diff --git a/tests/test_insert_rows.py b/tests/test_insert_rows.py index c29d211..2de3143 100644 --- a/tests/test_insert_rows.py +++ b/tests/test_insert_rows.py @@ -8,7 +8,7 @@ def test__main_true( ): mocker.patch("json.loads") mocker.patch("plugin_scripts.insert_rows.bigquery") - mocker.patch("json.load") + mocker.patch("json.load", new_callable=lambda: [{"a": 1}, {"b": 2}]) mocker.patch("builtins.open") insert_rows.main()