Skip to content

Commit

Permalink
Merge pull request #12 from nansencenter/time_restrictions
Browse files Browse the repository at this point in the history
Enable time restrictions on crawlers
  • Loading branch information
akorosov authored May 20, 2020
2 parents ef0ddba + b021429 commit 7276485
Show file tree
Hide file tree
Showing 12 changed files with 791 additions and 114 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ An example can be seen in the [default configuration file](./geospaas_harvesting

**Top-level keys**:

- **poll_interval**: The interval in seconds at which the main process checks if the running
harvester processes have finished executing.
- **endless** (default: False): boolean controlling the endless harvesting mode. If True, the
harvesters will be indefinitely re-run after they finish harvesting.
- **poll_interval** (default: 600): the interval in seconds at which the main process checks if the
running harvester processes have finished executing.
- **harvesters**: dictionary mapping the harvesters names to a dictionary containing their
properties.

Expand Down Expand Up @@ -78,6 +80,9 @@ The properties which are common to every harvester are:

The rest depends on the harvester and will be detailed in each harvester's documentation.

- **time_range** (optional): a two-elements list containing two date strings which define a time
range to which the crawler will be limited.

## Design

### Components
Expand Down
235 changes: 168 additions & 67 deletions geospaas_harvesting/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""A set of crawlers used to explore data provider interfaces and get resources URLs"""

"""
A set of crawlers used to explore data provider interfaces and get resources URLs. Each crawler
should inherit from the Crawler class and implement the abstract methods defined in Crawler.
"""
import calendar
import logging
import re
from datetime import datetime, timedelta
from html.parser import HTMLParser

import feedparser
import requests

logging.getLogger(__name__).addHandler(logging.NullHandler())

MIN_DATETIME = datetime(1, 1, 1)


class Crawler():
"""Base Crawler class"""
Expand Down Expand Up @@ -39,6 +45,40 @@ def _http_get(cls, url, request_parameters=None):
return html_page


class LinkExtractor(HTMLParser):
"""
HTML parser which extracts links from an HTML page
"""

LOGGER = logging.getLogger(__name__ + '.LinkExtractor')

def __init__(self):
"""Constructor with extra attribute definition"""
super().__init__()
self._links = []

def error(self, message):
"""Error behavior"""
self.LOGGER.error(message)

def feed(self, data):
"""Reset links lists when new data is fed to the parser"""
self._links = []
super().feed(data)

@property
def links(self):
"""Getter for the links attribute"""
return self._links

def handle_starttag(self, tag, attrs):
"""Extracts links from the HTML data"""
if tag == 'a':
for attr in attrs:
if attr[0] == 'href':
self._links.append(attr[1])


class OpenDAPCrawler(Crawler):
"""Crawler for OpenDAP resources"""

Expand All @@ -47,15 +87,36 @@ class OpenDAPCrawler(Crawler):
FILES_SUFFIXES = ('.nc', '.nc.gz')
EXCLUDE = ('?')

def __init__(self, root_url):
YEAR_PATTERN = r'(\d{4})'
MONTH_PATTERN = r'(1[0-2]|0[1-9]|[1-9])'
DAY_OF_MONTH_PATTERN = r'(3[0-1]|[1-2]\d|0[1-9]|[1-9]| [1-9])'
DAY_OF_YEAR_PATTERN = r'(36[0-6]|3[0-5]\d|[1-2]\d\d|0[1-9]\d|00[1-9]|[1-9]\d|0[1-9]|[1-9])'

YEAR_MATCHER = re.compile(f'^.*/{YEAR_PATTERN}/.*$')
MONTH_MATCHER = re.compile(f'^.*/{YEAR_PATTERN}/{MONTH_PATTERN}/.*$')
DAY_OF_MONTH_MATCHER = re.compile(
f'^.*/{YEAR_PATTERN}/{MONTH_PATTERN}/{DAY_OF_MONTH_PATTERN}/.*$')
DAY_OF_YEAR_MATCHER = re.compile(f'^.*/{YEAR_PATTERN}/{DAY_OF_YEAR_PATTERN}/.*$')

TIMESTAMP_MATCHER = re.compile((
r'(\d{4})(1[0-2]|0[1-9]|[1-9])(3[0-1]|[1-2]\d|0[1-9]|[1-9]| [1-9])'
r'(2[0-3]|[0-1]\d|\d)([0-5]\d|\d)(6[0-1]|[0-5]\d|\d)'))

def __init__(self, root_url, time_range=(None, None)):
"""
The _urls attribute contains URLs to the resources which will be returned by the crawler
The _to_process attribute contains URLs to pages which need to be searched for resources
`root_url` is the URL of the data repository to explore.
`time_range` is a tuple of datetime.datetime objects defining the time range of the datasets
returned the crawler.
"""
self.root_url = root_url
self.time_range = time_range
self.set_initial_state()

def set_initial_state(self):
"""
The `_urls` attribute contains URLs to the resources which will be returned by the crawler.
The `_to_process` attribute contains URLs to pages which need to be searched for resources.
"""
self._urls = []
self._to_process = [self.root_url.rstrip('/')]

Expand All @@ -78,10 +139,71 @@ def __next__(self):
raise StopIteration
return result

def _folder_coverage(self, folder_url):
"""
Find out if the folder has date info in its path. The resolution is one day.
For now, it supports the following structures:
- .../year/...
- .../year/month/...
- .../year/month/day/...
- .../year/day_of_year/...
It will need to be updated to support new structures.
"""
match_year = self.YEAR_MATCHER.search(folder_url)
if match_year:
match_month = self.MONTH_MATCHER.search(folder_url)
if match_month:
match_day = self.DAY_OF_MONTH_MATCHER.search(folder_url)
if match_day:
folder_coverage_start = datetime(
int(match_year[1]), int(match_month[2]), int(match_day[3]), 0, 0, 0)
folder_coverage_stop = datetime(
int(match_year[1]), int(match_month[2]), int(match_day[3]), 23, 59, 59)
else:
last_day_of_month = calendar.monthrange(
int(match_year[1]), int(match_month[2]))[1]
folder_coverage_start = datetime(
int(match_year[1]), int(match_month[2]), 1, 0, 0, 0)
folder_coverage_stop = datetime(
int(match_year[1]), int(match_month[2]), last_day_of_month, 23, 59, 59)
else:
match_day_of_year = self.DAY_OF_YEAR_MATCHER.search(folder_url)
if match_day_of_year:
offset = timedelta(int(match_day_of_year[2]) - 1)
folder_coverage_start = (datetime(int(match_year[1]), 1, 1, 0, 0, 0)
+ offset)
folder_coverage_stop = (datetime(int(match_year[1]), 1, 1, 23, 59, 59)
+ offset)
else:
folder_coverage_start = datetime(int(match_year[1]), 1, 1, 0, 0, 0)
folder_coverage_stop = datetime(int(match_year[1]), 12, 31, 23, 59, 59)
else:
folder_coverage_start = folder_coverage_stop = None

return (folder_coverage_start, folder_coverage_stop)

def _dataset_timestamp(self, dataset_name):
"""Tries to find a timestamp in the dataset's name"""
timestamp_match = self.TIMESTAMP_MATCHER.search(dataset_name)
if timestamp_match:
return datetime.strptime(timestamp_match[0], '%Y%m%d%H%M%S')
else:
return None

def _intersects_time_range(self, start_time=None, stop_time=None):
"""
Return True if:
- a time coverage was extracted from the folder's path or a timestamp from the dataset's
name, and this time coverage intersects with the Crawler's time range
- no time range was defined when instantiating the crawler
- no time coverage was extracted from the folder's url or dataset's name
"""
return ((not start_time or not self.time_range[1] or start_time <= self.time_range[1]) and
(not stop_time or not self.time_range[0] or stop_time >= self.time_range[0]))

def _explore_page(self, folder_url):
"""Gets all relevant links from a page and feeds the _urls and _to_process attributes"""
"""Get all relevant links from a page and feeds the _urls and _to_process attributes"""
self.LOGGER.info("Looking for resources in '%s'...", folder_url)

current_location = re.sub(r'/\w+\.\w+$', '', folder_url)
links = self._get_links(self._http_get(folder_url))
for link in links:
Expand All @@ -90,59 +212,25 @@ def _explore_page(self, folder_url):
if link.endswith(self.FOLDERS_SUFFIXES):
folder_url = f"{current_location}/{link}"
if folder_url not in self._to_process:
self.LOGGER.debug("Adding '%s' to the list of pages to process.", link)
self._to_process.append(folder_url)
if self._intersects_time_range(*self._folder_coverage(folder_url)):
self.LOGGER.debug("Adding '%s' to the list of pages to process.", link)
self._to_process.append(folder_url)
elif link.endswith(self.FILES_SUFFIXES):
resource_url = f"{current_location}/{link}"
if resource_url not in self._urls:
self.LOGGER.debug("Adding '%s' to the list of resources.", link)
self._urls.append(resource_url)
if self._intersects_time_range(*(self._dataset_timestamp(link),) * 2):
self.LOGGER.debug("Adding '%s' to the list of resources.", link)
self._urls.append(resource_url)

@classmethod
def _get_links(cls, html):
"""Returns the list of links contained in an HTML page, passed as a string"""

parser = LinkExtractor()
cls.LOGGER.debug("Parsing HTML data.")
parser.feed(html)

return parser.links


class LinkExtractor(HTMLParser):
"""
HTML parser which extracts links from an HTML page
"""

LOGGER = logging.getLogger(__name__ + '.LinkExtractor')

def __init__(self):
"""Constructor with extra attribute definition"""
super().__init__()
self._links = []

def error(self, message):
"""Error behavior"""
self.LOGGER.error(message)

def feed(self, data):
"""Reset links lists when new data is fed to the parser"""
self._links = []
super().feed(data)

@property
def links(self):
"""Getter for the links attribute"""
return self._links

def handle_starttag(self, tag, attrs):
"""Extracts links from the HTML data"""
if tag == 'a':
for attr in attrs:
if attr[0] == 'href':
self._links.append(attr[1])


class CopernicusOpenSearchAPICrawler(Crawler):
"""
Crawler which returns the search results of an Opensearch API, given the URL and search
Expand All @@ -151,19 +239,42 @@ class CopernicusOpenSearchAPICrawler(Crawler):

LOGGER = logging.getLogger(__name__ + '.CopernicusOpenSearchAPICrawler')

def __init__(self, url, search_terms='*', username=None, password=None,
def __init__(self, url, search_terms='*', time_range=(None, None),
username=None, password=None,
page_size=100, initial_offset=0):
self.url = url
self.search_terms = search_terms
self._credentials = (username, password) if username and password else None
self.page_size = page_size
self.initial_offset = initial_offset
self.request_parameters = self._build_request_parameters(
search_terms, time_range, username, password, page_size, initial_offset)
self.set_initial_state()

def set_initial_state(self):
self.offset = self.initial_offset
self.request_parameters['params']['start'] = self.initial_offset
self._urls = []

@staticmethod
def _build_request_parameters(search_terms, time_range, username, password, page_size,
initial_offset):
"""Build a dict containing the parameters used to query the Copernicus API"""
if time_range:
api_date_format = '%Y-%m-%dT%H:%M:%SZ'
start = (time_range[0] or MIN_DATETIME).strftime(api_date_format)
end = time_range[1].strftime(api_date_format) if time_range[1] else 'NOW'
time_condition = f"beginposition:[{start} TO {end}]"

request_parameters = {
'params': {
'q': f"({search_terms}) AND ({time_condition})",
'start': initial_offset,
'rows': page_size,
'orderby': 'beginposition asc'
}
}

if username and password:
request_parameters['auth'] = (username, password)
return request_parameters

def __iter__(self):
"""Makes the crawler iterable"""
return self
Expand All @@ -177,7 +288,7 @@ def __next__(self):
# If no more URLs from the previously processed page are available, process the next one
if not self._get_resources_urls(self._get_next_page()):
self.LOGGER.debug("No more entries found at '%s' matching '%s'",
self.url, self.search_terms)
self.url, self.request_parameters['params']['q'])
raise StopIteration
result = self.__next__()
return result
Expand All @@ -188,21 +299,11 @@ def _get_next_page(self):
if products are added while the harvesting is happening (it will generally be the case)
"""
self.LOGGER.info("Looking for ressources at '%s', matching '%s' with an offset of %s",
self.url, self.search_terms, self.offset)

request_parameters = {
'params': {
'q': self.search_terms,
'start': self.offset,
'rows': self.page_size,
'orderby': 'ingestiondate asc'
}
}
if self._credentials:
request_parameters['auth'] = self._credentials
self.url, self.request_parameters['params']['q'],
self.request_parameters['params']['start'])

current_page = self._http_get(self.url, request_parameters)
self.offset += self.page_size
current_page = self._http_get(self.url, self.request_parameters)
self.request_parameters['params']['start'] += self.request_parameters['params']['rows']

return current_page

Expand Down
4 changes: 3 additions & 1 deletion geospaas_harvesting/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Configuration(collections.abc.Mapping):
"""Manages harvesting configuration"""

DEFAULT_CONFIGURATION_PATH = os.path.join(os.path.dirname(__file__), 'harvest.yml')
TOP_LEVEL_KEYS = set(['harvesters', 'poll_interval'])
TOP_LEVEL_KEYS = set(['harvesters', 'poll_interval', 'endless'])
HARVESTER_CLASS_KEY = 'class'

def __init__(self, config_path=None):
Expand Down Expand Up @@ -228,6 +228,8 @@ def main():
#Start a new process
results[harvester_name] = pool.apply_async(
launch_harvest, (harvester_name, harvester_config))
if not config.get('endless', False):
break
time.sleep(config.get('poll_interval', 600))
LOGGER.error("All harvester processes encountered errors")
pool.close()
Expand Down
1 change: 1 addition & 0 deletions geospaas_harvesting/harvest.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
endless: False
poll_interval: 600
harvesters:
podaac:
Expand Down
Loading

0 comments on commit 7276485

Please sign in to comment.