Skip to content

Commit

Permalink
Support parallel operations over pages (#36)
Browse files Browse the repository at this point in the history
* doc: example of parallel pdfminer.six (we will do better)

* fix: do not submit a gratuitous extra job

* feat: crude example of parallelizing PLAYA

* feat: use layout in playa parallel bench

* chore: format

* feat: basic support for parallel execution across pages

* chore: mypy

* docs: deprecate eager api in readme

* test: test parallel execution

* fix(types): complete callable annotation
  • Loading branch information
dhdaines authored Dec 30, 2024
1 parent 20db2b1 commit ee966dc
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 62 deletions.
74 changes: 21 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# **P**LAYA-PDF is a **LA**z**Y** **A**nalyzer for **PDF** 🏖️
# **P**arallel and **LA**z**Y** **A**nalyzer for **PDF** 🏖️

## About

Expand All @@ -20,9 +20,13 @@ benchmarks](https://github.com/py-pdf/benchmarks) for a summary (TL;DR
pypdfium2 is probably what you want, but pdfplumber does a nice job of
converting PDF to ASCII art).

The purpose of PLAYA is to provide an efficent, pure-Python and
Pythonic (for its author's definition of the term), lazy interface to
the internals of PDF files.
Soon you will also be able to use
[PAVÉS](https://github.com/dhdaines/paves) for this and other
higher-level tasks.

The purpose of PLAYA is to provide an efficent, parallel and
parallelizable, pure-Python and Pythonic (for its author's definition
of the term), lazy interface to the internals of PDF files.

## Installation

Expand All @@ -31,7 +35,10 @@ or newer:

pipx install playa-pdf

Yes it's not just "playa". Sorry about that.
Yes it's not just "playa". Sorry about that. If you wish to read
certain encrypted PDFs then you will need the `crypto` add-on:

pipx install playa-pdf[crypto]

## Usage

Expand Down Expand Up @@ -81,16 +88,16 @@ a_particular_object = pdf[42]
```

Your PDF document probably has some pages. How many? What are their
numbers/labels? (they could be things like "xviii", 'a", or "42", for
instance)
numbers/labels? They could be things like "xvi" (pronounced
"gzvee"), 'a", or "42", for instance!

```python
npages = len(pdf.pages)
page_numbers = [page.label for page in pdf.pages]
```

What's in the table of contents? (NOTE: this API will likely change
in PLAYA 0.3 as it is not Lazy nor does it properly represent the
What's in the table of contents? (NOTE: this API is deprecated and
will change soon as it is not Lazy nor does it properly represent the
hierarchy of the document outline)

```python
Expand All @@ -103,8 +110,8 @@ for entry in pdf.outlines:

If you are lucky it has a "logical structure tree". The elements here
might even be referenced from the table of contents! (or, they might
not... with PDF you never know). (NOTE: this API will definitely
change in PLAYA 0.3 as it is not the least bit Lazy)
not... with PDF you never know). (NOTE: this API is deprecated and
will change soon as it is not Lazy at all)

```python
structure = pdf.structtree
Expand Down Expand Up @@ -152,47 +159,8 @@ involves some more work on the user's part.

## Dictionary-based API

If, on the other hand, **you** are lazy, then you can just use
`page.layout`, which will flatten everything for you into a friendly
dictionary representation (but it is a
[`TypedDict`](https://typing.readthedocs.io/en/latest/spec/typeddict.html#typeddict))
which, um, looks a lot like what `pdfplumber` gives you, except possibly in
a different
coordinate space, as defined [below](#an-important-note-about-coordinate-spaces).

```python
for dic in page.layout:
print("it is a {dic['object_type']} at ({dic['x0']}", {dic['y0']}))
print(" the color is {dic['stroking_color']}")
print(" the text is {dic['text']}")
print(" it is in MCS {dic['mcid']} which is a {dic['tag']}")
print(" it is also in Form XObject {dic['xobjid']}")
```

This is for instance quite useful for doing "Artificial Intelligence",
or if you like wasting time and energy for no good reason, but I
repeat myself. For instance, you can write `page.layout` to a CSV file:

```python
writer = DictWriter(outfh, fieldnames=playa.fieldnames)
writer.writeheader()
for dic in pdf.layout:
writer.writerow(dic)
```

you can also create a Pandas DataFrame:

```python
df = pandas.DataFrame.from_records(pdf.layout)
```

or a Polars DataFrame or LazyFrame:

```python
df = polars.DataFrame(pdf.layout, schema=playa.schema)
```

If you have more specific needs or want better performance, then read on.
There used to be a "dictionary-based" API here. You can now find it
it [PAVÉS](https://github.com/dhdaines/paves).)

## An important note about coordinate spaces

Expand Down Expand Up @@ -318,7 +286,7 @@ for obj in page:
other_stuff.append(my_stuff) # it's safe there
```

For compatbility with `pdfminer.six`, PLAYA, even though it is not a
For compatibility with `pdfminer.six`, PLAYA, even though it is not a
layout analyzer, can do some basic interpretation of paths. Again,
this is lazy. If you don't care about them, you just get objects with
`object_type` of `"path"`, which you can ignore. PLAYA won't even
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def benchmark_one_lazy(path: Path):

def benchmark_one_pdfminer(path: Path):
"""Open one of the documents"""
from pdfminer.converter import PDFPageAggregator
from pdfminer.converter import PDFLayoutAnalyzer
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfinterp import PDFPageInterpreter, PDFResourceManager
from pdfminer.pdfpage import PDFPage
Expand All @@ -58,8 +58,8 @@ def benchmark_one_pdfminer(path: Path):
with open(path, "rb") as infh:
LOG.debug("Reading %s", path)
rsrc = PDFResourceManager()
agg = PDFPageAggregator(rsrc, pageno=1)
interp = PDFPageInterpreter(rsrc, agg)
analyzer = PDFLayoutAnalyzer(rsrc)
interp = PDFPageInterpreter(rsrc, analyzer)
pdf = PDFDocument(PDFParser(infh), password=password)
for page in PDFPage.create_pages(pdf):
interp.process_page(page)
Expand Down
48 changes: 48 additions & 0 deletions benchmarks/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Attempt to scale.
"""

import time
from pathlib import Path

import playa
from playa.page import Page


def process_page(page: Page) -> str:
return " ".join(x.chars for x in page.texts)


def benchmark_single(path: Path):
with playa.open(path) as pdf:
return list(pdf.pages.map(process_page))


def benchmark_multi(path: Path, ncpu: int):
with playa.open(path, max_workers=ncpu) as pdf:
return list(pdf.pages.map(process_page))


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-n", "--ncpu", type=int, default=4)
parser.add_argument("pdf", type=Path)
args = parser.parse_args()

start = time.time()
benchmark_multi(args.pdf, args.ncpu)
multi_time = time.time() - start
print(
"PLAYA (%d CPUs) took %.2fs"
% (
args.ncpu,
multi_time,
)
)

start = time.time()
benchmark_single(args.pdf)
single_time = time.time() - start
print("PLAYA (single) took %.2fs" % (single_time,))
74 changes: 74 additions & 0 deletions benchmarks/parallel_miner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Demonstrate paralle extraction with pdfminer.six"""

import time
from pdfminer.high_level import extract_pages
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from pdfminer.pdfpage import PDFPage
from pdfminer.layout import LTImage
from pdfminer.pdftypes import PDFObjRef


def benchmark_single(path: Path):
for page in extract_pages(path):
pass


def remove_references(item):
try:
for child in item:
remove_references(child)
except TypeError:
if isinstance(item, LTImage):
for key, val in item.stream.attrs.items():
if isinstance(val, PDFObjRef):
val.doc = None


def extract_batch(path, page_numbers):
batch = list(extract_pages(path, page_numbers=page_numbers))
remove_references(batch)
return batch


def benchmark_multi(path: Path, ncpu: int):
with open(path, "rb") as fp:
npages = sum(1 for _ in PDFPage.get_pages(fp))
pages = [None] * npages
batches = []

with ProcessPoolExecutor(max_workers=ncpu) as pool:
step = max(1, round(npages / ncpu))
for start in range(0, npages, step):
end = min(npages, start + step)
batch = list(range(start, end))
print(f"Submitting pages {start} to {end - 1}")
batches.append((batch, pool.submit(extract_batch, path, batch)))
for batch, future in batches:
for idx, page in zip(batch, future.result()):
pages[idx] = page


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-n", "--ncpu", type=int, default=4)
parser.add_argument("pdf", type=Path)
args = parser.parse_args()

start = time.time()
benchmark_multi(args.pdf, args.ncpu)
multi_time = time.time() - start
print(
"pdfminer.six (%d CPUs) took %.2fs"
% (
args.ncpu,
multi_time,
)
)

start = time.time()
benchmark_single(args.pdf)
single_time = time.time() - start
print("pdfminer.six (single) took %.2fs" % (single_time,))
3 changes: 1 addition & 2 deletions benchmarks/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ def bench_mmap():
parser = Lexer(mapping)
_ = list(parser)
print(
"PLAYA Lexer (mmap): %fms / run"
% ((time.time() - start) / runs * 1000),
"PLAYA Lexer (mmap): %fms / run" % ((time.time() - start) / runs * 1000),
)


Expand Down
22 changes: 21 additions & 1 deletion playa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,41 @@
"""

import builtins
from concurrent.futures import ProcessPoolExecutor
from os import PathLike
from multiprocessing.context import BaseContext
from pathlib import Path
from typing import Union

import playa.document
from playa.document import Document, LayoutDict, schema as schema # noqa: F401
from playa.page import DeviceSpace
from playa._version import __version__ # noqa: F401

fieldnames = LayoutDict.__annotations__.keys()


def init_worker(path: Path, password: str = "", space: DeviceSpace = "screen") -> None:
playa.document.__pdf = open(path, password=password, space=space)


def open(
path: Union[PathLike, str], password: str = "", space: DeviceSpace = "screen"
path: Union[PathLike, str],
*,
password: str = "",
space: DeviceSpace = "screen",
max_workers: int = 1,
mp_context: Union[BaseContext, None] = None,
) -> Document:
"""Open a PDF document from a path on the filesystem."""
fp = builtins.open(path, "rb")
pdf = Document(fp, password=password, space=space)
pdf._fp = fp
if max_workers > 1:
pdf._pool = ProcessPoolExecutor(
max_workers=max_workers,
mp_context=mp_context,
initializer=init_worker, # type: ignore[arg-type]
initargs=(path, password, space), # type: ignore[arg-type]
)
return pdf
27 changes: 27 additions & 0 deletions playa/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import mmap
import re
import struct
import weakref
from concurrent.futures import Executor
from hashlib import md5, sha256, sha384, sha512
from typing import (
Any,
Expand Down Expand Up @@ -810,6 +812,7 @@ class Document:

_fp: Union[BinaryIO, None] = None
_pages: Union["PageList", None] = None
_pool: Union[Executor, None] = None

def __enter__(self) -> "Document":
return self
Expand All @@ -819,6 +822,9 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
if self._fp:
self._fp.close()
self._fp = None
if self._pool:
self._pool.shutdown()
self._pool = None

def __init__(
self,
Expand Down Expand Up @@ -1369,10 +1375,20 @@ def _read_xref_from(
self._read_xref_from(pos, xrefs)


__pdf: Union[Document, None] = None


def call_page(func: Callable[[Page], Any], idx: int) -> Any:
"""Call a function on a page in a worker process."""
assert __pdf is not None
return func(__pdf.pages[idx])


class PageList:
"""List of pages indexable by 0-based index or string label."""

def __init__(self, doc: Document):
self.doc = weakref.ref(doc)
try:
page_labels: Iterable[Optional[str]] = doc.page_labels
except (KeyError, ValueError):
Expand Down Expand Up @@ -1406,6 +1422,17 @@ def __getitem__(self, key: Union[int, str]) -> Page:
else:
return self._labels[key]

def map(self, func: Callable[[Page], Any]) -> Iterator:
doc = self.doc()
if doc is None:
raise RuntimeError("Document no longer exists")
if doc._pool is not None:
return doc._pool.map(
call_page, itertools.repeat(func), (page.page_idx for page in self)
)
else:
return (func(page) for page in self)


class PageLabels(NumberTree):
"""PageLabels from the document catalog.
Expand Down
Loading

0 comments on commit ee966dc

Please sign in to comment.