diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 967fc08129..4e44f95041 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -518,6 +518,7 @@ set(arcticdb_srcs version/key_block.cpp util/gil_safe_py_none.cpp version/local_versioned_engine.cpp + version/schema_checks.cpp version/op_log.cpp version/snapshot.cpp version/symbol_list.cpp diff --git a/cpp/arcticdb/column_store/memory_segment_impl.cpp b/cpp/arcticdb/column_store/memory_segment_impl.cpp index 36d6f17e20..7b5aad213c 100644 --- a/cpp/arcticdb/column_store/memory_segment_impl.cpp +++ b/cpp/arcticdb/column_store/memory_segment_impl.cpp @@ -650,7 +650,7 @@ size_t SegmentInMemoryImpl::num_bytes() const { void SegmentInMemoryImpl::sort(const std::string& column_name) { init_column_map(); auto idx = column_index(std::string_view(column_name)); - user_input::check(static_cast(idx), "Column {} not found in sort", column_name); + schema::check(static_cast(idx), "Column {} not found in sort", column_name); sort(static_cast(idx.value())); } @@ -659,7 +659,7 @@ void SegmentInMemoryImpl::sort(const std::vector& column_names) { std::vector positions; for(const auto& column_name : column_names) { auto idx = column_index(std::string_view(column_name)); - user_input::check(static_cast(idx), "Column {} not found in multi-sort", column_name); + schema::check(static_cast(idx), "Column {} not found in multi-sort", column_name); positions.emplace_back(static_cast(*idx)); } sort(positions); diff --git a/cpp/arcticdb/version/schema_checks.cpp b/cpp/arcticdb/version/schema_checks.cpp new file mode 100644 index 0000000000..c6354f7da8 --- /dev/null +++ b/cpp/arcticdb/version/schema_checks.cpp @@ -0,0 +1,154 @@ +#include +#include + +namespace arcticdb { + +std::string_view normalization_operation_str(NormalizationOperation operation) { + switch (operation) { + case APPEND: + return "APPEND"; + case UPDATE: + return "UPDATE"; + default: + util::raise_rte("Unknown operation type {}", static_cast(operation)); + } +} + +IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right) { + if (left == right) { + return left; + } + if (left == IndexDescriptor::Type::EMPTY) { + return right; + } + if (right == IndexDescriptor::Type::EMPTY) { + return left; + } + return IndexDescriptor::Type::UNKNOWN; +} + +void check_normalization_index_match( + NormalizationOperation operation, + const StreamDescriptor& old_descriptor, + const pipelines::InputTensorFrame& frame, + bool empty_types +) { + const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type(); + const IndexDescriptor::Type new_idx_kind = frame.desc.index().type(); + if (operation == UPDATE) { + const bool new_is_timeseries = std::holds_alternative(frame.index); + util::check_rte( + (old_idx_kind == IndexDescriptor::Type::TIMESTAMP || old_idx_kind == IndexDescriptor::Type::EMPTY) && new_is_timeseries, + "Update will not work as expected with a non-timeseries index" + ); + } else { + const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind); + if (empty_types) { + normalization::check( + common_index_type != IndexDescriptor::Type::UNKNOWN, + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } else { + // (old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT) is left to preserve + // pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series + // have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545 + // it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index + // in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series. + // Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would + // have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better + // after we enable the empty index. + const bool input_frame_is_series = frame.norm_meta.has_series(); + normalization::check( + common_index_type != IndexDescriptor::Type::UNKNOWN || + (input_frame_is_series && old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT), + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } + } +} + +bool index_names_match( + const StreamDescriptor& df_in_store_descriptor, + const StreamDescriptor& new_df_descriptor +) { + auto df_in_store_index_field_count = df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? 0 : df_in_store_descriptor.index().field_count(); + auto new_df_field_index_count = new_df_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? 0 : new_df_descriptor.index().field_count(); + + // If either index is empty, we consider them to match + if (df_in_store_index_field_count == 0 || new_df_field_index_count == 0) { + return true; + } + + if (df_in_store_index_field_count != new_df_field_index_count) { + return false; + } + + for (auto i = 0; i < int(df_in_store_index_field_count); ++i) { + if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i).name()) { + return false; + } + } + + return true; +} + +bool columns_match( + const StreamDescriptor& df_in_store_descriptor, + const StreamDescriptor& new_df_descriptor +) { + const int index_field_size = + df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? new_df_descriptor.index().field_count() : 0; + // The empty index is compatible with all other index types. Differences in the index fields in this case is + // allowed. The index fields are always the first in the list. + if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) { + return false; + } + // In case the left index is empty index we want to skip name/type checking of the index fields which are always + // the first fields. + for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) { + if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name()) + return false; + + const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type(); + const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type(); + + if (!trivially_compatible_types(left_type, right_type) && + !(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type()))) + return false; + } + return true; +} + +void fix_descriptor_mismatch_or_throw( + NormalizationOperation operation, + bool dynamic_schema, + const pipelines::index::IndexSegmentReader &existing_isr, + const pipelines::InputTensorFrame &new_frame, + bool empty_types) { + const auto &old_sd = existing_isr.tsd().as_stream_descriptor(); + check_normalization_index_match(operation, old_sd, new_frame, empty_types); + + fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame); + + // We need to check that the index names match regardless of the dynamic schema setting + if(!index_names_match(old_sd, new_frame.desc)) { + throw StreamDescriptorMismatch( + "The index names in the argument are not identical to that of the existing version", + old_sd, + new_frame.desc, + operation); + } + + if (!dynamic_schema && !columns_match(old_sd, new_frame.desc)) { + throw StreamDescriptorMismatch( + "The columns (names and types) in the argument are not identical to that of the existing version", + old_sd, + new_frame.desc, + operation); + } +} +} // namespace arcticdb diff --git a/cpp/arcticdb/version/schema_checks.hpp b/cpp/arcticdb/version/schema_checks.hpp index 831341b45f..2a81e12ac4 100644 --- a/cpp/arcticdb/version/schema_checks.hpp +++ b/cpp/arcticdb/version/schema_checks.hpp @@ -11,17 +11,7 @@ enum NormalizationOperation : uint8_t { UPDATE, }; -inline std::string_view normalization_operation_str(NormalizationOperation operation) { - switch (operation) { - case APPEND: - return "APPEND"; - case UPDATE: - return "UPDATE"; - default: - util::raise_rte("Unknown operation type {}", static_cast(operation)); - } -} - +std::string_view normalization_operation_str(NormalizationOperation operation); struct StreamDescriptorMismatch : ArcticSpecificException { StreamDescriptorMismatch(const char* preamble, const StreamDescriptor& existing, const StreamDescriptor& new_val, NormalizationOperation operation) : @@ -29,107 +19,30 @@ struct StreamDescriptorMismatch : ArcticSpecificException(frame.index); - util::check_rte( - (old_idx_kind == IndexDescriptor::Type::TIMESTAMP || old_idx_kind == IndexDescriptor::Type::EMPTY) && new_is_timeseries, - "Update will not work as expected with a non-timeseries index" - ); - } else { - const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind); - if (empty_types) { - normalization::check( - common_index_type != IndexDescriptor::Type::UNKNOWN, - "Cannot append {} index to {} index", - index_type_to_str(new_idx_kind), - index_type_to_str(old_idx_kind) - ); - } else { - // (old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT) is left to preserve - // pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series - // have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545 - // it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index - // in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series. - // Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would - // have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better - // after we enable the empty index. - const bool input_frame_is_series = frame.norm_meta.has_series(); - normalization::check( - common_index_type != IndexDescriptor::Type::UNKNOWN || - (input_frame_is_series && old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT), - "Cannot append {} index to {} index", - index_type_to_str(new_idx_kind), - index_type_to_str(old_idx_kind) - ); - } - } -} +); -inline bool columns_match( +bool index_names_match( const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor -) { - const int index_field_size = - df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? new_df_descriptor.index().field_count() : 0; - // The empty index is compatible with all other index types. Differences in the index fields in this case is - // allowed. The index fields are always the first in the list. - if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) { - return false; - } - // In case the left index is empty index we want to skip name/type checking of the index fields which are always - // the first fields. - for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) { - if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name()) - return false; - - const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type(); - const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type(); +); - if (!trivially_compatible_types(left_type, right_type) && - !(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type()))) - return false; - } - return true; -} +bool columns_match( + const StreamDescriptor& df_in_store_descriptor, + const StreamDescriptor& new_df_descriptor +); -inline void fix_descriptor_mismatch_or_throw( +void fix_descriptor_mismatch_or_throw( NormalizationOperation operation, bool dynamic_schema, const pipelines::index::IndexSegmentReader &existing_isr, const pipelines::InputTensorFrame &new_frame, - bool empty_types) { - const auto &old_sd = existing_isr.tsd().as_stream_descriptor(); - check_normalization_index_match(operation, old_sd, new_frame, empty_types); - - fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame); - - if (!dynamic_schema && !columns_match(old_sd, new_frame.desc)) { - throw StreamDescriptorMismatch( - "The columns (names and types) in the argument are not identical to that of the existing version", - old_sd, - new_frame.desc, - operation); - } -} + bool empty_types +); } // namespace arcticdb diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 728c4713f8..e81118a16b 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -1078,6 +1078,20 @@ bool read_incompletes_to_pipeline( ensure_timeseries_norm_meta(*pipeline_context->norm_meta_, pipeline_context->stream_id_, sparsify); } + const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor(); + + + // We need to check that the index names match regardless of the dynamic schema setting + // A more detailed check is done later in the do_compact function + if (pipeline_context->desc_) { + schema::check( + index_names_match(staged_desc, *pipeline_context->desc_), + "The index names in the staged stream descriptor {} are not identical to that of the stream descriptor on storage {}", + staged_desc, + *pipeline_context->desc_ + ); + } + if (dynamic_schema) { pipeline_context->staged_descriptor_ = merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns); @@ -1089,7 +1103,6 @@ bool read_incompletes_to_pipeline( pipeline_context->desc_ = pipeline_context->staged_descriptor_; } } else { - const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor(); if (pipeline_context->desc_) { schema::check( columns_match(staged_desc, *pipeline_context->desc_), @@ -2037,6 +2050,17 @@ bool is_segment_unsorted(const SegmentInMemory& segment) { } CheckOutcome check_schema_matches_incomplete(const StreamDescriptor& stream_descriptor_incomplete, const StreamDescriptor& pipeline_desc) { + // We need to check that the index names match regardless of the dynamic schema setting + if(!index_names_match(stream_descriptor_incomplete, pipeline_desc)) { + return Error{ + throw_error, + fmt::format("{} All staged segments must have the same index names." + "{} is different than {}", + error_code_data.name_, + stream_descriptor_incomplete, + pipeline_desc) + }; + } if (!columns_match(stream_descriptor_incomplete, pipeline_desc)) { return Error{ throw_error, diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index cbc8b4724f..f9e0ae9294 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -307,6 +308,12 @@ template descriptor())) { + auto written_keys = folly::collect(write_futures).get(); + remove_written_keys(store.get(), std::move(written_keys)); + return Error{throw_error, fmt::format("Index names in segment {} and pipeline context {} do not match", segment.descriptor(), pipeline_context->descriptor())}; + } + if(validate_index && is_segment_unsorted(segment)) { auto written_keys = folly::collect(write_futures).get(); remove_written_keys(store.get(), std::move(written_keys)); diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index fe6add73ec..ddb070f09b 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -1241,6 +1241,7 @@ def finalize_staged_data( is not the same as the schema of the existing data - If dynamic schema is used and different segments have the same column names but their dtypes don't have a common type (e.g string and any numeric type) + - If a different index name is encountered in the staged data, regardless of the schema mode See Also -------- @@ -1351,6 +1352,7 @@ def sort_and_finalize_staged_data( staged segment is not the same as the schema of the existing data - If dynamic schema is used and different segments have the same column names but their dtypes don't have a common type (e.g string and any numeric type) + - If a different index name is encountered in the staged data, regardless of the schema mode See Also -------- diff --git a/python/tests/hypothesis/arcticdb/test_append.py b/python/tests/hypothesis/arcticdb/test_append.py index 031479ee2b..daa7c6a151 100644 --- a/python/tests/hypothesis/arcticdb/test_append.py +++ b/python/tests/hypothesis/arcticdb/test_append.py @@ -3,7 +3,11 @@ import numpy as np import pytest from arcticdb.version_store import NativeVersionStore -from arcticdb_ext.exceptions import InternalException, NormalizationException, SortingException +from arcticdb_ext.exceptions import ( + InternalException, + NormalizationException, +) +from arcticdb_ext.version_store import StreamDescriptorMismatch from arcticdb_ext import set_config_int from hypothesis import given, assume, settings, strategies as st from itertools import chain, product, combinations @@ -11,7 +15,10 @@ from arcticdb.version_store._common import TimeFrame from arcticdb.util.test import assert_frame_equal, random_seed_context -from arcticdb.util.hypothesis import InputFactories, use_of_function_scoped_fixtures_in_hypothesis_checked +from arcticdb.util.hypothesis import ( + InputFactories, + use_of_function_scoped_fixtures_in_hypothesis_checked, +) def gen_params_append(): @@ -24,7 +31,11 @@ def gen_params_append(): # rownums p.append([1, 4, periods + 2]) # cols - p.append(list(chain(*[list(combinations(["a", "b", "c"], c)) for c in range(1, 4, 2)]))) + p.append( + list( + chain(*[list(combinations(["a", "b", "c"], c)) for c in range(1, 4, 2)]) + ) + ) # tsbounds p.append([(j, i) for i in [1, periods - 1] for j in range(i)]) # append_point @@ -50,13 +61,19 @@ def gen_params_append_single(): return list(product(*p)) -@pytest.mark.parametrize("colnum,periods,rownum,cols,tsbounds,append_point", gen_params_append()) -def test_append_partial_read(version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point): +@pytest.mark.parametrize( + "colnum,periods,rownum,cols,tsbounds,append_point", gen_params_append() +) +def test_append_partial_read( + version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point +): tz = "America/New_York" version_store = version_store_factory(col_per_group=colnum, row_per_segment=rownum) dtidx = pd.date_range("2019-02-06 11:43", periods=6).tz_localize(tz) a = np.arange(dtidx.shape[0]) - tf = TimeFrame(dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10]) + tf = TimeFrame( + dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10] + ) c1 = dtidx[append_point] c2 = dtidx[append_point + 1] tf1 = tf.tsloc[:c1] @@ -68,19 +85,27 @@ def test_append_partial_read(version_store_factory, colnum, periods, rownum, col dtr = (dtidx[tsbounds[0]], dtidx[tsbounds[1]]) vit = version_store.read(sid, date_range=dtr, columns=list(cols)) rtf = tf.tsloc[dtr[0] : dtr[1]] - col_names, col_values = zip(*[(c, v) for c, v in zip(rtf.columns_names, rtf.columns_values) if c in cols]) + col_names, col_values = zip( + *[(c, v) for c, v in zip(rtf.columns_names, rtf.columns_values) if c in cols] + ) rtf = TimeFrame(rtf.times, list(col_names), list(col_values)) assert rtf == vit.data -@pytest.mark.parametrize("colnum,periods,rownum,cols,tsbounds,append_point", gen_params_append()) -def test_incomplete_append_partial_read(version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point): +@pytest.mark.parametrize( + "colnum,periods,rownum,cols,tsbounds,append_point", gen_params_append() +) +def test_incomplete_append_partial_read( + version_store_factory, colnum, periods, rownum, cols, tsbounds, append_point +): tz = "America/New_York" version_store = version_store_factory(col_per_group=colnum, row_per_segment=rownum) lib_tool = version_store.library_tool() dtidx = pd.date_range("2019-02-06 11:43", periods=6).tz_localize(tz) a = np.arange(dtidx.shape[0]) - tf = TimeFrame(dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10]) + tf = TimeFrame( + dtidx.values, columns_names=["a", "b", "c"], columns_values=[a, a + a, a * 10] + ) c1 = dtidx[append_point] c2 = dtidx[append_point + 1] tf1 = tf.tsloc[:c1] @@ -92,7 +117,9 @@ def test_incomplete_append_partial_read(version_store_factory, colnum, periods, dtr = (dtidx[tsbounds[0]], dtidx[tsbounds[1]]) vit = version_store.read(sid, date_range=dtr, columns=list(cols), incomplete=True) rtf = tf.tsloc[dtr[0] : dtr[1]] - col_names, col_values = zip(*[(c, v) for c, v in zip(rtf.columns_names, rtf.columns_values) if c in cols]) + col_names, col_values = zip( + *[(c, v) for c, v in zip(rtf.columns_names, rtf.columns_values) if c in cols] + ) rtf = TimeFrame(rtf.times, list(col_names), list(col_values)) assert rtf == vit.data @@ -105,12 +132,22 @@ def test_incomplete_append_partial_read(version_store_factory, colnum, periods, # (InputFactories.DF_RC_NON_RANGE, InputFactories.DF_DTI, "TODO(AN-722)"), (InputFactories.DF_RC, InputFactories.ND_ARRAY_1D, "(Pandas|ndarray)"), (InputFactories.DF_RC, InputFactories.DF_MULTI_RC, "index type incompatible"), - (InputFactories.DF_RC, InputFactories.DF_RC_NON_RANGE, "range.*index which is incompatible"), + ( + InputFactories.DF_RC, + InputFactories.DF_RC_NON_RANGE, + "range.*index which is incompatible", + ), (InputFactories.DF_RC, InputFactories.DF_RC_STEP, "different.*step"), ], ) @pytest.mark.parametrize("swap", ["swap", ""]) -def test_(initial: InputFactories, append: InputFactories, match, swap, lmdb_version_store: NativeVersionStore): +def test_( + initial: InputFactories, + append: InputFactories, + match, + swap, + lmdb_version_store: NativeVersionStore, +): lib = lmdb_version_store if swap: initial, append = append, initial @@ -127,7 +164,9 @@ def test_(initial: InputFactories, append: InputFactories, match, swap, lmdb_ver @given( col_per_append_df=st.integers(2, 100), col_name_set=st.integers(1, 10000), - num_rows_per_test_cycle=st.lists(st.lists(st.integers(1, 20), min_size=1, max_size=10), max_size=2), + num_rows_per_test_cycle=st.lists( + st.lists(st.integers(1, 20), min_size=1, max_size=10), max_size=2 + ), column_group_size=st.integers(2, 100), segment_row_size=st.integers(2, 100), dynamic_schema=st.booleans(), @@ -149,19 +188,27 @@ def test_append_with_defragmentation( df_in_str, basic_store_factory, ): - def get_wide_and_long_df(start_idx, end_idx, col_per_append_df, col_name_set, df_in_str): + def get_wide_and_long_df( + start_idx, end_idx, col_per_append_df, col_name_set, df_in_str + ): df = pd.DataFrame() for idx in range(start_idx, end_idx): df = pd.concat([df, get_wide_df(idx, col_per_append_df, col_name_set)]) - if col_per_append_df == col_name_set: # manually sort them for static schema, for newer version of panda + if ( + col_per_append_df == col_name_set + ): # manually sort them for static schema, for newer version of panda df = df.reindex(sorted(list(df.columns)), axis=1) df = df.astype(str if df_in_str else np.float64) return df def get_no_of_segments_after_defragmentation(df, merged_segment_row_size): new_segment_row_size = no_of_segments = 0 - for start_row, end_row in pd.Series(df.end_row.values, index=df.start_row).to_dict().items(): - no_of_segments = no_of_segments + 1 if new_segment_row_size == 0 else no_of_segments + for start_row, end_row in ( + pd.Series(df.end_row.values, index=df.start_row).to_dict().items() + ): + no_of_segments = ( + no_of_segments + 1 if new_segment_row_size == 0 else no_of_segments + ) new_segment_row_size += end_row - start_row if new_segment_row_size >= merged_segment_row_size: new_segment_row_size = 0 @@ -184,7 +231,9 @@ def run_test( start_index = index_offset index_offset += num_of_row end_index = index_offset - df = get_wide_and_long_df(start_index, end_index, col_per_append_df, col_name_set, df_in_str) + df = get_wide_and_long_df( + start_index, end_index, col_per_append_df, col_name_set, df_in_str + ) before_compact = pd.concat([before_compact, df]) if start_index == 0: lib.write(sym, df) @@ -192,7 +241,9 @@ def run_test( lib.append(sym, df) segment_details = lib.read_index(sym) assert lib.is_symbol_fragmented(sym, None) is ( - get_no_of_segments_after_defragmentation(segment_details, merged_segment_row_size) + get_no_of_segments_after_defragmentation( + segment_details, merged_segment_row_size + ) != get_no_of_column_merged_segments(segment_details) ) if get_no_of_segments_after_defragmentation( @@ -206,7 +257,9 @@ def run_test( res = res.reindex(sorted(list(res.columns)), axis=1) res = res.replace("", 0.0) res = res.fillna(0.0) - before_compact = before_compact.reindex(sorted(list(before_compact.columns)), axis=1) + before_compact = before_compact.reindex( + sorted(list(before_compact.columns)), axis=1 + ) before_compact = before_compact.fillna(0.0) seg_details = lib.read_index(sym) @@ -217,9 +270,16 @@ def run_test( seg_details_before_compaction, merged_segment_row_size ) indexs = ( - seg_details["end_index"].astype(str).str.rsplit(" ", n=2).agg(" ".join).reset_index() + seg_details["end_index"] + .astype(str) + .str.rsplit(" ", n=2) + .agg(" ".join) + .reset_index() ) # start_index and end_index got merged into one column - assert np.array_equal(indexs.iloc[1:, 0].astype(str).values, indexs.iloc[:-1, 1].astype(str).values) + assert np.array_equal( + indexs.iloc[1:, 0].astype(str).values, + indexs.iloc[:-1, 1].astype(str).values, + ) else: with pytest.raises(InternalException): lib.defragment_symbol_data(sym, None) @@ -227,7 +287,9 @@ def run_test( assume(col_per_append_df <= col_name_set) assume( - num_of_row % 2 != 0 for num_of_rows in num_rows_per_test_cycle for num_of_row in num_of_rows + num_of_row % 2 != 0 + for num_of_rows in num_rows_per_test_cycle + for num_of_row in num_of_rows ) # Make sure at least one successful compaction run per cycle set_config_int("SymbolDataCompact.SegmentCount", 1) @@ -251,3 +313,20 @@ def run_test( index_offset, num_of_rows, ) + + +def test_regular_append_dynamic_schema_named_index( + lmdb_version_store_tiny_segment_dynamic, +): + lib = lmdb_version_store_tiny_segment_dynamic + sym = "test_parallel_append_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0) + with pytest.raises(StreamDescriptorMismatch) as exception_info: + lib.append(sym, df_1) + + assert "date" in str(exception_info.value) diff --git a/python/tests/unit/arcticdb/version_store/test_parallel.py b/python/tests/unit/arcticdb/version_store/test_parallel.py index 71601bb980..e005eb49bb 100644 --- a/python/tests/unit/arcticdb/version_store/test_parallel.py +++ b/python/tests/unit/arcticdb/version_store/test_parallel.py @@ -5,13 +5,19 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import numpy as np import pandas as pd import random import datetime import pytest -from arcticdb.exceptions import SortingException, SchemaException, UserInputException + +from arcticdb.exceptions import ( + SortingException, + SchemaException, + UserInputException, +) from arcticdb.util.test import ( assert_frame_equal, random_strings_of_length, @@ -65,8 +71,13 @@ def test_remove_incomplete(basic_store): sym1 = "test_remove_incomplete_1" sym2 = "test_remove_incomplete_2" num_chunks = 10 - df1 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2000-01-01", periods=num_chunks)) - df2 = pd.DataFrame({"col": np.arange(100, 110)}, index=pd.date_range("2001-01-01", periods=num_chunks)) + df1 = pd.DataFrame( + {"col": np.arange(10)}, index=pd.date_range("2000-01-01", periods=num_chunks) + ) + df2 = pd.DataFrame( + {"col": np.arange(100, 110)}, + index=pd.date_range("2001-01-01", periods=num_chunks), + ) for idx in range(num_chunks): lib.write(sym1, df1.iloc[idx : idx + 1, :], parallel=True) lib.write(sym2, df2.iloc[idx : idx + 1, :], parallel=True) @@ -98,7 +109,11 @@ def test_parallel_write(basic_store): num_rows = 1111 dtidx = pd.date_range("1970-01-01", periods=num_rows) test = pd.DataFrame( - {"uint8": random_integers(num_rows, np.uint8), "uint32": random_integers(num_rows, np.uint32)}, index=dtidx + { + "uint8": random_integers(num_rows, np.uint8), + "uint32": random_integers(num_rows, np.uint32), + }, + index=dtidx, ) chunk_size = 100 list_df = [test[i : i + chunk_size] for i in range(0, test.shape[0], chunk_size)] @@ -142,7 +157,9 @@ def test_floats_to_nans(lmdb_version_store_dynamic_schema): for _ in range(num_days): cols = random.sample(columns, 4) - index = pd.Index([dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)]) + index = pd.Index( + [dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)] + ) vals = {c: random_floats(num_rows_per_day) for c in cols} new_df = pd.DataFrame(data=vals, index=index) @@ -154,7 +171,9 @@ def test_floats_to_nans(lmdb_version_store_dynamic_schema): for d in dataframes: lmdb_version_store_dynamic_schema.write(symbol, d, parallel=True) - lmdb_version_store_dynamic_schema.version_store.compact_incomplete(symbol, False, False) + lmdb_version_store_dynamic_schema.version_store.compact_incomplete( + symbol, False, False + ) vit = lmdb_version_store_dynamic_schema.read(symbol) df.sort_index(axis=1, inplace=True) result = vit.data @@ -177,7 +196,9 @@ def test_write_parallel_sort_merge(basic_arctic_library, prune_previous_versions df = pd.DataFrame() for _ in range(num_days): - index = pd.Index([dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)]) + index = pd.Index( + [dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)] + ) vals = {c: random_floats(num_rows_per_day) for c in cols} new_df = pd.DataFrame(data=vals, index=index) @@ -189,14 +210,18 @@ def test_write_parallel_sort_merge(basic_arctic_library, prune_previous_versions lib.write(symbol, dataframes[0]) for d in dataframes: lib.write(symbol, d, staged=True) - lib.sort_and_finalize_staged_data(symbol, prune_previous_versions=prune_previous_versions) + lib.sort_and_finalize_staged_data( + symbol, prune_previous_versions=prune_previous_versions + ) vit = lib.read(symbol) df.sort_index(axis=1, inplace=True) result = vit.data result.sort_index(axis=1, inplace=True) assert_frame_equal(vit.data, df) if prune_previous_versions: - assert 0 not in [version["version"] for version in lib._nvs.list_versions(symbol)] + assert 0 not in [ + version["version"] for version in lib._nvs.list_versions(symbol) + ] else: assert_frame_equal(lib.read(symbol, as_of=0).data, dataframes[0]) @@ -215,7 +240,9 @@ def test_sort_merge_append(basic_store_dynamic_schema, prune_previous_versions): df = pd.DataFrame() for _ in range(num_days): cols = random.sample(columns, 4) - index = pd.Index([dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)]) + index = pd.Index( + [dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)] + ) vals = {c: random_floats(num_rows_per_day) for c in cols} new_df = pd.DataFrame(data=vals, index=index) dataframes.append(new_df) @@ -239,7 +266,9 @@ def test_sort_merge_append(basic_store_dynamic_schema, prune_previous_versions): for d in dataframes: lib.write(symbol, d, parallel=True) - lib.version_store.sort_merge(symbol, None, True, prune_previous_versions=prune_previous_versions) + lib.version_store.sort_merge( + symbol, None, True, prune_previous_versions=prune_previous_versions + ) vit = lib.read(symbol) df.sort_index(axis=1, inplace=True) result = vit.data @@ -254,7 +283,7 @@ def test_sort_merge_append(basic_store_dynamic_schema, prune_previous_versions): for version in range(int(half_way)): result = lib.read(symbol, as_of=version).data result.sort_index(axis=1, inplace=True) - df = pd.concat(old_dataframes[0 : version+1]) + df = pd.concat(old_dataframes[0 : version + 1]) df.sort_index(axis=1, inplace=True) assert_frame_equal(result, df) @@ -272,7 +301,9 @@ def test_datetimes_to_nats(lmdb_version_store_dynamic_schema): for _ in range(num_days): cols = random.sample(columns, 4) - index = pd.Index([dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)]) + index = pd.Index( + [dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)] + ) vals = {c: random_dates(num_rows_per_day) for c in cols} new_df = pd.DataFrame(data=vals, index=index) dataframes.append(new_df) @@ -283,7 +314,9 @@ def test_datetimes_to_nats(lmdb_version_store_dynamic_schema): for d in dataframes: lmdb_version_store_dynamic_schema.write(symbol, d, parallel=True) - lmdb_version_store_dynamic_schema.version_store.compact_incomplete(symbol, False, True) + lmdb_version_store_dynamic_schema.version_store.compact_incomplete( + symbol, False, True + ) vit = lmdb_version_store_dynamic_schema.read(symbol) df.sort_index(axis=1, inplace=True) result = vit.data @@ -302,12 +335,18 @@ def test_datetimes_to_nats(lmdb_version_store_dynamic_schema): @pytest.mark.parametrize("append", (True, False)) @pytest.mark.parametrize("arg", (True, False, None)) @pytest.mark.parametrize("lib_config", (True, False)) -def test_compact_incomplete_prune_previous(lib_config, arg, append, version_store_factory): +def test_compact_incomplete_prune_previous( + lib_config, arg, append, version_store_factory +): lib = version_store_factory(prune_previous_version=lib_config) lib.write("sym", pd.DataFrame({"col": [3]}, index=pd.DatetimeIndex([0]))) - lib.append("sym", pd.DataFrame({"col": [4]}, index=pd.DatetimeIndex([1])), incomplete=True) + lib.append( + "sym", pd.DataFrame({"col": [4]}, index=pd.DatetimeIndex([1])), incomplete=True + ) - lib.compact_incomplete("sym", append, convert_int_to_float=False, prune_previous_version=arg) + lib.compact_incomplete( + "sym", append, convert_int_to_float=False, prune_previous_version=arg + ) assert lib.read_metadata("sym").version == 1 should_prune = lib_config if arg is None else arg @@ -317,15 +356,23 @@ def test_compact_incomplete_prune_previous(lib_config, arg, append, version_stor def test_compact_incomplete_sets_sortedness(lmdb_version_store): lib = lmdb_version_store sym = "test_compact_incomplete_sets_sortedness" - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")] + ) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")] + ) lib.write(sym, df_1, parallel=True) lib.write(sym, df_0, parallel=True) lib.compact_incomplete(sym, False, False) assert lib.get_info(sym)["sorted"] == "ASCENDING" - df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")]) - df_3 = pd.DataFrame({"col": [7, 8]}, index=[pd.Timestamp("2024-01-07"), pd.Timestamp("2024-01-08")]) + df_2 = pd.DataFrame( + {"col": [5, 6]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")] + ) + df_3 = pd.DataFrame( + {"col": [7, 8]}, index=[pd.Timestamp("2024-01-07"), pd.Timestamp("2024-01-08")] + ) lib.append(sym, df_3, incomplete=True) lib.append(sym, df_2, incomplete=True) lib.compact_incomplete(sym, True, False) @@ -334,13 +381,20 @@ def test_compact_incomplete_sets_sortedness(lmdb_version_store): @pytest.mark.parametrize("append", (True, False)) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_sortedness_checks_unsorted_data(lmdb_version_store, append, validate_index): +def test_parallel_sortedness_checks_unsorted_data( + lmdb_version_store, append, validate_index +): lib = lmdb_version_store sym = "test_parallel_sortedness_checks_unsorted_data" if append: - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, + index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")], + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-04"), pd.Timestamp("2024-01-03")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-04"), pd.Timestamp("2024-01-03")] + ) if validate_index: with pytest.raises(SortingException): if append: @@ -363,14 +417,23 @@ def test_parallel_sortedness_checks_unsorted_data(lmdb_version_store, append, va @pytest.mark.parametrize("append", (True, False)) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_sortedness_checks_sorted_data(lmdb_version_store, append, validate_index): +def test_parallel_sortedness_checks_sorted_data( + lmdb_version_store, append, validate_index +): lib = lmdb_version_store sym = "test_parallel_sortedness_checks_unsorted_data" if append: - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, + index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")], + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")]) - df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")] + ) + df_2 = pd.DataFrame( + {"col": [5, 6]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")] + ) if append: lib.append(sym, df_1, incomplete=True, validate_index=validate_index) lib.append(sym, df_2, incomplete=True, validate_index=validate_index) @@ -404,9 +467,17 @@ def test_parallel_non_timestamp_index(lmdb_version_store, append): read_df = lib.read(sym).data if append: - expected_df = pd.concat([df_0, df_1, df_2]) if read_df["col"].iloc[-1] == 6 else pd.concat([df_0, df_2, df_1]) + expected_df = ( + pd.concat([df_0, df_1, df_2]) + if read_df["col"].iloc[-1] == 6 + else pd.concat([df_0, df_2, df_1]) + ) else: - expected_df = pd.concat([df_1, df_2]) if read_df["col"].iloc[-1] == 6 else pd.concat([df_2, df_1]) + expected_df = ( + pd.concat([df_1, df_2]) + if read_df["col"].iloc[-1] == 6 + else pd.concat([df_2, df_1]) + ) assert_frame_equal(expected_df, read_df) @@ -415,10 +486,17 @@ def test_parallel_all_same_index_values(lmdb_version_store, append): lib = lmdb_version_store sym = "test_parallel_all_same_index_values" if append: - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-01")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, + index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-01")], + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")]) - df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")] + ) + df_2 = pd.DataFrame( + {"col": [5, 6]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")] + ) if append: lib.append(sym, df_2, incomplete=True) lib.append(sym, df_1, incomplete=True) @@ -429,10 +507,18 @@ def test_parallel_all_same_index_values(lmdb_version_store, append): received = lib.read(sym).data # Index values in incompletes all the same, so order of values in col could be [3, 4, 5, 6] or [5, 6, 3, 4] if append: - expected = pd.concat([df_0, df_1, df_2]) if received["col"][2] == 3 else pd.concat([df_0, df_2, df_1]) + expected = ( + pd.concat([df_0, df_1, df_2]) + if received["col"][2] == 3 + else pd.concat([df_0, df_2, df_1]) + ) assert_frame_equal(expected, received) else: - expected = pd.concat([df_1, df_2]) if received["col"][0] == 3 else pd.concat([df_2, df_1]) + expected = ( + pd.concat([df_1, df_2]) + if received["col"][0] == 3 + else pd.concat([df_2, df_1]) + ) assert_frame_equal(expected, received) assert lib.get_info(sym)["sorted"] == "ASCENDING" @@ -440,14 +526,24 @@ def test_parallel_all_same_index_values(lmdb_version_store, append): @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("append", (True, False)) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_overlapping_incomplete_segments(lmdb_version_store, append, validate_index, delete_staged_data_on_failure): +def test_parallel_overlapping_incomplete_segments( + lmdb_version_store, append, validate_index, delete_staged_data_on_failure +): lib = lmdb_version_store sym = "test_parallel_overlapping_incomplete_segments" if append: - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, + index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")], + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")]) - df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-03T12"), pd.Timestamp("2024-01-05")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")] + ) + df_2 = pd.DataFrame( + {"col": [5, 6]}, + index=[pd.Timestamp("2024-01-03T12"), pd.Timestamp("2024-01-05")], + ) if append: lib.append(sym, df_2, incomplete=True) lib.append(sym, df_1, incomplete=True) @@ -456,7 +552,13 @@ def test_parallel_overlapping_incomplete_segments(lmdb_version_store, append, va lib.write(sym, df_1, parallel=True) if validate_index: with pytest.raises(SortingException): - lib.compact_incomplete(sym, append, False, validate_index=True, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + append, + False, + validate_index=True, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, sym)) == expected_key_count else: @@ -473,9 +575,13 @@ def test_parallel_overlapping_incomplete_segments(lmdb_version_store, append, va def test_parallel_append_exactly_matches_existing(lmdb_version_store): lib = lmdb_version_store sym = "test_parallel_append_exactly_matches_existing" - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")] + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03")] + ) lib.append(sym, df_1, incomplete=True) lib.compact_incomplete(sym, True, False) expected = pd.concat([df_0, df_1]) @@ -483,17 +589,27 @@ def test_parallel_append_exactly_matches_existing(lmdb_version_store): assert_frame_equal(expected, received) assert lib.get_info(sym)["sorted"] == "ASCENDING" + @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("append", (True, False)) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_all_incomplete_segments_same_index(lmdb_version_store_v1, append, validate_index, delete_staged_data_on_failure): +def test_parallel_all_incomplete_segments_same_index( + lmdb_version_store_v1, append, validate_index, delete_staged_data_on_failure +): lib = lmdb_version_store_v1 sym = "test_parallel_all_incomplete_segments_same_index" if append: - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, + index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")], + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")]) - df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")] + ) + df_2 = pd.DataFrame( + {"col": [5, 6]}, index=[pd.Timestamp("2024-01-03"), pd.Timestamp("2024-01-04")] + ) if append: lib.append(sym, df_2, incomplete=True) lib.append(sym, df_1, incomplete=True) @@ -502,7 +618,13 @@ def test_parallel_all_incomplete_segments_same_index(lmdb_version_store_v1, appe lib.write(sym, df_1, parallel=True) if validate_index: with pytest.raises(SortingException): - lib.compact_incomplete(sym, append, False, validate_index=True, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + append, + False, + validate_index=True, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, sym)) == expected_key_count else: @@ -514,24 +636,41 @@ def test_parallel_all_incomplete_segments_same_index(lmdb_version_store_v1, appe received = lib.read(sym).data # Order is arbitrary if all index values are the same if received["col"].iloc[-1] == 6: - expected = pd.concat([df_0, df_1, df_2]) if append else pd.concat([df_1, df_2]) + expected = ( + pd.concat([df_0, df_1, df_2]) if append else pd.concat([df_1, df_2]) + ) else: - expected = pd.concat([df_0, df_2, df_1]) if append else pd.concat([df_2, df_1]) + expected = ( + pd.concat([df_0, df_2, df_1]) if append else pd.concat([df_2, df_1]) + ) assert_frame_equal(received, expected) @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_append_overlapping_with_existing(lmdb_version_store, validate_index, delete_staged_data_on_failure): +def test_parallel_append_overlapping_with_existing( + lmdb_version_store, validate_index, delete_staged_data_on_failure +): lib = lmdb_version_store sym = "test_parallel_append_overlapping_with_existing" - df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")]) + df_0 = pd.DataFrame( + {"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")] + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-01T12"), pd.Timestamp("2024-01-03")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, + index=[pd.Timestamp("2024-01-01T12"), pd.Timestamp("2024-01-03")], + ) lib.append(sym, df_1, incomplete=True) if validate_index: with pytest.raises(SortingException): - lib.compact_incomplete(sym, True, False, validate_index=validate_index, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + True, + False, + validate_index=validate_index, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, sym)) == expected_key_count else: @@ -547,21 +686,35 @@ def test_parallel_append_overlapping_with_existing(lmdb_version_store, validate_ @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("sortedness", ("DESCENDING", "UNSORTED")) @pytest.mark.parametrize("validate_index", (True, False, None)) -def test_parallel_append_existing_data_unsorted(lmdb_version_store, sortedness, validate_index, delete_staged_data_on_failure): +def test_parallel_append_existing_data_unsorted( + lmdb_version_store, sortedness, validate_index, delete_staged_data_on_failure +): lib = lmdb_version_store sym = "test_parallel_append_existing_data_unsorted" last_index_date = "2024-01-01" if sortedness == "DESCENDING" else "2024-01-03" df_0 = pd.DataFrame( {"col": [1, 2, 3]}, - index=[pd.Timestamp("2024-01-04"), pd.Timestamp("2024-01-02"), pd.Timestamp(last_index_date)] + index=[ + pd.Timestamp("2024-01-04"), + pd.Timestamp("2024-01-02"), + pd.Timestamp(last_index_date), + ], ) lib.write(sym, df_0) assert lib.get_info(sym)["sorted"] == sortedness - df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")]) + df_1 = pd.DataFrame( + {"col": [3, 4]}, index=[pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-06")] + ) lib.append(sym, df_1, incomplete=True) if validate_index: with pytest.raises(SortingException): - lib.compact_incomplete(sym, True, False, validate_index=True, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + True, + False, + validate_index=True, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, sym)) == expected_key_count else: @@ -590,25 +743,54 @@ def test_parallel_no_column_slicing(lmdb_version_store_tiny_segment): @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("rows_per_incomplete", (1, 2)) -def test_parallel_write_static_schema_type_changing(lmdb_version_store_tiny_segment, rows_per_incomplete, delete_staged_data_on_failure): +def test_parallel_write_static_schema_type_changing( + lmdb_version_store_tiny_segment, rows_per_incomplete, delete_staged_data_on_failure +): lib = lmdb_version_store_tiny_segment sym = "test_parallel_write_static_schema_type_changing" - df_0 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-01", periods=rows_per_incomplete)) - df_1 = pd.DataFrame({"col": np.arange(rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16)}, index=pd.date_range("2024-01-03", periods=rows_per_incomplete)) + df_0 = pd.DataFrame( + {"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, + index=pd.date_range("2024-01-01", periods=rows_per_incomplete), + ) + df_1 = pd.DataFrame( + { + "col": np.arange( + rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16 + ) + }, + index=pd.date_range("2024-01-03", periods=rows_per_incomplete), + ) lib.write(sym, df_0, parallel=True) lib.write(sym, df_1, parallel=True) with pytest.raises(SchemaException): - lib.compact_incomplete(sym, False, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + False, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, sym)) == expected_key_count @pytest.mark.parametrize("rows_per_incomplete", (1, 2)) -def test_parallel_write_dynamic_schema_type_changing(lmdb_version_store_tiny_segment_dynamic, rows_per_incomplete): +def test_parallel_write_dynamic_schema_type_changing( + lmdb_version_store_tiny_segment_dynamic, rows_per_incomplete +): lib = lmdb_version_store_tiny_segment_dynamic sym = "test_parallel_write_dynamic_schema_type_changing" - df_0 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-01", periods=rows_per_incomplete)) - df_1 = pd.DataFrame({"col": np.arange(rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16)}, index=pd.date_range("2024-01-02", periods=rows_per_incomplete)) + df_0 = pd.DataFrame( + {"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, + index=pd.date_range("2024-01-01", periods=rows_per_incomplete), + ) + df_1 = pd.DataFrame( + { + "col": np.arange( + rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16 + ) + }, + index=pd.date_range("2024-01-02", periods=rows_per_incomplete), + ) lib.write(sym, df_0, parallel=True) lib.write(sym, df_1, parallel=True) lib.compact_incomplete(sym, False, False) @@ -621,20 +803,42 @@ def test_parallel_write_dynamic_schema_type_changing(lmdb_version_store_tiny_seg @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("symbol_already_exists", [True, False]) -def test_parallel_write_static_schema_type_changing_cleans_up_data_keys(lmdb_version_store_tiny_segment, delete_staged_data_on_failure, symbol_already_exists): +def test_parallel_write_static_schema_type_changing_cleans_up_data_keys( + lmdb_version_store_tiny_segment, + delete_staged_data_on_failure, + symbol_already_exists, +): lib = lmdb_version_store_tiny_segment sym = "test_parallel_write_static_schema_type_changing" rows_per_incomplete = 2 # tiny segment store uses 2 rows per segment if symbol_already_exists: - df_0 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-01", periods=rows_per_incomplete)) + df_0 = pd.DataFrame( + {"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, + index=pd.date_range("2024-01-01", periods=rows_per_incomplete), + ) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, index=pd.date_range("2024-01-03", periods=rows_per_incomplete)) - df_2 = pd.DataFrame({"col": np.arange(rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16)}, index=pd.date_range("2024-01-05", periods=rows_per_incomplete)) + df_1 = pd.DataFrame( + {"col": np.arange(rows_per_incomplete, dtype=np.uint8)}, + index=pd.date_range("2024-01-03", periods=rows_per_incomplete), + ) + df_2 = pd.DataFrame( + { + "col": np.arange( + rows_per_incomplete, 2 * rows_per_incomplete, dtype=np.uint16 + ) + }, + index=pd.date_range("2024-01-05", periods=rows_per_incomplete), + ) lib.write(sym, df_1, parallel=True) lib.write(sym, df_2, parallel=True) with pytest.raises(SchemaException): - lib.compact_incomplete(sym, False, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + False, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, sym)) == expected_key_count @@ -644,23 +848,36 @@ def test_parallel_write_static_schema_type_changing_cleans_up_data_keys(lmdb_ver @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) -def test_parallel_write_static_schema_missing_column(lmdb_version_store_tiny_segment, delete_staged_data_on_failure): +def test_parallel_write_static_schema_missing_column( + lmdb_version_store_tiny_segment, delete_staged_data_on_failure +): lib = lmdb_version_store_tiny_segment sym = "test_parallel_write_static_schema_missing_column" - df_0 = pd.DataFrame({"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1)) + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) lib.write(sym, df_0, parallel=True) lib.write(sym, df_1, parallel=True) with pytest.raises(SchemaException): - lib.compact_incomplete(sym, False, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + sym, + False, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, sym)) == expected_key_count -def test_parallel_write_dynamic_schema_missing_column(lmdb_version_store_tiny_segment_dynamic): +def test_parallel_write_dynamic_schema_missing_column( + lmdb_version_store_tiny_segment_dynamic, +): lib = lmdb_version_store_tiny_segment_dynamic sym = "test_parallel_write_dynamic_schema_missing_column" - df_0 = pd.DataFrame({"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1)) + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) lib.write(sym, df_0, parallel=True) lib.write(sym, df_1, parallel=True) @@ -670,10 +887,14 @@ def test_parallel_write_dynamic_schema_missing_column(lmdb_version_store_tiny_se assert_frame_equal(expected, received) -def test_parallel_append_dynamic_schema_missing_column(lmdb_version_store_tiny_segment_dynamic): +def test_parallel_append_dynamic_schema_missing_column( + lmdb_version_store_tiny_segment_dynamic, +): lib = lmdb_version_store_tiny_segment_dynamic sym = "test_parallel_append_dynamic_schema_missing_column" - df_0 = pd.DataFrame({"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1)) + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) lib.write(sym, df_0) lib.append(sym, df_1, incomplete=True) @@ -683,84 +904,228 @@ def test_parallel_append_dynamic_schema_missing_column(lmdb_version_store_tiny_s assert_frame_equal(expected, received) +@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) +def test_parallel_write_dynamic_schema_named_index( + lmdb_version_store_tiny_segment_dynamic, delete_staged_data_on_failure +): + lib = lmdb_version_store_tiny_segment_dynamic + sym = "test_parallel_append_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0, parallel=True) + lib.write(sym, df_1, parallel=True) + + with pytest.raises(SchemaException) as exception_info: + lib.compact_incomplete( + sym, + False, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) + + assert "date" in str(exception_info.value) + expected_key_count = 0 if delete_staged_data_on_failure else 2 + assert len(get_append_keys(lib, sym)) == expected_key_count + + +@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) +def test_parallel_append_dynamic_schema_named_index( + lmdb_version_store_tiny_segment_dynamic, delete_staged_data_on_failure +): + lib = lmdb_version_store_tiny_segment_dynamic + sym = "test_parallel_append_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0) + lib.append(sym, df_1, incomplete=True) + with pytest.raises(SchemaException) as exception_info: + lib.compact_incomplete( + sym, + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) + + assert "date" in str(exception_info.value) + expected_key_count = 0 if delete_staged_data_on_failure else 1 + assert len(get_append_keys(lib, sym)) == expected_key_count + + @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) class TestFinalizeStagedDataStaticSchemaMismatch: - def test_append_throws_with_missmatched_column_set(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_append_throws_with_missmatched_column_set( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - initial_df = pd.DataFrame({"col_0": [1]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) + initial_df = pd.DataFrame( + {"col_0": [1]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]) + ) lib.write("sym", initial_df) - appended_df = pd.DataFrame({"col_1": [1]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + appended_df = pd.DataFrame( + {"col_1": [1]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]) + ) lib.write("sym", appended_df, parallel=True) with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "col_1" in str(exception_info.value) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_append_throws_column_subset(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_append_throws_column_subset( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 df1 = pd.DataFrame( {"a": np.array([1.1], dtype="float"), "b": np.array([2], dtype="int64")}, - index=pd.DatetimeIndex([pd.Timestamp("2024-01-01")]) + index=pd.DatetimeIndex([pd.Timestamp("2024-01-01")]), ) lib.write("sym", df1) - df2 = pd.DataFrame({"b": [1]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-02")])) + df2 = pd.DataFrame( + {"b": [1]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-02")]) + ) lib.write("sym", df2, parallel=True) - with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + with pytest.raises(SchemaException) as exception_info: + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "a" in str(exception_info.value) assert "b" in str(exception_info.value) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_append_throws_on_incompatible_dtype(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_append_throws_on_incompatible_dtype( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - initial_df = pd.DataFrame({"col_0": np.array([1], dtype="int64")}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) + initial_df = pd.DataFrame( + {"col_0": np.array([1], dtype="int64")}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ) lib.write("sym", initial_df) - appended_df = pd.DataFrame({"col_0": ["asd"]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + appended_df = pd.DataFrame( + {"col_0": ["asd"]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]) + ) lib.write("sym", appended_df, parallel=True) with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "col_0" in str(exception_info.value) assert "INT64" in str(exception_info.value) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_type_mismatch_in_staged_segments_throws(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_type_mismatch_in_staged_segments_throws( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - lib.write("sym", pd.DataFrame({"col": [1]}, index=pd.DatetimeIndex([np.datetime64('2023-01-01')])), parallel=True) - lib.write("sym", pd.DataFrame({"col": ["a"]}, index=pd.DatetimeIndex([np.datetime64('2023-01-02')])), parallel=True) + lib.write( + "sym", + pd.DataFrame( + {"col": [1]}, index=pd.DatetimeIndex([np.datetime64("2023-01-01")]) + ), + parallel=True, + ) + lib.write( + "sym", + pd.DataFrame( + {"col": ["a"]}, index=pd.DatetimeIndex([np.datetime64("2023-01-02")]) + ), + parallel=True, + ) with pytest.raises(Exception) as exception_info: - lib.compact_incomplete("sym", False, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + False, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert all(x in str(exception_info.value) for x in ["INT64", "type"]) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_types_cant_be_promoted(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_types_cant_be_promoted( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - lib.write("sym", pd.DataFrame({"col": np.array([1], dtype="int64")}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]))) + lib.write( + "sym", + pd.DataFrame( + {"col": np.array([1], dtype="int64")}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ), + ) - lib.write("sym", pd.DataFrame({"col": np.array([1], dtype="int32")}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])), parallel=True) + lib.write( + "sym", + pd.DataFrame( + {"col": np.array([1], dtype="int32")}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ), + parallel=True, + ) with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "INT32" in str(exception_info.value) assert "INT64" in str(exception_info.value) expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_appending_reordered_column_set_throws(self, lmdb_version_store_v1, delete_staged_data_on_failure): + def test_appending_reordered_column_set_throws( + self, lmdb_version_store_v1, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - lib.write("sym", pd.DataFrame({"col_0": [1], "col_1": ["test"], "col_2": [1.2]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]))) + lib.write( + "sym", + pd.DataFrame( + {"col_0": [1], "col_1": ["test"], "col_2": [1.2]}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ), + ) - lib.write("sym", pd.DataFrame({"col_1": ["asd"], "col_2": [2.5], "col_0": [2]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])), parallel=True) + lib.write( + "sym", + pd.DataFrame( + {"col_1": ["asd"], "col_2": [2.5], "col_0": [2]}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ), + parallel=True, + ) with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "col_0" in str(exception_info.value) assert "col_1" in str(exception_info.value) assert "col_2" in str(exception_info.value) @@ -768,43 +1133,76 @@ def test_appending_reordered_column_set_throws(self, lmdb_version_store_v1, dele assert len(get_append_keys(lib, "sym")) == expected_key_count @pytest.mark.parametrize("mode", [True, False]) - def test_staged_segments_can_be_reordered(self, lmdb_version_store_v1, mode, delete_staged_data_on_failure): + def test_staged_segments_can_be_reordered( + self, lmdb_version_store_v1, mode, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - df1 = pd.DataFrame({"col_0": [1], "col_1": ["test"], "col_2": [1.2]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) - df2 = pd.DataFrame({"col_1": ["asd"], "col_2": [2.5], "col_0": [2]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + df1 = pd.DataFrame( + {"col_0": [1], "col_1": ["test"], "col_2": [1.2]}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ) + df2 = pd.DataFrame( + {"col_1": ["asd"], "col_2": [2.5], "col_0": [2]}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ) lib.write("sym", df1, parallel=True) lib.write("sym", df2, parallel=True) expected = pd.concat([df1, df2]).sort_index() with pytest.raises(SchemaException) as exception_info: - lib.compact_incomplete("sym", mode, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + mode, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "col_0" in str(exception_info.value) assert "col_1" in str(exception_info.value) assert "col_2" in str(exception_info.value) expected_key_count = 0 if delete_staged_data_on_failure else 2 assert len(get_append_keys(lib, "sym")) == expected_key_count + # finalize_method True -> append, finalize_method False -> write @pytest.mark.parametrize("finalize_method", (True, False)) class TestFinalizeWithEmptySegments: - def test_staged_segment_is_only_empty_dfs(self, lmdb_version_store_v1, finalize_method): + def test_staged_segment_is_only_empty_dfs( + self, lmdb_version_store_v1, finalize_method + ): lib = lmdb_version_store_v1 lib.write("sym", pd.DataFrame([]), parallel=True) lib.write("sym", pd.DataFrame([]), parallel=True) lib.compact_incomplete("sym", finalize_method, False) - assert_frame_equal(lib.read("sym").data, pd.DataFrame([], index=pd.DatetimeIndex([]))) + assert_frame_equal( + lib.read("sym").data, pd.DataFrame([], index=pd.DatetimeIndex([])) + ) @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) - def test_staged_segment_has_empty_df(self, lmdb_version_store_v1, finalize_method, delete_staged_data_on_failure): + def test_staged_segment_has_empty_df( + self, lmdb_version_store_v1, finalize_method, delete_staged_data_on_failure + ): lib = lmdb_version_store_v1 - index = pd.DatetimeIndex([pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 3), pd.Timestamp(2024, 1, 4)]) + index = pd.DatetimeIndex( + [ + pd.Timestamp(2024, 1, 1), + pd.Timestamp(2024, 1, 3), + pd.Timestamp(2024, 1, 4), + ] + ) df1 = pd.DataFrame({"col": [1, 2, 3]}, index=index) df2 = pd.DataFrame({}) - df3 = pd.DataFrame({"col": [4]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 5)])) + df3 = pd.DataFrame( + {"col": [4]}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 5)]) + ) lib.write("sym", df1, parallel=True) lib.write("sym", df2, parallel=True) lib.write("sym", df3, parallel=True) with pytest.raises(SchemaException): - lib.compact_incomplete("sym", finalize_method, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + finalize_method, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) expected_key_count = 0 if delete_staged_data_on_failure else 3 assert len(get_append_keys(lib, "sym")) == expected_key_count @@ -815,17 +1213,20 @@ def test_df_without_rows(self, lmdb_version_store_v1, finalize_method): lib.compact_incomplete("sym", finalize_method, False) assert_frame_equal(lib.read("sym").data, df) + class TestSlicing: def test_append_long_segment(self, lmdb_version_store_tiny_segment): lib = lmdb_version_store_tiny_segment - df_0 = pd.DataFrame({"col_0": [1, 2, 3]}, index=pd.date_range("2024-01-01", "2024-01-03")) + df_0 = pd.DataFrame( + {"col_0": [1, 2, 3]}, index=pd.date_range("2024-01-01", "2024-01-03") + ) lib.write("sym", df_0) index = pd.date_range("2024-01-05", "2024-01-15") df_1 = pd.DataFrame({"col_0": range(0, len(index))}, index=index) lib.write("sym", df_1, parallel=True) lib.compact_incomplete("sym", True, False) - + assert_frame_equal(lib.read("sym").data, pd.concat([df_0, df_1])) def test_write_long_segment(self, lmdb_version_store_tiny_segment): @@ -836,48 +1237,81 @@ def test_write_long_segment(self, lmdb_version_store_tiny_segment): lib.compact_incomplete("sym", False, False) assert_frame_equal(lib.read("sym").data, df) - def test_write_several_segments_triggering_slicing(self, lmdb_version_store_tiny_segment): + def test_write_several_segments_triggering_slicing( + self, lmdb_version_store_tiny_segment + ): lib = lmdb_version_store_tiny_segment - combined_staged_index = pd.date_range(pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 15)) + combined_staged_index = pd.date_range( + pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 15) + ) staged_values = range(0, len(combined_staged_index)) - for (value, date) in zip(staged_values, combined_staged_index): + for value, date in zip(staged_values, combined_staged_index): df = pd.DataFrame({"a": [value]}, index=pd.DatetimeIndex([date])) lib.write("sym", df, parallel=True) lib.compact_incomplete("sym", False, False) expected = pd.DataFrame({"a": staged_values}, index=combined_staged_index) assert_frame_equal(lib.read("sym").data, expected) - def test_append_several_segments_trigger_slicing(self, lmdb_version_store_tiny_segment): + def test_append_several_segments_trigger_slicing( + self, lmdb_version_store_tiny_segment + ): lib = lmdb_version_store_tiny_segment - df_0 = pd.DataFrame({"a": [1, 2, 3]}, index=pd.date_range(pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 3))) + df_0 = pd.DataFrame( + {"a": [1, 2, 3]}, + index=pd.date_range(pd.Timestamp(2024, 1, 1), pd.Timestamp(2024, 1, 3)), + ) lib.write("sym", df_0) - combined_staged_index = pd.date_range(pd.Timestamp(2024, 1, 5), pd.Timestamp(2024, 1, 20)) + combined_staged_index = pd.date_range( + pd.Timestamp(2024, 1, 5), pd.Timestamp(2024, 1, 20) + ) staged_values = range(0, len(combined_staged_index)) - for (value, date) in zip(staged_values, combined_staged_index): + for value, date in zip(staged_values, combined_staged_index): df = pd.DataFrame({"a": [value]}, index=pd.DatetimeIndex([date])) lib.write("sym", df, parallel=True) lib.compact_incomplete("sym", True, False) - expected = pd.concat([df_0, pd.DataFrame({"a": staged_values}, index=combined_staged_index)]) + expected = pd.concat( + [df_0, pd.DataFrame({"a": staged_values}, index=combined_staged_index)] + ) assert_frame_equal(lib.read("sym").data, expected) @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) @pytest.mark.parametrize("mode", [True, False]) - def test_wide_segment_with_no_prior_slicing(self, lmdb_version_store_tiny_segment, mode, delete_staged_data_on_failure): + def test_wide_segment_with_no_prior_slicing( + self, lmdb_version_store_tiny_segment, mode, delete_staged_data_on_failure + ): lib = lmdb_version_store_tiny_segment - df_0 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) + df_0 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ) lib.write("sym", df_0, parallel=True) lib.compact_incomplete("sym", mode, False) assert_frame_equal(lib.read("sym").data, df_0) - - df_1 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + + df_1 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ) lib.append("sym", df_1) assert_frame_equal(lib.read("sym").data, pd.concat([df_0, df_1])) # Cannot perform another sort and finalize append when column sliced data has been written even though the first # write is done using sort and finalize with pytest.raises(UserInputException) as exception_info: - lib.write("sym", pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 3)])), parallel=True) - lib.compact_incomplete("sym", True, False, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.write( + "sym", + pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 3)]), + ), + parallel=True, + ) + lib.compact_incomplete( + "sym", + True, + False, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "append" in str(exception_info.value).lower() assert "column" in str(exception_info.value).lower() assert "sliced" in str(exception_info.value).lower() @@ -885,29 +1319,50 @@ def test_wide_segment_with_no_prior_slicing(self, lmdb_version_store_tiny_segmen assert len(get_append_keys(lib, "sym")) == expected_key_count @pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) - def test_appending_wide_segment_throws_with_prior_slicing(self, lmdb_version_store_tiny_segment, lib_name, delete_staged_data_on_failure): + def test_appending_wide_segment_throws_with_prior_slicing( + self, lmdb_version_store_tiny_segment, lib_name, delete_staged_data_on_failure + ): lib = lmdb_version_store_tiny_segment - df_0 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) + df_0 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ) lib.write("sym", df_0) - - df_1 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + + df_1 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ) lib.write("sym", df_1, parallel=True) with pytest.raises(UserInputException) as exception_info: - lib.compact_incomplete("sym", True, True, delete_staged_data_on_failure=delete_staged_data_on_failure) + lib.compact_incomplete( + "sym", + True, + True, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) assert "append" in str(exception_info.value).lower() assert "column" in str(exception_info.value).lower() assert "sliced" in str(exception_info.value).lower() expected_key_count = 0 if delete_staged_data_on_failure else 1 assert len(get_append_keys(lib, "sym")) == expected_key_count - def test_writing_wide_segment_over_sliced_data(self, lmdb_version_store_tiny_segment): + def test_writing_wide_segment_over_sliced_data( + self, lmdb_version_store_tiny_segment + ): lib = lmdb_version_store_tiny_segment - df_0 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)])) + df_0 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 1)]), + ) lib.write("sym", df_0) - - df_1 = pd.DataFrame({f"col_{i}": [i] for i in range(0, 10)}, index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)])) + + df_1 = pd.DataFrame( + {f"col_{i}": [i] for i in range(0, 10)}, + index=pd.DatetimeIndex([pd.Timestamp(2024, 1, 2)]), + ) lib.write("sym", df_1, parallel=True) lib.compact_incomplete("sym", False, False) - assert_frame_equal(lib.read("sym").data, df_1) \ No newline at end of file + assert_frame_equal(lib.read("sym").data, df_1) diff --git a/python/tests/unit/arcticdb/version_store/test_sort_merge.py b/python/tests/unit/arcticdb/version_store/test_sort_merge.py index c1e5b661c3..0d01e0cd06 100644 --- a/python/tests/unit/arcticdb/version_store/test_sort_merge.py +++ b/python/tests/unit/arcticdb/version_store/test_sort_merge.py @@ -868,3 +868,56 @@ def test_writing_wide_segment_over_sliced_data(self, lmdb_storage, lib_name): lib.sort_and_finalize_staged_data("sym", mode=StagedDataFinalizeMethod.WRITE) assert_frame_equal(lib.read("sym").data, df_1) + + +@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) +def test_sort_and_finalize_staged_data_write_dynamic_schema_named_index( + lmdb_library_static_dynamic, delete_staged_data_on_failure +): + lib = lmdb_library_static_dynamic + sym = "test_sort_and_finalize_staged_data_append_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0, staged=True) + lib.write(sym, df_1, staged=True) + + with pytest.raises(SchemaException) as exception_info: + lib.sort_and_finalize_staged_data( + sym, + mode=StagedDataFinalizeMethod.WRITE, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) + + # Make sure that name of the problematic index column + assert "date" in str(exception_info.value) + expected_key_count = 0 if delete_staged_data_on_failure else 2 + assert len(get_append_keys(lib, sym)) == expected_key_count + + +@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False]) +def test_sort_and_finalize_staged_data_append_dynamic_schema_named_index( + lmdb_library_static_dynamic, delete_staged_data_on_failure +): + lib = lmdb_library_static_dynamic + sym = "test_sort_and_finalize_staged_data_append_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0) + lib.write(sym, df_1, staged=True) + with pytest.raises(SchemaException) as exception_info: + lib.sort_and_finalize_staged_data( + sym, + mode=StagedDataFinalizeMethod.APPEND, + delete_staged_data_on_failure=delete_staged_data_on_failure, + ) + + # Make sure that name of the problematic index column + assert "date" in str(exception_info.value) + expected_key_count = 0 if delete_staged_data_on_failure else 1 + assert len(get_append_keys(lib, sym)) == expected_key_count diff --git a/python/tests/unit/arcticdb/version_store/test_update.py b/python/tests/unit/arcticdb/version_store/test_update.py index e55f1aa968..4bc92a7f4d 100644 --- a/python/tests/unit/arcticdb/version_store/test_update.py +++ b/python/tests/unit/arcticdb/version_store/test_update.py @@ -13,8 +13,17 @@ import datetime import random -from arcticdb.util.test import random_strings_of_length, random_string, random_floats, assert_frame_equal -from arcticdb.exceptions import InternalException, SortingException +from arcticdb.util.test import ( + random_strings_of_length, + random_string, + random_floats, + assert_frame_equal, +) +from arcticdb.exceptions import ( + InternalException, + SortingException, +) +from arcticdb_ext.version_store import StreamDescriptorMismatch from tests.util.date import DateRange from pandas import MultiIndex @@ -23,7 +32,9 @@ def test_update_single_dates(lmdb_version_store_dynamic_schema): lib = lmdb_version_store_dynamic_schema df1 = pd.DataFrame(index=[pd.Timestamp(2022, 1, 3)], data=2220103.0, columns=["a"]) df2 = pd.DataFrame(index=[pd.Timestamp(2021, 12, 22)], data=211222.0, columns=["a"]) - df3 = pd.DataFrame(index=[pd.Timestamp(2021, 12, 29)], data=2211229.0, columns=["a"]) + df3 = pd.DataFrame( + index=[pd.Timestamp(2021, 12, 29)], data=2211229.0, columns=["a"] + ) sym = "data6" lib.update(sym, df1, upsert=True) lib.update(sym, df2, upsert=True) @@ -42,7 +53,9 @@ def test_update(version_store_factory): lmdb_version_store.write(symbol, df) idx2 = pd.date_range("1970-01-12", periods=10, freq="D") - df2 = pd.DataFrame({"a": np.arange(1000, 1000 + len(idx2), dtype="float")}, index=idx2) + df2 = pd.DataFrame( + {"a": np.arange(1000, 1000 + len(idx2), dtype="float")}, index=idx2 + ) lmdb_version_store.update(symbol, df2) vit = lmdb_version_store.read(symbol) @@ -87,18 +100,27 @@ def test_update_long_strides(s3_version_store): lib = s3_version_store symbol = "test_update_long_strides" - write_df = pd.DataFrame({"A": 7 * [1]}, index=pd.date_range("2023-02-01", periods=7)) + write_df = pd.DataFrame( + {"A": 7 * [1]}, index=pd.date_range("2023-02-01", periods=7) + ) assert write_df.index.values.strides[0] == 8 lib.write(symbol, write_df) - update_df = write_df[write_df.index.isin([pd.Timestamp(2023, 2, 1), pd.Timestamp(2023, 2, 6)])].copy() + update_df = write_df[ + write_df.index.isin([pd.Timestamp(2023, 2, 1), pd.Timestamp(2023, 2, 6)]) + ].copy() update_df["A"] = 999 assert update_df.index.values.strides[0] in (8, 40) lib.update(symbol, update_df) expected = pd.DataFrame( - {"A": [999, 999, 1]}, index=[pd.Timestamp(2023, 2, 1), pd.Timestamp(2023, 2, 6), pd.Timestamp(2023, 2, 7)] + {"A": [999, 999, 1]}, + index=[ + pd.Timestamp(2023, 2, 1), + pd.Timestamp(2023, 2, 6), + pd.Timestamp(2023, 2, 7), + ], ) received = lib.read(symbol).data pd.testing.assert_frame_equal(expected, received) @@ -117,12 +139,21 @@ def gen_params(): @pytest.mark.parametrize( - "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", gen_params() + "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", + gen_params(), ) def test_update_repeatedly_dynamic_schema( - version_store_factory, col_per_group, start_increment, end_increment, update_start, iterations, start_dist + version_store_factory, + col_per_group, + start_increment, + end_increment, + update_start, + iterations, + start_dist, ): - lmdb_version_store = version_store_factory(col_per_group=col_per_group, row_per_segment=2, dynamic_schema=True) + lmdb_version_store = version_store_factory( + col_per_group=col_per_group, row_per_segment=2, dynamic_schema=True + ) symbol = "update_dynamic_schema" @@ -140,7 +171,9 @@ def test_update_repeatedly_dynamic_schema( continue idx2 = pd.date_range(update_date, periods=periods, freq="D") - df2 = pd.DataFrame({"a": np.arange(1000 + x, 1000 + x + len(idx2), dtype="float")}, index=idx2) + df2 = pd.DataFrame( + {"a": np.arange(1000 + x, 1000 + x + len(idx2), dtype="float")}, index=idx2 + ) lmdb_version_store.update(symbol, df2) vit = lmdb_version_store.read(symbol) @@ -149,12 +182,21 @@ def test_update_repeatedly_dynamic_schema( @pytest.mark.parametrize( - "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", gen_params() + "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", + gen_params(), ) def test_update_repeatedly_dynamic_schema_hashed( - version_store_factory, col_per_group, start_increment, end_increment, update_start, iterations, start_dist + version_store_factory, + col_per_group, + start_increment, + end_increment, + update_start, + iterations, + start_dist, ): - lmdb_version_store = version_store_factory(col_per_group=col_per_group, row_per_segment=2, dynamic_schema=True) + lmdb_version_store = version_store_factory( + col_per_group=col_per_group, row_per_segment=2, dynamic_schema=True + ) symbol = "update_dynamic_schema" @@ -203,12 +245,21 @@ def test_update_repeatedly_dynamic_schema_hashed( @pytest.mark.parametrize( - "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", gen_params() + "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", + gen_params(), ) def test_update_repeatedly( - version_store_factory, col_per_group, start_increment, end_increment, update_start, iterations, start_dist + version_store_factory, + col_per_group, + start_increment, + end_increment, + update_start, + iterations, + start_dist, ): - lmdb_version_store = version_store_factory(col_per_group=col_per_group, row_per_segment=2) + lmdb_version_store = version_store_factory( + col_per_group=col_per_group, row_per_segment=2 + ) symbol = "update_no_daterange" @@ -226,7 +277,9 @@ def test_update_repeatedly( continue idx2 = pd.date_range(update_date, periods=periods, freq="D") - df2 = pd.DataFrame({"a": np.arange(1000 + x, 1000 + x + len(idx2), dtype="float")}, index=idx2) + df2 = pd.DataFrame( + {"a": np.arange(1000 + x, 1000 + x + len(idx2), dtype="float")}, index=idx2 + ) lmdb_version_store.update(symbol, df2) vit = lmdb_version_store.read(symbol) @@ -235,12 +288,21 @@ def test_update_repeatedly( @pytest.mark.parametrize( - "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", gen_params() + "col_per_group, start_increment, end_increment, update_start, iterations, start_dist", + gen_params(), ) def test_update_repeatedly_with_strings( - version_store_factory, col_per_group, start_increment, end_increment, update_start, iterations, start_dist + version_store_factory, + col_per_group, + start_increment, + end_increment, + update_start, + iterations, + start_dist, ): - lmdb_version_store = version_store_factory(col_per_group=col_per_group, row_per_segment=2) + lmdb_version_store = version_store_factory( + col_per_group=col_per_group, row_per_segment=2 + ) symbol = "update_no_daterange" @@ -258,7 +320,9 @@ def test_update_repeatedly_with_strings( continue idx2 = pd.date_range(update_date, periods=periods, freq="D") - df2 = pd.DataFrame({"a": [random_string(10) for _ in range(len(idx2))]}, index=idx2) + df2 = pd.DataFrame( + {"a": [random_string(10) for _ in range(len(idx2))]}, index=idx2 + ) lmdb_version_store.update(symbol, df2) vit = lmdb_version_store.read(symbol) @@ -279,29 +343,39 @@ def test_update_with_snapshot(version_store_factory): lmdb_version_store.snapshot("my_snap") idx2 = pd.date_range("1970-01-12", periods=10, freq="D") - df2 = pd.DataFrame({"a": np.arange(1000, 1000 + len(idx2), dtype="float")}, index=idx2) + df2 = pd.DataFrame( + {"a": np.arange(1000, 1000 + len(idx2), dtype="float")}, index=idx2 + ) lmdb_version_store.update(symbol, df2) assert_frame_equal(lmdb_version_store.read(symbol, as_of=0).data, original_df) - assert_frame_equal(lmdb_version_store.read(symbol, as_of="my_snap").data, original_df) + assert_frame_equal( + lmdb_version_store.read(symbol, as_of="my_snap").data, original_df + ) df.update(df2) vit = lmdb_version_store.read(symbol) assert_frame_equal(vit.data, df) assert_frame_equal(lmdb_version_store.read(symbol, as_of=1).data, df) - assert_frame_equal(lmdb_version_store.read(symbol, as_of="my_snap").data, original_df) + assert_frame_equal( + lmdb_version_store.read(symbol, as_of="my_snap").data, original_df + ) lmdb_version_store.delete(symbol) assert lmdb_version_store.list_versions() == [] - assert_frame_equal(lmdb_version_store.read(symbol, as_of="my_snap").data, original_df) + assert_frame_equal( + lmdb_version_store.read(symbol, as_of="my_snap").data, original_df + ) def generate_dataframe(columns, dt, num_days, num_rows_per_day): dataframes = [] for _ in range(num_days): - index = pd.Index([dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)]) + index = pd.Index( + [dt + datetime.timedelta(seconds=s) for s in range(num_rows_per_day)] + ) vals = {c: random_floats(num_rows_per_day) for c in columns} new_df = pd.DataFrame(data=vals, index=index) dataframes.append(new_df) @@ -473,7 +547,9 @@ def test_non_cstyle_numpy_update(lmdb_version_store): def _create_product_candles_df(arr): timestamps = [pd.to_datetime(t[0], unit="s") for t in arr] - sorted_df = pd.DataFrame(data=arr, index=timestamps, columns=["time_start", "volume"]) + sorted_df = pd.DataFrame( + data=arr, index=timestamps, columns=["time_start", "volume"] + ) return sorted_df.sort_index() sorted_df_1 = _create_product_candles_df(not_sorted_arr_1) @@ -486,8 +562,12 @@ def _create_product_candles_df(arr): assert_frame_equal(after_arctic, before_arctic) -@pytest.mark.parametrize("existing_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED")) -@pytest.mark.parametrize("update_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED")) +@pytest.mark.parametrize( + "existing_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED") +) +@pytest.mark.parametrize( + "update_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED") +) @pytest.mark.parametrize("date_range_arg_provided", (True, False)) def test_update_sortedness_checks( lmdb_version_store, @@ -501,10 +581,18 @@ def test_update_sortedness_checks( data = np.arange(num_rows) ascending_idx = pd.date_range("2024-01-15", periods=num_rows) ascending_df = pd.DataFrame({"col": data}, index=ascending_idx) - descending_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(reversed(ascending_idx))) - unsorted_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(np.roll(ascending_idx, num_rows // 2))) + descending_df = pd.DataFrame( + {"col": data}, index=pd.DatetimeIndex(reversed(ascending_idx)) + ) + unsorted_df = pd.DataFrame( + {"col": data}, index=pd.DatetimeIndex(np.roll(ascending_idx, num_rows // 2)) + ) - date_range = (pd.Timestamp("2024-01-13"), pd.Timestamp("2024-01-17")) if date_range_arg_provided else None + date_range = ( + (pd.Timestamp("2024-01-13"), pd.Timestamp("2024-01-17")) + if date_range_arg_provided + else None + ) if existing_df_sortedness == "ASCENDING": write_df = ascending_df @@ -607,3 +695,20 @@ def test_update_not_sorted_range_index_exception(lmdb_version_store): assert df.index.is_monotonic_increasing == True with pytest.raises(InternalException): lmdb_version_store.update(symbol, df) + + +def test_regular_update_dynamic_schema_named_index( + lmdb_version_store_tiny_segment_dynamic, +): + lib = lmdb_version_store_tiny_segment_dynamic + sym = "test_parallel_update_dynamic_schema_named_index" + df_0 = pd.DataFrame( + {"col_0": [0], "col_1": [0.5]}, index=pd.date_range("2024-01-01", periods=1) + ) + df_0.index.name = "date" + df_1 = pd.DataFrame({"col_0": [1]}, index=pd.date_range("2024-01-02", periods=1)) + lib.write(sym, df_0) + with pytest.raises(StreamDescriptorMismatch) as exception_info: + lib.update(sym, df_1, upsert=True) + + assert "date" in str(exception_info.value)