-
Notifications
You must be signed in to change notification settings - Fork 0
/
harvest_metadata_multiprocessing.py
218 lines (176 loc) · 8.62 KB
/
harvest_metadata_multiprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
""" Script for harvesting metadata in parallell
Inspired by:
- harvest-metadata from https://github.com/steingod/mdharvest/tree/master/src
- code from http://lightonphiri.org/blog/metadata-harvesting-via-oai-pmh-using-python
AUTOR: Trygve Halsne, 25.01.2017
USAGE:
- input must have metadataPrefix?
COMMENTS:
- Implement it object oriented by means of classes
- Implement hprotocol: OGC-CSW, OpenSearch, ISO 19115
- Does OGC-CSW metadata have some kind of resumptionToken analog?
"""
# List all recordsets: http://arcticdata.met.no/metamod/oai?verb=ListRecords&set=nmdc&metadataPrefix=dif
# List identifier: http://arcticdata.met.no/metamod/oai?verb=GetRecord&identifier=urn:x-wmo:md:no.met.arcticdata.test3::ADC_svim-oha-monthly&metadataPrefix=dif
# Recordset with resumptionToken: http://union.ndltd.org/OAI-PMH/?verb=ListRecords&metadataPrefix=oai_dc
# Recordset with DIF elements and resumptionToken (Slow server..): http://ws.pangaea.de/oai/provider?verb=ListRecords&metadataPrefix=dif
# Recordset with DIF elements and resumptionToken: https://esg.prototype.ucar.edu/oai/repository.htm?verb=ListRecords&metadataPrefix=dif
# Recordset with gcmd(DIF) elements: http://oai.nerc-bas.ac.uk:8080/oai/provider?verb=ListRecords&metadataPrefix=gcmd
# OGC-CSW recordset: http://metadata.bgs.ac.uk/geonetwork/srv/en/csw?SERVICE=CSW&VERSION=2.0.2&request=GetRecords&constraintLanguage=CQL_TEXT&typeNames=csw:Record&resultType=results&outputSchema=http://www.isotc211.org/2005/gmd
import urllib2 as ul2
import urllib as ul
from xml.dom.minidom import parseString
import codecs
import sys
from datetime import datetime
import multiprocessing as mp
import os
class MetadataHarvester(mp.Process):
def __init__(self, baseURL, records, outputDir, hProtocol): # add outputname also
""" set variables in class """
# First call
mp.Process.__init__(self)
self.baseURL = baseURL
self.records = records
self.outputDir = outputDir
self.hProtocol = hProtocol
def run(self):
sys.stdout.write('[%s] running ... process id: %s\n'
% (self.baseURL, os.getpid()))
self.harvest()
def harvest(self):
""" Inititates harvester. Chooses strategy depending on
harvesting protocol
"""
baseURL, records, hProtocol = self.baseURL, self.records, self.hProtocol
if hProtocol == 'OAI-PMH':
# Could/should be more sophistiated by means of deciding url properties
getRecordsURL = str(baseURL + records)
print "Harvesting metadata from: \n\tURL: %s \n\tprotocol: %s \n" % (getRecordsURL,hProtocol)
start_time = datetime.now()
# Initial phase
resumptionToken = self.oaipmh_resumptionToken(getRecordsURL)
dom = self.harvestContent(getRecordsURL)
if dom != None:
self.oaipmh_writeDIFtoFile(dom)
pageCounter = 1
while resumptionToken != []:
print "\n"
print "Handeling resumptionToken: %.0f \n" % pageCounter
resumptionToken = ul.urlencode({'resumptionToken':resumptionToken}) # create resumptionToken URL parameter
getRecordsURLLoop = str(baseURL+'?verb=ListRecords&'+resumptionToken)
dom = self.harvestContent(getRecordsURLLoop)
if dom != None:
self.oaipmh_writeDIFtoFile(dom)
else:
print "dom = " + str(dom) + ', for page ' + str(pageCounter)
resumptionToken = self.oaipmh_resumptionToken(getRecordsURLLoop)
pageCounter += 1
print "\n\nHarvesting took: %s [h:mm:ss]" % str(datetime.now()-start_time)
elif hProtocol == 'OGC-CSW':
getRecordsURL = str(baseURL + records)
print "Harvesting metadata from: \n\tURL: %s \n\tprotocol: %s \n" % (getRecordsURL,hProtocol)
start_time = datetime.now()
dom = self.harvestContent(getRecordsURL)
if dom != None:
self.ogccsw_writeCSWISOtoFile(dom)
print "\n\nHarvesting took: %s [h:mm:ss]" % str(datetime.now()-start_time)
else:
print 'Protocol %s is not accepted.' % hProtocol
exit()
def ogccsw_writeCSWISOtoFile(self,dom):
""" Write CSW-ISO elements in dom to file """
print("Writing CSW ISO metadata elements to disk... ")
mD_metadata_elements = dom.getElementsByTagName('gmd:MD_Metadata')
mDsize = mD_metadata_elements.length
size_idInfo = dom.getElementsByTagName('gmd:identificationInfo').length
counter = 1
if mDsize>0:
for md_element in mD_metadata_elements:
# Check if element contains valid metadata
idInfo = md_element.getElementsByTagName('gmd:identificationInfo')
if idInfo !=[]:
sys.stdout.write('\tWriting CSW-ISO elements %.f / %d \r' %(counter,size_idInfo))
sys.stdout.flush()
counter += 1
def oaipmh_writeDIFtoFile(self,dom):
""" Write DIF elements in dom to file """
print "Writing DIF elements to disk... "
record_elements = dom.getElementsByTagName('record')
size_dif = dom.getElementsByTagName('DIF').length
if size_dif != 0:
counter = 1
for record in record_elements:
for child in record.childNodes:
if str(child.nodeName) == 'header':
has_attrib = child.hasAttributes()
for gchild in child.childNodes:
if gchild.nodeName == 'identifier':
id_text = gchild.childNodes[0].nodeValue
break;
if not has_attrib:
sys.stdout.write('\tWriting DIF elements %.f / %d \r' %(counter,size_dif))
sys.stdout.flush()
dif = record.getElementsByTagName('DIF')[0]
#tmp_fname ='dif_test_' + str(id_text) + '.xml'
tmp_fname ='dif_test_' + str(counter) + '.xml'
output = codecs.open(tmp_fname ,'w','utf-8')
dif.writexml(output)
output.close()
counter += 1
# Temporary break
if counter == 3:
break;
else:
print "\trecords did not contain DIF elements"
def harvestContent(self,URL):
try:
file = ul2.urlopen(URL,timeout=40)
data = file.read()
file.close()
return parseString(data)
except ul2.HTTPError:
print("There was an error with the URL request. " +
"Could not open or parse content from: \t\n %s" % URL)
def oaipmh_resumptionToken(self,URL):
try:
file = ul2.urlopen(URL, timeout=40)
data = file.read()
file.close()
dom = parseString(data)
if dom.getElementsByTagName('resumptionToken').length == 0:
return dom.getElementsByTagName('resumptionToken')
else:
if dom.getElementsByTagName('resumptionToken')[0].firstChild != None:
return dom.getElementsByTagName('resumptionToken')[0].firstChild.nodeValue
else:
return []
except ul2.HTTPError:
print "There was an error with the URL request"
#baseURL = 'https://esg.prototype.ucar.edu/oai/repository.htm'
#records = '?verb=ListRecords&metadataPrefix=dif'
def main():
harvester_list = []
baseURL = 'http://oai.nerc-bas.ac.uk:8080/oai/provider'
records='?verb=ListRecords&metadataPrefix=gcmd'
outputDir = 'tmp'
hProtocol = 'OAI-PMH'
mh = MetadataHarvester(baseURL,records, outputDir, hProtocol)
#mh.harvest()
baseURL = 'http://metadata.bgs.ac.uk/geonetwork/srv/en/csw'
records = '?SERVICE=CSW&VERSION=2.0.2&request=GetRecords&constraintLanguage=CQL_TEXT&typeNames=csw:Record&resultType=results&outputSchema=http://www.isotc211.org/2005/gmd'
outputDir = 'tmp'
hProtocol = 'OGC-CSW'
mh2 = MetadataHarvester(baseURL,records, outputDir, hProtocol)
#mh2.harvest()
harvester_list.append(mh)
harvester_list.append(mh2)
for item in harvester_list:
item.start()
print item
if __name__ == '__main__':
main()
#baseURL = 'http://dalspace.library.dal.ca:8080/oai/request'
#arguments = '?verb=ListRecords&metadataPrefix=oai_dc'
#baseURL = 'http://union.ndltd.org/OAI-PMH/'
#getRecordsURL = str(baseURL+'?verb=ListRecords&metadataPrefix=oai_dc')