Skip to content

Commit

Permalink
M117 - Moved parameter metadata to coverage metadata table.
Browse files Browse the repository at this point in the history
  • Loading branch information
caseybryant committed Jul 3, 2014
1 parent ff79e69 commit 71f0c99
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 81 deletions.
10 changes: 0 additions & 10 deletions coverage_model/coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,6 @@ def __init__(self, root_dir, persistence_guid, name=None, reference_coverage_loc

def _doload(self):
# Make sure the coverage directory exists
#cjb if not os.path.exists(pth):
if not is_persisted(root_dir, persistence_guid):
raise SystemError('Cannot find specified coverage: {0}'.format(pth))

Expand All @@ -1013,7 +1012,6 @@ def _doload(self):

self.__setup(self._persistence_layer.param_dict)

#cjb if os.path.exists(pth):
if is_persisted(root_dir, persistence_guid):
# if reference_coverage_location is None or name is None or parameter_dictionary is None:
# This appears to be a load
Expand All @@ -1026,7 +1024,6 @@ def _doload(self):

# If the coverage directory exists, load it instead!!
if is_persisted(root_dir, persistence_guid):
#cjb if os.path.exists(pth):
log.warn('The specified coverage already exists - performing load of \'{0}\''.format(pth))
_doload(self)
return
Expand Down Expand Up @@ -2148,7 +2145,6 @@ def __init__(self, root_dir, persistence_guid, name=None, parameter_dictionary=N
def _doload(self):
# Make sure the coverage directory exists
if not is_persisted(root_dir, persistence_guid):
#cjb if not os.path.exists(pth):
raise SystemError('Cannot find specified coverage: {0}'.format(pth))

# All appears well - load it up!
Expand Down Expand Up @@ -2178,9 +2174,6 @@ def _doload(self):
pc._pval_callback = self.get_parameter_values
pc._pctxt_callback = self.get_parameter_context
self._range_dictionary.add_context(pc)
# if pc.param_type._value_class == 'SparseConstantValue':
# s = SparsePersistedStorage(md, mm, self._persistence_layer.brick_dispatcher, dtype=pc.param_type.storage_encoding, fill_value=pc.param_type.fill_value, mode=self.mode, inline_data_writes=inline_data_writes, auto_flush=auto_flush_values)
# else:
s = PostgresPersistedStorage(md, metadata_manager=mm, parameter_context=pc, dtype=pc.param_type.storage_encoding, fill_value=pc.param_type.fill_value, mode=self._persistence_layer.mode)
self._persistence_layer.value_list[parameter_name] = s
self._range_value[parameter_name] = get_value_class(param_type=pc.param_type, domain_set=pc.dom, storage=s)
Expand All @@ -2192,8 +2185,6 @@ def _doload(self):
# TODO: Why do this, just see if the directory is there no?
# if name is None or parameter_dictionary is None:
if is_persisted(root_dir, persistence_guid):
#cjb if os.path.exists(pth):
# This appears to be a load
_doload(self)

else:
Expand All @@ -2208,7 +2199,6 @@ def _doload(self):

# If the coverage directory exists, load it instead!!
if is_persisted(root_dir, persistence_guid):
#cjb if os.path.exists(pth):
log.warn('The specified coverage already exists - performing load of \'{0}\''.format(pth))
_doload(self)
return
Expand Down
9 changes: 4 additions & 5 deletions coverage_model/coverages/aggregate_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from coverage_model.parameter_data import NumpyDictParameterData
from coverage_model.parameter_values import get_value_class
from coverage_model.persistence import is_persisted
from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer
from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer, PostgresPersistedStorage
from coverage_model.util.numpy_utils import sort_flat_arrays
from coverage_model.utils import Interval


Expand Down Expand Up @@ -117,7 +118,6 @@ def _do_build(self):
self._range_value = RangeValues()
self._reference_covs = self._build_ordered_coverage_dict()

from coverage_model.storage.parameter_persisted_storage import PostgresPersistedStorage
for parameter_name in self._persistence_layer.parameter_metadata:
md = self._persistence_layer.parameter_metadata[parameter_name]
mm = self._persistence_layer.master_manager
Expand Down Expand Up @@ -283,7 +283,6 @@ def get_time_values(self, time_segement=None, stride_length=None, return_value=N
cov_value_list.append((cov_dict, coverage))

combined_data = self._merge_value_dicts(cov_value_list, override_temporal_key=dummy_key, stride_length=stride_length)
from coverage_model.util.numpy_utils import sort_flat_arrays
if dummy_key in combined_data:
combined_data = sort_flat_arrays(combined_data, dummy_key)
return combined_data[dummy_key] #TODO: Handle case where 'time' may not be temporal parameter name of all sub-coverages
Expand Down Expand Up @@ -398,11 +397,11 @@ def _value_dict_unique(cls, value_dict, axis):

def _verify_rcovs(self, rcovs):
for cpth in rcovs:
if not os.path.exists(cpth):
pth, uuid = get_dir_and_id_from_path(cpth)
if not MetadataManagerFactory.is_persisted(uuid):
log.warn('Cannot find coverage \'%s\'; ignoring', cpth)
continue

pth, uuid = get_dir_and_id_from_path(cpth)
if uuid in self._reference_covs:
yield uuid, self._reference_covs[uuid]
continue
Expand Down
93 changes: 89 additions & 4 deletions coverage_model/db_backed_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

from ooi.logging import log
import os
import msgpack
from coverage_model.basic_types import Dictable
from coverage_model.coverages.coverage_extents import ReferenceCoverageExtents
from coverage_model.data_span import SpanStatsCollection, SpanCollectionByFile, SpanStats
from coverage_model.data_span import SpanCollectionByFile
from coverage_model.db_connectors import DBFactory
from coverage_model.metadata import MetadataManager
from coverage_model.config import CoverageConfig
from coverage_model.persistence_helpers import RTreeProxy, pack, unpack, MasterManager, BaseManager
from coverage_model.util.jsonable import Jsonable
from coverage_model.utils import hash_any


Expand All @@ -37,7 +37,6 @@ def is_persisted_in_db(guid):

@staticmethod
def get_coverage_class(directory, guid):
from coverage_model.config import CoverageConfig
config = CoverageConfig()
return config.get_coverage_class(DbBackedMetadataManager.getCoverageType(directory, guid))

Expand Down Expand Up @@ -107,6 +106,8 @@ def flush(self, deep=True):
if val != '':
insert_dict['brick_tree'] = val
continue
elif k == 'parameter_metadata':
value = pack_parameter_manager_dict(v)
else:
value = pack(v)

Expand Down Expand Up @@ -159,6 +160,8 @@ def _load(self):
unpacked = unpack(val)
value = SpanCollectionByFile.from_str(unpacked)
log.trace("Reconstructed SpanCollection for %s: %s", self.guid, str(value))
elif key == 'parameter_metadata':
value = unpack_parameter_manager_dict(val)
else:
value = unpack(val)

Expand Down Expand Up @@ -225,3 +228,85 @@ def add_span(self, span):
#
#def __ne__(self, other):
# return not self.__eq__(other)


def pack_parameter_manager_dict(pm_dict):
pack_dict = {}
for k, v in pm_dict.iteritems():
if not isinstance(v, ParameterContextWrapper):
raise RuntimeError('Dictionary values must be type %s' % ParameterContextWrapper.__name__)
pack_dict[k] = v.pack()

return msgpack.packb(pack_dict)


def unpack_parameter_manager_dict(text):
pm_dict = {}
pack_dict = msgpack.unpackb(text)
for k, v in pack_dict.iteritems():
pm_dict[k] = ParameterContextWrapper.from_pack(v)

return pm_dict


class ParameterContextWrapper(MetadataManager):
''' This class is meant to override the persistence behavior of ParameterManager.
Instead, it provides packing and unpacking methods to allow other objects to persist it.
It exists to minimize interface change ripple effects. Consider changing the interface in the future
if there is time.
'''

def __init__(self, guid, parameter_name, read_only=True, **kwargs):
super(ParameterContextWrapper, self).__init__(**kwargs)
for k, v in kwargs.iteritems():
# Don't overwrite with None
if hasattr(self, k) and v is None:
continue

setattr(self, k, v)

self.guid = guid
self.parameter_name = parameter_name
self.read_only = read_only

# Add attributes that should NEVER be flushed
self._ignore.update(['read_only'])

def pack(self):
pack_dict = {}
for k, v in self.__dict__.iteritems():
if k in self._ignore or k.startswith('_'):
continue
if isinstance(v, Dictable):
prefix='DICTABLE|{0}:{1}|'.format(v.__module__, v.__class__.__name__)
v = prefix + pack(v.dump())
pack_dict[k] = v

return msgpack.packb(pack_dict)

@staticmethod
def from_pack(text):
pack_dict = msgpack.unpackb(text)
guid = pack_dict.pop('guid')
parameter_name = pack_dict.pop('parameter_name')
pm = ParameterContextWrapper(guid, parameter_name, read_only=True)
for k, val in pack_dict.iteritems():
if isinstance(val, basestring) and val.startswith('DICTABLE'):
i = val.index('|', 9)
smod, sclass = val[9:i].split(':')
val = unpack(val[i+1:])
module = __import__(smod, fromlist=[sclass])
classobj = getattr(module, sclass)
val = classobj._fromdict(val)
pm.__setattr__(k,val)
return pm

def thin_origins(self, origins):
pass

def flush(self):
if not self.read_only and self.is_dirty(True):
self._dirty.clear()

def _load(self):
raise NotImplementedError('This object does not load itself')
27 changes: 27 additions & 0 deletions coverage_model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from coverage_model.data_span import SpanCollectionByFile, SpanStats
from coverage_model.address import BrickFileAddress
from coverage_model import utils
import numpy


Expand Down Expand Up @@ -94,7 +95,33 @@ def hdf_conversion_key_diffs(self, other):
continue # This is an HDF artifact. Ignored flush values are different between versions
if key in ['sdom', 'tdom', 'brick_domains', 'brick_list', '_dirty', '_is_dirty']:
continue # Tuple/List conversion by Postgres prevents comparison
elif key in ['parameter_metadata']:
continue # parameter context compare not available
elif self.__dict__[key] != other.__dict__[key]:
key_diffs.add(key)

return key_diffs

def is_dirty(self, force_deep=False):
"""
Tells if the object has attributes that have changed since the last flush
@return: True if the BaseMananager object is dirty and should be flushed
"""
if not force_deep and len(self._dirty) > 0: # Something new was set, easy-peasy
return True
else: # Nothing new has been set, need to check hashes
self._dirty.difference_update(self._ignore) # Ensure any ignored attrs are gone...
for k, v in [(k,v) for k, v in self.__dict__.iteritems() if not k in self._ignore and not k.startswith('_')]:
chv = utils.hash_any(v)
# log.trace('key=%s: cached hash value=%s current hash value=%s', k, self._hmap[k], chv)
if self._hmap[k] != chv:
self._dirty.add(k)
return len(self._dirty) != 0

def __setattr__(self, key, value):
super(MetadataManager, self).__setattr__(key, value)
if not key in self._ignore and not key.startswith('_'):
self._hmap[key] = utils.hash_any(value)
self._dirty.add(key)
super(MetadataManager, self).__setattr__('_is_dirty',True)
14 changes: 12 additions & 2 deletions coverage_model/metadata_factory.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
#!/usr/bin/env python

from coverage_model.persistence_helpers import MasterManager
from coverage_model.db_backed_metadata import DbBackedMetadataManager
from coverage_model.persistence_helpers import MasterManager, ParameterManager
from coverage_model.db_backed_metadata import DbBackedMetadataManager, ParameterContextWrapper


class MetadataManagerFactory(object):

mmm = DbBackedMetadataManager
pm = ParameterContextWrapper
# mmm = MasterManager

@staticmethod
def buildMetadataManager(directory, guid, **kwargs):
manager = MetadataManagerFactory.mmm(directory, guid, **kwargs)
return manager

@staticmethod
def buildParameterManager(identifier, param_name, read_only=True, **kwargs):
manager = MetadataManagerFactory.pm(identifier, param_name, read_only, **kwargs)
return manager

@staticmethod
def get_coverage_class(directory, guid):
return MetadataManagerFactory.mmm.get_coverage_class(directory, guid)
Expand All @@ -26,6 +32,10 @@ def getCoverageType(directory, guid):
def isPersisted(directory, guid):
return MetadataManagerFactory.mmm.isPersisted(directory, guid)

@staticmethod
def is_persisted(guid):
return MetadataManagerFactory.mmm.is_persisted_in_db(guid)

@staticmethod
def dirExists(directory):
return MetadataManagerFactory.mmm.dirExists(directory)
Loading

0 comments on commit 71f0c99

Please sign in to comment.