Skip to content

Commit

Permalink
NO-REF: Refactor NYPL Process (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 authored Nov 6, 2024
1 parent b5381ce commit 454b342
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 414 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests-unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ name: ETL Pipeline Tests (Pull Request)

on:
pull_request:
types: [opened]
types: [opened, synchronize]
paths:
- 'api/**'
- 'managers/**'
- 'mappings/**'
- 'model/**'
- 'processes/**'
- 'services/**'
- 'tests/**'
- 'utils/**'
- 'dev-requirements.txt'
Expand Down
151 changes: 17 additions & 134 deletions processes/ingest/nypl.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
from datetime import datetime, timedelta, timezone
import os
import requests

from ..core import CoreProcess
from logger import createLog
from managers.db import DBManager
from managers.nyplApi import NyplApiManager
from mappings.nypl import NYPLMapping
from sqlalchemy import text
from services import NYPLBibService

logger = createLog(__name__)

Expand All @@ -16,145 +9,35 @@ class NYPLProcess(CoreProcess):
def __init__(self, *args):
super(NYPLProcess, self).__init__(*args[:4])

self.ingest_limit = args[4] or None
self.ingest_offset = args[5] or None

self.generateEngine()
self.createSession()

self.bib_db_connection = DBManager(
user=os.environ['NYPL_BIB_USER'],
pswd=os.environ['NYPL_BIB_PSWD'],
host=os.environ['NYPL_BIB_HOST'],
port=os.environ['NYPL_BIB_PORT'],
db=os.environ['NYPL_BIB_NAME']
)
self.bib_db_connection.generateEngine()
self.limit = (len(args) >= 5 and args[4]) or None
self.offset = (len(args) >= 6 and args[5]) or None

self.nypl_api_manager = NyplApiManager()
self.nypl_api_manager.generateAccessToken()

self.location_codes = self.load_location_codes()
self.cce_api = os.environ['BARDO_CCE_API']
self.nypl_bib_service = NYPLBibService()

def runProcess(self):
try:
self.generateEngine()
self.createSession()

if self.process == 'daily':
self.import_bib_records()
records = self.nypl_bib_service.get_records(offset=self.offset, limit=self.limit)
elif self.process == 'complete':
self.import_bib_records(full_or_partial=True)
records = self.nypl_bib_service.get_records(full_import=True)
elif self.process == 'custom':
self.import_bib_records(start_timestamp=self.ingestPeriod)
records = self.nypl_bib_service.get_records(start_timestamp=self.ingestPeriod, offset=self.offset, limit=self.limit)
else:
logger.warning(f'Unknown NYPL ingestion process type {self.process}')
return

for record in records:
self.addDCDWToUpdateList(record)

self.saveRecords()
self.commitChanges()

logger.info(f'Ingested {len(self.records)} NYPL records')
except Exception:
except Exception as e:
logger.exception(f'Failed to ingest NYPL records')

def import_bib_records(self, full_or_partial=False, start_timestamp=None):
nypl_bib_query = 'SELECT id, nypl_source, publish_year, var_fields FROM bib'

if full_or_partial is False:
nypl_bib_query += ' WHERE updated_date > '
if start_timestamp:
nypl_bib_query += "'{}'".format(start_timestamp)
else:
startDateTime = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=24)
nypl_bib_query += "'{}'".format(startDateTime.strftime('%Y-%m-%dT%H:%M:%S%z'))

if self.ingest_offset:
nypl_bib_query += ' OFFSET {}'.format(self.ingest_offset)

if self.ingest_limit:
nypl_bib_query += ' LIMIT {}'.format(self.ingest_limit)

with self.bib_db_connection.engine.connect() as conn:
bib_results = conn.execution_options(stream_results=True).execute(text(nypl_bib_query))

for bib in bib_results:
bib = self._map_bib(bib)

if bib['var_fields'] is None:
continue

self.parse_nypl_bib(bib)

def parse_nypl_bib(self, bib):
try:
if self.is_pd_research_bib(dict(bib)):
bib_items = self.fetch_bib_items(dict(bib))

nypl_record = NYPLMapping(bib, bib_items, self.statics, self.location_codes)
nypl_record.applyMapping()

self.addDCDWToUpdateList(nypl_record)
except Exception:
logger.exception('Failed to parse NYPL bib {}'.format(bib.get('id')))

def fetch_bib_items(self, bib):
bib_endpoint = 'bibs/{}/{}/items'.format(bib['nypl_source'], bib['id'])

return self.nypl_api_manager.queryApi(bib_endpoint).get('data', [])

def _map_bib(self, bib):
try:
id, nypl_source, publish_year, var_fields = bib

return {
'id': id,
'nypl_source': nypl_source,
'publish_year': publish_year,
'var_fields': var_fields
}
except:
return bib

def load_location_codes(self):
return requests.get(os.environ['NYPL_LOCATIONS_BY_CODE']).json()

def is_pd_research_bib(self, bib):
current_year = datetime.today().year

try:
pub_year = int(bib['publish_year'])
except Exception:
pub_year = current_year

if pub_year > 1965:
return False
elif pub_year > current_year - 95:
copyright_status = self.get_copyright_status(bib['var_fields'])

if not copyright_status:
return False

bib_status = self.nypl_api_manager.queryApi('bibs/{}/{}/is-research'.format(bib['nypl_source'], bib['id']))

return bib_status.get('isResearch', False) is True

def get_copyright_status(self, var_fields):
lccn_data = list(filter(lambda field: field.get('marcTag', None) == '010', var_fields))

if not len(lccn_data) == 1:
return False

lccn_no = lccn_data[0]['subfields'][0]['content'].replace('sn', '').strip()

copyright_url = f'{self.cce_api}/lccn/{lccn_no}'

copyright_response = requests.get(copyright_url)

if copyright_response.status_code != 200:
return False

copyright_data = copyright_response.json()

if len(copyright_data['data']['results']) > 0:
return False if len(copyright_data['data']['results'][0]['renewals']) > 0 else True

return False
raise e
finally:
self.close_connection()
1 change: 1 addition & 0 deletions services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .sources.nypl_bib_service import NYPLBibService
140 changes: 140 additions & 0 deletions services/sources/nypl_bib_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from datetime import datetime, timedelta, timezone
import os
import requests
from typing import Optional

from logger import createLog
from managers.db import DBManager
from static.manager import StaticManager
from managers.nyplApi import NyplApiManager
from mappings.nypl import NYPLMapping
from .source_service import SourceService
from sqlalchemy import text


logger = createLog(__name__)


class NYPLBibService(SourceService):
def __init__(self):
self.bib_db_connection = DBManager(
user=os.environ['NYPL_BIB_USER'],
pswd=os.environ['NYPL_BIB_PSWD'],
host=os.environ['NYPL_BIB_HOST'],
port=os.environ['NYPL_BIB_PORT'],
db=os.environ['NYPL_BIB_NAME']
)
self.bib_db_connection.generateEngine()

self.nypl_api_manager = NyplApiManager()
self.nypl_api_manager.generateAccessToken()

self.location_codes = self.load_location_codes()
self.cce_api = os.environ['BARDO_CCE_API']

self.static_manager = StaticManager()

def get_records(
self,
full_import: bool=False,
start_timestamp: datetime=None,
offset: Optional[int]=None,
limit: Optional[int]=None
) -> list[NYPLMapping]:
records = []
nypl_bib_query = 'SELECT * FROM bib WHERE publish_year <= 1965'

if not full_import:
nypl_bib_query += ' and updated_date > '

if start_timestamp:
nypl_bib_query += "'{}'".format(start_timestamp)
else:
start_date_time = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=24)
nypl_bib_query += "'{}'".format(start_date_time.strftime('%Y-%m-%dT%H:%M:%S%z'))

if offset:
nypl_bib_query += ' OFFSET {}'.format(offset)

if limit:
nypl_bib_query += ' LIMIT {}'.format(limit)

with self.bib_db_connection.engine.connect() as db_connection:
bib_results = db_connection.execution_options(stream_results=True).execute(text(nypl_bib_query))
bib_result_mappings = [bib_result_mapping for bib_result_mapping in bib_results.mappings()]

for bib_result_mapping in bib_result_mappings:
if bib_result_mapping['var_fields'] is None:
continue

nypl_bib_record = self.parse_nypl_bib(bib_result_mapping)

if nypl_bib_record:
records.append(nypl_bib_record)

return records

def parse_nypl_bib(self, bib) -> Optional[NYPLMapping]:
try:
if self.is_pd_research_bib(dict(bib)):
bib_items = self.fetch_bib_items(dict(bib))

nypl_record = NYPLMapping(bib, bib_items, self.static_manager.statics, self.location_codes)
nypl_record.applyMapping()

return nypl_record

return None
except Exception:
logger.exception('Failed to parse NYPL bib {}'.format(bib.get('id')))
return None

def fetch_bib_items(self, bib):
bib_endpoint = 'bibs/{}/{}/items'.format(bib['nypl_source'], bib['id'])

return self.nypl_api_manager.queryApi(bib_endpoint).get('data', [])

def load_location_codes(self):
return requests.get(os.environ['NYPL_LOCATIONS_BY_CODE']).json()

def is_pd_research_bib(self, bib):
current_year = datetime.today().year

try:
pub_year = int(bib['publish_year'])
except Exception:
pub_year = current_year

if pub_year > 1965:
return False
elif pub_year > current_year - 95:
copyright_status = self.get_copyright_status(bib['var_fields'])

if not copyright_status:
return False

bib_status = self.nypl_api_manager.queryApi('bibs/{}/{}/is-research'.format(bib['nypl_source'], bib['id']))

return bib_status.get('isResearch', False) is True

def get_copyright_status(self, var_fields):
lccn_data = list(filter(lambda field: field.get('marcTag', None) == '010', var_fields))

if not len(lccn_data) == 1:
return False

lccn_no = lccn_data[0]['subfields'][0]['content'].replace('sn', '').strip()

copyright_url = f'{self.cce_api}/lccn/{lccn_no}'

copyright_response = requests.get(copyright_url)

if copyright_response.status_code != 200:
return False

copyright_data = copyright_response.json()

if len(copyright_data['data']['results']) > 0:
return False if len(copyright_data['data']['results'][0]['renewals']) > 0 else True

return False
17 changes: 17 additions & 0 deletions services/sources/source_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional

from mappings.abstractMapping import AbstractMapping

class SourceService(ABC):

@abstractmethod
def get_records(
self,
full_import: bool=False,
start_timestamp: Optional[datetime]=None,
offset: Optional[int]=None,
limit: Optional[int]=None
) -> list[AbstractMapping]:
pass
Empty file.
Empty file.
19 changes: 19 additions & 0 deletions tests/integration/services/sources/test_nypl_bib_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import datetime, timezone, timedelta
import pytest

from load_env import load_env_file
from services import NYPLBibService

class TestNYPLBibService:
@pytest.fixture
def test_instance(self):
load_env_file('local', file_string='config/local.yaml')
return NYPLBibService()

def test_get_records(self, test_instance: NYPLBibService):
records = test_instance.get_records(
start_timestamp=datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=24),
limit=100
)

assert records is not None
Loading

0 comments on commit 454b342

Please sign in to comment.