Replies: 3 comments 1 reply
-
If anyone is lookin for an answer: from lxml import etree
import sys, os, logging
import multiprocessing as mp
from oaipmh_scythe import Scythe
from oaipmh_scythe.models import Record
from httpx import ConnectTimeout
from termcolor import cprint
logging.basicConfig(filename='../harvester.log', format='%(asctime)s %(process)s %(levelname)s %(message)s', level=logging.INFO)
metadata_prefix = 'lido'
class XML(Record):
namespaces = {'lido': 'http://www.lido-schema.org',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'oai': 'http://www.openarchives.org/OAI/2.0/'}
def __init__(self, record_element: etree._Element, strip_ns: bool = False) -> None:
super().__init__(record_element, strip_ns=strip_ns)
self.metadata_xml = etree.ElementTree(self.xml.find(".//" + self._oai_namespace + "metadata").getchildren()[0])
self.identifier = self.header.identifier
def worker(q):
while True:
item = q.get(block=True)
if item is None:
break
oai = init_harvester()
try:
record = oai.get_record(identifier=item, metadata_prefix=metadata_prefix)
except ConnectTimeout:
q.put(item, block=True)
with mp.Lock():
logging.info(f"Timeout getting {item}, requeueing")
cprint('X', 'red', end='', flush=True)
filename = f"{record.identifier}.{metadata_prefix}.xml"
record.metadata_xml.write(filename, encoding="utf-8")
with mp.Lock():
logging.info(f"Sucessfully got {record.identifier}, saved to {filename}")
cprint('.', 'green', end='', flush=True)
def init_harvester() -> Scythe:
oai = Scythe(URL, timeout=120, max_retries=5)
oai.class_mapping['GetRecord'] = XML
client = oai.client
client.auth=(USER, PASSWORD)
oai._client = client
return oai
def main() -> int:
logging.info('Intializing Harvester')
oai = init_harvester()
q = mp.Queue()
pool = mp.Pool(8, worker,(q,))
records = oai.list_identifiers(ignore_deleted=True)
for record in records:
q.put(record.identifier)
logging.info('Queue filled')
#q.put(None)
q.close()
q.join_thread()
pool.close()
pool.join()
if __name__ == '__main__':
sys.exit(main()) Notes:
|
Beta Was this translation helpful? Give feedback.
-
Thanks for opening this. What is your use case? The client needs to iterate over the resumption tokens to get the whole set of identifiers/records. |
Beta Was this translation helpful? Give feedback.
-
The example you provided should help anybody who needs more speed right now. I am thinking about adding an async version of the oaipmh verbs. The httpx client is able to request in async. |
Beta Was this translation helpful? Give feedback.
-
Is it possible to use parallelisation? I would like to speed up the harvesting process using multiple threads.
Beta Was this translation helpful? Give feedback.
All reactions