Skip to content

Commit

Permalink
further cleanup
Browse files Browse the repository at this point in the history
- Recognise 0 as a valid numeric value
- Fix whitespace and unused import
- Extract maximum retry count to a constant
- Use context managers to automatically close streams
- Add README note about configuring PostgreSQL date style
- Add titles to queued jobs so they are more easily administered
  • Loading branch information
ThrawnCA committed Feb 2, 2024
1 parent 35e1a9b commit 6eb5658
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 24 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ This setting is shared with other plugins that download resource files, such as

ckan.download_proxy = http://my-proxy:1234/

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:

datestyle=ISO,DMY

------------------------
Developer installation
------------------------
Expand Down
4 changes: 3 additions & 1 deletion ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def xloader_submit(context, data_dict):

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data],
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
10 changes: 1 addition & 9 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
with ENGINE.begin() as conn:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
Expand Down Expand Up @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()


class InvalidErrorObjectError(Exception):
Expand Down
17 changes: 9 additions & 8 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
Expand Down Expand Up @@ -92,18 +93,21 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None
Expand All @@ -112,7 +116,7 @@ def xloader_data_into_datastore(input):
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand Down Expand Up @@ -562,8 +566,7 @@ def __init__(self, task_id, input):
self.input = input

def emit(self, record):
conn = db.ENGINE.connect()
try:
with db.ENGINE.connect() as conn:
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
message = str(record.getMessage())
Expand All @@ -579,8 +582,6 @@ def emit(self, record):
module=module,
funcName=funcName,
lineno=record.lineno))
finally:
conn.close()


class DatetimeJsonEncoder(json.JSONEncoder):
Expand Down
4 changes: 3 additions & 1 deletion ckanext/xloader/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def convert_types(self, extended_rows):
cell_type = self.types[cell_index] if self.types else None
if cell_type in [Decimal, None]:
converted_value = to_number(cell_value)
if converted_value:
# Can't do a simple truthiness check,
# because 0 is a valid numeric result.
if converted_value is not None:
row[cell_index] = converted_value
continue
if cell_type in [datetime.datetime, None]:
Expand Down
4 changes: 2 additions & 2 deletions ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def notify(self, entity, operation):
See: ckan/model/modification.py.DomainObjectModificationExtension
"""
if operation != DomainObjectOperation.changed \
or not isinstance(entity, Resource) \
or not getattr(entity, 'url_changed', False):
or not isinstance(entity, Resource) \
or not getattr(entity, 'url_changed', False):
return
context = {
"ignore_auth": True,
Expand Down
2 changes: 1 addition & 1 deletion ckanext/xloader/templates/xloader/resource_data.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
{% set delete_action = h.url_for('xloader.delete_datastore_table', id=pkg.id, resource_id=res.id) %}
<form method="post" action="{{ delete_action }}" class="mb-3 d-inline-block">
{{ h.csrf_input() if 'csrf_input' in h }}
<a href="{{ h.url_for('xloader.delete_datastore_table', id=pkg.id, resource_id=res.id) }}"
<a href="{{ delete_action }}"
class="btn btn-danger"
type="submit"
data-module="confirm-action"
Expand Down
3 changes: 2 additions & 1 deletion ckanext/xloader/tests/samples/simple-large.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
id,text
0,-
1,a
2,b
3,c
Expand Down Expand Up @@ -49997,4 +49998,4 @@ id,text
49996,x
49997,y
49998,z
49999,a
49999,a
17 changes: 17 additions & 0 deletions ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,23 @@ def test_simple(self, Session):
u"text",
]

def test_simple_large_file(self, Session):
csv_filepath = get_sample_filepath("simple-large.csv")
resource = factories.Resource()
resource_id = resource['id']
loader.load_table(
csv_filepath,
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
assert self._get_column_types(Session, resource_id) == [
u"int4",
u"tsvector",
u"numeric",
u"text",
]

# test disabled by default to avoid adding large file to repo and slow test
@pytest.mark.skip
def test_boston_311_complete(self):
Expand Down
2 changes: 1 addition & 1 deletion ckanext/xloader/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flask import Blueprint, request
from flask import Blueprint

from ckan.plugins.toolkit import _, h, g, render, request, abort, NotAuthorized, get_action, ObjectNotFound

Expand Down

0 comments on commit 6eb5658

Please sign in to comment.