Skip to content

Commit

Permalink
Merge pull request #68 from nansencenter/issue66-local-harvester
Browse files Browse the repository at this point in the history
Issue66 local harvester
  • Loading branch information
opsdep authored Feb 15, 2021
2 parents 6596e71 + 5d812c7 commit b810d12
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 166 deletions.
51 changes: 21 additions & 30 deletions geospaas_harvesting/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ class WebDirectoryCrawler(Crawler):
f'^.*/{YEAR_PATTERN}/{MONTH_PATTERN}/{DAY_OF_MONTH_PATTERN}/.*$')
DAY_OF_YEAR_MATCHER = re.compile(f'^.*/{YEAR_PATTERN}/{DAY_OF_YEAR_PATTERN}(/.*)?$')

def __init__(self, root_url, time_range=(None, None), excludes=None):
def __init__(self, root_url, time_range=(None, None), include=None):
"""
`root_url` is the URL of the data repository to explore.
`time_range` is a 2-tuple of datetime.datetime objects defining the time range
of the datasets returned by the crawler.
`excludes` is the list of string that are the associated url is ignored during
the harvesting process if these strings are found in the crawled url.
`include` is a regular expression string used to filter the crawler's output.
Only URLs matching it are returned.
"""
self.root_url = urlparse(root_url)
self.time_range = time_range
self.excludes = (self.EXCLUDE or []) + (excludes or [])
self.include = re.compile(include) if include else None
self.set_initial_state()

@property
Expand Down Expand Up @@ -209,10 +209,6 @@ def _is_folder(self, path):
"""Returns True if path points to a folder"""
raise NotImplementedError("_is_folder is abstract in WebDirectoryCrawler")

def _is_file(self, path):
"""Returns True if path points to a file"""
raise NotImplementedError("_is_file is abstract in WebDirectoryCrawler")

def _add_url_to_return(self, path):
"""
Add a URL to the list of URLs returned by the crawler after
Expand All @@ -233,15 +229,20 @@ def _add_folder_to_process(self, path):
self._to_process.append(path)

def _process_folder(self, folder_path):
"""Get the contents of a folder and feed the _urls and _to_process attributes"""
"""
Get the contents of a folder and feed the _urls (based on includes) and _to_process
attributes
"""
self.LOGGER.info("Looking for resources in '%s'...", folder_path)
for path in self._list_folder_contents(folder_path):
# Select paths which do not contain any of the self.excludes strings
if all(excluded_string not in path for excluded_string in self.excludes):
if self._is_folder(path):
self._add_folder_to_process(path)
elif self._is_file(path):
self._add_url_to_return(path)
# deselect paths which contains any of the excludes strings
if self.EXCLUDE and self.EXCLUDE.search(path):
continue
if self._is_folder(path):
self._add_folder_to_process(path)
# select paths which are matched based on input config file
if self.include and self.include.search(path):
self._add_url_to_return(path)

def get_download_url(self, resource_url):
"""
Expand All @@ -264,9 +265,6 @@ def _list_folder_contents(self, folder_path):
def _is_folder(self, path):
return os.path.isdir(path)

def _is_file(self, path):
return os.path.isfile(path)


class HTMLDirectoryCrawler(WebDirectoryCrawler):
"""Implementation of WebDirectoryCrawler for repositories exposed as HTML pages."""
Expand All @@ -285,9 +283,6 @@ def _strip_folder_page(folder_path):
def _is_folder(self, path):
return path.endswith(self.FOLDERS_SUFFIXES)

def _is_file(self, path):
return path.endswith(self.FILES_SUFFIXES)

@classmethod
def _get_links(cls, html):
"""Returns the list of links contained in an HTML page, passed as a string"""
Expand Down Expand Up @@ -325,7 +320,7 @@ class OpenDAPCrawler(HTMLDirectoryCrawler):
LOGGER = logging.getLogger(__name__ + '.OpenDAPCrawler')
FOLDERS_SUFFIXES = ('/contents.html',)
FILES_SUFFIXES = ('.nc', '.nc.gz')
EXCLUDE = ['?']
EXCLUDE = re.compile(r'\?')


class ThreddsCrawler(HTMLDirectoryCrawler):
Expand All @@ -335,7 +330,7 @@ class ThreddsCrawler(HTMLDirectoryCrawler):
LOGGER = logging.getLogger(__name__ + '.ThreddsCrawler')
FOLDERS_SUFFIXES = ('/catalog.html',)
FILES_SUFFIXES = ('.nc',)
EXCLUDE = ['/thredds/catalog.html']
EXCLUDE = re.compile(r'/thredds/catalog.html$')

def get_download_url(self, resource_url):
result = None
Expand All @@ -354,18 +349,17 @@ class FTPCrawler(WebDirectoryCrawler):
"""
LOGGER = logging.getLogger(__name__ + '.FTPCrawler')

def __init__(self, root_url, time_range=(None, None), excludes=None,
username='anonymous', password='anonymous', files_suffixes=''):
def __init__(self, root_url, time_range=(None, None), include=None,
username='anonymous', password='anonymous'):

if not root_url.startswith('ftp://'):
raise ValueError("The root url must start with 'ftp://'")

self.username = username
self.password = password
self.files_suffixes = files_suffixes
self.ftp = None

super().__init__(root_url, time_range, excludes)
super().__init__(root_url, time_range, include)

def set_initial_state(self):
"""
Expand Down Expand Up @@ -433,9 +427,6 @@ def _is_folder(self, path):
else:
return True

def _is_file(self, path):
return path.endswith(self.files_suffixes)


class HTTPPaginatedAPICrawler(Crawler):
"""Base class for crawlers used on repositories exposing a paginated API over HTTP"""
Expand Down
96 changes: 57 additions & 39 deletions geospaas_harvesting/harvest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,61 @@
# dump_on_interruption: False
# poll_interval: 600
# harvesters:
# OSISAF:
# class: 'OSISAFHarvester'
# max_fetcher_threads: 30
# # We exclude "EASE-Grid map projections" and "southern hemispheres" from harvesting process
# excludes: ['ease', '_sh_polstere',]
# max_db_threads: 1
# urls:
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/amsr2_conc/catalog.html'
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/conc/catalog.html'
# podaac:
# class: 'PODAACHarvester'
# max_fetcher_threads: 30
# max_db_threads: 1
# urls:
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_NPP/NAVO/v1/2014/005/contents.html'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_N20/'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_NPP/'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/MODIS_A/'
# copernicus_sentinel:
# class: 'CopernicusSentinelHarvester'
# max_fetcher_threads: 30
# max_db_threads: 1
# url: 'https://scihub.copernicus.eu/apihub/search'
# search_terms:
# - 'platformname:Sentinel-1 AND NOT L0'
# - 'platformname:Sentinel-2 AND NOT L0'
# - 'platformname:Sentinel-3 AND NOT L0'
# username: 'username'
# # Environment variable name
# password: !ENV 'COPERNICUS_OPEN_HUB_PASSWORD'
# FTP_jaxa:
# class: 'FTPHarvester'
# max_fetcher_threads: 30
# max_db_threads: 1
# username: username
# password: !ENV 'JAXA_PASSWORD'
# fileformat: '.h5'
# urls:
# - 'ftp://ftp.gportal.jaxa.jp/standard/GCOM-W/GCOM-W.AMSR2/L3.SST_25/3/2012/07/'
# radarsat_local:
# class: 'LOCALHarvester'
# include: 'RS2_\w+(?!.)'
# max_fetcher_threads: 1
# max_db_threads: 1
# paths:
# - "/src/sample/test_multi_nansat"
# FTP_jaxa:
# class: 'FTPHarvester'
# max_fetcher_threads: 1
# max_db_threads: 1
# username: username
# password: !ENV 'JAXA_PASSWORD'
# include: '\.h5$'
# urls:
# - 'ftp://ftp.gportal.jaxa.jp/standard/GCOM-W/GCOM-W.AMSR2/L3.SST_25/3/2012/07/'
# OSISAF:
# class: 'OSISAFHarvester'
# max_fetcher_threads: 1
# # We include "_nh_polstere" in order to only harvest the northen-hemisphere data
# include: '_nh_polstere'
# max_db_threads: 1
# #time_range:
# # - !ENV HARVESTING_START_TIME
# # - !ENV HARVESTING_END_TIME
# urls:
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/amsr2_conc/catalog.html'
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/conc/catalog.html'
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/type/catalog.html'
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/drift_mr/catalog.html'
# - 'https://thredds.met.no/thredds/catalog/osisaf/met.no/ice/drift_lr/merged/catalog.html'
#
# podaac:
# class: 'PODAACHarvester'
# max_fetcher_threads: 1
# max_db_threads: 1
# include: '\.nc$|\.h5$'
# urls:
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_NPP/NAVO/v1/2014/005/contents.html'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_N20/'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/VIIRS_NPP/'
# - 'https://opendap.jpl.nasa.gov/opendap/allData/ghrsst/data/GDS2/L2P/MODIS_A/'
# copernicus_sentinel:
# class: 'CopernicusSentinelHarvester'
# max_fetcher_threads: 30
# max_db_threads: 1
# include: '.*'
# #time_range:
# # - !ENV HARVESTING_START_TIME
# # - !ENV HARVESTING_END_TIME
# url: 'https://scihub.copernicus.eu/apihub/search'
# search_terms:
# - 'platformname:Sentinel-1 AND NOT L0'
# - 'platformname:Sentinel-2 AND NOT L0'
# - 'platformname:Sentinel-3 AND NOT L0'
# username: "username"
# password: !ENV COPERNICUS_OPEN_HUB_PASSWORD
...
25 changes: 19 additions & 6 deletions geospaas_harvesting/harvesters.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ class for harvesting online data sources that rely on webpages (and most of the

def __init__(self, **config):
super().__init__(**config)
if 'excludes' in config:
if not isinstance(config['excludes'], list):
if 'include' in config:
if not isinstance(config['include'], str):
raise HarvesterConfigurationError(
"'excludes' field must be fed with a python list of excluded names ")
"The 'include' field must be fed with a regex matching URLs to include")

def _create_crawlers(self):
if self.crawler is None:
Expand All @@ -107,7 +107,7 @@ def _create_crawlers(self):
try:
return [
self.crawler(url, time_range=(self.get_time_range()),
excludes=self.config.get('excludes', None))
include=self.config.get('include', None))
for url in self.config['urls']
]
except TypeError as error:
Expand Down Expand Up @@ -150,9 +150,8 @@ def _create_crawlers(self):
root_url=url,
username=self.config.get('username', None),
password=self.config.get('password'),
files_suffixes=self.config.get('fileformat', None),
time_range=(self.get_time_range()),
excludes=self.config.get('excludes', None)
include=self.config.get('include', None)
)
for url in self.config['urls']
]
Expand Down Expand Up @@ -200,3 +199,17 @@ def _create_ingester(self):
if parameter_name in self.config:
parameters[parameter_name] = self.config[parameter_name]
return ingesters.CreodiasEOFinderIngester(**parameters)


class LOCALHarvester(WebDirectoryHarvester):
""" Harvester class for some specific local files """
def _create_crawlers(self):
return [
crawlers.LocalDirectoryCrawler(
url,
include = self.config.get('include', None),
time_range = self.get_time_range()
)
for url in self.config['paths']
]
ingester = ingesters.NansatIngester
49 changes: 33 additions & 16 deletions geospaas_harvesting/ingesters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uuid
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
from dateutil.tz import tzutc
import dateutil.parser
import django.db
import django.db.utils
Expand All @@ -28,7 +29,7 @@
ISOTopicCategory, Location, Parameter, Platform)
from nansat import Nansat
from metanorm.handlers import GeospatialMetadataHandler

from metanorm.utils import get_cf_or_wkv_standard_name
logging.getLogger(__name__).addHandler(logging.NullHandler())


Expand Down Expand Up @@ -520,29 +521,30 @@ def _get_normalized_attributes(self, dataset_info, *args, **kwargs):
normalized_attributes = {}
n_points = int(kwargs.get('n_points', 10))
nansat_options = kwargs.get('nansat_options', {})
url_scheme = urlparse(dataset_info).scheme
if not 'http' in url_scheme and not 'ftp' in url_scheme:
normalized_attributes['geospaas_service_name'] = FILE_SERVICE_NAME
normalized_attributes['geospaas_service'] = LOCAL_FILE_SERVICE
elif 'http' in url_scheme and not 'ftp' in url_scheme:
normalized_attributes['geospaas_service_name'] = DAP_SERVICE_NAME
normalized_attributes['geospaas_service'] = OPENDAP_SERVICE
elif 'ftp' in url_scheme:
raise ValueError("LOCALHarvester (which uses NansatIngester) is only for local file"
" addresses or http addresses, not for ftp protocol")

# Open file with Nansat
nansat_object = Nansat(nansat_filename(dataset_info), **nansat_options)

# get metadata from Nansat and get objects from vocabularies
n_metadata = nansat_object.get_metadata()

# set service info attributes
url_scheme = urlparse(dataset_info).scheme
if 'http' in url_scheme:
normalized_attributes['geospaas_service_name'] = DAP_SERVICE_NAME
normalized_attributes['geospaas_service'] = OPENDAP_SERVICE
else:
normalized_attributes['geospaas_service_name'] = FILE_SERVICE_NAME
normalized_attributes['geospaas_service'] = LOCAL_FILE_SERVICE

# set compulsory metadata (source)
normalized_attributes['entry_title'] = n_metadata.get('entry_title', 'NONE')
normalized_attributes['summary'] = n_metadata.get('summary', 'NONE')
normalized_attributes['time_coverage_start'] = dateutil.parser.parse(
n_metadata['time_coverage_start'])
n_metadata['time_coverage_start']).replace(tzinfo=tzutc())
normalized_attributes['time_coverage_end'] = dateutil.parser.parse(
n_metadata['time_coverage_end'])
n_metadata['time_coverage_end']).replace(tzinfo=tzutc())
normalized_attributes['platform'] = json.loads(n_metadata['platform'])
normalized_attributes['instrument'] = json.loads(n_metadata['instrument'])
normalized_attributes['specs'] = n_metadata.get('specs', '')
Expand All @@ -551,15 +553,30 @@ def _get_normalized_attributes(self, dataset_info, *args, **kwargs):
# set optional ForeignKey metadata from Nansat or from defaults
normalized_attributes['gcmd_location'] = n_metadata.get(
'gcmd_location', pti.get_gcmd_location('SEA SURFACE'))
normalized_attributes['provider'] = n_metadata.get(
'data_center', pti.get_gcmd_provider('NERSC'))
normalized_attributes['provider'] = pti.get_gcmd_provider(
n_metadata.get('provider', 'NERSC'))
normalized_attributes['iso_topic_category'] = n_metadata.get(
'ISO_topic_category', pti.get_iso19115_topic_category('Oceans'))

# Find coverage to set number of points in the geolocation
if len(nansat_object.vrt.dataset.GetGCPs()) > 0:
if nansat_object.vrt.dataset.GetGCPs():
nansat_object.reproject_gcps()
normalized_attributes['location_geometry'] = GEOSGeometry(
nansat_object.get_border_wkt(nPoints=n_points), srid=4326)
nansat_object.get_border_wkt(n_points=n_points), srid=4326)

json_dumped_dataset_parameters = n_metadata.get('dataset_parameters', None)
if json_dumped_dataset_parameters:
json_loads_result = json.loads(json_dumped_dataset_parameters)
if isinstance(json_loads_result, list):
normalized_attributes['dataset_parameters'] = [
get_cf_or_wkv_standard_name(dataset_param)
for dataset_param in json_loads_result
]
else:
self.LOGGER.error(
"'dataset_parameters' section of metadata is not a json-dumped python list",
exc_info=True)
raise TypeError(
"'dataset_parameters' section of metadata is not a json-dumped python list")

return normalized_attributes
2 changes: 1 addition & 1 deletion runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
test_module = f".{sys.argv[1]}" if len(sys.argv) >= 2 else ''

TestRunner = get_runner(settings)
test_runner = TestRunner()
test_runner = TestRunner(interactive=False)
failures = test_runner.run_tests(["tests" + test_module])
sys.exit(bool(failures))
Binary file removed tests/data/nansat/arc_metno_dataset.nc
Binary file not shown.
Loading

0 comments on commit b810d12

Please sign in to comment.