Skip to content

Commit

Permalink
rough sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
rshewitt committed Jan 5, 2024
1 parent 9a278fa commit d1b8698
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 23 deletions.
48 changes: 38 additions & 10 deletions harvester/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,50 @@
logger = logging.getLogger("harvester")


def convert_datasets_to_id_hash(ckan_source):
"""converts the input catalogs into [{ dataset_id: metadata_hash },...] format.
the harvest source metadata is sorted"""

# harvest_datasets = {
# d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source
# }

ckan_datasets = {}

for d in ckan_source:
oid, meta = None, None
for e in d["extras"]:
if e["key"] == "dcat_metadata":
meta = eval(e["value"], {"__builtins__": {}})
if e["key"] == "identifier":
oid = e["value"]
ckan_datasets[oid] = dataset_to_hash(
meta
) # ckan datasets will always be stored sorted

return ckan_datasets


def compare(harvest_source, ckan_source):
"""Compares records"""
logger.info("Hello from harvester.compare()")

output = {
"create": [],
"update": [],
"delete": [],
}

harvest_ids = set(harvest_source.keys())
ckan_ids = set(ckan_source.keys())
same_ids = harvest_ids & ckan_ids

output["create"] += list(harvest_ids - ckan_ids)
output["delete"] += list(ckan_ids - harvest_ids)
output["update"] += [i for i in same_ids if harvest_source[i] != ckan_source[i]]
create += list(harvest_ids - ckan_ids)
delete += list(ckan_ids - harvest_ids)
update += [i for i in same_ids if harvest_source[i] != ckan_source[i]]

compare_res = compare(*data_sources)

# for record in harvest_source.records:
# if record.identifier in create:
# record.operation = "create"
# if record.identifier in delete:
# record.operation = "delete"
# if record.identifier in update:
# record.operation = "update"

return output
return harvest_source
21 changes: 8 additions & 13 deletions harvester/validate/dcat_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ def parse_errors(errors):
return error_message


def validate_json_schema(json_data, dataset_schema):
success = None
error_message = ""

def validate_json_schema(record, dataset_schema):
validator = Draft202012Validator(dataset_schema)

try:
validator.validate(json_data)
success = True
error_message = "no errors"
except ValidationError:
success = False
errors = validator.iter_errors(json_data)
error_message = parse_errors(errors)

return success, error_message
validator.validate(record.record)
record.valid = True
except Exception as e:
record.value = False
# do something with the exception

return record
11 changes: 11 additions & 0 deletions tests/integration/pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pytest


@pytest.fixture
def harvest_source_example():
return {
"name": "test_harvest_source_name",
"url": "http://localhost/dcatus/dcatus_compare.json",
"source_type": "dcatus",
"config": {},
}
133 changes: 133 additions & 0 deletions tests/integration/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from harvester.validate.dcat_us import validate_json_schema


class HarvestSource:
def __init__(self, name, url, source_type, config) -> None:
self.name = name
self.url = url
self.source_type = source_type
self.config = config


class HarvestRecord:
def __init__(self, identifier, record, data_hash) -> None:
self.identifier = identifier
self.record = record
self.record_hash = record_hash


def assign_operation_to_record(record, operation):
record.operation = operation


def compare(harvest_source) -> {str: list}:
"""Compares records"""
logger.info("Hello from harvester.compare()")

harvest_ids = set(harvest_source.records.keys())
ckan_ids = set(ckan_source.keys())
same_ids = harvest_ids & ckan_ids

create += list(harvest_ids - ckan_ids)
delete += list(ckan_ids - harvest_ids)
update += [
i for i in same_ids if harvest_source.records[i].data_hash != ckan_source[i]
]

for operation, ids in [("create", create), ("delete", delete), ("update", update)]:
map(
assign_operation_to_record(operation),
filter(lambda r: r.identifier in ids, harvest_source.records),
)

return harvest_source


def extract(harvest_source):
harvest_source.extracted_data = extract(
harvest_source.url, harvest_source.source_type
)

harvest_source.records = {
r["identifier"]: HarvestRecord(
r["identifier"], r, dataset_to_hash(sort_dataset(r))
)
for r in harvest_source.extracted_data["dataset"]
}

ckan_source = harvester.search_ckan()["results"]
ckan_source = convert_datasets_to_id_hash(ckan_source)

harvest_source = compare(harvest_source)

return harvest_source


def validate(harvest_record):
validator = Draft202012Validator(dcatus_dataset_schema)
validator.validate(harvest_record.record)

# harvest_record.valid = True or False
return harvest_record


def load(harvest_record):
ckan = harvester.create_ckan_entrypoint(ckan_url, api_key)
operations = {
"delete": harvester.purge_ckan_package,
"create": harvester.create_ckan_package,
"update": harvester.update_ckan_package,
}
operations[harvest_record.operation](ckan, harvest_record.record)


def test_pipeline(harvest_source_example):
# harvest source setup
harvest_source = HarvestSource(**harvest_source_example)

# EXTRACTION
# download the data
harvest_source.extracted_data = extract(
harvest_source.url, harvest_source.source_type
)["dataset"]

# format and store the records
harvest_source.records = {
r["identifier"]: HarvestRecord(
r["identifier", r, dataset_to_hash(sort_dataset(r))]
)
for r in harvest_source.extracted_data
}

# COMPARISON
# get the associated records on ckan
ckan_source_datasets = search_ckan(
ckan_entrypoint,
{
"q": 'harvest_source_name:"test_harvest_source_name"',
"fl": [
"extras_harvest_source_name",
"extras_dcat_metadata",
"extras_identifier",
],
},
)["results"]

# format the ckan records for comparison
ckan_source = convert_datasets_to_id_hash(ckan_source_datasets)

# run our comparison
compare_result = compare(harvest_source, ckan_source)

ckan = harvester.create_ckan_entrypoint(ckan_url, api_key)
operations = {
"delete": harvester.purge_ckan_package,
"create": harvester.create_ckan_package,
"update": harvester.update_ckan_package,
}

# VALIDATE AND LOAD
for record_id, record in harvest_source.records.items():
validate_json_schema(record.record)
record.record_as_ckan = dcatus_to_ckan(record.record)
operations[record.operation](ckan, record.record_as_ckan)

0 comments on commit d1b8698

Please sign in to comment.