From 798cedaaa1c82669bd1347b8f8d7271b141ce572 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Fri, 27 Sep 2024 22:13:28 -0400 Subject: [PATCH] fix(data-warehouse): salesforce custom fields update (#25261) --- .../pipelines/salesforce/__init__.py | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py index 2a62967fb68d4..2b603b60d4df0 100644 --- a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py +++ b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py @@ -6,6 +6,8 @@ from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource from posthog.temporal.data_imports.pipelines.salesforce.auth import SalseforceAuth +import pendulum +import re # Note: When pulling all fields, salesforce requires a 200 limit. We circumvent the pagination by using Id ordering. @@ -170,10 +172,10 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: class SalesforceEndpointPaginator(BasePaginator): - def __init__(self, instance_url): + def __init__(self, instance_url, is_incremental: bool): super().__init__() self.instance_url = instance_url - self.first_system_modstamp = None + self.is_incremental = is_incremental def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: res = response.json() @@ -184,21 +186,28 @@ def update_state(self, response: Response, data: Optional[list[Any]] = None) -> self._has_next_page = False return - if not self.first_system_modstamp: - self.first_system_modstamp = res["records"][0]["SystemModstamp"] - last_record = res["records"][-1] model_name = res["records"][0]["attributes"]["type"] - params = { - "q": f"SELECT FIELDS(ALL) FROM {model_name} WHERE Id > '{last_record['Id']}' AND SystemModstamp >= {self.first_system_modstamp} ORDER BY Id ASC LIMIT 200" - } - self._has_next_page = True - self._next_page = f"/services/data/v61.0/query" + "?" + urlencode(params) + self._last_record_id = last_record["Id"] + self._model_name = model_name def update_request(self, request: Request) -> None: - request.url = f"{self.instance_url}{self._next_page}" + if self.is_incremental: + # Cludge: Need to get initial value for date filter + query = request.params.get("q", "") + date_match = re.search(r"SystemModstamp >= (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{4})", query) + if date_match: + date_filter = date_match.group(1) + query = f"SELECT FIELDS(ALL) FROM {self._model_name} WHERE Id > '{self._last_record_id}' AND SystemModstamp >= {date_filter} ORDER BY Id ASC LIMIT 200" + else: + raise ValueError("No date filter found in initial query. Incremental loading requires a date filter.") + else: + query = f"SELECT FIELDS(ALL) FROM {self._model_name} WHERE Id > '{self._last_record_id}' ORDER BY Id ASC LIMIT 200" + + _next_page = f"/services/data/v61.0/query" + "?" + urlencode({"q": query}) + request.url = f"{self.instance_url}{_next_page}" @dlt.source(max_table_nesting=0) @@ -215,7 +224,7 @@ def salesforce_source( "client": { "base_url": instance_url, "auth": SalseforceAuth(refresh_token, access_token), - "paginator": SalesforceEndpointPaginator(instance_url=instance_url), + "paginator": SalesforceEndpointPaginator(instance_url=instance_url, is_incremental=is_incremental), }, "resource_defaults": { **({"primary_key": "id"} if is_incremental else {}),