From 523f716e912cceddb369d8cc27b01400d0bc08dc Mon Sep 17 00:00:00 2001 From: Reid Hewitt Date: Mon, 8 Jan 2024 09:15:18 -0700 Subject: [PATCH] save progress. --- harvester/compare.py | 39 ++++--- harvester/extract.py | 26 ++--- tests/integration/pipeline/conftest.py | 12 +++ tests/integration/pipeline/test_pipeline.py | 106 +++++++++++--------- 4 files changed, 100 insertions(+), 83 deletions(-) diff --git a/harvester/compare.py b/harvester/compare.py index 9d529159..18681c3b 100644 --- a/harvester/compare.py +++ b/harvester/compare.py @@ -1,15 +1,18 @@ import logging +from harvester.utils.util import dataset_to_hash, sort_dataset logger = logging.getLogger("harvester") -def convert_datasets_to_id_hash(ckan_source): +def convert_datasets_to_id_hash( + harvest_source: [dict], ckan_source: [dict] +) -> (list, list): """converts the input catalogs into [{ dataset_id: metadata_hash },...] format. the harvest source metadata is sorted""" - # harvest_datasets = { - # d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source - # } + harvest_datasets = { + d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source + } ckan_datasets = {} @@ -24,29 +27,25 @@ def convert_datasets_to_id_hash(ckan_source): meta ) # ckan datasets will always be stored sorted - return ckan_datasets + return harvest_datasets, ckan_datasets -def compare(harvest_source, ckan_source): +def compare(harvest_source: [dict], ckan_source: [dict]) -> {str: list}: """Compares records""" logger.info("Hello from harvester.compare()") + output = { + "create": [], + "update": [], + "delete": [], + } + harvest_ids = set(harvest_source.keys()) ckan_ids = set(ckan_source.keys()) same_ids = harvest_ids & ckan_ids - create += list(harvest_ids - ckan_ids) - delete += list(ckan_ids - harvest_ids) - update += [i for i in same_ids if harvest_source[i] != ckan_source[i]] - - compare_res = compare(*data_sources) - - # for record in harvest_source.records: - # if record.identifier in create: - # record.operation = "create" - # if record.identifier in delete: - # record.operation = "delete" - # if record.identifier in update: - # record.operation = "update" + output["create"] += list(harvest_ids - ckan_ids) + output["delete"] += list(ckan_ids - harvest_ids) + output["update"] += [i for i in same_ids if harvest_source[i] != ckan_source[i]] - return harvest_source + return output diff --git a/harvester/extract.py b/harvester/extract.py index d9af537d..de95f986 100644 --- a/harvester/extract.py +++ b/harvester/extract.py @@ -13,19 +13,9 @@ def download_dcatus_catalog(url): url (str) : path to the file to be downloaded. """ try: - resp = requests.get(url) - except RequestException as e: - return Exception(e) - except JSONDecodeError as e: - return Exception(e) - - if resp.status_code != 200: - return Exception("non-200 status code") - - try: - return resp.json() - except JSONDecodeError as e: - return Exception(e) + return requests.get(url).json() + except Exception as e: + pass # do something with the exception def traverse_waf(url, files=[], file_ext=".xml", folder="/", filters=[]): @@ -74,12 +64,18 @@ def download_waf(files): return output -def extract(harvest_source) -> list: +def extract(harvest_source: dict, waf_options: dict = {}) -> list: """Extracts all records from a harvest_source""" logger.info("Hello from harvester.extract()") datasets = [] - # stub + if harvest_source.source_type == "dcatus": + datasets += download_dcatus_catalog(harvest_source.url)["dataset"] + elif harvest_source.source_type == "waf": + files = traverse_waf(harvest_source.url, **waf_options) + datasets += [f["content"] for f in download_waf(files)] + else: # whatever else we need? + pass return datasets diff --git a/tests/integration/pipeline/conftest.py b/tests/integration/pipeline/conftest.py index 87ca0451..49f2e6ba 100644 --- a/tests/integration/pipeline/conftest.py +++ b/tests/integration/pipeline/conftest.py @@ -1,4 +1,16 @@ import pytest +from harvester.utils.json import open_json +from pathlib import Path + +BASE_DIR = Path(__file__).parents[3] +DATA_DIR = BASE_DIR / "data" / "dcatus" +SCHEMA_DIR = DATA_DIR / "schemas" + + +@pytest.fixture +def open_dataset_schema(): + dataset_schema = SCHEMA_DIR / "dataset.json" + return open_json(dataset_schema) @pytest.fixture diff --git a/tests/integration/pipeline/test_pipeline.py b/tests/integration/pipeline/test_pipeline.py index 07a03ba4..4875d9db 100644 --- a/tests/integration/pipeline/test_pipeline.py +++ b/tests/integration/pipeline/test_pipeline.py @@ -1,64 +1,68 @@ from harvester.validate.dcat_us import validate_json_schema +from harvester.extract import extract +from harvester.utils.util import sort_dataset, dataset_to_hash +from harvester.load import create_ckan_entrypoint, search_ckan +from harvester.compare import convert_datasets_to_id_hash +from harvester.load import dcatus_to_ckan + +def extract_catalog(): + pass + +def extract_ckan(): + pass + +def compare(): + pass + +def extract(): + + extract_catalog() + extract_ckan() + compare() + + class HarvestSource: def __init__(self, name, url, source_type, config) -> None: self.name = name self.url = url self.source_type = source_type self.config = config - + + self.records = {} class HarvestRecord: - def __init__(self, identifier, record, data_hash) -> None: + def __init__(self, hs, identifier, record, record_hash) -> None: + self.hs = hs self.identifier = identifier self.record = record self.record_hash = record_hash + self.harvest_source def assign_operation_to_record(record, operation): record.operation = operation + return record -def compare(harvest_source) -> {str: list}: +def compare(harvest_source, ckan_source) -> {str: list}: """Compares records""" - logger.info("Hello from harvester.compare()") harvest_ids = set(harvest_source.records.keys()) ckan_ids = set(ckan_source.keys()) same_ids = harvest_ids & ckan_ids - create += list(harvest_ids - ckan_ids) - delete += list(ckan_ids - harvest_ids) - update += [ - i for i in same_ids if harvest_source.records[i].data_hash != ckan_source[i] + create = list(harvest_ids - ckan_ids) + delete = list(ckan_ids - harvest_ids) + update = [ + i for i in same_ids if harvest_source.records[i].record_hash != ckan_source[i] ] for operation, ids in [("create", create), ("delete", delete), ("update", update)]: - map( - assign_operation_to_record(operation), - filter(lambda r: r.identifier in ids, harvest_source.records), - ) - - return harvest_source - - -def extract(harvest_source): - harvest_source.extracted_data = extract( - harvest_source.url, harvest_source.source_type - ) - - harvest_source.records = { - r["identifier"]: HarvestRecord( - r["identifier"], r, dataset_to_hash(sort_dataset(r)) - ) - for r in harvest_source.extracted_data["dataset"] - } - - ckan_source = harvester.search_ckan()["results"] - ckan_source = convert_datasets_to_id_hash(ckan_source) - - harvest_source = compare(harvest_source) + for record_id, record in harvest_source.records.items(): + if record_id in ids: + harvest_source.records[record_id].operation = operation return harvest_source @@ -81,28 +85,30 @@ def load(harvest_record): operations[harvest_record.operation](ckan, harvest_record.record) -def test_pipeline(harvest_source_example): +def test_pipeline(harvest_source_example, open_dataset_schema): # harvest source setup harvest_source = HarvestSource(**harvest_source_example) # EXTRACTION # download the data - harvest_source.extracted_data = extract( - harvest_source.url, harvest_source.source_type - )["dataset"] + harvest_source.extracted_data = extract(harvest_source) # format and store the records harvest_source.records = { r["identifier"]: HarvestRecord( - r["identifier", r, dataset_to_hash(sort_dataset(r))] + r["identifier"], r, dataset_to_hash(sort_dataset(r)) ) for r in harvest_source.extracted_data } + ckan = create_ckan_entrypoint( + "https://catalog-dev.data.gov/", + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJtQjRSX0pSU1hsaFlmRDN2WnN6NWRpRXF3dF83UE10TE1JVFRaRU1zSDhjIiwiaWF0IjoxNjk1MjMxMDkyfQ.Z8BeUk36zUGuiHWJCIMuVLwlHjz2e-yfXe-zMEOpV8k", + ) # COMPARISON # get the associated records on ckan ckan_source_datasets = search_ckan( - ckan_entrypoint, + ckan, { "q": 'harvest_source_name:"test_harvest_source_name"', "fl": [ @@ -117,17 +123,21 @@ def test_pipeline(harvest_source_example): ckan_source = convert_datasets_to_id_hash(ckan_source_datasets) # run our comparison - compare_result = compare(harvest_source, ckan_source) + harvest_source = compare(harvest_source, ckan_source) - ckan = harvester.create_ckan_entrypoint(ckan_url, api_key) - operations = { - "delete": harvester.purge_ckan_package, - "create": harvester.create_ckan_package, - "update": harvester.update_ckan_package, - } + # ckan = create_ckan_entrypoint(ckan_url, api_key) + # operations = { + # "delete": harvester.purge_ckan_package, + # "create": harvester.create_ckan_package, + # "update": harvester.update_ckan_package, + # } # VALIDATE AND LOAD for record_id, record in harvest_source.records.items(): - validate_json_schema(record.record) - record.record_as_ckan = dcatus_to_ckan(record.record) - operations[record.operation](ckan, record.record_as_ckan) + validate_json_schema(record, open_dataset_schema) + record.record_as_ckan = dcatus_to_ckan( + record.record, "test_harvest_source_name" + ) + # operations[record.operation](ckan, record.record_as_ckan) + + a = 10