From ee966dcb3b242b4b268042b89e2026791c28bc22 Mon Sep 17 00:00:00 2001 From: David Huggins-Daines Date: Mon, 30 Dec 2024 13:02:14 -0500 Subject: [PATCH] Support parallel operations over pages (#36) * doc: example of parallel pdfminer.six (we will do better) * fix: do not submit a gratuitous extra job * feat: crude example of parallelizing PLAYA * feat: use layout in playa parallel bench * chore: format * feat: basic support for parallel execution across pages * chore: mypy * docs: deprecate eager api in readme * test: test parallel execution * fix(types): complete callable annotation --- README.md | 74 ++++++++++-------------------------- benchmarks/converter.py | 6 +-- benchmarks/parallel.py | 48 +++++++++++++++++++++++ benchmarks/parallel_miner.py | 74 ++++++++++++++++++++++++++++++++++++ benchmarks/parser.py | 3 +- playa/__init__.py | 22 ++++++++++- playa/document.py | 27 +++++++++++++ tests/data.py | 3 -- tests/test_parallel.py | 35 +++++++++++++++++ 9 files changed, 230 insertions(+), 62 deletions(-) create mode 100644 benchmarks/parallel.py create mode 100644 benchmarks/parallel_miner.py create mode 100644 tests/test_parallel.py diff --git a/README.md b/README.md index 44605e1f..fee57476 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# **P**LAYA-PDF is a **LA**z**Y** **A**nalyzer for **PDF** 🏖️ +# **P**arallel and **LA**z**Y** **A**nalyzer for **PDF** 🏖️ ## About @@ -20,9 +20,13 @@ benchmarks](https://github.com/py-pdf/benchmarks) for a summary (TL;DR pypdfium2 is probably what you want, but pdfplumber does a nice job of converting PDF to ASCII art). -The purpose of PLAYA is to provide an efficent, pure-Python and -Pythonic (for its author's definition of the term), lazy interface to -the internals of PDF files. +Soon you will also be able to use +[PAVÉS](https://github.com/dhdaines/paves) for this and other +higher-level tasks. + +The purpose of PLAYA is to provide an efficent, parallel and +parallelizable, pure-Python and Pythonic (for its author's definition +of the term), lazy interface to the internals of PDF files. ## Installation @@ -31,7 +35,10 @@ or newer: pipx install playa-pdf -Yes it's not just "playa". Sorry about that. +Yes it's not just "playa". Sorry about that. If you wish to read +certain encrypted PDFs then you will need the `crypto` add-on: + + pipx install playa-pdf[crypto] ## Usage @@ -81,16 +88,16 @@ a_particular_object = pdf[42] ``` Your PDF document probably has some pages. How many? What are their -numbers/labels? (they could be things like "xviii", 'a", or "42", for -instance) +numbers/labels? They could be things like "xvi" (pronounced +"gzvee"), 'a", or "42", for instance! ```python npages = len(pdf.pages) page_numbers = [page.label for page in pdf.pages] ``` -What's in the table of contents? (NOTE: this API will likely change -in PLAYA 0.3 as it is not Lazy nor does it properly represent the +What's in the table of contents? (NOTE: this API is deprecated and +will change soon as it is not Lazy nor does it properly represent the hierarchy of the document outline) ```python @@ -103,8 +110,8 @@ for entry in pdf.outlines: If you are lucky it has a "logical structure tree". The elements here might even be referenced from the table of contents! (or, they might -not... with PDF you never know). (NOTE: this API will definitely -change in PLAYA 0.3 as it is not the least bit Lazy) +not... with PDF you never know). (NOTE: this API is deprecated and +will change soon as it is not Lazy at all) ```python structure = pdf.structtree @@ -152,47 +159,8 @@ involves some more work on the user's part. ## Dictionary-based API -If, on the other hand, **you** are lazy, then you can just use -`page.layout`, which will flatten everything for you into a friendly -dictionary representation (but it is a -[`TypedDict`](https://typing.readthedocs.io/en/latest/spec/typeddict.html#typeddict)) -which, um, looks a lot like what `pdfplumber` gives you, except possibly in -a different -coordinate space, as defined [below](#an-important-note-about-coordinate-spaces). - -```python -for dic in page.layout: - print("it is a {dic['object_type']} at ({dic['x0']}", {dic['y0']})) - print(" the color is {dic['stroking_color']}") - print(" the text is {dic['text']}") - print(" it is in MCS {dic['mcid']} which is a {dic['tag']}") - print(" it is also in Form XObject {dic['xobjid']}") -``` - -This is for instance quite useful for doing "Artificial Intelligence", -or if you like wasting time and energy for no good reason, but I -repeat myself. For instance, you can write `page.layout` to a CSV file: - -```python -writer = DictWriter(outfh, fieldnames=playa.fieldnames) -writer.writeheader() -for dic in pdf.layout: - writer.writerow(dic) -``` - -you can also create a Pandas DataFrame: - -```python -df = pandas.DataFrame.from_records(pdf.layout) -``` - -or a Polars DataFrame or LazyFrame: - -```python -df = polars.DataFrame(pdf.layout, schema=playa.schema) -``` - -If you have more specific needs or want better performance, then read on. +There used to be a "dictionary-based" API here. You can now find it +it [PAVÉS](https://github.com/dhdaines/paves).) ## An important note about coordinate spaces @@ -318,7 +286,7 @@ for obj in page: other_stuff.append(my_stuff) # it's safe there ``` -For compatbility with `pdfminer.six`, PLAYA, even though it is not a +For compatibility with `pdfminer.six`, PLAYA, even though it is not a layout analyzer, can do some basic interpretation of paths. Again, this is lazy. If you don't care about them, you just get objects with `object_type` of `"path"`, which you can ignore. PLAYA won't even diff --git a/benchmarks/converter.py b/benchmarks/converter.py index c18689c0..20ba8779 100644 --- a/benchmarks/converter.py +++ b/benchmarks/converter.py @@ -45,7 +45,7 @@ def benchmark_one_lazy(path: Path): def benchmark_one_pdfminer(path: Path): """Open one of the documents""" - from pdfminer.converter import PDFPageAggregator + from pdfminer.converter import PDFLayoutAnalyzer from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfinterp import PDFPageInterpreter, PDFResourceManager from pdfminer.pdfpage import PDFPage @@ -58,8 +58,8 @@ def benchmark_one_pdfminer(path: Path): with open(path, "rb") as infh: LOG.debug("Reading %s", path) rsrc = PDFResourceManager() - agg = PDFPageAggregator(rsrc, pageno=1) - interp = PDFPageInterpreter(rsrc, agg) + analyzer = PDFLayoutAnalyzer(rsrc) + interp = PDFPageInterpreter(rsrc, analyzer) pdf = PDFDocument(PDFParser(infh), password=password) for page in PDFPage.create_pages(pdf): interp.process_page(page) diff --git a/benchmarks/parallel.py b/benchmarks/parallel.py new file mode 100644 index 00000000..b8d2b584 --- /dev/null +++ b/benchmarks/parallel.py @@ -0,0 +1,48 @@ +""" +Attempt to scale. +""" + +import time +from pathlib import Path + +import playa +from playa.page import Page + + +def process_page(page: Page) -> str: + return " ".join(x.chars for x in page.texts) + + +def benchmark_single(path: Path): + with playa.open(path) as pdf: + return list(pdf.pages.map(process_page)) + + +def benchmark_multi(path: Path, ncpu: int): + with playa.open(path, max_workers=ncpu) as pdf: + return list(pdf.pages.map(process_page)) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-n", "--ncpu", type=int, default=4) + parser.add_argument("pdf", type=Path) + args = parser.parse_args() + + start = time.time() + benchmark_multi(args.pdf, args.ncpu) + multi_time = time.time() - start + print( + "PLAYA (%d CPUs) took %.2fs" + % ( + args.ncpu, + multi_time, + ) + ) + + start = time.time() + benchmark_single(args.pdf) + single_time = time.time() - start + print("PLAYA (single) took %.2fs" % (single_time,)) diff --git a/benchmarks/parallel_miner.py b/benchmarks/parallel_miner.py new file mode 100644 index 00000000..9e6c82ff --- /dev/null +++ b/benchmarks/parallel_miner.py @@ -0,0 +1,74 @@ +"""Demonstrate paralle extraction with pdfminer.six""" + +import time +from pdfminer.high_level import extract_pages +from concurrent.futures import ProcessPoolExecutor +from pathlib import Path +from pdfminer.pdfpage import PDFPage +from pdfminer.layout import LTImage +from pdfminer.pdftypes import PDFObjRef + + +def benchmark_single(path: Path): + for page in extract_pages(path): + pass + + +def remove_references(item): + try: + for child in item: + remove_references(child) + except TypeError: + if isinstance(item, LTImage): + for key, val in item.stream.attrs.items(): + if isinstance(val, PDFObjRef): + val.doc = None + + +def extract_batch(path, page_numbers): + batch = list(extract_pages(path, page_numbers=page_numbers)) + remove_references(batch) + return batch + + +def benchmark_multi(path: Path, ncpu: int): + with open(path, "rb") as fp: + npages = sum(1 for _ in PDFPage.get_pages(fp)) + pages = [None] * npages + batches = [] + + with ProcessPoolExecutor(max_workers=ncpu) as pool: + step = max(1, round(npages / ncpu)) + for start in range(0, npages, step): + end = min(npages, start + step) + batch = list(range(start, end)) + print(f"Submitting pages {start} to {end - 1}") + batches.append((batch, pool.submit(extract_batch, path, batch))) + for batch, future in batches: + for idx, page in zip(batch, future.result()): + pages[idx] = page + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-n", "--ncpu", type=int, default=4) + parser.add_argument("pdf", type=Path) + args = parser.parse_args() + + start = time.time() + benchmark_multi(args.pdf, args.ncpu) + multi_time = time.time() - start + print( + "pdfminer.six (%d CPUs) took %.2fs" + % ( + args.ncpu, + multi_time, + ) + ) + + start = time.time() + benchmark_single(args.pdf) + single_time = time.time() - start + print("pdfminer.six (single) took %.2fs" % (single_time,)) diff --git a/benchmarks/parser.py b/benchmarks/parser.py index a8ee5f27..e3213fcb 100644 --- a/benchmarks/parser.py +++ b/benchmarks/parser.py @@ -298,8 +298,7 @@ def bench_mmap(): parser = Lexer(mapping) _ = list(parser) print( - "PLAYA Lexer (mmap): %fms / run" - % ((time.time() - start) / runs * 1000), + "PLAYA Lexer (mmap): %fms / run" % ((time.time() - start) / runs * 1000), ) diff --git a/playa/__init__.py b/playa/__init__.py index c1a50b4a..41498c24 100644 --- a/playa/__init__.py +++ b/playa/__init__.py @@ -14,9 +14,13 @@ """ import builtins +from concurrent.futures import ProcessPoolExecutor from os import PathLike +from multiprocessing.context import BaseContext +from pathlib import Path from typing import Union +import playa.document from playa.document import Document, LayoutDict, schema as schema # noqa: F401 from playa.page import DeviceSpace from playa._version import __version__ # noqa: F401 @@ -24,11 +28,27 @@ fieldnames = LayoutDict.__annotations__.keys() +def init_worker(path: Path, password: str = "", space: DeviceSpace = "screen") -> None: + playa.document.__pdf = open(path, password=password, space=space) + + def open( - path: Union[PathLike, str], password: str = "", space: DeviceSpace = "screen" + path: Union[PathLike, str], + *, + password: str = "", + space: DeviceSpace = "screen", + max_workers: int = 1, + mp_context: Union[BaseContext, None] = None, ) -> Document: """Open a PDF document from a path on the filesystem.""" fp = builtins.open(path, "rb") pdf = Document(fp, password=password, space=space) pdf._fp = fp + if max_workers > 1: + pdf._pool = ProcessPoolExecutor( + max_workers=max_workers, + mp_context=mp_context, + initializer=init_worker, # type: ignore[arg-type] + initargs=(path, password, space), # type: ignore[arg-type] + ) return pdf diff --git a/playa/document.py b/playa/document.py index e92ac273..9fdf68c1 100644 --- a/playa/document.py +++ b/playa/document.py @@ -8,6 +8,8 @@ import mmap import re import struct +import weakref +from concurrent.futures import Executor from hashlib import md5, sha256, sha384, sha512 from typing import ( Any, @@ -810,6 +812,7 @@ class Document: _fp: Union[BinaryIO, None] = None _pages: Union["PageList", None] = None + _pool: Union[Executor, None] = None def __enter__(self) -> "Document": return self @@ -819,6 +822,9 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: if self._fp: self._fp.close() self._fp = None + if self._pool: + self._pool.shutdown() + self._pool = None def __init__( self, @@ -1369,10 +1375,20 @@ def _read_xref_from( self._read_xref_from(pos, xrefs) +__pdf: Union[Document, None] = None + + +def call_page(func: Callable[[Page], Any], idx: int) -> Any: + """Call a function on a page in a worker process.""" + assert __pdf is not None + return func(__pdf.pages[idx]) + + class PageList: """List of pages indexable by 0-based index or string label.""" def __init__(self, doc: Document): + self.doc = weakref.ref(doc) try: page_labels: Iterable[Optional[str]] = doc.page_labels except (KeyError, ValueError): @@ -1406,6 +1422,17 @@ def __getitem__(self, key: Union[int, str]) -> Page: else: return self._labels[key] + def map(self, func: Callable[[Page], Any]) -> Iterator: + doc = self.doc() + if doc is None: + raise RuntimeError("Document no longer exists") + if doc._pool is not None: + return doc._pool.map( + call_page, itertools.repeat(func), (page.page_idx for page in self) + ) + else: + return (func(page) for page in self) + class PageLabels(NumberTree): """PageLabels from the document catalog. diff --git a/tests/data.py b/tests/data.py index 1cdc2279..2b6d9e61 100644 --- a/tests/data.py +++ b/tests/data.py @@ -46,15 +46,12 @@ # really rather broken. "issue9418.pdf", "bug1250079.pdf", - # pdf.js doesn't extract text correctly here but it is possible # "issue9915_reduced.pdf", # ToUnicode points to the same place as Encoding - # We "accept" these but our handling of ToUnicode mappings is very # incorrect, so no text is produced for the glyphs. Leaving them # here as the tests should be updated to verify text extraction # works once we figure out how to support them - # "issue2931.pdf", # ToUnicode maps input characters not CIDs (ASCII) # "issue9534_reduced.pdf", # ToUnicode maps input characters not CIDs (UTF-16BE) # "issue18117.pdf", # ToUnicode maps input characters not CIDs (UTF-8) diff --git a/tests/test_parallel.py b/tests/test_parallel.py new file mode 100644 index 00000000..bb06e145 --- /dev/null +++ b/tests/test_parallel.py @@ -0,0 +1,35 @@ +"""Test parallel analysis.""" + +import pytest + +import playa +import playa.document +from playa.page import Page +from tests.data import TESTDIR, CONTRIB + + +def has_one_true_pdf() -> int: + assert playa.document.__pdf is not None + assert playa.document.__pdf.space == "default" + return len(playa.document.__pdf.pages) + + +def test_open_parallel(): + with playa.open( + TESTDIR / "pdf_structure.pdf", space="default", max_workers=4 + ) as pdf: + future = pdf._pool.submit(has_one_true_pdf) + assert future.result() == 1 + + +def get_text(page: Page) -> str: + return " ".join(x.chars for x in page.texts) + + +@pytest.mark.skipif(not CONTRIB.exists(), reason="contrib samples not present") +def test_map_parallel(): + with playa.open(CONTRIB / "PSC_Station.pdf", space="default", max_workers=2) as pdf: + parallel_texts = list(pdf.pages.map(get_text)) + with playa.open(CONTRIB / "PSC_Station.pdf", space="default") as pdf: + texts = list(pdf.pages.map(get_text)) + assert texts == parallel_texts