Skip to content

Commit

Permalink
Merge pull request #12 from utkdigitalinitiatives/collections
Browse files Browse the repository at this point in the history
Collections
  • Loading branch information
markpbaggett authored May 20, 2024
2 parents 62cfa69 + 3126a15 commit 8e0ca25
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 7 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ If you want to generate a sheet of checksums for files that failed to import, yo
exodus hash_errors --path /path/to/directory --output /path/to/sheet.csv
```
If you want to generate an import sheet for all collections, you can:
```shell
exodus generate_collection_metadata
```
If you want to generate an import sheet for a single collection, you can:
```shell
exodus generate_collection_metadata --collection "namespace:identifier"
```
## What's Missing Here Right Now

* The ability to create pcdm:Collection objects.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "utk-exodus"
version = "0.1.8"
version = "0.1.9"
description = "A tool for building import sheets from UTK legacy systems"
authors = ["Mark Baggett <[email protected]>"]
readme = "README.md"
Expand Down
3 changes: 3 additions & 0 deletions utk_exodus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from .combine import ImportRefactor
from .template import ImportTemplate
from .restrict import Restrictions, RestrictionsSheet
from .collection import CollectionMetadata, CollectionImporter

__all__ = [
"CollectionMetadata",
"CollectionImporter",
"FedoraObject",
"FileCurator",
"FileOrganizer",
Expand Down
3 changes: 3 additions & 0 deletions utk_exodus/collection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .collection import CollectionMetadata, CollectionImporter

__all__ = ["CollectionMetadata", "CollectionImporter"]
195 changes: 195 additions & 0 deletions utk_exodus/collection/collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
from lxml import etree
from io import BytesIO
import csv
from utk_exodus.fedora import FedoraObject
from utk_exodus.restrict import Restrictions
import os
from tqdm import tqdm


class CollectionMetadata:
"""Grabs All Metadata for a Collection Object in Fedora."""

def __init__(self, pid):
self.pid = pid
self.namespaces = {
"mods": "http://www.loc.gov/mods/v3",
"xlink": "http://www.w3.org/1999/xlink",
}
self.mods = self.get_metadata(pid)

def simplify_xpath(self, xpath):
return " | ".join(
[value.text for value in self.mods.xpath(xpath, namespaces=self.namespaces)]
)

def get_text_from_multiple_xpaths(self, xpaths):
all_matches = []
for xpath in xpaths:
all_matches.extend(
[
value.text
for value in self.mods.xpath(xpath, namespaces=self.namespaces)
]
)
return " | ".join(all_matches)

def grab_all_metadata(self):
return {
"source_identifier": self.pid,
"model": "Collection",
"parents": "",
"title": self.simplify_xpath("mods:titleInfo/mods:title"),
"abstract": self.simplify_xpath("mods:abstract"),
"contributor": "",
"utk_contributor": self.get_text_from_multiple_xpaths(
[
'mods:name[mods:role/mods:roleTerm[contains(.,"Contributor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Addressee")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Arranger")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Associated Name")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Autographer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Censor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Choreographer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Client")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Contractor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Copyright Holder")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Dedicatee")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Depicted")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Distributor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Donor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Editor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Editor of Compilation")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Former Owner")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Honoree")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Host Institution")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Instrumentalist")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Interviewer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Issuing Body")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Music Copyist")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Musical Director")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Organizer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Originator")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Owner")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Performer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Printer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Printer of Plates")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Producer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Production Company")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Publisher")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Restorationist")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Set Designer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Signer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Speaker")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Stage Director")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Stage Manager")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Standards Body")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Surveyor")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Translator")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Videographer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Witness")]]/mods:namePart',
]
),
"creator": "",
"utk_creator": self.get_text_from_multiple_xpaths(
[
'mods:name[mods:role/mods:roleTerm[contains(.,"Creator")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Architect")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Artist")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Attributed Name")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Author")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Binding Designer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Cartographer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Compiler")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Composer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Correspondent")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Costume Designer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Designer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Engraver")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Illustrator")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Interviewee")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Lithographer")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Lyricist")]]/mods:namePart',
'mods:name[mods:role/mods:roleTerm[contains(.,"Photographer")]]/mods:namePart',
]
),
"date_created": self.simplify_xpath(
"mods:originInfo/mods:dateCreated[not(@encoding)]"
),
"date_issued": self.simplify_xpath(
"mods:originInfo/mods:dateIssued[not(@encoding)]"
),
"date_created_d": self.simplify_xpath(
"mods:originInfo/mods:dateCreated[@encoding]"
),
"date_issued_d": self.simplify_xpath(
"mods:originInfo/mods:dateIssued[@encoding]"
),
"utk_publisher": self.simplify_xpath("mods:originInfo/mods:publisher"),
"publisher": "",
"publication_place": "",
"extent": self.simplify_xpath("mods:physicalDescription/mods:extent"),
"form": self.simplify_xpath("mods:physicalDescription/mods:form"),
"subject": "",
"keyword": self.simplify_xpath("mods:subject/mods:topic"),
"spatial": "",
"resource_type": "",
"note": self.simplify_xpath("mods:note"),
"repository": "",
"visibility": self.get_policy(self.pid),
}

@staticmethod
def get_metadata(pid):
fedora = FedoraObject(
auth=(
os.environ.get("FEDORA_USERNAME"),
os.environ.get("FEDORA_PASSWORD"),
),
fedora_uri=os.environ.get("FEDORA_URI"),
pid=f"{pid.replace('info:fedora/', '').strip()}",
)
r = fedora.streamDatastream("MODS")
# @Todo: What if MODS doesn't exist?
return etree.parse(BytesIO(r.content))

@staticmethod
def get_policy(pid):
fedora = FedoraObject(
auth=(
os.environ.get("FEDORA_USERNAME"),
os.environ.get("FEDORA_PASSWORD"),
),
fedora_uri=os.environ.get("FEDORA_URI"),
pid=f"{pid.replace('info:fedora/', '').strip()}",
)
r = fedora.streamDatastream("POLICY")
if r.status_code == 200:
with open("tmp/POLICY.xml", "wb") as f:
f.write(r.content)
restrictions = Restrictions("tmp/POLICY.xml").get()
if restrictions.get("work_restricted", "open"):
return "restricted"
else:
return "open"


class CollectionImporter:
def __init__(self, collections):
self.collections = collections
self.collection_metadata = self.__build_collections()
self.headers = [k for k, v in self.collection_metadata[0].items()]

def __build_collections(self):
return [
CollectionMetadata(collection).grab_all_metadata()
for collection in tqdm(self.collections)
]

def write_csv(self, filename):
with open(filename, "w", newline="") as bulkrax_sheet:
writer = csv.DictWriter(bulkrax_sheet, fieldnames=self.headers)
writer.writeheader()
for data in self.collection_metadata:
writer.writerow(data)
return
34 changes: 34 additions & 0 deletions utk_exodus/exodus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from utk_exodus.template import ImportTemplate
from utk_exodus.combine import ImportRefactor
from utk_exodus.checksum import HashSheet
from utk_exodus.collection import CollectionImporter
from utk_exodus.risearch import ResourceIndexSearch
import click
import requests

Expand Down Expand Up @@ -240,3 +242,35 @@ def hash_errors(
hs = HashSheet(path, output)
hs.write()
print(f"Hash sheet written to {output}.")


@cli.command(
"generate_collection_metadata",
help="Generate metadata for a collection.",
)
@click.option(
"--collection",
"-l",
required=False,
help="Specify the collection you want to download metadata for.",
)
@click.option(
"--output",
"-o",
required=False,
default="tmp/collections.csv",
help="Specify where to write output.",
)
def generate_collection_metadata(
collection: str,
output: str,
) -> None:
if collection:
print(f"Generating metadata for {collection}.")
x = CollectionImporter([collection])
else:
print("Generating metadata for all collections.")
collections = ResourceIndexSearch().find_all_collections()
x = CollectionImporter(collections)
x.write_csv(output)
print("Done. Metadata written to tmp/all_collections.csv.")
9 changes: 9 additions & 0 deletions utk_exodus/fedora/fedora.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,12 @@ def getDatastream(self, dsid, output):
else:
print(f"{r.status_code} on {self.pid}.")
return

def streamDatastream(self, dsid):
r = requests.get(
f"{self.fedora_uri}/objects/{self.pid}/datastreams/{dsid}/content",
auth=self.auth,
allow_redirects=True,
stream=True,
)
return r
3 changes: 2 additions & 1 deletion utk_exodus/metadata/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .base import BaseProperty, StandardProperty, XMLtoDictProperty
__all__ = ['BaseProperty', 'StandardProperty', 'XMLtoDictProperty']

__all__ = ["BaseProperty", "StandardProperty", "XMLtoDictProperty"]
57 changes: 52 additions & 5 deletions utk_exodus/risearch/risearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ def get_works_based_on_type_and_collection(self, work_type, collection):
f"}}"
)
results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8")
return [result for result in results.split("\n") if result != "" and result != '"pid"']
return [
result
for result in results.split("\n")
if result != "" and result != '"pid"'
]

def get_policies_for_pages_in_book(self, book):
query = quote(
Expand All @@ -203,7 +207,11 @@ def get_policies_for_pages_in_book(self, book):
f"FILTER(REGEX(STR(?o), 'POLICY')).}}"
)
results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8")
return [result for result in results.split("\n") if result != "" and result != '"pid"']
return [
result
for result in results.split("\n")
if result != "" and result != '"pid"'
]

def get_policies_based_on_type_and_collection(self, work_type, collection):
iri = self.__lookup_work_type(work_type).strip()
Expand All @@ -219,10 +227,18 @@ def get_policies_based_on_type_and_collection(self, work_type, collection):
)
results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8")
if work_type != "book":
return [result for result in results.split("\n") if result != "" and result != '"pid"']
return [
result
for result in results.split("\n")
if result != "" and result != '"pid"'
]
else:
all_policies_from_book = []
books = [result for result in results.split("\n") if result != "" and result != '"pid"']
books = [
result
for result in results.split("\n")
if result != "" and result != '"pid"'
]
for book in books:
all_policies_from_book.append(book)
all_policies_from_book.extend(self.get_policies_for_pages_in_book(book))
Expand All @@ -246,8 +262,39 @@ def get_page_number(self, pid):
results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8")
return results.split("\n")[1]

def find_all_collections(self):
ignore = (
"info:fedora/islandora:root",
"info:fedora/islandora:sp_large_image_collection",
"info:fedora/islandora:sp_basic_image_collection",
"info:fedora/islandora:manuscriptCollection",
"info:fedora/islandora:compound_collection",
"info:fedora/islandora:transformCollection",
"info:fedora/islandora:bookCollection",
"info:fedora/islandora:binary_object_collection",
"info:fedora/islandora:audio_collection",
"info:fedora/islandora:sp_pdf_collection",
"info:fedora/islandora:video_collection",
"info:fedora/digital:collections",
"info:fedora/ir:citationCollection",
"info:fedora/islandora:oralhistories_collection",
"info:fedora/collections:test",
"info:fedora/collections:rftatest",
)
query = quote(
"SELECT ?collection WHERE { ?collection <info:fedora/fedora-system:def/model#hasModel> <info:fedora/islandora:collectionCModel> . }"
)
results = requests.get(f"{self.base_url}&query={query}").content.decode("utf-8")
return [
result.replace("info:fedora/", "")
for result in results.split("\n")
if result != "" and result not in ignore and result != '"collection"'
]


if __name__ == "__main__":
risearch = ResourceIndexSearch()
x = risearch.get_policies_based_on_type_and_collection("book", "collections:galston")
x = risearch.get_policies_based_on_type_and_collection(
"book", "collections:galston"
)
print(x)

0 comments on commit 8e0ca25

Please sign in to comment.