Skip to content

Commit

Permalink
source-intercom-native: reduce logging frequency
Browse files Browse the repository at this point in the history
The connector was logging out `Processing page 1 of 1` frequently when
streams were caught up to the present & capturing incrementally. This
connector has had sufficient usage that I feel comfortable removing
these logs when a stream is caught up, so I limited the page number
logging to only happen when a stream is backfilling (i.e. has multiple
pages to process or has more than an hour of results to page through).
  • Loading branch information
Alex-Bair committed Jan 17, 2025
1 parent 17120d1 commit 43a7e18
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions source-intercom-native/source_intercom_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def _generate_conversations_or_tickets_search_body(
}


def _is_page_number_to_log(page_number: int, total_pages: int) -> bool:

return total_pages > 1 and (
page_number == 1 or
page_number % 25 == 0 or
page_number == total_pages
)


def _is_large_date_window(start: int, end: int) -> bool:
delta = _s_to_dt(end) - _s_to_dt(start)
return delta > timedelta(hours=1)


async def fetch_contacts(
http: HTTPSession,
window_size: int,
Expand All @@ -187,6 +201,7 @@ async def fetch_contacts(
url = f"{API}/contacts/search"
body = _generate_contacts_search_body(start, end)

should_log_progress = _is_large_date_window(start, end)
pagination_ended_early = False
while True:
response = ContactsSearchResponse.model_validate_json(
Expand All @@ -199,7 +214,7 @@ async def fetch_contacts(
if total_pages == 0:
break

if page_num == 1 or page_num % 25 == 0 or page_num == total_pages:
if should_log_progress and _is_page_number_to_log(page_num, total_pages):
log.info(f"Processing page {page_num} of {total_pages}.", {
'window_start': _s_to_dt(start),
'window_end': _s_to_dt(end)
Expand All @@ -210,7 +225,7 @@ async def fetch_contacts(
if updated_at < start:
# We request that results are returned in descending order of updated_at,
# so we stop processing results once we see one before the current date window.
if page_num > 1:
if should_log_progress and page_num > 1:
log.info(f"Ending pagination early after processing {page_num} pages. Remaining pages contain already replicated data.")

pagination_ended_early = True
Expand Down Expand Up @@ -259,7 +274,7 @@ async def fetch_tickets(
if total_pages == 0:
break

if page_num == 1 or page_num % 25 == 0 or page_num == total_pages:
if _is_page_number_to_log(page_num, total_pages):
log.info(f"Processing page {page_num} of {total_pages}.", {
'start': _s_to_dt(start),
})
Expand Down Expand Up @@ -309,7 +324,7 @@ async def fetch_conversations(
if total_pages == 0:
break

if page_num == 1 or page_num % 25 == 0 or page_num == total_pages:
if _is_page_number_to_log(page_num, total_pages):
log.info(f"Processing page {page_num} of {total_pages}.", {
'start': _s_to_dt(start),
})
Expand Down Expand Up @@ -350,6 +365,7 @@ async def fetch_conversations_parts(
url = f"{API}/conversations/search"
body = _generate_conversations_or_tickets_search_body(start, end)

should_log_progress = _is_large_date_window(start, end)
while True:
response = ConversationsSearchResponse.model_validate_json(
await http.request(log, url, "POST", json=body)
Expand All @@ -361,7 +377,7 @@ async def fetch_conversations_parts(
if total_pages == 0:
break

if page_num == 1 or page_num % 25 == 0 or page_num == total_pages:
if should_log_progress and _is_page_number_to_log(page_num, total_pages):
log.info(f"Processing page {page_num} of {total_pages}.", {
'window_start': _s_to_dt(start),
'window_end': _s_to_dt(end)
Expand Down

0 comments on commit 43a7e18

Please sign in to comment.