diff --git a/coverage_model/config.py b/coverage_model/config.py index 0d42b7a..7f18191 100644 --- a/coverage_model/config.py +++ b/coverage_model/config.py @@ -28,6 +28,7 @@ class CoverageConfig(object): _default_span_id_db_key = 'span_address' _default_span_coverage_id_db_key = 'coverage_id' _default_storage_location = None + _default_ideal_span_size = 8*1024*1024 # in bytes def __init__(self): self.ordered_time_key_preferences = self._default_ordered_time_key_preferences @@ -44,6 +45,7 @@ def __init__(self): self.config_time = 0 self.read_and_set_config() self.ingest_time_key = self._default_ingest_time_key + self.ideal_span_size = self._default_ideal_span_size def read_and_set_config(self): one_from_config = False diff --git a/coverage_model/data_span.py b/coverage_model/data_span.py index 72cfb12..29f5e70 100644 --- a/coverage_model/data_span.py +++ b/coverage_model/data_span.py @@ -20,7 +20,8 @@ class Span(object): ingest_time_str = CoverageConfig().ingest_time_key - def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compressors=None, mutable=False): + def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compressors=None, mutable=False, + fill_mask=None, sort_parameter=None): self.param_dict = param_dict self.ingest_time = ingest_time self.ingest_times = [] @@ -32,6 +33,7 @@ def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compres self.coverage_id = coverage_id self.compressors = compressors self.mutable = mutable + self.sort_parameter = sort_parameter def _set_ingest_times(self, ingest_times=None): if self.ingest_time_str in self.param_dict: @@ -58,6 +60,15 @@ def get_param_data_length(self): raise IndexError("Parameter arrays are different sizes in the 0th dimmension.") return size + def get_numpy_bytes(self): + size = 0 + for k, param_data in self.param_dict.iteritems(): + if not isinstance(param_data, NumpyParameterData): + continue + if size == 0: + size += param_data.get_data().nbytes + return size + @property def ingest_time_array(self, dtype=np.float64): arr = np.empty(self.get_param_data_length(), dtype=dtype) @@ -73,6 +84,8 @@ def get_span_stats(self, params=None): param_stat_dict = {} if params is None: params = self.param_dict.keys() + elif isinstance(params, basestring): + params = [params] for param in params: if param in self.param_dict: if isinstance(self.param_dict[param], ConstantOverTime): @@ -155,6 +168,7 @@ def from_json(cls, js_str, decompressors=None): span = Span(str(json_dict['id']), str(json_dict['coverage_id']), uncompressed_params, ingest_time=json_dict['ingest_time'], mutable=json_dict['mutable']) span.ingest_times = json_dict['ingest_time_dict'] span.param_dict[cls.ingest_time_str] = NumpyParameterData(cls.ingest_time_str, span.ingest_time_array) + span.compressors = decompressors return span def get_hash(self): @@ -198,11 +212,66 @@ def __eq__(self, other): return False def __gt__(self, other): - return self.ingest_time > other.ingest_time + if self.sort_parameter != other.sort_parameter: + raise RuntimeError('Sort parameters for objects do not match [%s vs. %s]' % (self.sort_parameter, other.sort_parameter)) + if self.sort_parameter is None: + return self.ingest_time > other.ingest_time + else: + my_stats = self.get_span_stats(self.sort_parameter) + other_stats = other.get_span_stats(self.sort_parameter) + return my_stats.params[self.sort_parameter][0] > other_stats.params[self.sort_parameter][0] def __lt__(self, other): return not self.__gt__(other) + @classmethod + def merge_spans(cls, spans, sort_param=None, fill_value_dict=None): + num_observations = 0 + cov_id = None + param_key_set = set() + compressors = {} + for span in spans: + param_key_set.update(span.param_dict.keys()) + num_observations += span.get_param_data_length() + if cov_id is None: + cov_id = span.coverage_id + elif cov_id != span.coverage_id: + raise RuntimeError('coverages ids do not match across spans') + if span.compressors is not None: + for k, v in span.compressors.iteritems(): + compressors[k] = v + + new_pdict = {} + current_pos = 0 + for span in spans: + span_size = span.get_param_data_length() + for key in param_key_set: + if key not in span.param_dict.keys(): + continue + data = span.param_dict[key].get_data() + if key not in new_pdict and key in span.param_dict.keys(): + new_pdict[key] = np.empty(num_observations, dtype=data.dtype) + if fill_value_dict is not None and key in fill_value_dict: + new_pdict[key][:] = fill_value_dict[key] + else: + new_pdict[key][:] = 0 + new_pdict[key][current_pos:current_pos+span_size] = data + current_pos += span_size + + from coverage_model.util.numpy_utils import NumpyUtils + if sort_param is not None: + new_pdict = NumpyUtils.sort_flat_arrays(new_pdict, sort_param) + + replace_dict = new_pdict + for k, v in new_pdict.iteritems(): + replace_dict[k] = NumpyParameterData(k, v) + + from coverage_model.utils import create_guid + new_span_id = create_guid() + merged_span = Span(new_span_id, coverage_id=cov_id, param_dict=replace_dict, compressors=compressors) + return merged_span + + class SpanStats(object): diff --git a/coverage_model/storage/parameter_persisted_storage.py b/coverage_model/storage/parameter_persisted_storage.py index f8f3463..75f1d47 100644 --- a/coverage_model/storage/parameter_persisted_storage.py +++ b/coverage_model/storage/parameter_persisted_storage.py @@ -459,7 +459,6 @@ def _create_param_dict_from_spans_dict(self, params, span_dict): def _sort_flat_arrays(cls, np_dict, sort_parameter=None): sorted_array_dict = {} if sort_parameter is None or sort_parameter not in np_dict.keys(): - # sort_parameter = self.alignment_parameter sort_parameter = 'time' sort_array = np_dict[sort_parameter] sorted_indexes = np.argsort(sort_array) diff --git a/coverage_model/storage/postgres_span_storage.py b/coverage_model/storage/postgres_span_storage.py index 5716097..b561305 100644 --- a/coverage_model/storage/postgres_span_storage.py +++ b/coverage_model/storage/postgres_span_storage.py @@ -57,6 +57,14 @@ def write_span(self, span): with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur: cur.execute(sql_str, [SpanJsonDumper(span) for i in range(data_times)]) + def replace_spans(self, new_span, old_spans): + stats_sql, bin_sql = self.get_span_stats_and_bin_insert_sql(new_span) + span_sql, data_times = self.get_span_insert_sql(new_span) + delete_sql = self.get_span_delete_sql(old_spans) + sql_str = "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; %s %s %s %s COMMIT;" % (span_sql, stats_sql, bin_sql, delete_sql) + with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur: + cur.execute(sql_str, [SpanJsonDumper(new_span) for i in range(data_times)]) + def get_span_insert_sql(self, span): data_times = 1 if True: @@ -122,6 +130,22 @@ def get_span_stats_and_bin_insert_sql(self, span): bin_sql = "" return stats_sql, bin_sql + def get_span_delete_sql(self, spans): + cov_id = None + span_ids = set() + for span in spans: + if cov_id is None: + cov_id = span.coverage_id + elif cov_id != span.coverage_id: + raise RuntimeError('Cannot delete spans from multiple coverages in one call') + span_ids.add("'" + span.id + "'") + span_ids = ", ".join(span_ids) + statement = """DELETE FROM %s where coverage_id = '%s' AND id IN (%s); \ + DELETE FROM %s WHERE coverage_id = '%s' AND span_address IN (%s);""" \ + % (self.span_table_name, cov_id, span_ids, self.span_stats_table_name, cov_id, span_ids) + return statement + + def get_spans(self, span_ids=None, coverage_ids=None, params=None, start_time=None, stop_time=None, decompressors=None): statement = """SELECT data::text from %s where coverage_id = '%s'""" % (self.span_table_name, coverage_ids) if span_ids is not None: @@ -140,6 +164,19 @@ def get_spans(self, span_ids=None, coverage_ids=None, params=None, start_time=No return spans + def get_stored_coverage_ids(self): + statement = """SELECT coverage_id from %s""" % (self.span_stats_table_name) + with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur: + cur.execute(statement) + results = cur.fetchall() + + cov_ids = set() + for row in results: + cov_id, = row + cov_ids.add(cov_id) + + return cov_ids + def has_data(self, coverage_id): statement = """SELECT coverage_id FROM %s WHERE coverage_id = '%s'""" % (self.span_stats_table_name, coverage_id) results = [] diff --git a/coverage_model/storage/span_storage.py b/coverage_model/storage/span_storage.py index a162d77..81c6ac2 100644 --- a/coverage_model/storage/span_storage.py +++ b/coverage_model/storage/span_storage.py @@ -15,4 +15,10 @@ def search(self, search_criteria, limit=None): raise NotImplementedError('Not implemented in base class') def has_data(self, coverage_id): - raise NotImplementedError('Not implemented in base class') \ No newline at end of file + raise NotImplementedError('Not implemented in base class') + + def get_stored_coverage_ids(self): + raise NotImplementedError('Not implemented in base class') + + def replace_spans(self, new_spans, old_spans): + raise NotImplementedError('Not implemented in base class') diff --git a/coverage_model/storage/storage_respan_task.py b/coverage_model/storage/storage_respan_task.py new file mode 100644 index 0000000..5d0c91e --- /dev/null +++ b/coverage_model/storage/storage_respan_task.py @@ -0,0 +1,97 @@ +__author__ = 'casey' + +import sys +import threading +from ooi.logging import log +from threading import Thread +from coverage_model.config import CoverageConfig +from coverage_model.coverage import AbstractCoverage, SimplexCoverage +from coverage_model.data_span import Span +from coverage_model.storage.span_storage_factory import SpanStorageFactory +from coverage_model.storage.span_storage import SpanStorage +from coverage_model.metadata_factory import MetadataManagerFactory + + +class StorageRespanTask(object): + """ + Repackage data writes into contiguous memory packages for more optimal data access. + Each individual data write is stored in it's own location to support asynchronous writes. + Repackaging accounts for the possibility of out of observation chronological order writes as well as + data write sizes that could hurt read performance (e.g. lots of small-ish writes). + Repackaging attempts to merge many small 'files' into one optimally-sized 'file'. + The current implementation does not split larger 'files' into multiple smaller files, though that + functionality should be considered in the future. + """ + + def __init__(self, storage_name=None, coverage_ids=None, time_segment=None, + sort_parameter=None): + store = SpanStorageFactory.get_span_storage_obj(storage_name) + if not isinstance(store, SpanStorage): + raise TypeError("Retrieved storage object must implement %s type. Found %s." % (SpanStorage.__name__, self.store.__class__.__name__)) + else: + self.store = store + + if coverage_ids is None: + self.coverage_ids = set() + coverage_ids = self.store.get_stored_coverage_ids() + for cov_id in coverage_ids: + if MetadataManagerFactory.is_persisted(cov_id): + self.coverage_ids.add(cov_id) + + elif isinstance(coverage_ids, (list,set)): + self.coverage_ids = set(coverage_ids) + elif isinstance(coverage_ids, basestring): + self.coverage_ids = [coverage_ids] + else: + raise TypeError("Unhandled coverage_ids type - %s", type(coverage_ids)) + + if time_segment is not None and not isinstance(time_segment, tuple) and len(time_segment) != 2: + raise TypeError() + self.time_segment = time_segment + self.sort_parameter_name = sort_parameter + + def do_respan(self, asynchronous=False): + if asynchronous: + thread = Thread(target=self.do_respan, args=(False,)) + thread.start() + return thread + + for id in self.coverage_ids: + self.respan_coverage(id) + + def respan_coverage(self, cov_id): + cov = AbstractCoverage.resurrect(cov_id, 'r') + if not isinstance(cov, SimplexCoverage): + return + log.info('Respanning coverage %s' % cov_id) + decompressors = cov._persistence_layer.value_list + fill_value_dict = {} + for k in decompressors: + fill_value_dict[k] = cov.get_parameter_context(k).fill_value + if fill_value_dict[k] is None: + fill_value_dict[k] = -9999.0 + spans = self.store.get_spans(coverage_ids=cov_id, decompressors=decompressors) + for span in spans: + span.sort_parameter = cov.temporal_parameter_name + starting_num_spans = len(spans) + + ideal_span_size = CoverageConfig().ideal_span_size + span_sets = [[]] + current_size = 0 + spans = sorted(spans) + for span in spans: + span_size = span.get_numpy_bytes() + if (current_size > ideal_span_size and len(span_sets[-1]) > 0) or abs(ideal_span_size - current_size) < abs(ideal_span_size - span_size): + span_sets.append([]) + current_size = 0 + span_sets[-1].append(span) + current_size += span.get_numpy_bytes() + + new_span_ids = [] + for span_set in span_sets: + new_span = Span.merge_spans(span_set, sort_param=self.sort_parameter_name, fill_value_dict=fill_value_dict) + self.store.replace_spans(new_span, span_set) + new_span_ids.append(new_span.id) + + log.info('Respaned coverage %s from %s spans to %s spans' % (cov_id, starting_num_spans, len(new_span_ids))) + return new_span_ids \ No newline at end of file diff --git a/coverage_model/test/test_postgres_storage.py b/coverage_model/test/test_postgres_storage.py index 0da4e00..c4f4ac4 100644 --- a/coverage_model/test/test_postgres_storage.py +++ b/coverage_model/test/test_postgres_storage.py @@ -17,6 +17,7 @@ from coverage_model.parameter_data import * from coverage_test_base import get_parameter_dict from coverage_model.parameter_types import * +from coverage_model.storage.storage_respan_task import StorageRespanTask @attr('UNIT',group='cov') @@ -266,7 +267,6 @@ def test_add_constant(self): expected_array[1:4] = 4096 scov.set_parameter_values({'sparseness': ConstantOverTime('sparseness', 4096, time_start=10000, time_end=10002)}) returned_array = scov.get_parameter_values([scov.temporal_parameter_name, 'sparseness'], as_record_array=True).get_data() - print returned_array np.testing.assert_array_equal(expected_array, returned_array['sparseness']) expected_array[-3:] = 17 @@ -1045,6 +1045,67 @@ def test_order_by_ingest(self): print return_values['ingestion_timestamp'][x+1], sorted_values[x+1] np.testing.assert_array_equal(sorted_values, return_values['ingestion_timestamp']) + def test_respan(self): + data_ctx = ParameterContext('data', param_type=RecordType(), fill_value=-999.0) + cov = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10) + + cov.set_parameter_values({'time': np.arange(50,55), 'data': np.arange(150,155)}) + cov.set_parameter_values({'time': np.arange(250,253), 'data': np.arange(200,203)}) + cov.set_parameter_values({'time': np.arange(1500,1610), 'data': np.arange(13000,13110), 'conductivity': np.arange(100,210)}) + cov.set_parameter_values({'time': np.arange(100,110), 'data': np.arange(2000,2010), 'conductivity': np.arange(0,10)}) + cov.refresh() + + orig_cov_data = cov.get_parameter_values().get_data() + from coverage_model.storage.storage_respan_task import StorageRespanTask + respan_task = StorageRespanTask(coverage_ids=cov.persistence_guid, sort_parameter='time') + respan_task.do_respan() + + respan_cov = AbstractCoverage.resurrect(cov.persistence_guid, 'r') + respan_cov_data = respan_cov.get_parameter_values().get_data() + self.assertEqual(respan_cov_data.keys(), orig_cov_data.keys()) + for key in orig_cov_data.keys(): + np.testing.assert_array_equal(orig_cov_data[key], respan_cov_data[key]) + + @unittest.skip("Test has the potential to take considerable time only run if you're testing respan of all coverages") + def test_respan_all_coverages(self): + data_ctx = ParameterContext('data', param_type=RecordType(), fill_value=-999.0) + cov = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10) + + cov.set_parameter_values({'time': np.arange(50,55), 'data': np.arange(150,155)}) + import time + time.sleep(1) + cov.set_parameter_values({'time': np.arange(250,253), 'data': np.arange(200,203)}) + time.sleep(1) + cov.set_parameter_values({'time': np.arange(100,110), 'data': np.arange(2000,2010), 'conductivity': np.arange(0,10)}) + cov.refresh() + + cov2 = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10) + + cov2.set_parameter_values({'time': np.arange(1050,1055), 'data': np.arange(1150,1155)}) + cov2.set_parameter_values({'time': np.arange(1250,1253), 'data': np.arange(1200,1203)}) + cov2.set_parameter_values({'time': np.arange(1500,1610), 'data': np.arange(13000,13110), 'conductivity': np.arange(100,210)}) + cov2.set_parameter_values({'time': np.arange(1100,1110), 'data': np.arange(12000,12010), 'conductivity': np.arange(0,10)}) + cov2.refresh() + + orig_cov_data = cov.get_parameter_values().get_data() + orig_cov2_data = cov2.get_parameter_values().get_data() + + from coverage_model.storage.storage_respan_task import StorageRespanTask + respan_task = StorageRespanTask(sort_parameter='time') + respan_task.do_respan() + + respan_cov = AbstractCoverage.resurrect(cov.persistence_guid, 'r') + respan_cov_data = respan_cov.get_parameter_values().get_data() + self.assertEqual(respan_cov_data.keys(), orig_cov_data.keys()) + for key in orig_cov_data.keys(): + np.testing.assert_array_equal(orig_cov_data[key], respan_cov_data[key]) + + respan_cov2 = AbstractCoverage.resurrect(cov2.persistence_guid, 'r') + respan_cov2_data = respan_cov2.get_parameter_values().get_data() + self.assertEqual(respan_cov2_data.keys(), orig_cov2_data.keys()) + for key in orig_cov2_data.keys(): + np.testing.assert_array_equal(orig_cov2_data[key], respan_cov2_data[key]) + def identity(x): return np.copy(x)*3