-
Notifications
You must be signed in to change notification settings - Fork 1
/
build.py
159 lines (136 loc) · 5.82 KB
/
build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from datetime import datetime, timezone
from itertools import filterfalse
import os
from os.path import join
import re
from dotenv import load_dotenv
from rdflib import DCTERMS, SKOS, XSD, Literal
from qlit.identifier import generate_identifier, validate_identifier
from qlit.simple import name_to_ref, ref_to_name
from qlit.thesaurus import Termset, Thesaurus
from qlit.skos import skos_validate_partial, skos_validate_graph, skos_complete_graph
from qlit.qlit import qlit_validate_partial
load_dotenv()
THESAURUSFILE = os.environ.get('THESAURUSFILE')
if not THESAURUSFILE:
raise EnvironmentError('Error: THESAURUSFILE missing from env')
INDIR = os.environ.get('INDIR')
if not INDIR:
raise EnvironmentError('Error: INDIR missing from env')
rdf_now = Literal(
datetime.now(timezone.utc).isoformat().split('.')[0],
datatype=XSD.dateTime)
P_TRACKED = [SKOS.altLabel, SKOS.broader, SKOS.broadMatch, SKOS.exactMatch, SKOS.narrower, SKOS.prefLabel, SKOS.related, SKOS.scopeNote]
def randomize_ids(thesaurus: Thesaurus):
"""Replace all non-randomized ids with new, randomized ids."""
uris = thesaurus.refs()
ids = [thesaurus.value(uri, DCTERMS.identifier) for uri in uris]
bad_ids = filterfalse(validate_identifier, ids)
for bad_id in bad_ids:
new_id = generate_identifier(ids)
print(f'New id {new_id} for {bad_id}')
replace_identifier(thesaurus, bad_id, new_id)
def replace_identifier(thesaurus: Thesaurus, old_id: str, new_id: str):
"""Replace all statements about the term `old_id` with statements about `new_id`"""
old_ref = name_to_ref(old_id)
new_ref = name_to_ref(new_id)
# Update outgoing statements
for p, o in thesaurus.predicate_objects(old_ref):
thesaurus.remove((old_ref, p, o))
thesaurus.add((new_ref, p, o))
# Update incoming relations
for s, p in thesaurus.subject_predicates(old_ref):
thesaurus.remove((s, p, old_ref))
thesaurus.add((s, p, new_ref))
# Update identifier literal
thesaurus.set((new_ref, DCTERMS.identifier, Literal(new_id)))
def check_changes(thesaurus: Thesaurus, thesaurus_prev: Thesaurus):
uris = thesaurus.refs()
uris_prev = thesaurus_prev.refs()
count_new = 0
count_changed = 0
count_removed = len(list(uri for uri in uris_prev if uri not in uris))
for term_uri in uris:
try:
if term_uri not in uris_prev:
# This term is a new addition.
thesaurus.set((term_uri, DCTERMS.issued, rdf_now))
thesaurus.set((term_uri, DCTERMS.modified, rdf_now))
count_new += 1
else:
# Are there any changes in the term?
changed_ps = []
for p in P_TRACKED:
a = sorted(thesaurus.objects(term_uri, p))
b = sorted(thesaurus_prev.objects(term_uri, p))
if a != b:
changed_ps.append(p)
if (changed_ps):
# The term has changes.
p_names = [re.sub(r'.*[/#]', '', p) for p in changed_ps]
print(f'Changes for {ref_to_name(term_uri)} in {", ".join(p_names)}')
thesaurus.set((term_uri, DCTERMS.modified, rdf_now))
count_changed += 1
else:
# No changes. Copy old dates.
thesaurus.set((term_uri, DCTERMS.issued, thesaurus_prev.value(term_uri, DCTERMS.issued)))
thesaurus.set((term_uri, DCTERMS.modified, thesaurus_prev.value(term_uri, DCTERMS.modified)))
except Exception as e:
# On any error, re-raise it after mentioning the faulty term uri
print(f'\nError when checking changes for {term_uri}:')
raise e
return count_changed, count_new, count_removed
def is_infile(fn):
return re.match(r'[^-_.]+\.ttl', fn, re.IGNORECASE)
if __name__ == '__main__':
# Load current state.
thesaurus = Thesaurus()
# Prepare parsing.
indirs = INDIR.split(":")
for indir in indirs:
print(f'Reading files from {indir}')
fns = [join(indir, fn) for indir in indirs for fn in os.listdir(indir) if is_infile(fn)]
print(f'Parsing {len(fns)} files...')
skipped = []
# Parse input files.
for fn in fns:
try:
with open(fn) as f:
data = f.read()
termset = Termset().parse(data=data)
for error in skos_validate_partial(termset):
raise SyntaxError(error)
for error in qlit_validate_partial(termset):
raise SyntaxError(error)
thesaurus += termset
except Exception as err:
# Report error and skip this input file.
print(f'{fn}: {type(err)} {err}')
skipped.append(fn)
# Check for thesaurus-wide errors.
for error in skos_validate_graph(thesaurus):
raise SyntaxError(error)
# Done parsing.
if skipped:
print(f'WARNING: Skipped {len(skipped)} files')
print(f'Parsed {len(fns) - len(skipped)} files')
# Randomize new ids
print('Creating new identifiers...')
randomize_ids(thesaurus)
# Complete relations
print('Completing relations...')
skos_complete_graph(thesaurus)
# Load another copy to track changes.
print('Checking changes...')
thesaurus_prev = Thesaurus().parse(THESAURUSFILE)
count_changed, count_new, count_removed = check_changes(thesaurus, thesaurus_prev)
print(f'{count_changed} changed, {count_new} new, {count_removed} removed')
# Write result.
terms = thesaurus.refs()
print(f'Writing {len(terms)} terms...')
thesaurus.base = None
nt_data = thesaurus.serialize(format='nt')
nt_lines = sorted(nt_data.splitlines(True))
with open(THESAURUSFILE, 'w') as f:
f.writelines(nt_lines)
print(f'Wrote {THESAURUSFILE}')