Skip to content

Commit

Permalink
save progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
rshewitt committed Jan 8, 2024
1 parent d1b8698 commit 523f716
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 83 deletions.
39 changes: 19 additions & 20 deletions harvester/compare.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import logging
from harvester.utils.util import dataset_to_hash, sort_dataset

logger = logging.getLogger("harvester")


def convert_datasets_to_id_hash(ckan_source):
def convert_datasets_to_id_hash(
harvest_source: [dict], ckan_source: [dict]
) -> (list, list):
"""converts the input catalogs into [{ dataset_id: metadata_hash },...] format.
the harvest source metadata is sorted"""

# harvest_datasets = {
# d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source
# }
harvest_datasets = {
d["identifier"]: dataset_to_hash(sort_dataset(d)) for d in harvest_source
}

ckan_datasets = {}

Expand All @@ -24,29 +27,25 @@ def convert_datasets_to_id_hash(ckan_source):
meta
) # ckan datasets will always be stored sorted

return ckan_datasets
return harvest_datasets, ckan_datasets


def compare(harvest_source, ckan_source):
def compare(harvest_source: [dict], ckan_source: [dict]) -> {str: list}:
"""Compares records"""
logger.info("Hello from harvester.compare()")

output = {
"create": [],
"update": [],
"delete": [],
}

harvest_ids = set(harvest_source.keys())
ckan_ids = set(ckan_source.keys())
same_ids = harvest_ids & ckan_ids

create += list(harvest_ids - ckan_ids)
delete += list(ckan_ids - harvest_ids)
update += [i for i in same_ids if harvest_source[i] != ckan_source[i]]

compare_res = compare(*data_sources)

# for record in harvest_source.records:
# if record.identifier in create:
# record.operation = "create"
# if record.identifier in delete:
# record.operation = "delete"
# if record.identifier in update:
# record.operation = "update"
output["create"] += list(harvest_ids - ckan_ids)
output["delete"] += list(ckan_ids - harvest_ids)
output["update"] += [i for i in same_ids if harvest_source[i] != ckan_source[i]]

return harvest_source
return output
26 changes: 11 additions & 15 deletions harvester/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,9 @@ def download_dcatus_catalog(url):
url (str) : path to the file to be downloaded.
"""
try:
resp = requests.get(url)
except RequestException as e:
return Exception(e)
except JSONDecodeError as e:
return Exception(e)

if resp.status_code != 200:
return Exception("non-200 status code")

try:
return resp.json()
except JSONDecodeError as e:
return Exception(e)
return requests.get(url).json()
except Exception as e:
pass # do something with the exception


def traverse_waf(url, files=[], file_ext=".xml", folder="/", filters=[]):
Expand Down Expand Up @@ -74,12 +64,18 @@ def download_waf(files):
return output


def extract(harvest_source) -> list:
def extract(harvest_source: dict, waf_options: dict = {}) -> list:
"""Extracts all records from a harvest_source"""
logger.info("Hello from harvester.extract()")

datasets = []

# stub
if harvest_source.source_type == "dcatus":
datasets += download_dcatus_catalog(harvest_source.url)["dataset"]
elif harvest_source.source_type == "waf":
files = traverse_waf(harvest_source.url, **waf_options)
datasets += [f["content"] for f in download_waf(files)]
else: # whatever else we need?
pass

return datasets
12 changes: 12 additions & 0 deletions tests/integration/pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import pytest
from harvester.utils.json import open_json
from pathlib import Path

BASE_DIR = Path(__file__).parents[3]
DATA_DIR = BASE_DIR / "data" / "dcatus"
SCHEMA_DIR = DATA_DIR / "schemas"


@pytest.fixture
def open_dataset_schema():
dataset_schema = SCHEMA_DIR / "dataset.json"
return open_json(dataset_schema)


@pytest.fixture
Expand Down
106 changes: 58 additions & 48 deletions tests/integration/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,68 @@
from harvester.validate.dcat_us import validate_json_schema
from harvester.extract import extract
from harvester.utils.util import sort_dataset, dataset_to_hash
from harvester.load import create_ckan_entrypoint, search_ckan
from harvester.compare import convert_datasets_to_id_hash
from harvester.load import dcatus_to_ckan



def extract_catalog():
pass

def extract_ckan():
pass

def compare():
pass

def extract():

extract_catalog()
extract_ckan()
compare()


class HarvestSource:
def __init__(self, name, url, source_type, config) -> None:
self.name = name
self.url = url
self.source_type = source_type
self.config = config


self.records = {}

class HarvestRecord:
def __init__(self, identifier, record, data_hash) -> None:
def __init__(self, hs, identifier, record, record_hash) -> None:
self.hs = hs
self.identifier = identifier
self.record = record
self.record_hash = record_hash
self.harvest_source


def assign_operation_to_record(record, operation):
record.operation = operation
return record


def compare(harvest_source) -> {str: list}:
def compare(harvest_source, ckan_source) -> {str: list}:
"""Compares records"""
logger.info("Hello from harvester.compare()")

harvest_ids = set(harvest_source.records.keys())
ckan_ids = set(ckan_source.keys())
same_ids = harvest_ids & ckan_ids

create += list(harvest_ids - ckan_ids)
delete += list(ckan_ids - harvest_ids)
update += [
i for i in same_ids if harvest_source.records[i].data_hash != ckan_source[i]
create = list(harvest_ids - ckan_ids)
delete = list(ckan_ids - harvest_ids)
update = [
i for i in same_ids if harvest_source.records[i].record_hash != ckan_source[i]
]

for operation, ids in [("create", create), ("delete", delete), ("update", update)]:
map(
assign_operation_to_record(operation),
filter(lambda r: r.identifier in ids, harvest_source.records),
)

return harvest_source


def extract(harvest_source):
harvest_source.extracted_data = extract(
harvest_source.url, harvest_source.source_type
)

harvest_source.records = {
r["identifier"]: HarvestRecord(
r["identifier"], r, dataset_to_hash(sort_dataset(r))
)
for r in harvest_source.extracted_data["dataset"]
}

ckan_source = harvester.search_ckan()["results"]
ckan_source = convert_datasets_to_id_hash(ckan_source)

harvest_source = compare(harvest_source)
for record_id, record in harvest_source.records.items():
if record_id in ids:
harvest_source.records[record_id].operation = operation

return harvest_source

Expand All @@ -81,28 +85,30 @@ def load(harvest_record):
operations[harvest_record.operation](ckan, harvest_record.record)


def test_pipeline(harvest_source_example):
def test_pipeline(harvest_source_example, open_dataset_schema):
# harvest source setup
harvest_source = HarvestSource(**harvest_source_example)

# EXTRACTION
# download the data
harvest_source.extracted_data = extract(
harvest_source.url, harvest_source.source_type
)["dataset"]
harvest_source.extracted_data = extract(harvest_source)

# format and store the records
harvest_source.records = {
r["identifier"]: HarvestRecord(
r["identifier", r, dataset_to_hash(sort_dataset(r))]
r["identifier"], r, dataset_to_hash(sort_dataset(r))
)
for r in harvest_source.extracted_data
}

ckan = create_ckan_entrypoint(
"https://catalog-dev.data.gov/",
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJtQjRSX0pSU1hsaFlmRDN2WnN6NWRpRXF3dF83UE10TE1JVFRaRU1zSDhjIiwiaWF0IjoxNjk1MjMxMDkyfQ.Z8BeUk36zUGuiHWJCIMuVLwlHjz2e-yfXe-zMEOpV8k",
)
# COMPARISON
# get the associated records on ckan
ckan_source_datasets = search_ckan(
ckan_entrypoint,
ckan,
{
"q": 'harvest_source_name:"test_harvest_source_name"',
"fl": [
Expand All @@ -117,17 +123,21 @@ def test_pipeline(harvest_source_example):
ckan_source = convert_datasets_to_id_hash(ckan_source_datasets)

# run our comparison
compare_result = compare(harvest_source, ckan_source)
harvest_source = compare(harvest_source, ckan_source)

ckan = harvester.create_ckan_entrypoint(ckan_url, api_key)
operations = {
"delete": harvester.purge_ckan_package,
"create": harvester.create_ckan_package,
"update": harvester.update_ckan_package,
}
# ckan = create_ckan_entrypoint(ckan_url, api_key)
# operations = {
# "delete": harvester.purge_ckan_package,
# "create": harvester.create_ckan_package,
# "update": harvester.update_ckan_package,
# }

# VALIDATE AND LOAD
for record_id, record in harvest_source.records.items():
validate_json_schema(record.record)
record.record_as_ckan = dcatus_to_ckan(record.record)
operations[record.operation](ckan, record.record_as_ckan)
validate_json_schema(record, open_dataset_schema)
record.record_as_ckan = dcatus_to_ckan(
record.record, "test_harvest_source_name"
)
# operations[record.operation](ckan, record.record_as_ckan)

a = 10

0 comments on commit 523f716

Please sign in to comment.