Skip to content

Commit

Permalink
Various iNaturalist updates (#3846)
Browse files Browse the repository at this point in the history
* Use static Catalog of Life URL for iNaturalist

* Add media type when upserting data

This was broken before

* Add more logging

* Update iNaturalist schema

* Start with minimum photo ID and create range from there
  • Loading branch information
AetherUnbound authored Mar 8, 2024
1 parent 154109c commit bbea9e6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 43 deletions.
28 changes: 19 additions & 9 deletions catalog/dags/providers/provider_api_scripts/inaturalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"media_type": IMAGE,
}
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/tmp/"))
COL_URL = "https://download.checklistbank.org/col/latest_coldp.zip"


class INaturalistDataIngester(ProviderDataIngester):
Expand Down Expand Up @@ -86,16 +87,19 @@ def get_batches(
postgres_conn_id=POSTGRES_CONN_ID,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
max_id = pg.get_records("SELECT max(photo_id) FROM inaturalist.photos")[0][0]
if max_id is None:
min_id, max_id = pg.get_records(
"SELECT min(photo_id), max(photo_id) FROM inaturalist.photos"
)[0]
if min_id is None or max_id is None:
# This would only happen if there were no data loaded to inaturalist.photos
# yet, but just in case.
return
else:
# Return the list of batch starts and ends, which will be passed to op_args,
# which expects each arg to be a list. So, it's a list of lists, not a list
# of tuples.
return [[(x, x + batch_length - 1)] for x in range(0, max_id, batch_length)]
# Return the list of batch starts and ends, which will be passed to op_args,
# which expects each arg to be a list. So, it's a list of lists, not a list
# of tuples.
return [
[(x, x + batch_length - 1)] for x in range(min_id, max_id, batch_length)
]

@staticmethod
def load_transformed_data(
Expand Down Expand Up @@ -139,7 +143,10 @@ def load_transformed_data(
# TO DO: Would it be better to use loader.upsert_records here? Would need to
# trace back the parameters that need to be passed in for different stats.
upserted_records = sql.upsert_records_to_db_table(
postgres_conn_id=POSTGRES_CONN_ID, identifier=identifier, task=task
postgres_conn_id=POSTGRES_CONN_ID,
identifier=identifier,
task=task,
media_type=IMAGE,
)
logger.info(f"Upserted {upserted_records} records, from batch {batch_number}.")
# Truncate the temp table
Expand Down Expand Up @@ -217,7 +224,6 @@ def compare_update_dates(

@staticmethod
def load_catalog_of_life_names(task: PythonOperator, remove_api_files: bool):
COL_URL = "https://api.checklistbank.org/dataset/9840/export.zip?format=ColDP"
local_zip_file = "COL_archive.zip"
name_usage_file = "NameUsage.tsv"
vernacular_file = "VernacularName.tsv"
Expand All @@ -230,6 +236,10 @@ def load_catalog_of_life_names(task: PythonOperator, remove_api_files: bool):
# This is a static method so that it can be used to create preingestion
# tasks for airflow. Unfortunately, that means it does not have access to
# the delayed requester. So, we are just using requests for now.
logger.info(
f"Downloading Catalog of Life from "
f"{COL_URL} to {OUTPUT_DIR}/{local_zip_file}."
)
with requests.get(COL_URL, stream=True) as response:
response.raise_for_status()
with open(OUTPUT_DIR / local_zip_file, "wb") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,58 +54,80 @@ https://www.itis.gov/dwca_format.html which also has vernacular names / synonyms
DROP TABLE IF EXISTS inaturalist.col_vernacular;
COMMIT;

/* Table definition can be found here:
https://github.com/CatalogueOfLife/coldp/blob/master/README.md#vernacularname
*/
CREATE TABLE inaturalist.col_vernacular (
taxonID varchar(5),
sourceID decimal,
taxon_name varchar(2000),
transliteration text,
name_language varchar(3),
preferred boolean,
country varchar(3),
area varchar(2000),
sex decimal,
referenceID decimal
referenceID decimal,
remarks text
);
COMMIT;

DROP TABLE IF EXISTS inaturalist.col_name_usage;
COMMIT;

/* Table definition can be found here:
https://github.com/CatalogueOfLife/coldp/blob/master/README.md#nameusage
*/
CREATE TABLE inaturalist.col_name_usage (
ID varchar(50),
ID text,
alternativeID decimal,
nameAlternativeID decimal,
sourceID decimal,
parentID varchar(5),
basionymID varchar(5),
status varchar(22),
scientificName varchar(76),
authorship varchar(255),
rank varchar(21),
notho varchar(13),
uninomial varchar(50),
genericName varchar(50),
infragenericEpithet varchar(25),
specificEpithet varchar(50),
infraspecificEpithet varchar(50),
cultivarEpithet varchar(50),
namePhrase varchar(80),
nameReferenceID varchar(36),
parentID text,
basionymID text,
status text,
scientificName text,
authorship text,
rank text,
notho text,
originalSpelling boolean,
uninomial text,
genericName text,
infragenericEpithet text,
specificEpithet text,
infraspecificEpithet text,
cultivarEpithet text,
combinationAuthorship text,
combinationAuthorshipID text,
combinationExAuthorship text,
combinationExAuthorshipID text,
combinationAuthorshipYear text,
basionymAuthorship text,
basionymAuthorshipID text,
basionymExAuthorship text,
basionymExAuthorshipID text,
basionymAuthorshipYear text,
namePhrase text,
nameReferenceID text,
publishedInYear decimal,
publishedInPage varchar(255),
publishedInPageLink varchar(255),
code varchar(10),
nameStatus varchar(15),
accordingToID varchar(36),
publishedInPage text,
publishedInPageLink text,
gender text,
genderAgreement boolean,
etymology text,
code text,
nameStatus text,
accordingToID text,
accordingToPage decimal,
accordingToPageLink decimal,
referenceID text,
scrutinizer varchar(149),
scrutinizer text,
scrutinizerID decimal,
scrutinizerDate varchar(10),
scrutinizerDate text,
extinct boolean,
temporalRangeStart varchar(15),
temporalRangeEnd varchar(15),
environment varchar(38),
temporalRangeStart text,
temporalRangeEnd text,
environment text,
species decimal,
section decimal,
subgenus decimal,
Expand All @@ -124,7 +146,7 @@ CREATE TABLE inaturalist.col_name_usage (
kingdom decimal,
sequenceIndex decimal,
branchLength decimal,
link varchar(240),
link text,
nameRemarks decimal,
remarks text
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,18 @@ def test_consolidate_load_statistics(all_results, expected):


@pytest.mark.parametrize(
"batch_length, max_id, expected",
"batch_length, min_and_max, expected",
[
pytest.param(10, [(22,)], [[(0, 9)], [(10, 19)], [(20, 29)]], id="happy_path"),
pytest.param(10, [(2,)], [[(0, 9)]], id="bigger_batch_than_id"),
pytest.param(10, [(None,)], None, id="no_data"),
pytest.param(10, (0, 22), [[(0, 9)], [(10, 19)], [(20, 29)]], id="happy_path"),
pytest.param(10, (0, 2), [[(0, 9)]], id="bigger_batch_than_id"),
pytest.param(10, (None, None), None, id="no_data"),
pytest.param(10, (8, 22), [[(8, 17)], [(18, 27)]], id="min_not_zero"),
],
)
def test_get_batches(batch_length, max_id, expected):
def test_get_batches(batch_length, min_and_max, expected):
task = mock.Mock()
with mock.patch.object(PostgresHook, "get_execution_timeout", return_value=60):
with mock.patch.object(PostgresHook, "get_records", return_value=max_id):
with mock.patch.object(PostgresHook, "get_records", return_value=[min_and_max]):
actual = INAT.get_batches(batch_length, task)
assert actual == expected

Expand Down

0 comments on commit bbea9e6

Please sign in to comment.