diff --git a/catalog/dags/providers/provider_api_scripts/inaturalist.py b/catalog/dags/providers/provider_api_scripts/inaturalist.py index 81443cf2f6a..3a8c59920c7 100644 --- a/catalog/dags/providers/provider_api_scripts/inaturalist.py +++ b/catalog/dags/providers/provider_api_scripts/inaturalist.py @@ -49,6 +49,7 @@ "media_type": IMAGE, } OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/tmp/")) +COL_URL = "https://download.checklistbank.org/col/latest_coldp.zip" class INaturalistDataIngester(ProviderDataIngester): @@ -86,16 +87,19 @@ def get_batches( postgres_conn_id=POSTGRES_CONN_ID, default_statement_timeout=PostgresHook.get_execution_timeout(task), ) - max_id = pg.get_records("SELECT max(photo_id) FROM inaturalist.photos")[0][0] - if max_id is None: + min_id, max_id = pg.get_records( + "SELECT min(photo_id), max(photo_id) FROM inaturalist.photos" + )[0] + if min_id is None or max_id is None: # This would only happen if there were no data loaded to inaturalist.photos # yet, but just in case. return - else: - # Return the list of batch starts and ends, which will be passed to op_args, - # which expects each arg to be a list. So, it's a list of lists, not a list - # of tuples. - return [[(x, x + batch_length - 1)] for x in range(0, max_id, batch_length)] + # Return the list of batch starts and ends, which will be passed to op_args, + # which expects each arg to be a list. So, it's a list of lists, not a list + # of tuples. + return [ + [(x, x + batch_length - 1)] for x in range(min_id, max_id, batch_length) + ] @staticmethod def load_transformed_data( @@ -139,7 +143,10 @@ def load_transformed_data( # TO DO: Would it be better to use loader.upsert_records here? Would need to # trace back the parameters that need to be passed in for different stats. upserted_records = sql.upsert_records_to_db_table( - postgres_conn_id=POSTGRES_CONN_ID, identifier=identifier, task=task + postgres_conn_id=POSTGRES_CONN_ID, + identifier=identifier, + task=task, + media_type=IMAGE, ) logger.info(f"Upserted {upserted_records} records, from batch {batch_number}.") # Truncate the temp table @@ -217,7 +224,6 @@ def compare_update_dates( @staticmethod def load_catalog_of_life_names(task: PythonOperator, remove_api_files: bool): - COL_URL = "https://api.checklistbank.org/dataset/9840/export.zip?format=ColDP" local_zip_file = "COL_archive.zip" name_usage_file = "NameUsage.tsv" vernacular_file = "VernacularName.tsv" @@ -230,6 +236,10 @@ def load_catalog_of_life_names(task: PythonOperator, remove_api_files: bool): # This is a static method so that it can be used to create preingestion # tasks for airflow. Unfortunately, that means it does not have access to # the delayed requester. So, we are just using requests for now. + logger.info( + f"Downloading Catalog of Life from " + f"{COL_URL} to {OUTPUT_DIR}/{local_zip_file}." + ) with requests.get(COL_URL, stream=True) as response: response.raise_for_status() with open(OUTPUT_DIR / local_zip_file, "wb") as f: diff --git a/catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql b/catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql index 736e4b8453e..deedeaaafb9 100644 --- a/catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql +++ b/catalog/dags/providers/provider_csv_load_scripts/inaturalist/create_schema.sql @@ -54,58 +54,80 @@ https://www.itis.gov/dwca_format.html which also has vernacular names / synonyms DROP TABLE IF EXISTS inaturalist.col_vernacular; COMMIT; +/* Table definition can be found here: + https://github.com/CatalogueOfLife/coldp/blob/master/README.md#vernacularname +*/ CREATE TABLE inaturalist.col_vernacular ( taxonID varchar(5), sourceID decimal, taxon_name varchar(2000), transliteration text, name_language varchar(3), + preferred boolean, country varchar(3), area varchar(2000), sex decimal, - referenceID decimal + referenceID decimal, + remarks text ); COMMIT; DROP TABLE IF EXISTS inaturalist.col_name_usage; COMMIT; +/* Table definition can be found here: + https://github.com/CatalogueOfLife/coldp/blob/master/README.md#nameusage +*/ CREATE TABLE inaturalist.col_name_usage ( - ID varchar(50), + ID text, alternativeID decimal, nameAlternativeID decimal, sourceID decimal, - parentID varchar(5), - basionymID varchar(5), - status varchar(22), - scientificName varchar(76), - authorship varchar(255), - rank varchar(21), - notho varchar(13), - uninomial varchar(50), - genericName varchar(50), - infragenericEpithet varchar(25), - specificEpithet varchar(50), - infraspecificEpithet varchar(50), - cultivarEpithet varchar(50), - namePhrase varchar(80), - nameReferenceID varchar(36), + parentID text, + basionymID text, + status text, + scientificName text, + authorship text, + rank text, + notho text, + originalSpelling boolean, + uninomial text, + genericName text, + infragenericEpithet text, + specificEpithet text, + infraspecificEpithet text, + cultivarEpithet text, + combinationAuthorship text, + combinationAuthorshipID text, + combinationExAuthorship text, + combinationExAuthorshipID text, + combinationAuthorshipYear text, + basionymAuthorship text, + basionymAuthorshipID text, + basionymExAuthorship text, + basionymExAuthorshipID text, + basionymAuthorshipYear text, + namePhrase text, + nameReferenceID text, publishedInYear decimal, - publishedInPage varchar(255), - publishedInPageLink varchar(255), - code varchar(10), - nameStatus varchar(15), - accordingToID varchar(36), + publishedInPage text, + publishedInPageLink text, + gender text, + genderAgreement boolean, + etymology text, + code text, + nameStatus text, + accordingToID text, accordingToPage decimal, accordingToPageLink decimal, referenceID text, - scrutinizer varchar(149), + scrutinizer text, scrutinizerID decimal, - scrutinizerDate varchar(10), + scrutinizerDate text, extinct boolean, - temporalRangeStart varchar(15), - temporalRangeEnd varchar(15), - environment varchar(38), + temporalRangeStart text, + temporalRangeEnd text, + environment text, species decimal, section decimal, subgenus decimal, @@ -124,7 +146,7 @@ CREATE TABLE inaturalist.col_name_usage ( kingdom decimal, sequenceIndex decimal, branchLength decimal, - link varchar(240), + link text, nameRemarks decimal, remarks text ); diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_inaturalist.py b/catalog/tests/dags/providers/provider_api_scripts/test_inaturalist.py index 64682eb01f8..ce4eedc0cb3 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_inaturalist.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_inaturalist.py @@ -162,17 +162,18 @@ def test_consolidate_load_statistics(all_results, expected): @pytest.mark.parametrize( - "batch_length, max_id, expected", + "batch_length, min_and_max, expected", [ - pytest.param(10, [(22,)], [[(0, 9)], [(10, 19)], [(20, 29)]], id="happy_path"), - pytest.param(10, [(2,)], [[(0, 9)]], id="bigger_batch_than_id"), - pytest.param(10, [(None,)], None, id="no_data"), + pytest.param(10, (0, 22), [[(0, 9)], [(10, 19)], [(20, 29)]], id="happy_path"), + pytest.param(10, (0, 2), [[(0, 9)]], id="bigger_batch_than_id"), + pytest.param(10, (None, None), None, id="no_data"), + pytest.param(10, (8, 22), [[(8, 17)], [(18, 27)]], id="min_not_zero"), ], ) -def test_get_batches(batch_length, max_id, expected): +def test_get_batches(batch_length, min_and_max, expected): task = mock.Mock() with mock.patch.object(PostgresHook, "get_execution_timeout", return_value=60): - with mock.patch.object(PostgresHook, "get_records", return_value=max_id): + with mock.patch.object(PostgresHook, "get_records", return_value=[min_and_max]): actual = INAT.get_batches(batch_length, task) assert actual == expected