diff --git a/README.md b/README.md index 66ddf1fc..f994b361 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This tap: - [Email Events](http://developers.hubspot.com/docs/methods/email/get_events) - [Engagements](https://developers.hubspot.com/docs/methods/engagements/get-all-engagements) - [Forms](http://developers.hubspot.com/docs/methods/forms/v2/get_forms) + - [Forms Submissions](https://developers.hubspot.com/docs/methods/forms/get-submissions-for-a-form) - [Keywords](http://developers.hubspot.com/docs/methods/keywords/get_keywords) - [Owners](http://developers.hubspot.com/docs/methods/owners/get_owners) - [Subscription Changes](http://developers.hubspot.com/docs/methods/email/get_subscriptions_timeline) diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 4d0c9c3f..bb7a9bbe 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -88,6 +88,7 @@ class StateFields: "email_events": "/email/public/v1/events", "contact_lists": "/contacts/v1/lists", "forms": "/forms/v2/forms", + "form_submissions": "/form-integrations/v1/submissions/forms/{form_guid}", "workflows": "/automation/v3/workflows", "owners": "/owners/v2/owners", } @@ -767,6 +768,74 @@ def sync_deal_pipelines(STATE, ctx): singer.write_state(STATE) return STATE +def sync_form_submissions(STATE, ctx): + data = request(get_url("forms")).json() + + for row in data: + STATE = _sync_form_submissions_by_form_id(STATE, row['guid']) + singer.write_state(STATE) + + return STATE + +def _sync_form_submissions_by_form_id(STATE, form_guid): + schema = load_schema("form_submissions") + bookmark_key = 'last_max_submitted_at' + + singer.write_schema("form_submissions", schema, ['guid', 'submittedAt', 'pageUrl'], [bookmark_key]) + end = utils.strptime_to_utc(get_start(STATE, form_guid, bookmark_key)) + max_bk_value = end + up_to_date = False + + LOGGER.info("_sync_form_submissions_by_form_id for guid %s ending at %s", form_guid, end) + + url = get_url("form_submissions", form_guid=form_guid) + path = 'results' + params = { + 'limit': 50 + } + with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: + while up_to_date == False: + form_offset = singer.get_offset(STATE, form_guid) + + if form_offset and form_offset.get('after') != None: + params['after'] = form_offset.get('after') + data = request(url, params).json() + for row in data[path]: + if len(row) == 0: + continue + + submitted_at = utils.strptime_with_tz( + _transform_datetime(row['submittedAt'], UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING)) + + if submitted_at > max_bk_value: + max_bk_value = submitted_at + + # since this stream returns in reverse order check to see if we've reached the data already loaded + if submitted_at <= end: + STATE = singer.clear_offset(STATE, form_guid) + up_to_date = True + LOGGER.info("Reached the end of new form submissions") + break + + record = { + 'guid': form_guid, + 'submittedAt': row['submittedAt'], + 'pageUrl': row['pageUrl'], + 'values': row['values'] + } + record = bumble_bee.transform(record, schema) + singer.write_record("form_submissions", record, 'form_submissions', time_extracted=utils.now()) + if 'paging' in data: + STATE = singer.set_offset(STATE, form_guid, 'after', data['paging']['next']['after']) + singer.write_state(STATE) + else: + STATE = singer.clear_offset(STATE, form_guid) + singer.write_state(STATE) + LOGGER.info("No more submissions for this form") + break + STATE = singer.write_bookmark(STATE, form_guid, bookmark_key, max_bk_value.strftime("%Y-%m-%d %H:%M:%S")) + return STATE + @attr.s class Stream(object): tap_stream_id = attr.ib() @@ -779,6 +848,7 @@ class Stream(object): # Do these first as they are incremental Stream('subscription_changes', sync_subscription_changes, ['timestamp', 'portalId', 'recipient'], 'startTimestamp', 'INCREMENTAL'), Stream('email_events', sync_email_events, ['id'], 'startTimestamp', 'INCREMENTAL'), + Stream('form_submissions', sync_form_submissions, ['guid', 'submittedAt', 'pageUrl'], 'submittedAt', 'INCREMENTAL'), # Do these last as they are full table Stream('forms', sync_forms, ['guid'], 'updatedAt', 'FULL_TABLE'), diff --git a/tap_hubspot/schemas/form_submissions.json b/tap_hubspot/schemas/form_submissions.json new file mode 100644 index 00000000..9800763c --- /dev/null +++ b/tap_hubspot/schemas/form_submissions.json @@ -0,0 +1,29 @@ +{ + "type": "object", + "properties": { + "guid": { + "type": ["null", "string"] + }, + "submittedAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "values": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": ["null", "string"] + }, + "value": { + "type": ["null", "string"] + } + } + } + }, + "pageUrl": { + "type": ["null", "string"] + } + } +}