Skip to content

Commit

Permalink
Merge pull request #168 from OCHA-DAP/HDXDSYS-843-add-dtm
Browse files Browse the repository at this point in the history
Hdxdsys 843 Add DTM data
  • Loading branch information
turnerm authored Sep 19, 2024
2 parents 8dd2e14 + 155a9aa commit 27eda92
Show file tree
Hide file tree
Showing 15 changed files with 447,575 additions and 2 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.0] - 2024-09-19

### Added

- IDP scraper

## [0.9.58] - 2024-09-18

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ classifiers = [
requires-python = ">=3.8"

dependencies = [
"hapi-schema>=0.8.15",
"hapi-schema>=0.8.17",
"hdx-python-api>= 6.3.4",
"hdx-python-country>= 3.7.8",
"hdx-python-database[postgresql]>= 1.3.1",
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ greenlet==3.1.0
# via sqlalchemy
gspread==6.1.2
# via hdx-python-scraper
hapi-schema==0.8.16
hapi-schema==0.8.17
# via hapi-pipelines (pyproject.toml)
hdx-python-api==6.3.4
# via
Expand Down
1 change: 1 addition & 0 deletions src/hapi/pipelines/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def main(
"core.yaml",
"food_security.yaml",
"funding.yaml",
"idps.yaml",
"national_risk.yaml",
"operational_presence.yaml",
"population.yaml",
Expand Down
22 changes: 22 additions & 0 deletions src/hapi/pipelines/app/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from hapi.pipelines.database.food_security import FoodSecurity
from hapi.pipelines.database.funding import Funding
from hapi.pipelines.database.humanitarian_needs import HumanitarianNeeds
from hapi.pipelines.database.idps import IDPs
from hapi.pipelines.database.locations import Locations
from hapi.pipelines.database.metadata import Metadata
from hapi.pipelines.database.national_risk import NationalRisk
Expand Down Expand Up @@ -174,6 +175,13 @@ def _create_configurable_scrapers(
_create_configurable_scrapers("national_risk", "national")
_create_configurable_scrapers("funding", "national")
_create_configurable_scrapers("refugees", "national")
_create_configurable_scrapers("idps", "national")
_create_configurable_scrapers(
"idps", "adminone", adminlevel=self.adminone
)
_create_configurable_scrapers(
"idps", "admintwo", adminlevel=self.admintwo
)
_create_configurable_scrapers("poverty_rate", "national")
_create_configurable_scrapers("conflict_event", "national")
_create_configurable_scrapers(
Expand Down Expand Up @@ -270,6 +278,19 @@ def output_refugees(self):
)
refugees.populate()

def output_idps(self):
if not self.themes_to_run or "idps" in self.themes_to_run:
results = self.runner.get_hapi_results(
self.configurable_scrapers["idps"]
)
idps = IDPs(
session=self.session,
metadata=self.metadata,
admins=self.admins,
results=results,
)
idps.populate()

def output_funding(self):
if not self.themes_to_run or "funding" in self.themes_to_run:
results = self.runner.get_hapi_results(
Expand Down Expand Up @@ -352,6 +373,7 @@ def output(self):
self.output_humanitarian_needs()
self.output_national_risk()
self.output_refugees()
self.output_idps()
self.output_funding()
self.output_poverty_rate()
self.output_conflict_event()
Expand Down
66 changes: 66 additions & 0 deletions src/hapi/pipelines/configs/idps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#National risk config file

idps_default:
scrapers_with_defaults:
- "dtm"
format: "csv"
use_hxl: True
admin_exact: True
input:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"
list:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"
output:
- "number_idps"
- "reporting_date"
- "round_number"
- "asessment_type"
- "operation"
output_hxl:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"

idps_national:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
prefilter: "#adm1+code is None"
admin:
- "#country+code"

idps_adminone:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
- "#adm2+code"
prefilter: "#adm1+code is not None and #adm2+code is None"
admin:
- "#country+code"
- "#adm1+code"

idps_admintwo:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
- "#adm2+code"
prefilter: "#adm1+code is not None and #adm2+code is not None"
admin:
- "#country+code"
- "#adm2+code"
99 changes: 99 additions & 0 deletions src/hapi/pipelines/database/idps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Functions specific to the refugees theme."""

from logging import getLogger
from typing import Dict

from hapi_schema.db_idps import DBIDPs
from sqlalchemy.orm import Session

from ..utilities.logging_helpers import add_message
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata

logger = getLogger(__name__)


class IDPs(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
results: Dict,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._results = results

def populate(self) -> None:
# TODO: This might be better suited to just work with the DTM resource
# directly as done with HNO, rather than using a configurable scraper
logger.info("Populating IDPs table")
errors = set()
# self._results is a dictionary where the keys are the HDX dataset ID and the
# values are a dictionary with keys containing HDX metadata plus a "results" key
# containing the results, stored in a dictionary with admin levels as keys.
# There is only one dataset for now in the results dictionary, take first value
# (popitem returns a tuple with (key, value) so take the value)
dataset = self._results.popitem()[1]
dataset_name = dataset["hdx_stub"]
for admin_level, admin_results in dataset["results"].items():
# admin_results contains the keys "headers", "values", and "hapi_resource_metadata".
# admin_results["values"] is a list of dictionaries of the format:
# [{AFG: [1, 2], BFA: [3, 4]}, {AFG: [A, B], BFA: [C, D]} etc
# So the way to get info from it is values[i_hdx_key][pcode][i] where
# i is just an iterator for the number of rows for that particular p-code
resource_id = admin_results["hapi_resource_metadata"]["hdx_id"]
hxl_tags = admin_results["headers"][1]
values = admin_results["values"]
admin_codes = values[0].keys()
for admin_code in admin_codes:
admin2_code = admins.get_admin2_code_based_on_level(
admin_code=admin_code, admin_level=admin_level
)
duplicate_rows = set()
for row in zip(
*[
values[hxl_tags.index(tag)][admin_code]
for tag in hxl_tags
]
):
# Keeping these defined outside of the row for now
# as we may need to check for duplicates in the future
admin2_ref = self._admins.admin2_data[admin2_code]
assessment_type = row[hxl_tags.index("#assessment+type")]
date_reported = row[hxl_tags.index("#date+reported")]
reporting_round = row[hxl_tags.index("#round+code")]
operation = row[hxl_tags.index("#operation+name")]
duplicate_row_check = (
admin2_ref,
assessment_type,
date_reported,
reporting_round,
operation,
)
if duplicate_row_check in duplicate_rows:
text = (
f"Duplicate row for admin code {admin2_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}, reporting round "
f"{reporting_round}"
)
add_message(errors, dataset_name, text)
continue
idps_row = DBIDPs(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
assessment_type=assessment_type,
reporting_round=reporting_round,
operation=operation,
population=row[hxl_tags.index("#affected+idps")],
reference_period_start=date_reported,
reference_period_end=date_reported,
)
self._session.add(idps_row)
duplicate_rows.add(duplicate_row_check)
self._session.commit()
for error in sorted(errors):
logger.error(error)
Loading

0 comments on commit 27eda92

Please sign in to comment.