Skip to content

Commit

Permalink
Merge pull request #73 from pmeulen/add_load_error_options
Browse files Browse the repository at this point in the history
Add fail_on_error and filter_invalid options to load command
  • Loading branch information
leifj committed Nov 17, 2015
2 parents 6b51980 + cc823ef commit 47b025c
Show file tree
Hide file tree
Showing 5 changed files with 663 additions and 20 deletions.
47 changes: 40 additions & 7 deletions src/pyff/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,18 +389,46 @@ def load(req, *opts):
"""
General-purpose resource fetcher.
:param opts:
:param req: The request
:param opts: Options: [qsize <5>] [timeout <30>] [validate <True*|False>]
:param opts: Options: See "Options" below
:return: None
Supports both remote and local resources. Fetching remote resources is done in parallell using threads.
Supports both remote and local resources. Fetching remote resources is done in parallel using threads.
Note: When downloading remote files over HTTPS the TLS server certificate is not validated.
Note: Default behaviour is to ignore metadata files or entities in MD files that cannot be loaded
Options are put directly after "load". E.g:
.. code-block:: yaml
- load fail_on_error True filter_invalid False:
- http://example.com/some_remote_metadata.xml
- local_file.xml
- /opt/directory_containing_md_files/
**Options**
Defaults are marked with (*)
- max_workers <5> : Number of parallel threads to use for loading MD files
- timeout <120> : Socket timeout when downloading files
- validate <True*|False> : When true downloaded metadata files are validated (schema validation)
- fail_on_error <True|False*> : Control whether an error during download, parsing or (optional)validatation of a MD file
does not abort processing of the pipeline. When true a failure aborts and causes pyff
to exit with a non zero exit code. Otherwise errors are logged but ignored.
- filter_invalid <True*|False> : Controls validation behaviour. When true Entities that fail validation are filtered
I.e. are not loaded. When false the entire metadata file is either loaded, or not.
fail_on_error controls whether failure to validating the entire MD file will abort
processing of the pipeline.
"""
opts = dict(zip(opts[::2], opts[1::2]))
opts.setdefault('timeout', 120)
opts.setdefault('max_workers', 5)
opts.setdefault('validate', "True")
opts.setdefault('fail_on_error', "False")
opts.setdefault('filter_invalid', "True")
opts['validate'] = bool(strtobool(opts['validate']))
opts['fail_on_error'] = bool(strtobool(opts['fail_on_error']))
opts['filter_invalid'] = bool(strtobool(opts['filter_invalid']))

remote = []
for x in req.args:
Expand Down Expand Up @@ -438,15 +466,20 @@ def load(req, *opts):
elif os.path.exists(url):
if os.path.isdir(url):
log.debug("directory %s verify %s as %s via %s" % (url, params['verify'], params['as'], params['via']))
req.md.load_dir(url, url=params['as'], validate=opts['validate'], post=post)
req.md.load_dir(url, url=params['as'], validate=opts['validate'], post=post, fail_on_error=opts['fail_on_error'], filter_invalid=opts['filter_invalid'])
elif os.path.isfile(url):
log.debug("file %s verify %s as %s via %s" % (url, params['verify'], params['as'], params['via']))
remote.append(("file://%s" % url, params['verify'], params['as'], post))
else:
log.error("Unknown file type for load: '%s'" % url)
error="Unknown file type for load: '%s'" % url
if opts['fail_on_error']:
raise PipeException(error)
log.error(error)
else:
log.error("Don't know how to load '%s' as %s verify %s via %s" %
(url, params['as'], params['verify'], params['via']))
error="Don't know how to load '%s' as %s verify %s via %s (file does not exist?)" % (url, params['as'], params['verify'], params['via'])
if opts['fail_on_error']:
raise PipeException(error)
log.error(error)

req.md.fetch_metadata(remote, **opts)

Expand Down
38 changes: 27 additions & 11 deletions src/pyff/mdrepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def expiration(self, t):

return None

def fetch_metadata(self, resources, max_workers=5, timeout=120, max_tries=5, validate=False):
def fetch_metadata(self, resources, max_workers=5, timeout=120, max_tries=5, validate=False, fail_on_error=False, filter_invalid=True):
"""Fetch a series of metadata URLs and optionally verify signatures.
:param resources: A list of triples (url,cert-or-fingerprint,id, post-callback)
Expand All @@ -522,15 +522,22 @@ def fetch_metadata(self, resources, max_workers=5, timeout=120, max_tries=5, val
max_workers=max_workers,
timeout=timeout,
max_tries=max_tries,
validate=validate)
validate=validate,
fail_on_error=fail_on_error,
filter_invalid=filter_invalid )

def _fetch_metadata(self, resources, max_workers=5, timeout=120, max_tries=5, validate=False):
def _fetch_metadata(self, resources, max_workers=5, timeout=120, max_tries=5, validate=False, fail_on_error=False, filter_invalid=True):
tries = dict()

def _process_url(rurl, verifier, tid, post, enable_cache=True):
tries.setdefault(rurl, 0)

resource = load_url(rurl, timeout=timeout, enable_cache=enable_cache)
try:
resource = load_url(rurl, timeout=timeout, enable_cache=enable_cache)
except Exception, ex:
raise MetadataException(ex, "Exception fetching '%s': %s" % (rurl, str(ex)) )
if (not resource.result):
raise MetadataException("error fetching '%s'" % rurl)
xml = resource.result.strip()
retry_resources = []
info = {
Expand Down Expand Up @@ -564,6 +571,8 @@ def _process_url(rurl, verifier, tid, post, enable_cache=True):
t, offset = self.parse_metadata(StringIO(xml),
key=verifier,
base_url=rurl,
fail_on_error=fail_on_error,
filter_invalid=filter_invalid,
validate=validate,
validation_errors=info['Validation Errors'],
expiration=self.expiration,
Expand Down Expand Up @@ -628,7 +637,11 @@ def _process_url(rurl, verifier, tid, post, enable_cache=True):
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
log.error('fetching %r generated an exception: %s' % (url, future.exception()))
if fail_on_error:
log.error('fetching %r generated an exception' % url)
raise future.exception()
else:
log.error('fetching %r generated an exception: %s' % (url, future.exception()))
else:
next_resources.extend(future.result())
resources = next_resources
Expand Down Expand Up @@ -711,8 +724,8 @@ def parse_metadata(self,
try:
validate_document(t)
except DocumentInvalid, ex:
raise MetadataException("schema validation failed: '%s': %s" %
(base_url, xml_error(ex.error_log, m=base_url)))
raise MetadataException("schema validation failed: [%s] '%s': %s" %
(base_url, source, xml_error(ex.error_log, m=base_url)))

if t is not None:
if t.tag == "{%s}EntityDescriptor" % NS['md']:
Expand All @@ -722,18 +735,18 @@ def parse_metadata(self,
t = post(t)

except Exception, ex:
traceback.print_exc(ex)
log.error(ex)
if fail_on_error:
raise ex
traceback.print_exc(ex)
log.error(ex)
return None, None

if log.isDebugEnabled():
log.debug("returning %d valid entities" % len(list(iter_entities(t))))

return t, valid_until

def load_dir(self, directory, ext=".xml", url=None, validate=False, post=None, description=None):
def load_dir(self, directory, ext=".xml", url=None, validate=False, post=None, description=None, fail_on_error=True, filter_invalid=True):
"""
:param directory: A directory to walk.
:param ext: Include files with this extension (default .xml)
Expand Down Expand Up @@ -761,14 +774,17 @@ def load_dir(self, directory, ext=".xml", url=None, validate=False, post=None, d
validation_errors = dict()
t, valid_until = self.parse_metadata(fn,
base_url=url,
fail_on_error=True,
fail_on_error=fail_on_error,
filter_invalid=filter_invalid,
validate=validate,
validation_errors=validation_errors,
post=post)
entities.extend(entities_list(t)) # local metadata is assumed to be ok
for (eid, error) in validation_errors.iteritems():
log.error(error)
except Exception, ex:
if fail_on_error:
raise MetadataException('Error parsing "%s": %s' % (fn, str(ex)))
log.error(ex)

if entities:
Expand Down
Loading

0 comments on commit 47b025c

Please sign in to comment.