Skip to content

Commit

Permalink
Crest master (#66)
Browse files Browse the repository at this point in the history
* Tdl 19964 add missing tap tester tests (#65)

* TDL-19964 Added missing tap-tester commits

* updated tap-tester tests

* updated bookmarks tap-tester test

* Updated and removed extra comments

* updated start date as per format

* updated pagination test case

* resolved review comments

* added new stream in tap-tester

* fixed the cci issues

* added missing assertion for all fields

* removed get_logger()

* added logger instead of print

* TDL-19957 Update dict based to class based implementation (#64)

* TDL-19957 update dict based to class based

* updated while condition

* updated while condition

* removed incremental_range from REQUIRED_CONFIG_KEYS

* updated discover and schema file

* updated to replication_key instead of keys

* updated schemas and added comments

* added unittests for code coverage

* added unittests for sync.py

* added parameterized in setup.py

* added parameterized in config.yml and updated unittests

* updated the replication key to list instead of a single key

* fixed the issue for Keyerror of form_id in answers stream

* updated unittests

* added configurable page size param

* handled page_size for 0 and updated unittests

* resolved bug fixes for pagination and removed incompleted_forms_only param

* added new stream unsubmitted landings

* resolved PR review comments

* added page_size in example config

* added back incremental_range in the tap-tester

* raised exc instead of fatal error message and updated unittests

* resolved PR comments

* fixed the issue when page_size not passed in config

* TDL-19801: Tap fetch data for sub-questions (#62)

* TDL-19801: Tap does not support fetching data for the questions nested within a Question Group.

* addressed the comments

* add unittest

* modify funciton comment

* formated test_case value in unittest

* formated expected_case

* Updated schema in questions.json

* resolved build fail error

* resolve build fail error

* updated unittest

* updated setup.py

* add parameterized

* change start date in start_date_test

* make change in bookmark test

* Updated unit test case.

* Updated schemas and keyerror.

Co-authored-by: Jay Tilala <[email protected]>
Co-authored-by: prijendev <[email protected]>

* TDL-19959 added missing fields (#63)

* added missing fields

* TDL-19957 update dict based to class based

* updated while condition

* updated while condition

* removed incremental_range from REQUIRED_CONFIG_KEYS

* TDL-19964 Added missing tap-tester commits

* updated discover and schema file

* updated tap-tester tests

* updated to replication_key instead of keys

* updated schemas and added comments

* added unittests for code coverage

* updated bookmarks tap-tester test

* added unittests for sync.py

* added parameterized in setup.py

* added parameterized in config.yml and updated unittests

* updated the replication key to list instead of a single key

* Updated and removed extra comments

* fixed the issue for Keyerror of form_id in answers stream

* updated unittests

* updated start date as per format

* added configurable page size param

* updated pagination test case

* handled page_size for 0 and updated unittests

* resolved review comments

* resolved bug fixes for pagination and removed incompleted_forms_only param

* updated array type schema

* added new stream unsubmitted landings

* added new stream in tap-tester

* updated indentation

* resolved PR review comments

* updated indentation to use 2 spaces

* added page_size in example config

* added back incremental_range in the tap-tester

* fixed the cci issues

* added missing fields to a dict

* added missing assertion for all fields

* updated comment

* raised exc instead of fatal error message and updated unittests

* removed get_logger()

* added logger instead of print

* Updated schema for questions.

* Removed duplicate assertion in all_fields test.

Co-authored-by: prijendev <[email protected]>

* Updated schema for answers stream.

Co-authored-by: namrata270998 <[email protected]>
Co-authored-by: jtilala <[email protected]>
Co-authored-by: Jay Tilala <[email protected]>
  • Loading branch information
4 people authored Sep 20, 2022
1 parent 023c187 commit 29f0ba4
Show file tree
Hide file tree
Showing 35 changed files with 2,423 additions and 1,354 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
pip install nose coverage
pip install nose coverage parameterized
nosetests --with-coverage --cover-erase --cover-package=tap_typeform --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ python3 ./setup.py install

## Configuration

This tap requires a `config.json` which specifies details regarding an [Authentication token](https://developer.typeform.com/get-started/convert-keys-to-access-tokens/), a list of form ids, a start date for syncing historical data (date format of YYYY-MM-DDTHH:MI:SSZ), request_timeout for which request should wait to get response(It is an optional parameter and default request_timeout is 300 seconds) and a time period range [daily,hourly] to control what incremental extract date ranges are. See [example.config.json](example.config.json) for an example.
This tap requires a `config.json` which specifies details regarding an [Authentication token](https://developer.typeform.com/get-started/convert-keys-to-access-tokens/), a list of form ids, a start date for syncing historical data (date format of YYYY-MM-DDTHH:MI:SSZ), request_timeout for which request should wait to get response(It is an optional parameter and default request_timeout is 300 seconds). See [example.config.json](example.config.json) for an example.

Create the catalog:

Expand Down
5 changes: 2 additions & 3 deletions example.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"token": "<myapitoken>",
"start_date": "2018-01-01T00:00:00Z",
"forms": "ZFuC6U,bFPlvG,WFBGBZ,WF0XE6,xFWoCE,OFHRwO,QFh3FI",
"request_timeout": 300,
"incremental_range": "daily"
"page_size": 100,
"request_timeout": 300
}

1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"ratelimit",
"backoff",
"requests",
"parameterized",
],
extras_require={
'dev': [
Expand Down
121 changes: 21 additions & 100 deletions tap_typeform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,52 @@
#!/usr/bin/env python3
import singer
from singer import utils
from singer.catalog import Catalog, metadata_module as metadata
from tap_typeform import streams
from tap_typeform.context import Context
from tap_typeform.http import Client
from tap_typeform import schemas
from tap_typeform.discover import discover as _discover
from tap_typeform.sync import _forms_to_list, sync as _sync
from tap_typeform.client import Client
from tap_typeform.streams import Forms

REQUIRED_CONFIG_KEYS = ["token", "forms", "incremental_range"]
REQUIRED_CONFIG_KEYS = ["start_date", "token", "forms"]

LOGGER = singer.get_logger()

#def check_authorization(atx):
# atx.client.get('/settings')
class FormMistmatchError(Exception):
pass

class NoFormsProvidedError(Exception):
pass


# Some taps do discovery dynamically where the catalog is read in from a
# call to the api but with the typeform structure, we won't do that here
# because it's always the same so we just pull it from file we never use
# atx in here since the schema is from file but we would use it if we
# pulled schema from the API def discover(atx):
def discover():
streams = []
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
#print("tap stream id=",tap_stream_id)
key_properties = schemas.PK_FIELDS[tap_stream_id]
schema = schemas.load_schema(tap_stream_id)
replication_method = schemas.REPLICATION_METHODS[tap_stream_id].get("replication_method")
replication_keys = schemas.REPLICATION_METHODS[tap_stream_id].get("replication_keys")
meta = metadata.get_standard_metadata(schema=schema,
key_properties=key_properties,
replication_method=replication_method,
valid_replication_keys=replication_keys)

meta = metadata.to_map(meta)

if replication_keys:
meta = metadata.write(meta, ('properties', replication_keys[0]), 'inclusion', 'automatic')

meta = metadata.to_list(meta)

streams.append({
'stream': tap_stream_id,
'tap_stream_id': tap_stream_id,
'key_properties': key_properties,
'schema': schema,
'metadata': meta,
'replication_method': replication_method,
'replication_key': replication_keys[0] if replication_keys else None
})
return Catalog.from_dict({'streams': streams})


# this is already defined in schemas.py though w/o dependencies. do we keep this for the sync?
def load_schema(tap_stream_id):
path = "schemas/{}.json".format(tap_stream_id)
schema = utils.load_json(get_abs_path(path))
dependencies = schema.pop("tap_schema_dependencies", [])
refs = {}
for sub_stream_id in dependencies:
refs[sub_stream_id] = load_schema(sub_stream_id)
if refs:
singer.resolve_schema_references(schema, refs)
return schema


def sync(atx):

# write schemas for selected streams\
for stream in atx.catalog.streams:
if stream.tap_stream_id in atx.selected_stream_ids:
schemas.load_and_write_schema(stream.tap_stream_id)

# since there is only one set of schemas for all forms, they will always be selected
streams.sync_forms(atx)

LOGGER.info('--------------------')
for stream_name, stream_count in atx.counts.items():
LOGGER.info('%s: %d', stream_name, stream_count)
LOGGER.info('--------------------')


def _compare_forms(config_forms, api_forms):
return config_forms.difference(api_forms)


def _forms_to_list(config, keyword='forms'):
"""Splits entries into a list and strips out surrounding blank spaces"""
return list(map(str.strip, config.get(keyword).split(',')))


def validate_form_ids(config):
def validate_form_ids(client, config):
"""Validate the form ids passed in the config"""
client = Client(config)
form_stream = Forms()

if not config.get('forms'):
LOGGER.fatal("No forms were provided in config")
raise NoFormsProvidedError
raise NoFormsProvidedError("No forms were provided in the config")

config_forms = set(_forms_to_list(config))
api_forms = {form.get('id') for form in client.get_forms()}
config_forms = _forms_to_list(config)
api_forms = {form.get('id') for res in form_stream.get_forms(client) for form in res}

mismatched_forms = _compare_forms(config_forms, api_forms)
mismatched_forms = config_forms.difference(api_forms)

if len(mismatched_forms) > 0:
LOGGER.fatal(f"FormMistmatchError: forms {mismatched_forms} not returned by API")
raise FormMistmatchError
# Raise an error if any form-id from config is not matching
# from ids from API response
raise FormMistmatchError("FormMistmatchError: forms {} not returned by API".format(mismatched_forms))


@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
atx = Context(args.config, args.state)
config = args.config
client = Client(config)
validate_form_ids(client, config)
if args.discover:
validate_form_ids(args.config)
# the schema is static from file so we don't need to pass in atx for connection info.
catalog = discover()
catalog = _discover()
catalog.dump()
else:
atx.catalog = args.catalog \
if args.catalog else discover()
sync(atx)
catalog = args.catalog \
if args.catalog else _discover()
_sync(client, config, args.state, catalog.to_dict())

if __name__ == "__main__":
main()
164 changes: 73 additions & 91 deletions tap_typeform/http.py → tap_typeform/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
LOGGER = singer.get_logger()

REQUEST_TIMEOUT = 300
MAX_RESPONSES_PAGE_SIZE = 1000
FORMS_PAGE_SIZE = 200

class TypeformError(Exception):
def __init__(self, message=None, response=None):
Expand Down Expand Up @@ -67,95 +69,10 @@ class TypeformNotAvailableError(TypeformError):
}
}

class Client(object):
BASE_URL = 'https://api.typeform.com'

def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.metric = config.get('metric')
self.session = requests.Session()
# Set and pass request timeout to config param `request_timeout` value.
config_request_timeout = config.get('request_timeout')
if config_request_timeout and float(config_request_timeout):
self.request_timeout = float(config_request_timeout)
else:
self.request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then it set default to 300 seconds.

def build_url(self, endpoint):
return f"{self.BASE_URL}/{endpoint}"

@backoff.on_exception(backoff.expo,
(Timeout, ConnectionError), # Backoff for Timeout and ConnectionError.
max_tries=5,
factor=2)
@backoff.on_exception(backoff.expo,
(TypeformInternalError, TypeformNotAvailableError,
TypeformTooManyError, ChunkedEncodingError),
max_tries=3,
factor=2)
def request(self, method, url, params=None, **kwargs):
# note that typeform response api doesn't return limit headers

if 'headers' not in kwargs:
kwargs['headers'] = {}
if self.token:
kwargs['headers']['Authorization'] = self.token

request = requests.Request(method, url, headers=kwargs['headers'], params=params)

response = self.session.send(request.prepare(), timeout=self.request_timeout)# Pass request timeout

if response.status_code != 200:
raise_for_error(response)
return None

if 'total_items' in response.json():
LOGGER.info('raw data items= {}'.format(response.json()['total_items']))
return response.json()

# Max page size for forms API is 200
def get_forms(self, page_size=200):
url = self.build_url(endpoint='forms')
with singer.metrics.http_request_timer(endpoint=url):
return self._get_forms('get', url, page_size)

def _get_forms(self, method, url, page_size):
page = 1
paginate = True
records = []
params = {'page_size': page_size}

while paginate:
params['page'] = page
response = self.request(method, url, params=params)
page_count = response.get('page_count')
paginate = page_count > page
page += 1

records += response.get('items')

return records

def get_form_definition(self, form_id, **kwargs):
endpoint = f"forms/{form_id}"
url = self.build_url(endpoint=endpoint)
with singer.metrics.http_request_timer(endpoint=url):
try:
return self.request('get', url, **kwargs)
except TypeformForbiddenError as err:
raise RuntimeError("Maybe add the Forms:Read scope to your token") from err

def get_form_responses(self, form_id, **kwargs):
endpoint = f"forms/{form_id}/responses"
url = self.build_url(endpoint)
with singer.metrics.http_request_timer(endpoint=url):
try:
return self.request('get', url, **kwargs)
except TypeformForbiddenError as err:
raise RuntimeError("Maybe add the Responses:Read scope to your token") from err


def raise_for_error(response):
"""
Retrieve the error code and the error message from the response and return custom exceptions accordingly.
"""
try:
response.raise_for_status()
except (requests.HTTPError, requests.ConnectionError) as error:
Expand All @@ -168,20 +85,20 @@ def raise_for_error(response):
api_rate_limit_message = ERROR_CODE_EXCEPTION_MAPPING[429]["message"]
message = "HTTP-error-code: 429, Error: {}. Please retry after {} seconds".format(api_rate_limit_message, resp_headers.get("Retry-After"))

# Handling status code 403 specially since response of API does not contain enough information
# Handling status code 403 specially since the response of API does not contain enough information
elif error_code in (403, 401):
api_message = ERROR_CODE_EXCEPTION_MAPPING[error_code]["message"]
message = "HTTP-error-code: {}, Error: {}".format(error_code, api_message)
else:
# Forming a response message for raising custom exception
# Forming a response message for raising a custom exception
try:
response_json = response.json()
except Exception:
response_json = {}

message = "HTTP-error-code: {}, Error: {}".format(
error_code,
response_json.get("description", "Uknown Error"))
response_json.get("description", "Unknown Error"))

exc = ERROR_CODE_EXCEPTION_MAPPING.get(error_code, {}).get("raise_exception", TypeformError)
message = ERROR_CODE_EXCEPTION_MAPPING.get(error_code, {}).get("message", "")
Expand All @@ -190,3 +107,68 @@ def raise_for_error(response):

except (ValueError, TypeError):
raise TypeformError(error) from None

class Client(object):
"""
The client class is used for making REST calls to the Github API.
"""
BASE_URL = 'https://api.typeform.com'

def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.metric = config.get('metric')
self.session = requests.Session()
self.page_size = MAX_RESPONSES_PAGE_SIZE
self.form_page_size = FORMS_PAGE_SIZE
self.get_page_size(config)

# Set and pass request timeout to config param `request_timeout` value.
config_request_timeout = config.get('request_timeout')
if config_request_timeout and float(config_request_timeout):
self.request_timeout = float(config_request_timeout)
else:
self.request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then it set default to 300 seconds.

def get_page_size(self, config):
"""
This function will get page size from config,
and will return the default value if invalid page size is given.
"""
page_size = config.get('page_size')
if page_size is None:
return
if ((type(page_size) == int or type(page_size) == float) and (page_size > 0)) or \
(type(page_size) == str and page_size.replace('.', '', 1).isdigit() and (float(page_size) > 0) ):
self.page_size = int(float(page_size))
self.form_page_size = min(self.form_page_size, self.page_size)
else:
raise Exception(f"The entered page size is invalid, it should be a valid integer.")

def build_url(self, endpoint):
"""
Returns full URL for a given endpoint.
"""
return f"{self.BASE_URL}/{endpoint}"

@backoff.on_exception(backoff.expo,(Timeout, ConnectionError), # Backoff for Timeout and ConnectionError.
max_tries=5, factor=2, jitter=None)
@backoff.on_exception(backoff.expo, (TypeformInternalError, TypeformNotAvailableError, TypeformTooManyError, ChunkedEncodingError),
max_tries=3, factor=2)
def request(self, url, params={}, **kwargs):
"""
Call rest API and return the response in case of status code 200.
"""

if 'headers' not in kwargs:
kwargs['headers'] = {}
if self.token:
kwargs['headers']['Authorization'] = self.token

LOGGER.info("URL: %s and Params: %s", url, params)
response = self.session.get(url, params=params, headers=kwargs['headers'], timeout=self.request_timeout)
if response.status_code != 200:
raise_for_error(response)

if 'total_items' in response.json():
LOGGER.info('raw data items= {}'.format(response.json()['total_items']))
return response.json()
Loading

0 comments on commit 29f0ba4

Please sign in to comment.