Skip to content

Commit

Permalink
fix(data-warehouse): salesforce custom fields update (#25261)
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Sep 28, 2024
1 parent abdf7a5 commit 798ceda
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions posthog/temporal/data_imports/pipelines/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources
from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource
from posthog.temporal.data_imports.pipelines.salesforce.auth import SalseforceAuth
import pendulum
import re


# Note: When pulling all fields, salesforce requires a 200 limit. We circumvent the pagination by using Id ordering.
Expand Down Expand Up @@ -170,10 +172,10 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:


class SalesforceEndpointPaginator(BasePaginator):
def __init__(self, instance_url):
def __init__(self, instance_url, is_incremental: bool):
super().__init__()
self.instance_url = instance_url
self.first_system_modstamp = None
self.is_incremental = is_incremental

def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()
Expand All @@ -184,21 +186,28 @@ def update_state(self, response: Response, data: Optional[list[Any]] = None) ->
self._has_next_page = False
return

if not self.first_system_modstamp:
self.first_system_modstamp = res["records"][0]["SystemModstamp"]

last_record = res["records"][-1]
model_name = res["records"][0]["attributes"]["type"]

params = {
"q": f"SELECT FIELDS(ALL) FROM {model_name} WHERE Id > '{last_record['Id']}' AND SystemModstamp >= {self.first_system_modstamp} ORDER BY Id ASC LIMIT 200"
}

self._has_next_page = True
self._next_page = f"/services/data/v61.0/query" + "?" + urlencode(params)
self._last_record_id = last_record["Id"]
self._model_name = model_name

def update_request(self, request: Request) -> None:
request.url = f"{self.instance_url}{self._next_page}"
if self.is_incremental:
# Cludge: Need to get initial value for date filter
query = request.params.get("q", "")
date_match = re.search(r"SystemModstamp >= (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+\d{4})", query)
if date_match:
date_filter = date_match.group(1)
query = f"SELECT FIELDS(ALL) FROM {self._model_name} WHERE Id > '{self._last_record_id}' AND SystemModstamp >= {date_filter} ORDER BY Id ASC LIMIT 200"
else:
raise ValueError("No date filter found in initial query. Incremental loading requires a date filter.")
else:
query = f"SELECT FIELDS(ALL) FROM {self._model_name} WHERE Id > '{self._last_record_id}' ORDER BY Id ASC LIMIT 200"

_next_page = f"/services/data/v61.0/query" + "?" + urlencode({"q": query})
request.url = f"{self.instance_url}{_next_page}"


@dlt.source(max_table_nesting=0)
Expand All @@ -215,7 +224,7 @@ def salesforce_source(
"client": {
"base_url": instance_url,
"auth": SalseforceAuth(refresh_token, access_token),
"paginator": SalesforceEndpointPaginator(instance_url=instance_url),
"paginator": SalesforceEndpointPaginator(instance_url=instance_url, is_incremental=is_incremental),
},
"resource_defaults": {
**({"primary_key": "id"} if is_incremental else {}),
Expand Down

0 comments on commit 798ceda

Please sign in to comment.