From 71f0c99bcebfb8bc837cbcf5741fc8a849dfbd6d Mon Sep 17 00:00:00 2001 From: Casey Bryant Date: Wed, 2 Jul 2014 19:55:12 -0700 Subject: [PATCH] M117 - Moved parameter metadata to coverage metadata table. --- coverage_model/coverage.py | 10 -- .../coverages/aggregate_coverage.py | 9 +- coverage_model/db_backed_metadata.py | 93 ++++++++++++++++++- coverage_model/metadata.py | 27 ++++++ coverage_model/metadata_factory.py | 14 ++- .../storage/parameter_persisted_storage.py | 90 ++++++------------ 6 files changed, 162 insertions(+), 81 deletions(-) diff --git a/coverage_model/coverage.py b/coverage_model/coverage.py index 33ed9f8..c795f4e 100644 --- a/coverage_model/coverage.py +++ b/coverage_model/coverage.py @@ -992,7 +992,6 @@ def __init__(self, root_dir, persistence_guid, name=None, reference_coverage_loc def _doload(self): # Make sure the coverage directory exists -#cjb if not os.path.exists(pth): if not is_persisted(root_dir, persistence_guid): raise SystemError('Cannot find specified coverage: {0}'.format(pth)) @@ -1013,7 +1012,6 @@ def _doload(self): self.__setup(self._persistence_layer.param_dict) -#cjb if os.path.exists(pth): if is_persisted(root_dir, persistence_guid): # if reference_coverage_location is None or name is None or parameter_dictionary is None: # This appears to be a load @@ -1026,7 +1024,6 @@ def _doload(self): # If the coverage directory exists, load it instead!! if is_persisted(root_dir, persistence_guid): -#cjb if os.path.exists(pth): log.warn('The specified coverage already exists - performing load of \'{0}\''.format(pth)) _doload(self) return @@ -2148,7 +2145,6 @@ def __init__(self, root_dir, persistence_guid, name=None, parameter_dictionary=N def _doload(self): # Make sure the coverage directory exists if not is_persisted(root_dir, persistence_guid): -#cjb if not os.path.exists(pth): raise SystemError('Cannot find specified coverage: {0}'.format(pth)) # All appears well - load it up! @@ -2178,9 +2174,6 @@ def _doload(self): pc._pval_callback = self.get_parameter_values pc._pctxt_callback = self.get_parameter_context self._range_dictionary.add_context(pc) - # if pc.param_type._value_class == 'SparseConstantValue': - # s = SparsePersistedStorage(md, mm, self._persistence_layer.brick_dispatcher, dtype=pc.param_type.storage_encoding, fill_value=pc.param_type.fill_value, mode=self.mode, inline_data_writes=inline_data_writes, auto_flush=auto_flush_values) - # else: s = PostgresPersistedStorage(md, metadata_manager=mm, parameter_context=pc, dtype=pc.param_type.storage_encoding, fill_value=pc.param_type.fill_value, mode=self._persistence_layer.mode) self._persistence_layer.value_list[parameter_name] = s self._range_value[parameter_name] = get_value_class(param_type=pc.param_type, domain_set=pc.dom, storage=s) @@ -2192,8 +2185,6 @@ def _doload(self): # TODO: Why do this, just see if the directory is there no? # if name is None or parameter_dictionary is None: if is_persisted(root_dir, persistence_guid): -#cjb if os.path.exists(pth): - # This appears to be a load _doload(self) else: @@ -2208,7 +2199,6 @@ def _doload(self): # If the coverage directory exists, load it instead!! if is_persisted(root_dir, persistence_guid): -#cjb if os.path.exists(pth): log.warn('The specified coverage already exists - performing load of \'{0}\''.format(pth)) _doload(self) return diff --git a/coverage_model/coverages/aggregate_coverage.py b/coverage_model/coverages/aggregate_coverage.py index c48c40e..5f89103 100644 --- a/coverage_model/coverages/aggregate_coverage.py +++ b/coverage_model/coverages/aggregate_coverage.py @@ -6,7 +6,8 @@ from coverage_model.parameter_data import NumpyDictParameterData from coverage_model.parameter_values import get_value_class from coverage_model.persistence import is_persisted -from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer +from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer, PostgresPersistedStorage +from coverage_model.util.numpy_utils import sort_flat_arrays from coverage_model.utils import Interval @@ -117,7 +118,6 @@ def _do_build(self): self._range_value = RangeValues() self._reference_covs = self._build_ordered_coverage_dict() - from coverage_model.storage.parameter_persisted_storage import PostgresPersistedStorage for parameter_name in self._persistence_layer.parameter_metadata: md = self._persistence_layer.parameter_metadata[parameter_name] mm = self._persistence_layer.master_manager @@ -283,7 +283,6 @@ def get_time_values(self, time_segement=None, stride_length=None, return_value=N cov_value_list.append((cov_dict, coverage)) combined_data = self._merge_value_dicts(cov_value_list, override_temporal_key=dummy_key, stride_length=stride_length) - from coverage_model.util.numpy_utils import sort_flat_arrays if dummy_key in combined_data: combined_data = sort_flat_arrays(combined_data, dummy_key) return combined_data[dummy_key] #TODO: Handle case where 'time' may not be temporal parameter name of all sub-coverages @@ -398,11 +397,11 @@ def _value_dict_unique(cls, value_dict, axis): def _verify_rcovs(self, rcovs): for cpth in rcovs: - if not os.path.exists(cpth): + pth, uuid = get_dir_and_id_from_path(cpth) + if not MetadataManagerFactory.is_persisted(uuid): log.warn('Cannot find coverage \'%s\'; ignoring', cpth) continue - pth, uuid = get_dir_and_id_from_path(cpth) if uuid in self._reference_covs: yield uuid, self._reference_covs[uuid] continue diff --git a/coverage_model/db_backed_metadata.py b/coverage_model/db_backed_metadata.py index f6affa6..e407b0a 100644 --- a/coverage_model/db_backed_metadata.py +++ b/coverage_model/db_backed_metadata.py @@ -9,13 +9,13 @@ from ooi.logging import log import os +import msgpack from coverage_model.basic_types import Dictable -from coverage_model.coverages.coverage_extents import ReferenceCoverageExtents -from coverage_model.data_span import SpanStatsCollection, SpanCollectionByFile, SpanStats +from coverage_model.data_span import SpanCollectionByFile from coverage_model.db_connectors import DBFactory from coverage_model.metadata import MetadataManager +from coverage_model.config import CoverageConfig from coverage_model.persistence_helpers import RTreeProxy, pack, unpack, MasterManager, BaseManager -from coverage_model.util.jsonable import Jsonable from coverage_model.utils import hash_any @@ -37,7 +37,6 @@ def is_persisted_in_db(guid): @staticmethod def get_coverage_class(directory, guid): - from coverage_model.config import CoverageConfig config = CoverageConfig() return config.get_coverage_class(DbBackedMetadataManager.getCoverageType(directory, guid)) @@ -107,6 +106,8 @@ def flush(self, deep=True): if val != '': insert_dict['brick_tree'] = val continue + elif k == 'parameter_metadata': + value = pack_parameter_manager_dict(v) else: value = pack(v) @@ -159,6 +160,8 @@ def _load(self): unpacked = unpack(val) value = SpanCollectionByFile.from_str(unpacked) log.trace("Reconstructed SpanCollection for %s: %s", self.guid, str(value)) + elif key == 'parameter_metadata': + value = unpack_parameter_manager_dict(val) else: value = unpack(val) @@ -225,3 +228,85 @@ def add_span(self, span): # #def __ne__(self, other): # return not self.__eq__(other) + + +def pack_parameter_manager_dict(pm_dict): + pack_dict = {} + for k, v in pm_dict.iteritems(): + if not isinstance(v, ParameterContextWrapper): + raise RuntimeError('Dictionary values must be type %s' % ParameterContextWrapper.__name__) + pack_dict[k] = v.pack() + + return msgpack.packb(pack_dict) + + +def unpack_parameter_manager_dict(text): + pm_dict = {} + pack_dict = msgpack.unpackb(text) + for k, v in pack_dict.iteritems(): + pm_dict[k] = ParameterContextWrapper.from_pack(v) + + return pm_dict + + +class ParameterContextWrapper(MetadataManager): + ''' This class is meant to override the persistence behavior of ParameterManager. + Instead, it provides packing and unpacking methods to allow other objects to persist it. + It exists to minimize interface change ripple effects. Consider changing the interface in the future + if there is time. + ''' + + def __init__(self, guid, parameter_name, read_only=True, **kwargs): + super(ParameterContextWrapper, self).__init__(**kwargs) + for k, v in kwargs.iteritems(): + # Don't overwrite with None + if hasattr(self, k) and v is None: + continue + + setattr(self, k, v) + + self.guid = guid + self.parameter_name = parameter_name + self.read_only = read_only + + # Add attributes that should NEVER be flushed + self._ignore.update(['read_only']) + + def pack(self): + pack_dict = {} + for k, v in self.__dict__.iteritems(): + if k in self._ignore or k.startswith('_'): + continue + if isinstance(v, Dictable): + prefix='DICTABLE|{0}:{1}|'.format(v.__module__, v.__class__.__name__) + v = prefix + pack(v.dump()) + pack_dict[k] = v + + return msgpack.packb(pack_dict) + + @staticmethod + def from_pack(text): + pack_dict = msgpack.unpackb(text) + guid = pack_dict.pop('guid') + parameter_name = pack_dict.pop('parameter_name') + pm = ParameterContextWrapper(guid, parameter_name, read_only=True) + for k, val in pack_dict.iteritems(): + if isinstance(val, basestring) and val.startswith('DICTABLE'): + i = val.index('|', 9) + smod, sclass = val[9:i].split(':') + val = unpack(val[i+1:]) + module = __import__(smod, fromlist=[sclass]) + classobj = getattr(module, sclass) + val = classobj._fromdict(val) + pm.__setattr__(k,val) + return pm + + def thin_origins(self, origins): + pass + + def flush(self): + if not self.read_only and self.is_dirty(True): + self._dirty.clear() + + def _load(self): + raise NotImplementedError('This object does not load itself') diff --git a/coverage_model/metadata.py b/coverage_model/metadata.py index 35dfe13..8018b2b 100644 --- a/coverage_model/metadata.py +++ b/coverage_model/metadata.py @@ -4,6 +4,7 @@ from coverage_model.data_span import SpanCollectionByFile, SpanStats from coverage_model.address import BrickFileAddress +from coverage_model import utils import numpy @@ -94,7 +95,33 @@ def hdf_conversion_key_diffs(self, other): continue # This is an HDF artifact. Ignored flush values are different between versions if key in ['sdom', 'tdom', 'brick_domains', 'brick_list', '_dirty', '_is_dirty']: continue # Tuple/List conversion by Postgres prevents comparison + elif key in ['parameter_metadata']: + continue # parameter context compare not available elif self.__dict__[key] != other.__dict__[key]: key_diffs.add(key) return key_diffs + + def is_dirty(self, force_deep=False): + """ + Tells if the object has attributes that have changed since the last flush + + @return: True if the BaseMananager object is dirty and should be flushed + """ + if not force_deep and len(self._dirty) > 0: # Something new was set, easy-peasy + return True + else: # Nothing new has been set, need to check hashes + self._dirty.difference_update(self._ignore) # Ensure any ignored attrs are gone... + for k, v in [(k,v) for k, v in self.__dict__.iteritems() if not k in self._ignore and not k.startswith('_')]: + chv = utils.hash_any(v) + # log.trace('key=%s: cached hash value=%s current hash value=%s', k, self._hmap[k], chv) + if self._hmap[k] != chv: + self._dirty.add(k) + return len(self._dirty) != 0 + + def __setattr__(self, key, value): + super(MetadataManager, self).__setattr__(key, value) + if not key in self._ignore and not key.startswith('_'): + self._hmap[key] = utils.hash_any(value) + self._dirty.add(key) + super(MetadataManager, self).__setattr__('_is_dirty',True) diff --git a/coverage_model/metadata_factory.py b/coverage_model/metadata_factory.py index 1d5312d..cd2f6e5 100644 --- a/coverage_model/metadata_factory.py +++ b/coverage_model/metadata_factory.py @@ -1,12 +1,13 @@ #!/usr/bin/env python -from coverage_model.persistence_helpers import MasterManager -from coverage_model.db_backed_metadata import DbBackedMetadataManager +from coverage_model.persistence_helpers import MasterManager, ParameterManager +from coverage_model.db_backed_metadata import DbBackedMetadataManager, ParameterContextWrapper class MetadataManagerFactory(object): mmm = DbBackedMetadataManager + pm = ParameterContextWrapper # mmm = MasterManager @staticmethod @@ -14,6 +15,11 @@ def buildMetadataManager(directory, guid, **kwargs): manager = MetadataManagerFactory.mmm(directory, guid, **kwargs) return manager + @staticmethod + def buildParameterManager(identifier, param_name, read_only=True, **kwargs): + manager = MetadataManagerFactory.pm(identifier, param_name, read_only, **kwargs) + return manager + @staticmethod def get_coverage_class(directory, guid): return MetadataManagerFactory.mmm.get_coverage_class(directory, guid) @@ -26,6 +32,10 @@ def getCoverageType(directory, guid): def isPersisted(directory, guid): return MetadataManagerFactory.mmm.isPersisted(directory, guid) + @staticmethod + def is_persisted(guid): + return MetadataManagerFactory.mmm.is_persisted_in_db(guid) + @staticmethod def dirExists(directory): return MetadataManagerFactory.mmm.dirExists(directory) \ No newline at end of file diff --git a/coverage_model/storage/parameter_persisted_storage.py b/coverage_model/storage/parameter_persisted_storage.py index 173b904..57c4e77 100644 --- a/coverage_model/storage/parameter_persisted_storage.py +++ b/coverage_model/storage/parameter_persisted_storage.py @@ -15,14 +15,15 @@ import numpy as np from ooi.logging import log -from coverage_model.metadata_factory import MetadataManagerFactory from coverage_model.basic_types import AbstractStorage, AxisTypeEnum -from coverage_model.persistence_helpers import ParameterManager, pack, unpack -from coverage_model.parameter_data import ParameterData, NumpyParameterData, ConstantOverTime, NumpyDictParameterData from coverage_model.data_span import Span -from coverage_model.storage.span_storage_factory import SpanStorageFactory +from coverage_model.metadata_factory import MetadataManagerFactory +from coverage_model.parameter_data import ParameterData, NumpyParameterData, ConstantOverTime, NumpyDictParameterData, RepeatOverTime +from coverage_model.parameter_functions import ExternalFunction +from coverage_model.parameter_types import QuantityType, ParameterFunctionType from coverage_model.persistence import SimplePersistenceLayer -from coverage_model.parameter_types import QuantityType +from coverage_model.persistence_helpers import unpack +from coverage_model.storage.span_storage_factory import SpanStorageFactory from coverage_model.util.numpy_utils import NumpyUtils @@ -62,17 +63,19 @@ def __init__(self, root, guid, name=None, mode=None, inline_data_writes=True, au self.master_manager.value_caching = value_caching if not hasattr(self.master_manager, 'coverage_type'): self.master_manager.coverage_type = coverage_type + if not hasattr(self.master_manager, 'parameter_metadata'): + self.master_manager.parameter_metadata = {} self.value_list = {} - self.parameter_metadata = {} # {parameter_name: [brick_list, parameter_domains, rtree]} self.spans = {} self.span_list = [] self.storage_name = storage_name for pname in self.param_groups: log.debug('parameter group: %s', pname) - self.parameter_metadata[pname] = ParameterManager(os.path.join(self.root_dir, self.guid, pname), pname) + if pname not in self.master_manager.parameter_metadata: + self.master_manager.parameter_metadata[pname] = MetadataManagerFactory.buildParameterManager(os.path.join(self.root_dir, self.guid, pname), pname) if self.mode != 'r': if self.master_manager.is_dirty(): @@ -100,7 +103,7 @@ def __setattr__(self, key, value): def update_parameter_bounds(self, parameter_name, bounds): dmin, dmax = bounds - if isinstance(self.parameter_metadata[parameter_name].parameter_context.param_type, QuantityType): # TODO should we store bounds for non quantity types? + if isinstance(self.master_manager.parameter_metadata[parameter_name].parameter_context.param_type, QuantityType): # TODO should we store bounds for non quantity types? if parameter_name in self.parameter_bounds: pmin, pmax = self.parameter_bounds[parameter_name] dmin = min(dmin, pmin) @@ -131,8 +134,8 @@ def init_parameter(self, parameter_context, bricking_scheme): parameter_name = parameter_context.name - pm = ParameterManager(os.path.join(self.root_dir, self.guid, parameter_name), parameter_name, read_only=False) - self.parameter_metadata[parameter_name] = pm + pm = MetadataManagerFactory.buildParameterManager(self.guid, parameter_name, read_only=False) + self.master_manager.parameter_metadata[parameter_name] = pm pm.parameter_context = parameter_context @@ -140,13 +143,6 @@ def init_parameter(self, parameter_context, bricking_scheme): self.master_manager.create_group(parameter_name) - # if parameter_context.param_type._value_class == 'SparseConstantValue': - # v = SparsePersistedStorage(pm, self.master_manager, self.brick_dispatcher, - # dtype=parameter_context.param_type.storage_encoding, - # fill_value=parameter_context.param_type.fill_value, - # mode=self.mode, inline_data_writes=self.inline_data_writes, - # auto_flush=self.auto_flush_values) - # else: v = PostgresPersistedStorage(pm, metadata_manager=self.master_manager, parameter_context=parameter_context, dtype=parameter_context.param_type.storage_encoding, @@ -213,6 +209,8 @@ def has_dirty_values(self): @return True if master file metadata has been modified """ + if self.master_manager.is_dirty(): + return True for v in self.value_list.itervalues(): if v.has_dirty_values(): return True @@ -250,8 +248,7 @@ def write_parameters(self, write_id, values): if key not in self.value_list: raise KeyError("Parameter, %s, has not been initialized" % (key)) - param_type = self.parameter_metadata[key].parameter_context.param_type - from coverage_model.parameter_types import ParameterFunctionType + param_type = self.master_manager.parameter_metadata[key].parameter_context.param_type if isinstance(param_type, ParameterFunctionType): bad_keys.append(key) continue #TODO: throw error instead @@ -263,10 +260,10 @@ def write_parameters(self, write_id, values): raise TypeError("Value for %s must implement <%s>, found <%s>" % (key, ParameterData.__name__, arr.__class__.__name__)) if not isinstance(arr, ConstantOverTime): - if self.parameter_metadata[key].parameter_context.param_type.validate_value_set(arr.get_data()): - self.parameter_metadata[key].read_only = False - self.parameter_metadata[key].flush() - self.parameter_metadata[key].read_only = True + if self.master_manager.parameter_metadata[key].parameter_context.param_type.validate_value_set(arr.get_data()): + self.master_manager.parameter_metadata[key].read_only = False + self.master_manager.parameter_metadata[key].flush() + self.master_manager.parameter_metadata[key].read_only = True all_values_constant_over_time = False if arr_len == -1 and isinstance(arr, NumpyParameterData): @@ -318,7 +315,7 @@ def get_data_products(self, params, time_range=None, time=None, sort_parameter=N np_dict = {key:value[0::stride_length] for key, value in np_dict.items()} if len(np_dict) == 0: - dt = np.dtype(self.parameter_metadata[self.alignment_parameter].parameter_context.param_type.value_encoding) + dt = np.dtype(self.master_manager.parameter_metadata[self.alignment_parameter].parameter_context.param_type.value_encoding) np_dict = {self.alignment_parameter: np.empty(0, dtype=dt)} rec_arr = self._convert_to_numpy_dict_parameter(np_dict, sort_parameter=sort_parameter, as_rec_array=create_record_array) return np_dict, function_params, rec_arr @@ -410,11 +407,9 @@ def _append_parameter_fuction_data(self, params, param_dict, fill_dict, time_seg if time is not None and time_segment is None: time_segment = (time,time) for param in list(set(params)-set(param_dict.keys())): - if param in self.parameter_metadata: - param_type = self.parameter_metadata[param].parameter_context.param_type - from coverage_model.parameter_types import ParameterFunctionType + if param in self.master_manager.parameter_metadata: + param_type = self.master_manager.parameter_metadata[param].parameter_context.param_type if isinstance(param_type, ParameterFunctionType): - from coverage_model.parameter_functions import ExternalFunction if isinstance(param_type.function, ExternalFunction): data = param_type.function.evaluate(param_type.callback, self.master_manager.root_dir, time_segment, time) else: @@ -436,7 +431,7 @@ def _create_parameter_dictionary_of_numpy_arrays(self, numpy_params, function_pa span_order.append(id) span_order.sort() t_dict = numpy_params[self.alignment_parameter] - dt = np.dtype(self.parameter_metadata[self.alignment_parameter].parameter_context.param_type.value_encoding) + dt = np.dtype(self.master_manager.parameter_metadata[self.alignment_parameter].parameter_context.param_type.value_encoding) arr = np.empty(shape_outer_dimmension, dtype=dt) insert_index = 0 @@ -455,7 +450,7 @@ def _create_parameter_dictionary_of_numpy_arrays(self, numpy_params, function_pa mask_list = [] for span_name in span_order: if span_name not in span_data: - npa = self.parameter_metadata[id].parameter_context.param_type.create_filled_array(span_size_dict[span_name]) + npa = self.master_manager.parameter_metadata[id].parameter_context.param_type.create_filled_array(span_size_dict[span_name]) npa_list.append(npa) mask_list.append(NumpyUtils.create_filled_array(npa.shape[0], False, dtype=np.bool)) continue @@ -463,13 +458,13 @@ def _create_parameter_dictionary_of_numpy_arrays(self, numpy_params, function_pa this_data = span_data[span_name].get_data() npa_list.append(this_data) mask_list.append(NumpyUtils.create_filled_array(this_data.shape[0], True, dtype=np.bool)) - return_dict[id] = self.parameter_metadata[id].parameter_context.param_type.create_merged_value_array(npa_list) - mask_dict[id] = self.parameter_metadata[id].parameter_context.param_type.create_merged_value_array(mask_list) + return_dict[id] = self.master_manager.parameter_metadata[id].parameter_context.param_type.create_merged_value_array(npa_list) + mask_dict[id] = self.master_manager.parameter_metadata[id].parameter_context.param_type.create_merged_value_array(mask_list) for param_name, param_dict in function_params.iteritems(): arr = ConstantOverTime.merge_data_as_numpy_array(return_dict[self.alignment_parameter], param_dict, - param_type=self.parameter_metadata[param_name].parameter_context.param_type) + param_type=self.master_manager.parameter_metadata[param_name].parameter_context.param_type) return_dict[param_name] = arr mask_dict[param_name] = NumpyUtils.create_filled_array(arr.shape[0], False, dtype=np.bool) @@ -481,7 +476,7 @@ def _fill_empty_params(self, params, np_dict): unset_params = set(params) - set(np_dict.keys()) if len(unset_params) > 0: for param in unset_params: - filled_params[param] = self.parameter_metadata[param].parameter_context.param_type.create_filled_array(len(np_dict[self.alignment_parameter])) + filled_params[param] = self.master_manager.parameter_metadata[param].parameter_context.param_type.create_filled_array(len(np_dict[self.alignment_parameter])) np_dict.update(filled_params) return np_dict @@ -496,18 +491,6 @@ def num_timesteps(self): ts += span.param_dict[self.alignment_parameter].get_data().size return ts - def has_dirty_values(self): - """ - Checks if the master file values have been modified - - @return True if master file metadata has been modified - """ - for v in self.value_list.values(): - if v.has_dirty_values(): - return True - - return False - def read_parameters_as_dense_array(self, params, time_range=None, time=None, sort_parameter=None): return_dict = {} arr_size = 0 @@ -528,12 +511,6 @@ def read_parameters_as_dense_array(self, params, time_range=None, time=None, sor return None for key, d in self.spans.iteritems(): - # if self.alignment_parameter not in d.keys(): - # for param, vals in d.iteritems(): - # if param not in params: - # continue - # if - if self.alignment_parameter not in d: continue alignment_array = self.value_list[self.alignment_parameter].decompress(d[self.alignment_parameter][0]).get_data() @@ -592,12 +569,6 @@ def read_parameters_as_dense_array(self, params, time_range=None, time=None, sor sort_parameter = self.alignment_parameter data.get_data().sort(order=sort_parameter) return data - # if sort_parameter is not None: - # # sort_parameter = self.alignment_parameter - # idx_arr = np.argsort(return_dict[sort_parameter]) - # for param, arr in return_dict.iteritems(): - # return_dict[param] = arr[idx_arr] - # return return_dict def flush_values(self): if self.mode == 'r': @@ -617,7 +588,7 @@ def flush(self): self.flush_values() log.debug('Flushing MasterManager...') self.master_manager.flush() - for pk, pm in self.parameter_metadata.iteritems(): + for pk, pm in self.master_manager.parameter_metadata.iteritems(): log.debug('Flushing ParameterManager for \'%s\'...', pk) pm.flush() @@ -773,7 +744,6 @@ def decompress(self, obj): return NumpyParameterData(self.parameter_manager.parameter_name, vals) elif isinstance(vals, tuple): - from coverage_model.parameter_data import RepeatOverTime if vals[4] == ConstantOverTime.__name__: return ConstantOverTime(self.parameter_manager.parameter_name, vals[1], time_start=vals[2], time_end=vals[3]) elif vals[4] == RepeatOverTime.__name__: