diff --git a/harvester/compare.py b/harvester/compare.py index feebcd6b..9d529159 100644 --- a/harvester/compare.py +++ b/harvester/compare.py @@ -3,22 +3,50 @@ logger = logging.getLogger("harvester") +def convert_datasets_to_id_hash(ckan_source): + """converts the input catalogs into [{ dataset_id: metadata_hash },...] format. + the harvest source metadata is sorted""" + + # harvest_datasets = { + # d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source + # } + + ckan_datasets = {} + + for d in ckan_source: + oid, meta = None, None + for e in d["extras"]: + if e["key"] == "dcat_metadata": + meta = eval(e["value"], {"__builtins__": {}}) + if e["key"] == "identifier": + oid = e["value"] + ckan_datasets[oid] = dataset_to_hash( + meta + ) # ckan datasets will always be stored sorted + + return ckan_datasets + + def compare(harvest_source, ckan_source): """Compares records""" logger.info("Hello from harvester.compare()") - output = { - "create": [], - "update": [], - "delete": [], - } - harvest_ids = set(harvest_source.keys()) ckan_ids = set(ckan_source.keys()) same_ids = harvest_ids & ckan_ids - output["create"] += list(harvest_ids - ckan_ids) - output["delete"] += list(ckan_ids - harvest_ids) - output["update"] += [i for i in same_ids if harvest_source[i] != ckan_source[i]] + create += list(harvest_ids - ckan_ids) + delete += list(ckan_ids - harvest_ids) + update += [i for i in same_ids if harvest_source[i] != ckan_source[i]] + + compare_res = compare(*data_sources) + + # for record in harvest_source.records: + # if record.identifier in create: + # record.operation = "create" + # if record.identifier in delete: + # record.operation = "delete" + # if record.identifier in update: + # record.operation = "update" - return output + return harvest_source diff --git a/harvester/validate/dcat_us.py b/harvester/validate/dcat_us.py index 4fcf554e..ed04543e 100644 --- a/harvester/validate/dcat_us.py +++ b/harvester/validate/dcat_us.py @@ -20,19 +20,14 @@ def parse_errors(errors): return error_message -def validate_json_schema(json_data, dataset_schema): - success = None - error_message = "" - +def validate_json_schema(record, dataset_schema): validator = Draft202012Validator(dataset_schema) try: - validator.validate(json_data) - success = True - error_message = "no errors" - except ValidationError: - success = False - errors = validator.iter_errors(json_data) - error_message = parse_errors(errors) - - return success, error_message + validator.validate(record.record) + record.valid = True + except Exception as e: + record.value = False + # do something with the exception + + return record diff --git a/tests/integration/pipeline/conftest.py b/tests/integration/pipeline/conftest.py new file mode 100644 index 00000000..87ca0451 --- /dev/null +++ b/tests/integration/pipeline/conftest.py @@ -0,0 +1,11 @@ +import pytest + + +@pytest.fixture +def harvest_source_example(): + return { + "name": "test_harvest_source_name", + "url": "http://localhost/dcatus/dcatus_compare.json", + "source_type": "dcatus", + "config": {}, + } diff --git a/tests/integration/pipeline/test_pipeline.py b/tests/integration/pipeline/test_pipeline.py new file mode 100644 index 00000000..07a03ba4 --- /dev/null +++ b/tests/integration/pipeline/test_pipeline.py @@ -0,0 +1,133 @@ +from harvester.validate.dcat_us import validate_json_schema + + +class HarvestSource: + def __init__(self, name, url, source_type, config) -> None: + self.name = name + self.url = url + self.source_type = source_type + self.config = config + + +class HarvestRecord: + def __init__(self, identifier, record, data_hash) -> None: + self.identifier = identifier + self.record = record + self.record_hash = record_hash + + +def assign_operation_to_record(record, operation): + record.operation = operation + + +def compare(harvest_source) -> {str: list}: + """Compares records""" + logger.info("Hello from harvester.compare()") + + harvest_ids = set(harvest_source.records.keys()) + ckan_ids = set(ckan_source.keys()) + same_ids = harvest_ids & ckan_ids + + create += list(harvest_ids - ckan_ids) + delete += list(ckan_ids - harvest_ids) + update += [ + i for i in same_ids if harvest_source.records[i].data_hash != ckan_source[i] + ] + + for operation, ids in [("create", create), ("delete", delete), ("update", update)]: + map( + assign_operation_to_record(operation), + filter(lambda r: r.identifier in ids, harvest_source.records), + ) + + return harvest_source + + +def extract(harvest_source): + harvest_source.extracted_data = extract( + harvest_source.url, harvest_source.source_type + ) + + harvest_source.records = { + r["identifier"]: HarvestRecord( + r["identifier"], r, dataset_to_hash(sort_dataset(r)) + ) + for r in harvest_source.extracted_data["dataset"] + } + + ckan_source = harvester.search_ckan()["results"] + ckan_source = convert_datasets_to_id_hash(ckan_source) + + harvest_source = compare(harvest_source) + + return harvest_source + + +def validate(harvest_record): + validator = Draft202012Validator(dcatus_dataset_schema) + validator.validate(harvest_record.record) + + # harvest_record.valid = True or False + return harvest_record + + +def load(harvest_record): + ckan = harvester.create_ckan_entrypoint(ckan_url, api_key) + operations = { + "delete": harvester.purge_ckan_package, + "create": harvester.create_ckan_package, + "update": harvester.update_ckan_package, + } + operations[harvest_record.operation](ckan, harvest_record.record) + + +def test_pipeline(harvest_source_example): + # harvest source setup + harvest_source = HarvestSource(**harvest_source_example) + + # EXTRACTION + # download the data + harvest_source.extracted_data = extract( + harvest_source.url, harvest_source.source_type + )["dataset"] + + # format and store the records + harvest_source.records = { + r["identifier"]: HarvestRecord( + r["identifier", r, dataset_to_hash(sort_dataset(r))] + ) + for r in harvest_source.extracted_data + } + + # COMPARISON + # get the associated records on ckan + ckan_source_datasets = search_ckan( + ckan_entrypoint, + { + "q": 'harvest_source_name:"test_harvest_source_name"', + "fl": [ + "extras_harvest_source_name", + "extras_dcat_metadata", + "extras_identifier", + ], + }, + )["results"] + + # format the ckan records for comparison + ckan_source = convert_datasets_to_id_hash(ckan_source_datasets) + + # run our comparison + compare_result = compare(harvest_source, ckan_source) + + ckan = harvester.create_ckan_entrypoint(ckan_url, api_key) + operations = { + "delete": harvester.purge_ckan_package, + "create": harvester.create_ckan_package, + "update": harvester.update_ckan_package, + } + + # VALIDATE AND LOAD + for record_id, record in harvest_source.records.items(): + validate_json_schema(record.record) + record.record_as_ckan = dcatus_to_ckan(record.record) + operations[record.operation](ckan, record.record_as_ckan)