diff --git a/app/routes.py b/app/routes.py index c722e35b..9e460fae 100644 --- a/app/routes.py +++ b/app/routes.py @@ -207,9 +207,7 @@ def edit_harvest_source(source_id=None): @mod.route("/harvest_source/harvest/", methods=["GET"]) def trigger_harvest_source(source_id): - job = db.add_harvest_job( - {"harvest_source_id": source_id, "status": "pending_manual"} - ) + job = db.add_harvest_job({"harvest_source_id": source_id, "status": "manual"}) if job: flash(f"Triggered harvest of source with ID: {source_id}") else: @@ -232,24 +230,12 @@ def delete_harvest_source(source_id): ## Harvest Job -@mod.route("/harvest_job/add", methods=["POST"]) -def add_harvest_job(): - if request.is_json: - job = db.add_harvest_job(request.json) - if job: - return jsonify({"message": f"Added new harvest job with ID: {job.id}"}) - else: - return jsonify({"error": "Failed to add harvest job."}), 400 - else: - return jsonify({"Please provide harvest job with json format."}) - - @mod.route("/harvest_job/", methods=["GET"]) @mod.route("/harvest_job/", methods=["GET"]) def get_harvest_job(job_id=None): try: if job_id: - job = db.get_harvest_job(job_id) + job = HarvesterDBInterface._to_dict(db.get_harvest_job(job_id)) return jsonify(job) if job else ("Not Found", 404) source_id = request.args.get("harvest_source_id") @@ -265,6 +251,22 @@ def get_harvest_job(job_id=None): return "Please provide correct job_id or harvest_source_id" +# all errors associated with a job ( job & records ) +@mod.route("/harvest_job//errors", methods=["GET"]) +@mod.route("/harvest_job//errors/", methods=["GET"]) +def get_all_errors_of_harvest_job(job_id, error_type="all"): + try: + all_errors = db.get_all_errors_of_job(job_id) + if error_type == "job": + return all_errors[0] + elif error_type == "record": + return all_errors[1] + else: + return all_errors + except Exception: + return "Please provide correct job_id" + + @mod.route("/harvest_job/", methods=["PUT"]) def update_harvest_job(job_id): result = db.update_harvest_job(job_id, request.json) @@ -277,54 +279,19 @@ def delete_harvest_job(job_id): return result -## Harvest Error -@mod.route("/harvest_error/add", methods=["POST", "GET"]) -def add_harvest_error(): +@mod.route("/harvest_job/add", methods=["POST"]) +def add_harvest_job(): if request.is_json: - error = db.add_harvest_error(request.json) - if error: - return jsonify({"message": f"Added new harvest error with ID: {error.id}"}) + job = db.add_harvest_job(request.json) + if job: + return jsonify({"message": f"Added new harvest job with ID: {job.id}"}) else: - return jsonify({"error": "Failed to add harvest error."}), 400 + return jsonify({"error": "Failed to add harvest job."}), 400 else: - return jsonify({"Please provide harvest error with json format."}) - - -@mod.route("/harvest_error/", methods=["GET"]) -@mod.route("/harvest_error/", methods=["GET"]) -def get_harvest_error(error_id=None): - try: - if error_id: - error = db.get_harvest_error(error_id) - return jsonify(error) if error else ("Not Found", 404) - - job_id = request.args.get("harvest_job_id") - if job_id: - error = db.get_harvest_errors_by_job(job_id) - if not error: - return "No harvest errors found for this harvest job", 404 - else: - # for test, will remove later - error = db.get_all_harvest_errors() - - return jsonify(error) - except Exception: - return "Please provide correct error_id or harvest_job_id" + return jsonify({"Please provide harvest job with json format."}) ## Harvest Record -@mod.route("/harvest_record/add", methods=["POST", "GET"]) -def add_harvest_record(): - if request.is_json: - record = db.add_harvest_record(request.json) - if record: - return jsonify({"message": f"Added new record with ID: {record.id}"}) - else: - return jsonify({"error": "Failed to add harvest record."}), 400 - else: - return jsonify({"Please provide harvest record with json format."}) - - @mod.route("/harvest_record/", methods=["GET"]) @mod.route("/harvest_record/", methods=["GET"]) def get_harvest_record(record_id=None): @@ -352,6 +319,42 @@ def get_harvest_record(record_id=None): return "Please provide correct record_id or harvest_job_id" +@mod.route("/harvest_record//errors", methods=["GET"]) +def get_all_harvest_record_errors(record_id: str) -> list: + try: + record_errors = db.get_harvest_record_errors(record_id) + return record_errors if record_errors else ("Not Found", 404) + except Exception: + return "Please provide correct record_id" + + +@mod.route("/harvest_error/", methods=["GET"]) +def get_harvest_record_error(error_id: str) -> dict: + # retrieves the given error ( either job or record ) + try: + job_error = db.get_harvest_job_error(error_id) + if job_error: + return job_error + record_error = db.get_harvest_record_error(error_id) + if record_error: + return record_error + return ("Not Found", 404) + except Exception: + return "Please provide correct record_id and record_error_id" + + +@mod.route("/harvest_record/add", methods=["POST", "GET"]) +def add_harvest_record(): + if request.is_json: + record = db.add_harvest_record(request.json) + if record: + return jsonify({"message": f"Added new record with ID: {record.id}"}) + else: + return jsonify({"error": "Failed to add harvest record."}), 400 + else: + return jsonify({"Please provide harvest record with json format."}) + + ## Test interface, will remove later # TODO: remove / improve @mod.route("/get_data_sources", methods=["GET"]) diff --git a/app/scripts/load_manager.py b/app/scripts/load_manager.py index c288b5ed..a109c13b 100644 --- a/app/scripts/load_manager.py +++ b/app/scripts/load_manager.py @@ -24,7 +24,7 @@ def create_task(jobId): def sort_jobs(jobs): - return sorted(jobs, key=lambda x: x["status"], reverse=True) + return sorted(jobs, key=lambda x: x["status"]) def load_manager(): @@ -39,10 +39,8 @@ def load_manager(): print("CF_INSTANCE_INDEX is not set or not equal to zero") return - # filter harvestjobs by pending / pending_manual - jobs = interface.get_harvest_jobs_by_faceted_filter( - "status", ["pending", "pending_manual"] - ) + # filter harvestjobs by new (automated) & manual + jobs = interface.get_harvest_jobs_by_faceted_filter("status", ["new", "manual"]) # get current list of all tasks current_tasks = cf_handler.get_all_app_tasks(LM_RUNNER_APP_GUID) @@ -56,7 +54,7 @@ def load_manager(): else: slots = LM_MAX_TASKS_COUNT - running_tasks - # sort jobs by pending_manual first + # sort jobs by manual first sorted_jobs = sort_jobs(jobs) # slice off jobs to invoke diff --git a/database/interface.py b/database/interface.py index 23ca1f51..5ce8fe4b 100644 --- a/database/interface.py +++ b/database/interface.py @@ -1,11 +1,19 @@ import os import uuid +import itertools from sqlalchemy import create_engine, inspect, or_, text from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import scoped_session, sessionmaker -from .models import HarvestError, HarvestJob, HarvestRecord, HarvestSource, Organization +from .models import ( + HarvestJobError, + HarvestRecordError, + HarvestJob, + HarvestRecord, + HarvestSource, + Organization, +) DATABASE_URI = os.getenv("DATABASE_URI") @@ -142,8 +150,7 @@ def add_harvest_job(self, job_data): return None def get_harvest_job(self, job_id): - result = self.db.query(HarvestJob).filter_by(id=job_id).first() - return HarvesterDBInterface._to_dict(result) + return self.db.query(HarvestJob).filter_by(id=job_id).first() def get_harvest_jobs_by_filter(self, filter): harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all() @@ -187,9 +194,14 @@ def delete_harvest_job(self, job_id): self.db.commit() return "Harvest job deleted successfully" - def add_harvest_error(self, error_data): + def add_harvest_error(self, error_data: dict, error_type: str): try: - new_error = HarvestError(**error_data) + if error_type is None: + return "Must indicate what type of error to add" + if error_type == "job": + new_error = HarvestJobError(**error_data) + else: + new_error = HarvestRecordError(**error_data) self.db.add(new_error) self.db.commit() self.db.refresh(new_error) @@ -199,19 +211,36 @@ def add_harvest_error(self, error_data): self.db.rollback() return None - def get_harvest_error(self, error_id): - result = self.db.query(HarvestError).filter_by(id=error_id).first() - return HarvesterDBInterface._to_dict(result) + def get_all_errors_of_job(self, job_id: str) -> [[dict], [dict]]: + job = self.get_harvest_job(job_id) + job_errors = list(map(HarvesterDBInterface._to_dict, job.errors)) + + # itertools.chain flattens a list of lists into a 1 dimensional list + record_errors = itertools.chain(*[record.errors for record in job.records]) + record_errors = list(map(HarvesterDBInterface._to_dict, record_errors)) - def get_harvest_errors_by_job(self, job_id: str) -> list: - harvest_errors = self.db.query(HarvestError).filter_by(harvest_job_id=job_id) - return [HarvesterDBInterface._to_dict(err) for err in harvest_errors] + return [job_errors, record_errors] + + def get_harvest_job_error(self, job_id: str) -> dict: + result = self.db.query(HarvestJobError).filter_by(harvest_job_id=job_id).first() + return HarvesterDBInterface._to_dict(result) - def get_harvest_errors_by_record_id(self, record_id: str) -> list: - harvest_errors = self.db.query(HarvestError).filter_by( + def get_harvest_record_errors(self, record_id: str): + # TODO: paginate this + errors = self.db.query(HarvestRecordError).filter_by( harvest_record_id=record_id ) - return [HarvesterDBInterface._to_dict(err) for err in harvest_errors] + return [HarvesterDBInterface._to_dict(err) for err in errors] + + def get_harvest_record_error(self, error_id: str) -> dict: + error = ( + self.db.query(HarvestRecordError) + .filter_by( + id=error_id, + ) + .first() + ) + return HarvesterDBInterface._to_dict(error) def add_harvest_record(self, record_data): try: @@ -323,10 +352,3 @@ def get_all_harvest_records(self): HarvesterDBInterface._to_dict(err) for err in harvest_records ] return harvest_records_data - - def get_all_harvest_errors(self): - harvest_errors = self.db.query(HarvestError).all() - harvest_errors_data = [ - HarvesterDBInterface._to_dict(err) for err in harvest_errors - ] - return harvest_errors_data diff --git a/database/models.py b/database/models.py index a3611b85..38510357 100644 --- a/database/models.py +++ b/database/models.py @@ -1,17 +1,26 @@ import uuid from flask_sqlalchemy import SQLAlchemy -from sqlalchemy import Enum, func +from sqlalchemy import Enum, func, String, Column +from sqlalchemy.orm import DeclarativeBase -db = SQLAlchemy() - -class Base(db.Model): +class Base(DeclarativeBase): __abstract__ = True # Indicates that this class should not be created as a table - id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + + +db = SQLAlchemy(model_class=Base) -class Organization(Base): +class Error(db.Model): + __abstract__ = True + date_created = db.Column(db.DateTime, default=func.now()) + type = db.Column(db.String) + message = db.Column(db.String) + + +class Organization(db.Model): __tablename__ = "organization" name = db.Column(db.String, nullable=False, index=True) @@ -21,7 +30,7 @@ class Organization(Base): ) -class HarvestSource(Base): +class HarvestSource(db.Model): __tablename__ = "harvest_source" name = db.Column(db.String, nullable=False) @@ -40,7 +49,7 @@ class HarvestSource(Base): ) -class HarvestJob(Base): +class HarvestJob(db.Model): __tablename__ = "harvest_job" harvest_source_id = db.Column( @@ -50,8 +59,9 @@ class HarvestJob(Base): Enum( "in_progress", "complete", - "pending", - "pending_manual", + "new", + "manual", + "error", name="job_status", ), nullable=False, @@ -65,43 +75,46 @@ class HarvestJob(Base): records_errored = db.Column(db.Integer) records_ignored = db.Column(db.Integer) errors = db.relationship( - "HarvestError", backref="job", cascade="all, delete-orphan", lazy=True + "HarvestJobError", backref="job", cascade="all, delete-orphan", lazy=True ) + records = db.relationship("HarvestRecord", backref="job", lazy=True) -class HarvestError(Base): - __tablename__ = "harvest_error" +class HarvestRecord(db.Model): + __tablename__ = "harvest_record" + identifier = db.Column(db.String, nullable=False) harvest_job_id = db.Column( db.String(36), db.ForeignKey("harvest_job.id"), nullable=False ) - harvest_record_id = db.Column(db.String, db.ForeignKey("harvest_record.id")) - date_created = db.Column(db.DateTime, default=func.now()) - type = db.Column(db.String) - severity = db.Column( - Enum("CRITICAL", "ERROR", "WARN", name="error_serverity"), - nullable=False, - index=True, + harvest_source_id = db.Column( + db.String(36), db.ForeignKey("harvest_source.id"), nullable=False + ) + source_hash = db.Column(db.String) + source_raw = db.Column(db.String) + date_created = db.Column(db.DateTime, index=True, default=func.now()) + date_finished = db.Column(db.DateTime, index=True) + ckan_id = db.Column(db.String, index=True) + action = db.Column( + Enum("create", "update", "delete", name="record_action"), index=True + ) + status = db.Column(Enum("error", "success", name="record_status"), index=True) + errors = db.relationship( + "HarvestRecordError", backref="record", cascade="all, delete-orphan", lazy=True ) - message = db.Column(db.String) - reference = db.Column(db.String) -class HarvestRecord(Base): - __tablename__ = "harvest_record" +class HarvestJobError(Error): + __tablename__ = "harvest_job_error" - identifier = db.Column(db.String) harvest_job_id = db.Column( - db.String(36), db.ForeignKey("harvest_job.id"), nullable=True + db.String(36), db.ForeignKey("harvest_job.id"), nullable=False ) - harvest_source_id = db.Column( - db.String(36), db.ForeignKey("harvest_source.id"), nullable=True + + +class HarvestRecordError(Error): + __tablename__ = "harvest_record_error" + + harvest_record_id = db.Column( + db.String, db.ForeignKey("harvest_record.id"), nullable=False ) - source_hash = db.Column(db.String) - source_raw = db.Column(db.String) - date_created = db.Column(db.DateTime, index=True, default=func.now()) - date_finished = db.Column(db.DateTime) - ckan_id = db.Column(db.String, index=True) - type = db.Column(db.String) - action = db.Column(Enum("create", "update", "delete", name="record_action")) - status = db.Column(Enum("error", "success", name="record_status")) diff --git a/harvester/exceptions.py b/harvester/exceptions.py index 7d610842..5d855443 100644 --- a/harvester/exceptions.py +++ b/harvester/exceptions.py @@ -11,7 +11,6 @@ def __init__(self, msg, harvest_job_id): self.msg = msg self.harvest_job_id = harvest_job_id - self.severity = "CRITICAL" self.db_interface: HarvesterDBInterface = db_interface self.logger = logging.getLogger("harvest_runner") @@ -19,12 +18,11 @@ def __init__(self, msg, harvest_job_id): error_data = { "harvest_job_id": self.harvest_job_id, "message": self.msg, - "severity": self.severity, "type": self.__class__.__name__, "date_created": datetime.now(timezone.utc), } - self.db_interface.add_harvest_error(error_data) + self.db_interface.add_harvest_error(error_data, "job") self.logger.critical(self.msg, exc_info=True) @@ -47,22 +45,19 @@ def __init__(self, msg, harvest_job_id, record_id): self.msg = msg self.harvest_job_id = harvest_job_id - self.severity = "ERROR" self.harvest_record_id = record_id self.db_interface: HarvesterDBInterface = db_interface self.logger = logging.getLogger("harvest_runner") error_data = { - "harvest_job_id": self.harvest_job_id, "message": self.msg, - "severity": self.severity, "type": self.__class__.__name__, "date_created": datetime.now(timezone.utc), "harvest_record_id": record_id, # to-do } - self.db_interface.add_harvest_error(error_data) + self.db_interface.add_harvest_error(error_data, "record") self.db_interface.update_harvest_record(record_id, {"status": "error"}) self.logger.error(self.msg, exc_info=True) diff --git a/harvester/harvest.py b/harvester/harvest.py index dd445843..ffd20a96 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -231,7 +231,6 @@ def write_compare_to_db(self) -> dict: "harvest_source_id": record.harvest_source.id, "source_hash": record.metadata_hash, "source_raw": str(record.metadata), - "type": self.source_type, "action": action, } ) diff --git a/pyproject.toml b/pyproject.toml index e4de1050..b37aa892 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datagov-harvesting-logic" -version = "0.4.6" +version = "0.5.0" description = "" # authors = [ # {name = "Jin Sun", email = "jin.sun@gsa.gov"}, diff --git a/tests/conftest.py b/tests/conftest.py index ceab064d..56675f79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -145,7 +145,16 @@ def source_data_dcatus_invalid(organization_data: dict) -> dict: def job_data_dcatus(source_data_dcatus: dict) -> dict: return { "id": "6bce761c-7a39-41c1-ac73-94234c139c76", - "status": "pending", + "status": "new", + "harvest_source_id": source_data_dcatus["id"], + } + + +@pytest.fixture +def job_data_dcatus_2(source_data_dcatus: dict) -> dict: + return { + "id": "392ac4b3-79a6-414b-a2b3-d6c607d3b8d4", + "status": "new", "harvest_source_id": source_data_dcatus["id"], } @@ -154,16 +163,25 @@ def job_data_dcatus(source_data_dcatus: dict) -> dict: def job_data_waf(source_data_waf: dict) -> dict: return { "id": "963cdc51-94d5-425d-a688-e0a57e0c5dd2", - "status": "pending", + "status": "new", "harvest_source_id": source_data_waf["id"], } +@pytest.fixture +def job_error_data(job_data_dcatus) -> dict: + return { + "harvest_job_id": job_data_dcatus["id"], + "message": "error reading records from harvest database", + "type": "ExtractInternalException", + } + + @pytest.fixture def job_data_dcatus_invalid(source_data_dcatus_invalid: dict) -> dict: return { "id": "59df7ba5-102d-4ae3-abd6-01b7eb26a338", - "status": "pending", + "status": "new", "harvest_source_id": source_data_dcatus_invalid["id"], } @@ -171,6 +189,7 @@ def job_data_dcatus_invalid(source_data_dcatus_invalid: dict) -> dict: @pytest.fixture def record_data_dcatus(job_data_dcatus: dict) -> dict: return { + "id": "0779c855-df20-49c8-9108-66359d82b77c", "identifier": "test_identifier", "harvest_job_id": job_data_dcatus["id"], "harvest_source_id": job_data_dcatus["harvest_source_id"], @@ -180,6 +199,15 @@ def record_data_dcatus(job_data_dcatus: dict) -> dict: } +@pytest.fixture +def record_error_data(record_data_dcatus) -> dict: + return { + "harvest_record_id": record_data_dcatus["id"], + "message": "record is invalid", + "type": "ValidationException", + } + + @pytest.fixture def source_data_dcatus_bad_url(organization_data: dict) -> dict: return { @@ -199,7 +227,7 @@ def source_data_dcatus_bad_url(organization_data: dict) -> dict: def job_data_dcatus_bad_url(source_data_dcatus_bad_url: dict) -> dict: return { "id": "707aee7b-bf72-4e07-a5fc-68980765b214", - "status": "pending", + "status": "new", "harvest_source_id": source_data_dcatus_bad_url["id"], } @@ -234,7 +262,7 @@ def source_data_dcatus_invalid_records_job( def interface_with_multiple_jobs( interface, organization_data, source_data_dcatus, source_data_waf ): - statuses = ["pending", "pending_manual", "in_progress", "complete"] + statuses = ["new", "manual", "in_progress", "complete", "error"] source_ids = [source_data_dcatus["id"], source_data_waf["id"]] jobs = [ {"status": status, "harvest_source_id": source} @@ -252,7 +280,9 @@ def interface_with_multiple_jobs( @pytest.fixture -def latest_records(source_data_dcatus, source_data_dcatus_2): +def latest_records( + source_data_dcatus, source_data_dcatus_2, job_data_dcatus, job_data_dcatus_2 +): return [ { "identifier": "a", @@ -261,6 +291,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "a", @@ -269,6 +300,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "update", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "b", @@ -277,6 +309,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "b", @@ -285,6 +318,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "error", "action": "update", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "c", @@ -293,6 +327,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "d", @@ -301,6 +336,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "delete", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "d", @@ -309,6 +345,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "e", @@ -317,6 +354,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "e", @@ -325,6 +363,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "delete", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "e", @@ -333,6 +372,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus["id"], + "harvest_job_id": job_data_dcatus["id"], }, { "identifier": "f", @@ -341,6 +381,7 @@ def latest_records(source_data_dcatus, source_data_dcatus_2): "status": "success", "action": "create", "harvest_source_id": source_data_dcatus_2["id"], + "harvest_job_id": job_data_dcatus_2["id"], # }, ] diff --git a/tests/integration/cf/test_load_manager.py b/tests/integration/cf/test_load_manager.py index 5bebddc4..99778837 100644 --- a/tests/integration/cf/test_load_manager.py +++ b/tests/integration/cf/test_load_manager.py @@ -142,15 +142,15 @@ def test_load_manager_bails_on_incorrect_index( def test_sort_jobs(self): jobs = [ - {"status": "pending"}, - {"status": "pending"}, - {"status": "pending_manual"}, + {"status": "new"}, + {"status": "new"}, + {"status": "manual"}, ] sorted_jobs = sort_jobs(jobs) assert sorted_jobs == [ - {"status": "pending_manual"}, - {"status": "pending"}, - {"status": "pending"}, + {"status": "manual"}, + {"status": "new"}, + {"status": "new"}, ] diff --git a/tests/integration/database/test_db.py b/tests/integration/database/test_db.py index 25c91925..e3546117 100644 --- a/tests/integration/database/test_db.py +++ b/tests/integration/database/test_db.py @@ -1,5 +1,6 @@ import datetime +from database.models import HarvestJobError, HarvestRecordError from harvester.harvest import HarvestSource from harvester.utils import dataset_to_hash, sort_dataset @@ -101,6 +102,43 @@ def test_harvest_source_by_jobid( assert source.id == harvest_source["id"] + def test_add_harvest_job_error( + self, + interface, + organization_data, + source_data_dcatus, + job_data_dcatus, + job_error_data, + ): + interface.add_organization(organization_data) + interface.add_harvest_source(source_data_dcatus) + interface.add_harvest_job(job_data_dcatus) + + harvest_job_error = interface.add_harvest_error(job_error_data, "job") + assert isinstance(harvest_job_error, HarvestJobError) + assert harvest_job_error.message == job_error_data["message"] + + def test_get_all_errors_of_job( + self, + interface, + organization_data, + source_data_dcatus, + job_data_dcatus, + job_error_data, + ): + interface.add_organization(organization_data) + interface.add_harvest_source(source_data_dcatus) + interface.add_harvest_job(job_data_dcatus) + interface.add_harvest_error(job_error_data, "job") + + all_errors = interface.get_all_errors_of_job(job_data_dcatus["id"]) + + assert len(all_errors) == 2 + assert len(all_errors[0]) == 1 # job error + assert len(all_errors[1]) == 0 # record errors + assert all_errors[0][0]["type"] == "ExtractInternalException" + assert len(all_errors[1]) == 0 + def test_add_harvest_record( self, interface, @@ -118,6 +156,29 @@ def test_add_harvest_record( assert record.harvest_source_id == source.id assert record.harvest_job_id == harvest_job.id + def test_add_harvest_record_error( + self, + interface, + organization_data, + source_data_dcatus, + job_data_dcatus, + record_data_dcatus, + record_error_data, + ): + interface.add_organization(organization_data) + interface.add_harvest_source(source_data_dcatus) + interface.add_harvest_job(job_data_dcatus) + interface.add_harvest_record(record_data_dcatus) + + harvest_record_error = interface.add_harvest_error(record_error_data, "record") + assert isinstance(harvest_record_error, HarvestRecordError) + assert harvest_record_error.message == record_error_data["message"] + + harvest_record_error_from_db = interface.get_harvest_record_error( + harvest_record_error.id + ) + assert harvest_record_error.id == harvest_record_error_from_db["id"] + def test_add_harvest_records( self, interface, @@ -133,6 +194,7 @@ def test_add_harvest_records( records = [] for i in range(10): new_record = record_data_dcatus.copy() + del new_record["id"] new_record["identifier"] = f"test-identifier-{i}" records.append(new_record) @@ -170,22 +232,22 @@ def test_get_harvest_jobs_by_filter( self, source_data_dcatus, interface_with_multiple_jobs ): filters = { - "status": "pending", + "status": "new", "harvest_source_id": f"{source_data_dcatus['id']}", } filtered_list = interface_with_multiple_jobs.get_harvest_jobs_by_filter(filters) assert len(filtered_list) == 1 - assert filtered_list[0]["status"] == "pending" + assert filtered_list[0]["status"] == "new" assert filtered_list[0]["harvest_source_id"] == source_data_dcatus["id"] def test_filter_jobs_by_faceted_filter( self, source_data_dcatus, interface_with_multiple_jobs ): faceted_list = interface_with_multiple_jobs.get_harvest_jobs_by_faceted_filter( - "status", ["pending", "pending_manual"] + "status", ["new", "manual"] ) assert len(faceted_list) == 4 - assert len([x for x in faceted_list if x["status"] == "pending"]) == 2 + assert len([x for x in faceted_list if x["status"] == "new"]) == 2 assert ( len( [ @@ -204,6 +266,7 @@ def test_get_latest_harvest_records( source_data_dcatus, source_data_dcatus_2, job_data_dcatus, + job_data_dcatus_2, latest_records, ): interface.add_organization(organization_data) @@ -212,6 +275,7 @@ def test_get_latest_harvest_records( # `latest_records` fixture interface.add_harvest_source(source_data_dcatus_2) interface.add_harvest_job(job_data_dcatus) + interface.add_harvest_job(job_data_dcatus_2) interface.add_harvest_records(latest_records) latest_records = interface.get_latest_records_by_source( @@ -225,53 +289,49 @@ def test_get_latest_harvest_records( expected_records = [ { "identifier": "a", - "harvest_job_id": None, + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", "source_hash": None, "source_raw": "data_1", "date_created": datetime.datetime(2024, 3, 1, 0, 0, 0, 1000), "date_finished": None, "ckan_id": None, - "type": None, "action": "update", "status": "success", }, { "identifier": "b", - "harvest_job_id": None, + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", "source_hash": None, "source_raw": "data_10", "date_created": datetime.datetime(2024, 3, 1, 0, 0, 0, 1000), "date_finished": None, "ckan_id": None, - "type": None, "action": "create", "status": "success", }, { "identifier": "c", - "harvest_job_id": None, + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", "source_hash": None, "source_raw": "data_12", "date_created": datetime.datetime(2024, 5, 1, 0, 0, 0, 1000), "date_finished": None, "ckan_id": None, - "type": None, "action": "create", "status": "success", }, { "identifier": "e", - "harvest_job_id": None, + "harvest_job_id": "6bce761c-7a39-41c1-ac73-94234c139c76", "harvest_source_id": "2f2652de-91df-4c63-8b53-bfced20b276b", "source_hash": None, "source_raw": "data_123", "date_created": datetime.datetime(2024, 4, 3, 0, 0, 0, 1000), "date_finished": None, "ckan_id": None, - "type": None, "action": "create", "status": "success", }, @@ -296,17 +356,19 @@ def test_write_compare_to_db( interface.add_harvest_job(job_data_dcatus) # prefill with records + records = [] for record in internal_compare_data["records"]: - data = { - "identifier": record["identifier"], - "harvest_job_id": job_data_dcatus["id"], - "harvest_source_id": job_data_dcatus["harvest_source_id"], - "source_hash": dataset_to_hash(sort_dataset(record)), - "source_raw": str(record), - "type": "dcatus", - } - - interface.add_harvest_record(data) + records.append( + { + "identifier": record["identifier"], + "harvest_job_id": job_data_dcatus["id"], + "harvest_source_id": job_data_dcatus["harvest_source_id"], + "source_hash": dataset_to_hash(sort_dataset(record)), + "source_raw": str(record), + } + ) + + interface.add_harvest_records(records) harvest_source = HarvestSource(job_data_dcatus["id"]) harvest_source.get_record_changes() diff --git a/tests/integration/exception/test_exception_handling.py b/tests/integration/exception/test_exception_handling.py index edb13435..e6e42613 100644 --- a/tests/integration/exception/test_exception_handling.py +++ b/tests/integration/exception/test_exception_handling.py @@ -62,7 +62,7 @@ def test_delete_exception( single_internal_record["identifier"] ] ) - interface_errors = interface.get_harvest_errors_by_record_id( + interface_errors = interface.get_harvest_record_errors( harvest_source.internal_records_lookup_table[ single_internal_record["identifier"] ] @@ -96,7 +96,7 @@ def test_validation_exception( interface_record = interface.get_harvest_record( harvest_source.internal_records_lookup_table[test_record.identifier] ) - interface_errors = interface.get_harvest_errors_by_record_id( + interface_errors = interface.get_harvest_record_errors( harvest_source.internal_records_lookup_table[test_record.identifier] ) assert ( @@ -129,7 +129,7 @@ def test_dcatus_to_ckan_exception( interface_record = interface.get_harvest_record( harvest_source.internal_records_lookup_table[test_record.identifier] ) - interface_errors = interface.get_harvest_errors_by_record_id( + interface_errors = interface.get_harvest_record_errors( harvest_source.internal_records_lookup_table[test_record.identifier] ) @@ -165,7 +165,7 @@ def test_ckan_sync_exception( harvest_source.internal_records_lookup_table[test_record.identifier] ) - interface_errors = interface.get_harvest_errors_by_record_id( + interface_errors = interface.get_harvest_record_errors( harvest_source.internal_records_lookup_table[test_record.identifier] )