Skip to content

Commit

Permalink
refactoring rest of processes
Browse files Browse the repository at this point in the history
  • Loading branch information
kylevillegas93 committed Nov 25, 2024
1 parent 757bbe9 commit fa97155
Show file tree
Hide file tree
Showing 20 changed files with 135 additions and 292 deletions.
26 changes: 26 additions & 0 deletions managers/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
from typing import Optional

from model import Base
from logger import create_log
Expand Down Expand Up @@ -98,6 +99,31 @@ def deleteRecordsByQuery(self, query):
logger.error('Deadlock in database layer, retry batch')
logger.debug(oprErr)

def windowedQuery(self, table, query, ingest_limit: Optional[int], windowSize=100):
singleEntity = query.is_single_entity
query = query.add_column(table.date_modified).order_by(table.date_modified)
query = query.add_column(table.id).order_by(table.id)

lastID = None
totalFetched = 0

while True:
subQuery = query

if lastID is not None:
subQuery = subQuery.filter(table.id > lastID)

queryChunk = subQuery.limit(windowSize).all()
totalFetched += windowSize

if not queryChunk or (ingest_limit and totalFetched > ingest_limit):
break

lastID = queryChunk[-1][-1]

for row in queryChunk:
yield row[0] if singleEntity else row[0:-2]

@staticmethod
def decryptEnvVar(envVar):
encrypted = os.environ.get(envVar, None)
Expand Down
54 changes: 1 addition & 53 deletions processes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,9 @@


class CoreProcess(DBManager):
def __init__(self, process, customFile, ingestPeriod, singleRecord, batchSize=500):
def __init__(self, process, customFile, ingestPeriod, singleRecord):
super(CoreProcess, self).__init__()
self.process = process
self.customFile = customFile
self.ingestPeriod = ingestPeriod
self.singleRecord = singleRecord

self.batchSize = batchSize
self.records = set()

def addDCDWToUpdateList(self, rec):
existing = self.session.query(Record)\
.filter(Record.source_id == rec.record.source_id).first()
if existing:
logger.debug('Existing record: ' + str(existing))
rec.updateExisting(existing)

try:
self.records.remove(existing)
except KeyError:
logger.debug('Record not in current set')

self.records.add(existing)
else:
logger.debug('New record: ' + str(rec.record))
self.records.add(rec.record)

if len(self.records) >= self.batchSize:
self.saveRecords()
self.records = set()

def windowedQuery(self, table, query, windowSize=100):
singleEntity = query.is_single_entity
query = query.add_column(table.date_modified).order_by(table.date_modified)
query = query.add_column(table.id).order_by(table.id)

lastID = None
totalFetched = 0

while True:
subQuery = query

if lastID is not None:
subQuery = subQuery.filter(table.id > lastID)

queryChunk = subQuery.limit(windowSize).all()
totalFetched += windowSize

if not queryChunk or (self.ingestLimit and totalFetched > self.ingestLimit):
break

lastID = queryChunk[-1][-1]

for row in queryChunk:
yield row[0] if singleEntity else row[0:-2]

def saveRecords(self):
self.bulkSaveObjects(self.records)
33 changes: 19 additions & 14 deletions processes/file/covers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

from ..core import CoreProcess
from managers import CoverManager, RedisManager, S3Manager
from managers import DBManager, CoverManager, RedisManager, S3Manager
from model import Edition, Link
from model.postgres.edition import EDITION_LINKS
from logger import create_log
Expand All @@ -12,10 +12,16 @@

class CoverProcess(CoreProcess):
def __init__(self, *args):
super(CoverProcess, self).__init__(*args[:4], batchSize=25)
super(CoverProcess, self).__init__(*args[:4])

self.generateEngine()
self.createSession()
self.batch_size = 25

self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.editions = set()

self.redis_manager = RedisManager()
self.redis_manager.createRedisClient()
Expand All @@ -32,13 +38,12 @@ def runProcess(self):

self.fetchEditionCovers(coverQuery)

self.saveRecords()
self.commitChanges()
self.db_manager.bulkSaveObjects(self.editions)

def generateQuery(self):
baseQuery = self.session.query(Edition)
baseQuery = self.db_manager.session.query(Edition)

subQuery = self.session.query(EDITION_LINKS.c.edition_id)\
subQuery = self.db_manager.session.query(EDITION_LINKS.c.edition_id)\
.join(Link)\
.distinct('edition_id')\
.filter(Link.flags['cover'] == 'true')
Expand All @@ -56,7 +61,7 @@ def generateQuery(self):
return baseQuery.filter(*filters)

def fetchEditionCovers(self, coverQuery):
for edition in self.windowedQuery(Edition, coverQuery, windowSize=self.batchSize):
for edition in self.db_manager.windowedQuery(Edition, coverQuery, ingest_limit=self.ingestLimit, windowSize=self.batch_size):
coverManager = self.searchForCover(edition)

if coverManager: self.storeFoundCover(coverManager, edition)
Expand All @@ -65,7 +70,7 @@ def fetchEditionCovers(self, coverQuery):

def searchForCover(self, edition):
identifiers = [i for i in self.getEditionIdentifiers(edition)]
manager = CoverManager(identifiers, self.session)
manager = CoverManager(identifiers, self.db_manager.session)

if manager.fetchCover() is True:
manager.fetchCoverFile()
Expand Down Expand Up @@ -97,8 +102,8 @@ def storeFoundCover(self, manager, edition):
)

edition.links.append(coverLink)
self.records.add(edition)
self.editions.add(edition)

if len(self.records) >= self.batchSize:
self.bulkSaveObjects(self.records)
self.records = set()
if len(self.editions) >= self.batch_size:
self.db_manager.bulkSaveObjects(self.editions)
self.editions.clear()
16 changes: 9 additions & 7 deletions processes/file/fulfill_url_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ..core import CoreProcess
from datetime import datetime, timedelta, timezone
from managers import S3Manager
from managers import DBManager, S3Manager
from model import Link
from logger import create_log

Expand All @@ -19,8 +19,10 @@ def __init__(self, *args):
self.fullImport = self.process == 'complete'
self.start_timestamp = None

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.s3Bucket = os.environ['FILE_BUCKET']
self.host = os.environ['DRB_API_HOST']
Expand Down Expand Up @@ -131,7 +133,7 @@ def toc_fulfill(self, metadata_json, counter):
for toc in metadata_json['toc']:
if 'pdf' in toc['href'] \
or 'epub' in toc['href']:
for link in self.session.query(Link) \
for link in self.db_manager.session.query(Link) \
.filter(Link.url == toc['href'].replace('https://', '')):
counter += 1
toc['href'] = f'https://{self.host}/fulfill/{link.id}'
Expand All @@ -141,7 +143,7 @@ def toc_fulfill(self, metadata_json, counter):
def fulfill_replace(self, metadata, counter):
if metadata['type'] == 'application/pdf' or metadata['type'] == 'application/epub+zip' \
or metadata['type'] == 'application/epub+xml':
for link in self.session.query(Link) \
for link in self.db_manager.session.query(Link) \
.filter(Link.url == metadata['href'].replace('https://', '')):
counter += 1
metadata['href'] = f'https://{self.host}/fulfill/{link.id}'
Expand All @@ -150,7 +152,7 @@ def fulfill_replace(self, metadata, counter):

def fulfill_flag_update(self, metadata):
if metadata['type'] == 'application/webpub+json':
for link in self.session.query(Link) \
for link in self.db_manager.session.query(Link) \
.filter(Link.url == metadata['href'].replace('https://', '')):
if 'fulfill_limited_access' in link.flags.keys():
if link.flags['fulfill_limited_access'] == False:
Expand All @@ -162,7 +164,7 @@ def fulfill_flag_update(self, metadata):
def check_copyright_status(self, metadata_json):
for link in metadata_json['links']:
if link['type'] == 'application/webpub+json':
for psql_link in self.session.query(Link) \
for psql_link in self.db_manager.session.query(Link) \
.filter(Link.url == link['href'].replace('https://', '')):
if 'fulfill_limited_access' not in psql_link.flags.keys():
copyright_status = False
Expand Down
20 changes: 12 additions & 8 deletions processes/frbr/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
from time import sleep

from ..core import CoreProcess
from managers import OCLCCatalogManager, RabbitMQManager
from managers import DBManager, OCLCCatalogManager, RabbitMQManager
from mappings.oclcCatalog import CatalogMapping
from logger import create_log
from ..record_buffer import RecordBuffer


logger = create_log(__name__)


class CatalogProcess(CoreProcess):
def __init__(self, *args):
super(CatalogProcess, self).__init__(*args[:4], batchSize=50)
super(CatalogProcess, self).__init__(*args[:4])

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager, batch_size=50)

self.rabbitmq_manager = RabbitMQManager()
self.rabbitmq_manager.createRabbitConnection()
Expand All @@ -28,10 +33,9 @@ def __init__(self, *args):
def runProcess(self, max_attempts: int=4):
self.process_catalog_messages(max_attempts=max_attempts)

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Saved {len(self.records)} catalog records')
logger.info(f'Saved {len(self.record_buffer.ingest_count)} catalog records')

def process_catalog_messages(self, max_attempts: int=4):
for attempt in range(0, max_attempts):
Expand Down Expand Up @@ -76,6 +80,6 @@ def parse_catalog_record(self, catalog_record, oclc_number, owi_number):
catalog_record_mapping.applyMapping()
catalog_record_mapping.record.identifiers.append('{}|owi'.format(owi_number))

self.addDCDWToUpdateList(catalog_record_mapping)
self.record_buffer.add(catalog_record_mapping)
except Exception:
logger.exception(f'Unable to map OCLC catalog record for OCLC number: {oclc_number}')
24 changes: 14 additions & 10 deletions processes/frbr/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
import re

from ..core import CoreProcess
from managers import OCLCCatalogManager, RabbitMQManager, RedisManager
from managers import DBManager, OCLCCatalogManager, RabbitMQManager, RedisManager
from mappings.oclc_bib import OCLCBibMapping
from model import Record
from logger import create_log
from ..record_buffer import RecordBuffer


logger = create_log(__name__)


class ClassifyProcess(CoreProcess):
def __init__(self, *args):
super(ClassifyProcess, self).__init__(*args[:4], batchSize=50)
super(ClassifyProcess, self).__init__(*args[:4])

self.ingest_limit = int(args[4]) if len(args) >= 5 and args[4] else None

self.generateEngine()
self.createSession()
self.db_manager = DBManager()

self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager, batch_size=50)

self.redis_manager = RedisManager()
self.redis_manager.createRedisClient()
Expand Down Expand Up @@ -48,8 +53,7 @@ def runProcess(self):
logger.warning(f'Unknown classify process type: {self.process}')
return

self.saveRecords()
self.commitChanges()
self.record_buffer.flush()

logger.info(f'Classified {self.classified_count} records')
except Exception as e:
Expand All @@ -58,7 +62,7 @@ def runProcess(self):

def classify_records(self, full=False, start_date_time=None):
get_unfrbrized_records_query = (
self.session.query(Record)
self.db_manager.session.query(Record)
.filter(Record.source != 'oclcClassify' and Record.source != 'oclcCatalog')
.filter(Record.frbr_status == 'to_do')
)
Expand All @@ -75,8 +79,8 @@ def classify_records(self, full=False, start_date_time=None):
unfrbrized_record.cluster_status = False
unfrbrized_record.frbr_status = 'complete'

self.session.add(unfrbrized_record)
self.session.commit()
self.db_manager.session.add(unfrbrized_record)
self.db_manager.session.commit()
self.classified_count += 1

if self.ingest_limit and self.classified_count >= self.ingest_limit:
Expand Down Expand Up @@ -137,7 +141,7 @@ def classify_record_by_metadata(self, identifier, identifier_type, author, title
related_oclc_numbers=related_oclc_numbers
)

self.addDCDWToUpdateList(oclc_record)
self.record_buffer.add(oclc_record)
self.get_oclc_catalog_records(oclc_record.record.identifiers)

def get_oclc_catalog_records(self, identifiers):
Expand Down
4 changes: 2 additions & 2 deletions processes/ingest/hathi_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class HathiTrustProcess(CoreProcess):
FIELD_SIZE_LIMIT = 131072 * 2 # 131072 is the default size limit

def __init__(self, *args):
super(HathiTrustProcess, self).__init__(*args[:4], batchSize=1000)
super(HathiTrustProcess, self).__init__(*args[:4])

self.ingest_limit = int(args[4]) if args[4] else None

Expand All @@ -29,7 +29,7 @@ def __init__(self, *args):
self.db_manager.generateEngine()
self.db_manager.createSession()

self.record_buffer = RecordBuffer(db_manager=self.db_manager)
self.record_buffer = RecordBuffer(db_manager=self.db_manager, batch_size=1000)

self.constants = get_constants()

Expand Down
2 changes: 1 addition & 1 deletion processes/ingest/publisher_backlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def runProcess(self):
logger.exception('Failed to run Publisher Backlist process')
raise e
finally:
self.close_connection()
self.db_manager.close_connection()
Loading

0 comments on commit fa97155

Please sign in to comment.