diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index a0fb6874..847ee4bd 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -6,7 +6,7 @@ from singer import metadata, metrics import tap_salesforce.salesforce from tap_salesforce.sync import (sync_stream, resume_syncing_bulk_query, get_stream_version) -from tap_salesforce.salesforce import Salesforce +from tap_salesforce.salesforce import Salesforce, DEFAULT_CHUNK_SIZE , MAX_CHUNK_SIZE from tap_salesforce.salesforce.bulk import Bulk from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) @@ -373,6 +373,16 @@ def main_impl(): args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS) CONFIG.update(args.config) + chunk_size = CONFIG.get('chunk_size') + #if chunk_size is other than 0, "0", "" then use chunk_size + if chunk_size and int(chunk_size): + chunk_size = int(chunk_size) + if chunk_size > MAX_CHUNK_SIZE: #If chunk_size is greater than MAX_CHUNK_SIZE, then use MAX_CHUNK_SIZE + LOGGER.info("The provided chunk_size value is greater than 250k hence tap will use 250k which is the maximum chunk size the API supports.") + chunk_size = MAX_CHUNK_SIZE + else: #if chunk_size is 0, "0", "" then use DEFAULT_CHUNK_SIZE + chunk_size = DEFAULT_CHUNK_SIZE + sf = None try: # get lookback window from config @@ -389,7 +399,8 @@ def main_impl(): select_fields_by_default=CONFIG.get('select_fields_by_default'), default_start_date=CONFIG.get('start_date'), api_type=CONFIG.get('api_type'), - lookback_window=lookback_window) + lookback_window=lookback_window, + chunk_size=chunk_size) sf.login() if args.discover: diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 493f1ba6..b7d659dd 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -22,6 +22,8 @@ BULK_API_TYPE = "BULK" REST_API_TYPE = "REST" +DEFAULT_CHUNK_SIZE = 100000 +MAX_CHUNK_SIZE = 250000 STRING_TYPES = set([ 'id', @@ -212,7 +214,8 @@ def __init__(self, select_fields_by_default=None, default_start_date=None, api_type=None, - lookback_window=None): + lookback_window=None, + chunk_size=None): self.api_type = api_type.upper() if api_type else None self.refresh_token = refresh_token self.token = token @@ -221,6 +224,7 @@ def __init__(self, self.session = requests.Session() self.access_token = None self.instance_url = None + self.chunk_size = chunk_size if isinstance(quota_percent_per_run, str) and quota_percent_per_run.strip() == '': quota_percent_per_run = None if isinstance(quota_percent_total, str) and quota_percent_total.strip() == '': diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 081db803..d677a4bd 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -172,7 +172,7 @@ def _create_job(self, catalog_entry, pk_chunking=False): if pk_chunking: LOGGER.info("ADDING PK CHUNKING HEADER") - headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(DEFAULT_CHUNK_SIZE) + headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(self.sf.chunk_size) # If the stream ends with 'CleanInfo' or 'History', we can PK Chunk on the object's parent if any(catalog_entry['stream'].endswith(suffix) for suffix in ["CleanInfo", "History"]): diff --git a/tests/unittests/test_chunk_size.py b/tests/unittests/test_chunk_size.py new file mode 100644 index 00000000..39515f24 --- /dev/null +++ b/tests/unittests/test_chunk_size.py @@ -0,0 +1,140 @@ +import unittest +from tap_salesforce.salesforce import DEFAULT_CHUNK_SIZE +import tap_salesforce +from unittest import mock + + +class MockSalesforce: + def __init__(self, *args, **kwargs): + self.rest_requests_attempted = 0 + self.jobs_completed = 0 + self.login_timer = None + + def login(self): + pass + + +class MockParseArgs: + """Mock the parsed_args() in main""" + + def __init__(self, config): + self.config = config + self.discover = None + self.properties = None + self.state = None + + +def get_args(chunk_size=None): + """Return the MockParseArgs object""" + mock_config = { + "refresh_token": None, + "client_id": None, + "client_secret": None, + "start_date": "2021-01-02T00:00:00Z", + } + mock_config["chunk_size"] = chunk_size + return MockParseArgs(mock_config) + + +@mock.patch("tap_salesforce.Salesforce", side_effect=MockSalesforce) +@mock.patch("singer.utils.parse_args") +class TestChunkSize(unittest.TestCase): + """Test cases to verify the chunk_size value is set as expected according to config""" + + def test_default_value_in_chunk_size(self, mocked_parsed_args, mocked_Salesforce_class): + """ + Unit test to ensure that "DEFAULT_CHUNK_SIZE" value is used when "chunk_size" is not passed in config + """ + mocked_parsed_args.return_value = get_args() + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, DEFAULT_CHUNK_SIZE) + + def test_empty_string_value_in_chunk_size(self, mocked_parse_args, mocked_Salesforce_class): + """ + Unit test to ensure that"DEFAULT_CHUNK_SIZE" value is used when passed "chunk_size" value is empty string in config + """ + + # mock parse args + mocked_parse_args.return_value = get_args("") + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, DEFAULT_CHUNK_SIZE) + + def test_zero_value_in_chunk_size(self, mocked_parse_args, mocked_Salesforce_class): + """ + Unit test to ensure that "DEFAULT_CHUNK_SIZE" value is used when "chunk_size" value is zero in config + """ + + # mock parse args + mocked_parse_args.return_value = get_args(0) + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, DEFAULT_CHUNK_SIZE) + + def test_string_zero_value_in_chunk_size(self, mocked_parse_args, mocked_Salesforce_class): + """ + Unit test to ensure that "DEFAULT_CHUNK_SIZE" value is used when "chunk_size" value is zero string in config + """ + + # mock parse args + mocked_parse_args.return_value = get_args("0") + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, DEFAULT_CHUNK_SIZE) + + def test_float_value_in_chunk_size(self, mocked_parse_args, mocked_Salesforce_class): + """ + Unit test to ensure that int "chunk_size" value is used when float "chunk_size" is passed in config + """ + + # mock parse args + mocked_parse_args.return_value = get_args(2000.200) + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, 2000) + + @mock.patch("tap_salesforce.LOGGER.info") + def test_max_value_in_chunk_size(self, mocked_logger, mocked_parse_args, mocked_Salesforce_class): + """ + Unit test to ensure that "MAX_CHUNK_SIZE" value is used when "chunk_size" passed in config is greater then the maximum chunk size that API supports. + """ + + # mock parse args + mocked_parse_args.return_value = get_args(260000) + + # function call + tap_salesforce.main_impl() + + # get arguments passed during calling 'Salesforce' class + args, kwargs = mocked_Salesforce_class.call_args + chunk_size = kwargs.get("chunk_size") + self.assertEqual(chunk_size, 250000) + # check if the logger is called with correct logger message + mocked_logger.assert_called_with('The provided chunk_size value is greater than 250k hence tap will use 250k which is the maximum chunk size the API supports.') +