Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2113 using compact incomplete on a library with dynamic schema with a named index can result in an unreadable index #2116

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ set(arcticdb_srcs
version/key_block.cpp
util/gil_safe_py_none.cpp
version/local_versioned_engine.cpp
version/schema_checks.cpp
version/op_log.cpp
version/snapshot.cpp
version/symbol_list.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ size_t SegmentInMemoryImpl::num_bytes() const {
void SegmentInMemoryImpl::sort(const std::string& column_name) {
init_column_map();
auto idx = column_index(std::string_view(column_name));
user_input::check<ErrorCode::E_COLUMN_NOT_FOUND>(static_cast<bool>(idx), "Column {} not found in sort", column_name);
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(static_cast<bool>(idx), "Column {} not found in sort", column_name);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I've changed this so it is more consistent with the other similar exceptions

sort(static_cast<position_t>(idx.value()));
}

Expand All @@ -659,7 +659,7 @@ void SegmentInMemoryImpl::sort(const std::vector<std::string>& column_names) {
std::vector<position_t> positions;
for(const auto& column_name : column_names) {
auto idx = column_index(std::string_view(column_name));
user_input::check<ErrorCode::E_COLUMN_NOT_FOUND>(static_cast<bool>(idx), "Column {} not found in multi-sort", column_name);
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(static_cast<bool>(idx), "Column {} not found in multi-sort", column_name);
positions.emplace_back(static_cast<position_t>(*idx));
}
sort(positions);
Expand Down
154 changes: 154 additions & 0 deletions cpp/arcticdb/version/schema_checks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include <arcticdb/version/schema_checks.hpp>
#include <arcticdb/pipeline/index_segment_reader.hpp>

namespace arcticdb {

std::string_view normalization_operation_str(NormalizationOperation operation) {
switch (operation) {
case APPEND:
return "APPEND";
case UPDATE:
return "UPDATE";
default:
util::raise_rte("Unknown operation type {}", static_cast<uint8_t>(operation));
}
}

IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right) {
if (left == right) {
return left;
}
if (left == IndexDescriptor::Type::EMPTY) {
return right;
}
if (right == IndexDescriptor::Type::EMPTY) {
return left;
}
return IndexDescriptor::Type::UNKNOWN;
}

void check_normalization_index_match(
NormalizationOperation operation,
const StreamDescriptor& old_descriptor,
const pipelines::InputTensorFrame& frame,
bool empty_types
) {
const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type();
const IndexDescriptor::Type new_idx_kind = frame.desc.index().type();
if (operation == UPDATE) {
const bool new_is_timeseries = std::holds_alternative<arcticdb::stream::TimeseriesIndex>(frame.index);
util::check_rte(
(old_idx_kind == IndexDescriptor::Type::TIMESTAMP || old_idx_kind == IndexDescriptor::Type::EMPTY) && new_is_timeseries,
"Update will not work as expected with a non-timeseries index"
);
} else {
const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind);
if (empty_types) {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::Type::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
} else {
// (old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT) is left to preserve
// pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series
// have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545
// it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index
// in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series.
// Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would
// have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better
// after we enable the empty index.
const bool input_frame_is_series = frame.norm_meta.has_series();
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::Type::UNKNOWN ||
(input_frame_is_series && old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT),
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
}
}
}

bool index_names_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
) {
auto df_in_store_index_field_count = df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? 0 : df_in_store_descriptor.index().field_count();
auto new_df_field_index_count = new_df_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? 0 : new_df_descriptor.index().field_count();

// If either index is empty, we consider them to match
if (df_in_store_index_field_count == 0 || new_df_field_index_count == 0) {
return true;
}

if (df_in_store_index_field_count != new_df_field_index_count) {
return false;
}

for (auto i = 0; i < int(df_in_store_index_field_count); ++i) {
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i).name()) {
return false;
}
}

return true;
}

bool columns_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
) {
const int index_field_size =
df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? new_df_descriptor.index().field_count() : 0;
// The empty index is compatible with all other index types. Differences in the index fields in this case is
// allowed. The index fields are always the first in the list.
if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) {
return false;
}
// In case the left index is empty index we want to skip name/type checking of the index fields which are always
// the first fields.
for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) {
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name())
return false;

const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type();

if (!trivially_compatible_types(left_type, right_type) &&
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type())))
return false;
}
return true;
}

void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader &existing_isr,
const pipelines::InputTensorFrame &new_frame,
bool empty_types) {
const auto &old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame);

// We need to check that the index names match regardless of the dynamic schema setting
if(!index_names_match(old_sd, new_frame.desc)) {
throw StreamDescriptorMismatch(
"The index names in the argument are not identical to that of the existing version",
old_sd,
new_frame.desc,
operation);
}

if (!dynamic_schema && !columns_match(old_sd, new_frame.desc)) {
throw StreamDescriptorMismatch(
"The columns (names and types) in the argument are not identical to that of the existing version",
old_sd,
new_frame.desc,
operation);
}
}
} // namespace arcticdb
113 changes: 13 additions & 100 deletions cpp/arcticdb/version/schema_checks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,125 +11,38 @@ enum NormalizationOperation : uint8_t {
UPDATE,
};

inline std::string_view normalization_operation_str(NormalizationOperation operation) {
switch (operation) {
case APPEND:
return "APPEND";
case UPDATE:
return "UPDATE";
default:
util::raise_rte("Unknown operation type {}", static_cast<uint8_t>(operation));
}
}

std::string_view normalization_operation_str(NormalizationOperation operation);

struct StreamDescriptorMismatch : ArcticSpecificException<ErrorCode::E_DESCRIPTOR_MISMATCH> {
StreamDescriptorMismatch(const char* preamble, const StreamDescriptor& existing, const StreamDescriptor& new_val, NormalizationOperation operation) :
ArcticSpecificException(fmt::format("{}: {} \nexisting={}\n new_val={}", preamble, normalization_operation_str(operation),
existing.fields(), new_val.fields())) {}
};

inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right) {
if (left == right) {
return left;
}
if (left == IndexDescriptor::Type::EMPTY) {
return right;
}
if (right == IndexDescriptor::Type::EMPTY) {
return left;
}
return IndexDescriptor::Type::UNKNOWN;
}
IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& left, const IndexDescriptor::Type& right) ;

inline void check_normalization_index_match(
void check_normalization_index_match(
NormalizationOperation operation,
const StreamDescriptor& old_descriptor,
const pipelines::InputTensorFrame& frame,
bool empty_types
) {
const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type();
const IndexDescriptor::Type new_idx_kind = frame.desc.index().type();
if (operation == UPDATE) {
const bool new_is_timeseries = std::holds_alternative<TimeseriesIndex>(frame.index);
util::check_rte(
(old_idx_kind == IndexDescriptor::Type::TIMESTAMP || old_idx_kind == IndexDescriptor::Type::EMPTY) && new_is_timeseries,
"Update will not work as expected with a non-timeseries index"
);
} else {
const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind);
if (empty_types) {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::Type::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
} else {
// (old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT) is left to preserve
// pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series
// have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545
// it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index
// in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series.
// Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would
// have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better
// after we enable the empty index.
const bool input_frame_is_series = frame.norm_meta.has_series();
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::Type::UNKNOWN ||
(input_frame_is_series && old_idx_kind == IndexDescriptor::Type::TIMESTAMP && new_idx_kind == IndexDescriptor::Type::ROWCOUNT),
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
}
}
}
);

inline bool columns_match(
bool index_names_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
) {
const int index_field_size =
df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? new_df_descriptor.index().field_count() : 0;
// The empty index is compatible with all other index types. Differences in the index fields in this case is
// allowed. The index fields are always the first in the list.
if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) {
return false;
}
// In case the left index is empty index we want to skip name/type checking of the index fields which are always
// the first fields.
for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) {
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name())
return false;

const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type();
);

if (!trivially_compatible_types(left_type, right_type) &&
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type())))
return false;
}
return true;
}
bool columns_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
);

inline void fix_descriptor_mismatch_or_throw(
void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader &existing_isr,
const pipelines::InputTensorFrame &new_frame,
bool empty_types) {
const auto &old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame);

if (!dynamic_schema && !columns_match(old_sd, new_frame.desc)) {
throw StreamDescriptorMismatch(
"The columns (names and types) in the argument are not identical to that of the existing version",
old_sd,
new_frame.desc,
operation);
}
}
bool empty_types
);
} // namespace arcticdb
26 changes: 25 additions & 1 deletion cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,20 @@ bool read_incompletes_to_pipeline(
ensure_timeseries_norm_meta(*pipeline_context->norm_meta_, pipeline_context->stream_id_, sparsify);
}

const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor();


// We need to check that the index names match regardless of the dynamic schema setting
// A more detailed check is done later in the do_compact function
if (pipeline_context->desc_) {
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
index_names_match(staged_desc, *pipeline_context->desc_),
"The index names in the staged stream descriptor {} are not identical to that of the stream descriptor on storage {}",
staged_desc,
*pipeline_context->desc_
);
}

if (dynamic_schema) {
pipeline_context->staged_descriptor_ =
merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns);
Expand All @@ -1089,7 +1103,6 @@ bool read_incompletes_to_pipeline(
pipeline_context->desc_ = pipeline_context->staged_descriptor_;
}
} else {
const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor();
if (pipeline_context->desc_) {
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
columns_match(staged_desc, *pipeline_context->desc_),
Expand Down Expand Up @@ -2037,6 +2050,17 @@ bool is_segment_unsorted(const SegmentInMemory& segment) {
}

CheckOutcome check_schema_matches_incomplete(const StreamDescriptor& stream_descriptor_incomplete, const StreamDescriptor& pipeline_desc) {
// We need to check that the index names match regardless of the dynamic schema setting
if(!index_names_match(stream_descriptor_incomplete, pipeline_desc)) {
return Error{
throw_error<ErrorCode::E_DESCRIPTOR_MISMATCH>,
fmt::format("{} All staged segments must have the same index names."
"{} is different than {}",
error_code_data<ErrorCode::E_DESCRIPTOR_MISMATCH>.name_,
stream_descriptor_incomplete,
pipeline_desc)
};
}
if (!columns_match(stream_descriptor_incomplete, pipeline_desc)) {
return Error{
throw_error<ErrorCode::E_DESCRIPTOR_MISMATCH>,
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <arcticdb/stream/segment_aggregator.hpp>
#include <arcticdb/entity/frame_and_descriptor.hpp>
#include <arcticdb/version/version_store_objects.hpp>
#include <arcticdb/version/schema_checks.hpp>

#include <string>

Expand Down Expand Up @@ -307,6 +308,12 @@ template <typename IndexType, typename SchemaType, typename SegmentationPolicy,

const SegmentInMemory& segment = sk.segment(store);

if(!index_names_match(segment.descriptor(), pipeline_context->descriptor())) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
return Error{throw_error<ErrorCode::E_DESCRIPTOR_MISMATCH>, fmt::format("Index names in segment {} and pipeline context {} do not match", segment.descriptor(), pipeline_context->descriptor())};
}

if(validate_index && is_segment_unsorted(segment)) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
Expand Down
Loading
Loading