Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make locking behaviour more robust #205

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import traceback
import sys

from psycopg2 import errors
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config

from . import loader
from . import db
from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata

Expand All @@ -28,6 +28,8 @@
except ImportError:
get_user_from_token = None

log = logging.getLogger(__name__)

SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()
Expand All @@ -37,6 +39,13 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
errors.ObjectInUse,
)
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')


# input = {
# 'api_key': user['apikey'],
Expand Down Expand Up @@ -80,15 +89,26 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
enqueue_job(
xloader_data_into_datastore,
[input],
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None

db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
finally:
Expand Down
127 changes: 91 additions & 36 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,49 @@
tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES


def _fields_match(fields, existing_fields, logger):
''' Check whether all columns have the same names and types as previously,
independent of ordering.
'''
# drop the generated '_id' field
for index in range(len(existing_fields)):
if existing_fields[index]['id'] == '_id':
existing_fields.pop(index)
break

# fail fast if number of fields doesn't match
field_count = len(fields)
if field_count != len(existing_fields):
logger.info("Fields do not match; there are now %s fields but previously %s", field_count, len(existing_fields))
return False

# ensure each field is present in both collections with the same type
for index in range(field_count):
field_id = fields[index]['id']
for existing_index in range(field_count):
existing_field_id = existing_fields[existing_index]['id']
if field_id == existing_field_id:
if fields[index]['type'] == existing_fields[existing_index]['type']:
break
else:
logger.info("Fields do not match; new type for %s field is %s but existing type is %s",
field_id, fields[index]["type"], existing_fields[existing_index]['type'])
return False
else:
logger.info("Fields do not match; no existing entry found for %s", field_id)
return False
return True


def _clear_datastore_resource(resource_id):
''' Delete all records from the datastore table, without dropping the table itself.
'''
engine = get_write_engine()
with engine.begin() as conn:
conn.execute("SET LOCAL lock_timeout = '5s'")
conn.execute('TRUNCATE TABLE "{}"'.format(resource_id))


def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

Expand Down Expand Up @@ -85,34 +128,43 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
existing = datastore_resource_exists(resource_id)
existing_info = {}
if existing:
existing_fields = existing.get('fields', [])
existing_info = dict((f['id'], f['info'])
for f in existing.get('fields', [])
for f in existing_fields
if 'info' in f)

'''
Delete existing datastore table before proceeding. Otherwise
the COPY will append to the existing table. And if
the fields have significantly changed, it may also fail.
'''
logger.info('Deleting "{res_id}" from DataStore.'.format(
res_id=resource_id))
delete_datastore_resource(resource_id)

# Columns types are either set (overridden) in the Data Dictionary page
# or default to text type (which is robust)
fields = [
{'id': header_name,
'type': existing_info.get(header_name, {})
.get('type_override') or 'text',
}
for header_name in headers]
# Column types are either set (overridden) in the Data Dictionary page
# or default to text type (which is robust)
fields = [
{'id': header_name,
'type': existing_info.get(header_name, {})
.get('type_override') or 'text',
}
for header_name in headers]

# Maintain data dictionaries from matching column names
if existing_info:
# Maintain data dictionaries from matching column names
for f in fields:
if f['id'] in existing_info:
f['info'] = existing_info[f['id']]

'''
Delete or truncate existing datastore table before proceeding,
depending on whether any fields have changed.
Otherwise the COPY will append to the existing table.
And if the fields have significantly changed, it may also fail.
'''
if _fields_match(fields, existing_fields, logger):
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
logger.info('Deleting "%s" from DataStore.', resource_id)
delete_datastore_resource(resource_id)
else:
fields = [
{'id': header_name,
'type': 'text'}
for header_name in headers]

logger.info('Fields: %s', fields)

# Create table
Expand Down Expand Up @@ -254,9 +306,10 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
existing = datastore_resource_exists(resource_id)
existing_info = None
if existing:
existing_fields = existing.get('fields', [])
existing_info = dict(
(f['id'], f['info'])
for f in existing.get('fields', []) if 'info' in f)
for f in existing_fields if 'info' in f)

# Some headers might have been converted from strings to floats and such.
headers = encode_headers(headers)
Expand Down Expand Up @@ -290,16 +343,6 @@ def row_iterator():
yield data_row
result = row_iterator()

'''
Delete existing datstore resource before proceeding. Otherwise
'datastore_create' will append to the existing datastore. And if
the fields have significantly changed, it may also fail.
'''
if existing:
logger.info('Deleting "{res_id}" from datastore.'.format(
res_id=resource_id))
delete_datastore_resource(resource_id)

headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
for field in zip(headers, types)]

Expand All @@ -313,8 +356,21 @@ def row_iterator():
if type_override in list(_TYPE_MAPPING.values()):
h['type'] = type_override

logger.info('Determined headers and types: {headers}'.format(
headers=headers_dicts))
logger.info('Determined headers and types: %s', headers_dicts)

'''
Delete or truncate existing datastore table before proceeding,
depending on whether any fields have changed.
Otherwise 'datastore_create' will append to the existing datastore.
And if the fields have significantly changed, it may also fail.
'''
if existing:
if _fields_match(headers_dicts, existing_fields, logger):
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
logger.info('Deleting "%s" from datastore.', resource_id)
delete_datastore_resource(resource_id)

logger.info('Copying to database...')
count = 0
Expand All @@ -323,7 +379,7 @@ def row_iterator():
non_empty_types = ['timestamp', 'numeric']
for i, records in enumerate(chunky(result, 250)):
count += len(records)
logger.info('Saving chunk {number}'.format(number=i))
logger.info('Saving chunk %s', i)
for row in records:
for column_index, column_name in enumerate(row):
if headers_dicts[column_index]['type'] in non_empty_types and row[column_name] == '':
Expand All @@ -332,8 +388,7 @@ def row_iterator():
logger.info('...copying done')

if count:
logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
n=count, res_id=resource_id))
logger.info('Successfully pushed %s entries to "%s".', count, resource_id)
else:
# no datastore table is created
raise LoaderError('No entries found - nothing to load')
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def set_resource_metadata(update_dict):
# better fix

q = model.Session.query(model.Resource). \
with_for_update(of=model.Resource). \
filter(model.Resource.id == update_dict['resource_id'])
resource = q.one()

Expand Down
Loading