Skip to content

Commit

Permalink
HDXDSYS-985 HNO is now one dataset with potentially multiple resources (
Browse files Browse the repository at this point in the history
#148)

* HNO is now one dataset with potentially multiple resources

* Use only first resource (assumes that is latest)
  • Loading branch information
mcarans authored Aug 21, 2024
1 parent be6583e commit ef165a7
Show file tree
Hide file tree
Showing 23 changed files with 135 additions and 109 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.9.47] - 2024-08-20

### Fixed

- Read HNO data from global annual dataset with multiple resources

## [0.9.46] - 2024-08-19

### Fixed
Expand Down
10 changes: 5 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ attrs==24.2.0
# jsonlines
# jsonschema
# referencing
cachetools==5.4.0
cachetools==5.5.0
# via google-auth
certifi==2024.7.4
# via requests
Expand Down Expand Up @@ -48,7 +48,7 @@ filelock==3.15.4
# via virtualenv
frictionless==5.17.0
# via hdx-python-utilities
google-auth==2.33.0
google-auth==2.34.0
# via
# google-auth-oauthlib
# gspread
Expand Down Expand Up @@ -242,13 +242,13 @@ ruamel-yaml==0.18.6
# via hdx-python-utilities
ruamel-yaml-clib==0.2.8
# via ruamel-yaml
setuptools==72.2.0
setuptools==73.0.1
# via ckanapi
shellingham==1.5.4
# via typer
simpleeval==0.9.13
# via frictionless
simplejson==3.19.2
simplejson==3.19.3
# via ckanapi
six==1.16.0
# via
Expand Down Expand Up @@ -276,7 +276,7 @@ text-unidecode==1.3
# via python-slugify
typeguard==4.3.0
# via inflect
typer==0.12.3
typer==0.12.4
# via frictionless
typing-extensions==4.12.2
# via
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/admins.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
self.admin1_data = {}
self.admin2_data = {}

def populate(self):
def populate(self) -> None:
logger.info("Populating admin1 table")
self._update_admin_table(
desired_admin_level="1",
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/conflict_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
self._results = results
self._config = config

def populate(self):
def populate(self) -> None:
logger.info("Populating conflict event table")
errors = set()
for dataset in self._results.values():
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/currency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
super().__init__(session)
self._configuration = configuration

def populate(self):
def populate(self) -> None:
logger.info("Populating currencies table")
reader = Read.get_reader("wfp_token")
bearer_json = reader.download_json(
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/food_price.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
self._commodity = commodity
self._market = market

def populate(self):
def populate(self) -> None:
logger.info("Populating WFP price table")
reader = Read.get_reader("hdx")
headers, country_iterator = reader.read(datasetinfo=self._datasetinfo)
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/food_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
self._admins = admins
self._results = results

def populate(self):
def populate(self) -> None:
logger.info("Populating food security table")
for dataset in self._results.values():
for admin_level, admin_results in dataset["results"].items():
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/funding.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
self._locations = locations
self._results = results

def populate(self):
def populate(self) -> None:
logger.info("Populating funding table")
errors = set()
for dataset in self._results.values():
Expand Down
149 changes: 74 additions & 75 deletions src/hapi/pipelines/database/humanitarian_needs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Functions specific to the humanitarian needs theme."""

from datetime import datetime
from logging import getLogger

from hapi_schema.db_humanitarian_needs import DBHumanitarianNeeds
Expand Down Expand Up @@ -58,86 +59,84 @@ def get_admin2_ref(self, row, dataset_name, errors):
)
return ref

def populate(self):
def populate(self) -> None:
logger.info("Populating humanitarian needs table")
reader = Read.get_reader("hdx")
datasets = reader.search_datasets(
filename="Global HPC HNO*",
fq="name:global-hpc-hno-*",
configuration=self._configuration,
)
warnings = set()
errors = set()
for dataset in datasets:
negative_values = []
rounded_values = []
dataset_name = dataset["name"]
self._metadata.add_dataset(dataset)
time_period = dataset.get_time_period()
time_period_start = time_period["startdate_str"]
time_period_end = time_period["enddate_str"]
resource = dataset.get_resource()
resource_id = resource["id"]
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
# Admin 1 PCode,Admin 2 PCode,Sector,Gender,Age Group,Disabled,Population Group,Population,In Need,Targeted,Affected,Reached
for row in rows:
admin2_ref = self.get_admin2_ref(row, dataset_name, errors)
if not admin2_ref:
continue
population_group = row["Population Group"]
if population_group == "ALL":
population_group = "all"
sector = row["Sector"]
sector_code = self._sector.get_sector_code(sector)
if not sector_code:
add_missing_value_message(
errors, dataset_name, "sector", sector
)
continue
gender = row["Gender"]
if gender == "a":
gender = "all"
age_range = row["Age Range"]
min_age = row["Min Age"]
max_age = row["Max Age"]
disabled_marker = row["Disabled"]
if disabled_marker == "a":
disabled_marker = "all"
reader = Read.get_reader("hdx")
dataset = reader.read_dataset("global-hpc-hno", self._configuration)
self._metadata.add_dataset(dataset)
dataset_id = dataset["id"]
dataset_name = dataset["name"]
resource = dataset.get_resource() # assumes first resource is latest!
self._metadata.add_resource(dataset_id, resource)
negative_values = []
rounded_values = []
resource_id = resource["id"]
resource_name = resource["name"]
year = int(resource_name[-4:])
time_period_start = datetime(year, 1, 1)
time_period_end = datetime(year, 12, 31, 23, 59, 59)
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
# Admin 1 PCode,Admin 2 PCode,Sector,Gender,Age Group,Disabled,Population Group,Population,In Need,Targeted,Affected,Reached
for row in rows:
admin2_ref = self.get_admin2_ref(row, dataset_name, errors)
if not admin2_ref:
continue
population_group = row["Population Group"]
if population_group == "ALL":
population_group = "all"
sector = row["Sector"]
sector_code = self._sector.get_sector_code(sector)
if not sector_code:
add_missing_value_message(
errors, dataset_name, "sector", sector
)
continue
gender = row["Gender"]
if gender == "a":
gender = "all"
age_range = row["Age Range"]
min_age = row["Min Age"]
max_age = row["Max Age"]
disabled_marker = row["Disabled"]
if disabled_marker == "a":
disabled_marker = "all"

def create_row(in_col, population_status):
value = row[in_col]
if value is None:
return
value = get_numeric_if_possible(value)
if value < 0:
negative_values.append(str(value))
return
if isinstance(value, float):
rounded_values.append(str(value))
value = round(value)
humanitarian_needs_row = DBHumanitarianNeeds(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
gender=gender,
age_range=age_range,
min_age=min_age,
max_age=max_age,
sector_code=sector_code,
population_group=population_group,
population_status=population_status,
disabled_marker=disabled_marker,
population=value,
reference_period_start=time_period_start,
reference_period_end=time_period_end,
)
self._session.add(humanitarian_needs_row)
def create_row(in_col, population_status):
value = row[in_col]
if value is None:
return
value = get_numeric_if_possible(value)
if value < 0:
negative_values.append(str(value))
return
if isinstance(value, float):
rounded_values.append(str(value))
value = round(value)
humanitarian_needs_row = DBHumanitarianNeeds(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
gender=gender,
age_range=age_range,
min_age=min_age,
max_age=max_age,
sector_code=sector_code,
population_group=population_group,
population_status=population_status,
disabled_marker=disabled_marker,
population=value,
reference_period_start=time_period_start,
reference_period_end=time_period_end,
)
self._session.add(humanitarian_needs_row)

create_row("Population", "all")
create_row("Affected", "AFF")
create_row("In Need", "INN")
create_row("Targeted", "TGT")
create_row("Reached", "REA")
create_row("Population", "all")
create_row("Affected", "AFF")
create_row("In Need", "INN")
create_row("Targeted", "TGT")
create_row("Reached", "REA")

self._session.commit()
add_multi_valued_message(
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self.data = {}
self._datasetinfo = configuration["locations_hrp_gho"]

def populate(self):
def populate(self) -> None:
has_hrp, in_gho = self.read_hrp_gho_data()
for country in Country.countriesdata()["countries"].values():
code = country["#country+code+v_iso3"]
Expand Down
43 changes: 32 additions & 11 deletions src/hapi/pipelines/database/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from hapi_schema.db_dataset import DBDataset
from hapi_schema.db_resource import DBResource
from hdx.data.dataset import Dataset
from hdx.data.resource import Resource
from hdx.scraper.runner import Runner
from hdx.scraper.utilities.reader import Read
from sqlalchemy.orm import Session
Expand All @@ -15,13 +16,15 @@


class Metadata(BaseUploader):
def __init__(self, runner: Runner, session: Session, today: datetime):
def __init__(
self, runner: Runner, session: Session, today: datetime
) -> None:
super().__init__(session)
self.runner = runner
self.today = today
self.dataset_data = []

def populate(self):
def populate(self) -> None:
logger.info("Populating metadata")
datasets = self.runner.get_hapi_metadata()
for dataset_id, dataset in datasets.items():
Expand Down Expand Up @@ -59,9 +62,7 @@ def populate(self):
self._session.add(resource_row)
self._session.commit()

def add_hapi_metadata(
self, hapi_dataset_metadata: Dict, hapi_resource_metadata: Dict
):
def add_hapi_dataset_metadata(self, hapi_dataset_metadata: Dict) -> str:
dataset_id = hapi_dataset_metadata["hdx_id"]
dataset_row = DBDataset(
hdx_id=dataset_id,
Expand All @@ -71,27 +72,47 @@ def add_hapi_metadata(
hdx_provider_name=hapi_dataset_metadata["hdx_provider_name"],
)
self._session.add(dataset_row)

self.dataset_data.append(dataset_id)
return dataset_id

def add_hapi_resource_metadata(
self, dataset_id: str, hapi_resource_metadata: Dict
) -> None:
hapi_resource_metadata["dataset_hdx_id"] = dataset_id
hapi_resource_metadata["is_hxl"] = True
hapi_resource_metadata["hapi_updated_date"] = self.today

resource_row = DBResource(**hapi_resource_metadata)
self._session.add(resource_row)
self._session.commit()

self.dataset_data.append(dataset_id)
def add_hapi_metadata(
self, hapi_dataset_metadata: Dict, hapi_resource_metadata: Dict
) -> None:
dataset_id = self.add_hapi_dataset_metadata(hapi_dataset_metadata)
self.add_hapi_resource_metadata(dataset_id, hapi_resource_metadata)
self._session.commit()

def add_dataset(self, dataset: Dataset):
def get_hapi_dataset_metadata(self, dataset: Dataset) -> Dict:
time_period = dataset.get_time_period()
hapi_time_period = {
"time_period": {
"start": time_period["startdate"],
"end": time_period["enddate"],
}
}
hapi_dataset_metadata = Read.get_hapi_dataset_metadata(
dataset, hapi_time_period
)
return Read.get_hapi_dataset_metadata(dataset, hapi_time_period)

def add_dataset(self, dataset: Dataset) -> None:
hapi_dataset_metadata = self.get_hapi_dataset_metadata(dataset)
self.add_hapi_dataset_metadata(hapi_dataset_metadata)

def add_resource(self, dataset_id: str, resource: Resource) -> None:
hapi_resource_metadata = Read.get_hapi_resource_metadata(resource)
self.add_hapi_resource_metadata(dataset_id, hapi_resource_metadata)

def add_dataset_first_resource(self, dataset: Dataset) -> None:
hapi_dataset_metadata = self.get_hapi_dataset_metadata(dataset)
hapi_resource_metadata = Read.get_hapi_resource_metadata(
dataset.get_resource()
)
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/national_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self._locations = locations
self._results = results

def populate(self):
def populate(self) -> None:
logger.info("Populating national risk table")
for dataset in self._results.values():
time_period_start = dataset["time_period"]["start"]
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/operational_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def complete_org_info(
# * Org matching
self._org.add_or_match_org(org_info)

def populate(self):
def populate(self) -> None:
logger.info("Populating operational presence table")
operational_presence_rows = []
errors = set()
Expand Down
Loading

0 comments on commit ef165a7

Please sign in to comment.