Skip to content

Commit

Permalink
Check the index names in finalize staged data on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
G-D-Petrov committed Jan 14, 2025
1 parent 4f8cd7e commit 300ae92
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
9 changes: 1 addition & 8 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,16 +1080,9 @@ bool read_incompletes_to_pipeline(

const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor();

// check that the index names of all staged segments match
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
std::all_of(incomplete_segments.begin(), incomplete_segments.end(), [&](const auto& slice) {
return index_names_match(staged_desc, slice.segment(store).descriptor());
}),
"The index names in the staged stream descriptor {} are not identical",
staged_desc
);

// We need to check that the index names match regardless of the dynamic schema setting
// A more detailed check is done later in the do_compact function
if (pipeline_context->desc_) {
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
index_names_match(staged_desc, *pipeline_context->desc_),
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <arcticdb/stream/segment_aggregator.hpp>
#include <arcticdb/entity/frame_and_descriptor.hpp>
#include <arcticdb/version/version_store_objects.hpp>
#include <arcticdb/version/schema_checks.hpp>

#include <string>

Expand Down Expand Up @@ -307,6 +308,12 @@ template <typename IndexType, typename SchemaType, typename SegmentationPolicy,

const SegmentInMemory& segment = sk.segment(store);

if(!index_names_match(segment.descriptor(), pipeline_context->descriptor())) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
return Error{throw_error<ErrorCode::E_DESCRIPTOR_MISMATCH>, fmt::format("Index names in segment {} and pipeline context {} do not match", segment.descriptor(), pipeline_context->descriptor())};
}

if(validate_index && is_segment_unsorted(segment)) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
Expand Down
26 changes: 20 additions & 6 deletions python/tests/unit/arcticdb/version_store/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,9 @@ def test_parallel_append_dynamic_schema_missing_column(
assert_frame_equal(expected, received)


@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False])
def test_parallel_write_dynamic_schema_named_index(
lmdb_version_store_tiny_segment_dynamic,
lmdb_version_store_tiny_segment_dynamic, delete_staged_data_on_failure
):
lib = lmdb_version_store_tiny_segment_dynamic
sym = "test_parallel_append_dynamic_schema_named_index"
Expand All @@ -918,14 +919,21 @@ def test_parallel_write_dynamic_schema_named_index(
lib.write(sym, df_1, parallel=True)

with pytest.raises(SchemaException) as exception_info:
lib.compact_incomplete(sym, False, False)
lib.compact_incomplete(
sym,
False,
False,
delete_staged_data_on_failure=delete_staged_data_on_failure,
)

assert "date" in str(exception_info.value)
assert "E_DESCRIPTOR_MISMATCH" in str(exception_info.value)
expected_key_count = 0 if delete_staged_data_on_failure else 2
assert len(get_append_keys(lib, sym)) == expected_key_count


@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False])
def test_parallel_append_dynamic_schema_named_index(
lmdb_version_store_tiny_segment_dynamic,
lmdb_version_store_tiny_segment_dynamic, delete_staged_data_on_failure
):
lib = lmdb_version_store_tiny_segment_dynamic
sym = "test_parallel_append_dynamic_schema_named_index"
Expand All @@ -937,10 +945,16 @@ def test_parallel_append_dynamic_schema_named_index(
lib.write(sym, df_0)
lib.append(sym, df_1, incomplete=True)
with pytest.raises(SchemaException) as exception_info:
lib.compact_incomplete(sym, True, False)
lib.compact_incomplete(
sym,
True,
False,
delete_staged_data_on_failure=delete_staged_data_on_failure,
)

assert "date" in str(exception_info.value)
assert "E_DESCRIPTOR_MISMATCH" in str(exception_info.value)
expected_key_count = 0 if delete_staged_data_on_failure else 1
assert len(get_append_keys(lib, sym)) == expected_key_count


@pytest.mark.parametrize("delete_staged_data_on_failure", [True, False])
Expand Down

0 comments on commit 300ae92

Please sign in to comment.