Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FilesystemFdw: Enhance pattern matching and timestamp support #205

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
81 changes: 73 additions & 8 deletions python/multicorn/fsfdw/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""
Purpose
"""Purpose
-------

This fdw can be used to access data stored in various files, in a filesystem.
Expand Down Expand Up @@ -35,9 +34,25 @@
``filename_column``
If set, defines which column will contain the full filename.

``mtime_column``
If set, defines which column will contain the file mtime.

``ctime_column``
If set, defines which column will contain the file ctime.

``file_mode`` (default: 700)
The unix permission mask to be used when creating files.

``escape_pattern`` (default: TRUE)
If TRUE, the pattern used to match files is escaped before it is
used for regular expression matching. If FALSE, the pattern used to
match files is used as is and it is assumed to be a valid regular
expression.

``ignore_case`` (default: FALSE)
If FALSE, the pattern used to match files is case sensitive. If
TRUE, the pattern used to match files is case insensitive.

Usage Example
-------------

Expand Down Expand Up @@ -105,13 +120,14 @@

"""

import errno
import os
import stat
from logging import ERROR, WARNING
from multicorn import TransactionAwareForeignDataWrapper
from multicorn.fsfdw.structuredfs import StructuredDirectory
from multicorn.utils import log_to_postgres
from multicorn.compat import unicode_
from logging import ERROR, WARNING
import os
import errno


class FilesystemFdw(TransactionAwareForeignDataWrapper):
Expand All @@ -126,6 +142,8 @@ class FilesystemFdw(TransactionAwareForeignDataWrapper):
content_column -- The column's name which contains the file content.
(defaults to None)
filename_column -- The column's name wich contains the full filename.
mtime_column -- The column's name wich contains the file mtime.
ctime_column -- The column's name wich contains the file ctime.

"""

Expand All @@ -135,10 +153,19 @@ def __init__(self, options, columns):
pattern = options.get('pattern')
self.content_column = options.get('content_column', None)
self.filename_column = options.get('filename_column', None)
self.mtime_column = options.get('mtime_column', None)
self.ctime_column = options.get('ctime_column', None)
self.file_mode = int(options.get('file_mode', '700'), 8)
escape_pattern = (options.get('escape_pattern', 'TRUE').upper() in
('TRUE', 'T'))
ignore_case = (options.get('ignore_case', 'FALSE').upper() in
('TRUE', 'T'))
self.structured_directory = StructuredDirectory(
root_dir, pattern,
file_mode=self.file_mode)
root_dir,
pattern,
file_mode=self.file_mode,
escape_pattern=escape_pattern,
ignore_case=ignore_case)
self.folder_columns = [key[0] for key in
self.structured_directory._path_parts_properties
if key]
Expand Down Expand Up @@ -169,6 +196,26 @@ def __init__(self, options, columns):
"%s bytea" % self.content_column)
else:
columns.pop(self.content_column)
if self.mtime_column:
if self.mtime_column not in columns:
log_to_postgres("The mtime column (%s) does not exist"
"in the column list" % self.mtime_column,
ERROR,
"You should try to create your table with an "
"additional column: \n"
"%s bytea" % self.mtime_column)
else:
columns.pop(self.mtime_column)
if self.ctime_column:
if self.ctime_column not in columns:
log_to_postgres("The ctime column (%s) does not exist"
"in the column list" % self.ctime_column,
ERROR,
"You should try to create your table with an "
"additional column: \n"
"%s bytea" % self.ctime_column)
else:
columns.pop(self.ctime_column)
if len(self.structured_directory.properties) < len(columns):
missing_columns = set(columns.keys()).difference(
self.structured_directory.properties)
Expand Down Expand Up @@ -231,7 +278,9 @@ def get_items(self, quals, columns):
if qual.field_name == filename_column and qual.operator == '=':
item = self.structured_directory.from_filename(
unicode_(qual.value))
if item is not None and os.path.exists(item.full_filename):
if item is not None and os.path.isfile(item.full_filename):
st = os.stat(item.full_filename)
item.set_timestamps(st[stat.ST_MTIME], st[stat.ST_CTIME])
return [item]
else:
return []
Expand All @@ -243,8 +292,12 @@ def get_items(self, quals, columns):
def items_to_dicts(self, items, columns):
content_column = self.content_column
filename_column = self.filename_column
mtime_column = self.mtime_column
ctime_column = self.ctime_column
has_content = content_column and content_column in columns
has_filename = filename_column and filename_column in columns
has_mtime = mtime_column and mtime_column in columns
has_ctime = ctime_column and ctime_column in columns
for item in items:
if item.full_filename in self.invisible_files:
continue
Expand All @@ -256,11 +309,17 @@ def items_to_dicts(self, items, columns):
new_item[content_column] = content
if has_filename:
new_item[filename_column] = item.filename
if has_mtime:
new_item[mtime_column] = item.mtime
if has_ctime:
new_item[ctime_column] = item.ctime
yield new_item

def _item_from_dml(self, values):
content = values.pop(self.content_column, None)
filename = values.pop(self.filename_column, None)
mtime = values.pop(self.mtime_column, None)
ctime = values.pop(self.ctime_column, None)
item_from_filename = None
item_from_values = None
if filename:
Expand Down Expand Up @@ -291,6 +350,8 @@ def _item_from_dml(self, values):
" statement, or ensure they match")
item = item_from_filename or item_from_values
item.content = content
item.mtime = mtime
item.ctime = ctime
return item

def _report_pk_violation(self, item):
Expand Down Expand Up @@ -319,6 +380,8 @@ def insert(self, values):
return_value = dict(item)
return_value[self.filename_column] = item.filename
return_value[self.content_column] = item.content
return_value[self.mtime_column] = item.mtime
return_value[self.ctime_column] = item.ctime
return return_value

def update(self, oldfilename, newvalues):
Expand Down Expand Up @@ -374,6 +437,8 @@ def update(self, oldfilename, newvalues):
return_value = dict(newitem)
return_value[self.filename_column] = newitem.filename
return_value[self.content_column] = newitem.content
return_value[self.mtime_column] = newitem.mtime
return_value[self.ctime_column] = newitem.ctime
return return_value

def delete(self, rowid):
Expand Down
61 changes: 47 additions & 14 deletions python/multicorn/fsfdw/structuredfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
Handle nicely a set of files in a structured directory.

"""
import os
import sys
import collections
import datetime
import errno
import fcntl
import io
import os
import re
import errno
import stat
import string
import collections
import fcntl
import sys
from multicorn.compat import unicode_, basestring_

vformat = string.Formatter().vformat
Expand Down Expand Up @@ -89,7 +91,7 @@ def _tokenize_pattern(pattern):
yield 'path separator', '/'


def _parse_pattern(pattern):
def _parse_pattern(pattern, escape_pattern=True, ignore_case=False):
r"""
Parse a string pattern and return (path_parts_re, path_parts_properties)

Expand Down Expand Up @@ -121,7 +123,9 @@ def _parse_pattern(pattern):
if not next_re:
raise ValueError('A slash-separated part is empty in %r' %
pattern)
path_parts_re.append(re.compile('^%s$' % next_re))
path_parts_re.append(
re.compile('^%s$' % next_re,
re.IGNORECASE if ignore_case else 0))
next_re = ''
path_parts_properties.append(tuple(properties))
properties = []
Expand All @@ -136,7 +140,9 @@ def _parse_pattern(pattern):
properties.append(token)
next_re += '(?P<%s>.*)' % token
elif token_type == 'literal':
next_re += re.escape(token)
if escape_pattern:
token = re.escape(token)
next_re += token
else:
assert False, 'Unexpected token type: ' + token_type

Expand Down Expand Up @@ -167,7 +173,13 @@ class Item(collections.Mapping):
Note that at a given point in time, the actual file for an Item may or
may not exist in the filesystem.
"""
def __init__(self, directory, properties, content=b''):
def __init__(self,
directory,
properties,
content=b'',
actual_filename=None,
mtime=None,
ctime=None):
properties = dict(properties)
keys = set(properties)
missing = directory.properties - keys
Expand All @@ -179,6 +191,8 @@ def __init__(self, directory, properties, content=b''):
self.directory = directory
self._properties = {}
self.content = content
self.actual_filename = actual_filename
self.set_timestamps(mtime, ctime)
# TODO: check for ambiguities.
# eg. with pattern = '{a}_{b}', values {'a': '1_2', 'b': '3'} and
# {'a': '1', 'b': '2_3'} both give the same filename.
Expand All @@ -194,7 +208,10 @@ def filename(self):
Return the normalized (slash-separated) filename for the item,
relative to the root.
"""
return vformat(self.directory.pattern, [], self)
if self.actual_filename:
return self.actual_filename
else:
return vformat(self.directory.pattern, [], self)

@property
def full_filename(self):
Expand Down Expand Up @@ -279,6 +296,10 @@ def write(self, fd=None):
def remove(self):
os.unlink(self.full_filename)

def set_timestamps(self, mtime, ctime):
self.mtime = datetime.datetime.fromtimestamp(mtime) if mtime else None
self.ctime = datetime.datetime.fromtimestamp(ctime) if ctime else None

# collections.Mapping interface:

def __len__(self):
Expand All @@ -300,12 +321,19 @@ class StructuredDirectory(object):
:param pattern: Pattern for files in this directory,
eg. '{category}/{number}_{name}.txt'
"""
def __init__(self, root_dir, pattern, file_mode=0o700):
def __init__(self,
root_dir,
pattern,
file_mode=0o700,
escape_pattern=True,
ignore_case=False):
self.root_dir = unicode_(root_dir)
self.pattern = unicode_(pattern)
# Cache for file descriptors.
self.cache = {}
parts_re, parts_properties = _parse_pattern(self.pattern)
parts_re, parts_properties = _parse_pattern(self.pattern,
escape_pattern,
ignore_case)
self.file_mode = file_mode
self._path_parts_re = parts_re
self._path_parts_properties = parts_properties
Expand Down Expand Up @@ -345,7 +373,7 @@ def from_filename(self, filename):
if match is None:
return None
values.update(match.groupdict())
return Item(self, values)
return Item(self, values, actual_filename=filename)

def get_items(self, **fixed_values):
"""
Expand Down Expand Up @@ -406,7 +434,12 @@ def _walk(self, previous_path_parts, previous_values, fixed):
filename = self._join(path_parts)
if is_leaf:
if os.path.isfile(filename):
yield Item(self, values)
st = os.stat(filename)
yield Item(self,
values,
actual_filename=name,
mtime=st[stat.ST_MTIME],
ctime=st[stat.ST_CTIME])
# Do not check if filename is a directory or even exists,
# let listdir() raise later.
else:
Expand Down