From 2f7f87b696a1778236927af08aa015d63073a93f Mon Sep 17 00:00:00 2001 From: toge Date: Mon, 9 Sep 2024 12:45:11 +0900 Subject: [PATCH] fix compilation error on apple-clang 15 --- .../block_based/block_based_table_builder.cc | 1238 ++++++++--------- 1 file changed, 619 insertions(+), 619 deletions(-) diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index ec24721b7aa..cae44eee669 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -267,711 +267,711 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector bool decoupled_partitioned_filters_; }; -struct BlockBasedTableBuilder::Rep { - const ImmutableOptions ioptions; - // BEGIN from MutableCFOptions - std::shared_ptr prefix_extractor; - // END from MutableCFOptions - const WriteOptions write_options; - const BlockBasedTableOptions table_options; - const InternalKeyComparator& internal_comparator; - // Size in bytes for the user-defined timestamps. - size_t ts_sz; - // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the - // user key will be stripped when creating the block based table. This - // stripping happens for all user keys, including the keys in data block, - // index block for data block, index block for index block (if index type is - // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned - // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range - // deletion entries. - // As long as the user keys are sorted when added via `Add` API, their logic - // ordering won't change after timestamps are stripped. However, for each user - // key to be logically equivalent before and after timestamp is stripped, the - // user key should contain the minimum timestamp. - bool persist_user_defined_timestamps; - WritableFileWriter* file; - std::atomic offset; - size_t alignment; - BlockBuilder data_block; - // Buffers uncompressed data blocks to replay later. Needed when - // compression dictionary is enabled so we can finalize the dictionary before - // compressing any data blocks. - std::vector data_block_buffers; - BlockBuilder range_del_block; +struct BlockBasedTableBuilder::ParallelCompressionRep { + // TODO: consider replacing with autovector or similar + // Keys is a wrapper of vector of strings avoiding + // releasing string memories during vector clear() + // in order to save memory allocation overhead + class Keys { + public: + Keys() : keys_(kKeysInitSize), size_(0) {} + void PushBack(const Slice& key) { + if (size_ == keys_.size()) { + keys_.emplace_back(key.data(), key.size()); + } else { + keys_[size_].assign(key.data(), key.size()); + } + size_++; + } + void SwapAssign(std::vector& keys) { + size_ = keys.size(); + std::swap(keys_, keys); + } + void Clear() { size_ = 0; } + size_t Size() { return size_; } + std::string& Back() { return keys_[size_ - 1]; } + std::string& operator[](size_t idx) { + assert(idx < size_); + return keys_[idx]; + } - InternalKeySliceTransform internal_prefix_transform; - std::unique_ptr index_builder; - std::string index_separator_scratch; - PartitionedIndexBuilder* p_index_builder_ = nullptr; + private: + const size_t kKeysInitSize = 32; + std::vector keys_; + size_t size_; + }; + std::unique_ptr curr_block_keys; - std::string last_ikey; // Internal key or empty (unset) - const Slice* first_key_in_next_block = nullptr; - CompressionType compression_type; - uint64_t sample_for_compression; - std::atomic compressible_input_data_bytes; - std::atomic uncompressible_input_data_bytes; - std::atomic sampled_input_data_bytes; - std::atomic sampled_output_slow_data_bytes; - std::atomic sampled_output_fast_data_bytes; - CompressionOptions compression_opts; - std::unique_ptr compression_dict; - std::vector> compression_ctxs; - std::vector> verify_ctxs; - std::unique_ptr verify_dict; + class BlockRepSlot; - size_t data_begin_offset = 0; + // BlockRep instances are fetched from and recycled to + // block_rep_pool during parallel compression. + struct BlockRep { + Slice contents; + Slice compressed_contents; + std::unique_ptr data; + std::unique_ptr compressed_data; + CompressionType compression_type; + std::unique_ptr first_key_in_next_block; + std::unique_ptr keys; + std::unique_ptr slot; + Status status; + }; + // Use a vector of BlockRep as a buffer for a determined number + // of BlockRep structures. All data referenced by pointers in + // BlockRep will be freed when this vector is destructed. + using BlockRepBuffer = std::vector; + BlockRepBuffer block_rep_buf; + // Use a thread-safe queue for concurrent access from block + // building thread and writer thread. + using BlockRepPool = WorkQueue; + BlockRepPool block_rep_pool; - TableProperties props; + // Use BlockRepSlot to keep block order in write thread. + // slot_ will pass references to BlockRep + class BlockRepSlot { + public: + BlockRepSlot() : slot_(1) {} + template + void Fill(T&& rep) { + slot_.push(std::forward(rep)); + } + void Take(BlockRep*& rep) { slot_.pop(rep); } - // States of the builder. - // - // - `kBuffered`: This is the initial state where zero or more data blocks are - // accumulated uncompressed in-memory. From this state, call - // `EnterUnbuffered()` to finalize the compression dictionary if enabled, - // compress/write out any buffered blocks, and proceed to the `kUnbuffered` - // state. - // - // - `kUnbuffered`: This is the state when compression dictionary is finalized - // either because it wasn't enabled in the first place or it's been created - // from sampling previously buffered data. In this state, blocks are simply - // compressed/written out as they fill up. From this state, call `Finish()` - // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete - // the partially created file. - // - // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been - // called, so the table builder is no longer usable. We must be in this - // state by the time the destructor runs. - enum class State { - kBuffered, - kUnbuffered, - kClosed, + private: + // slot_ will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + WorkQueue slot_; }; - State state; - // `kBuffered` state is allowed only as long as the buffering of uncompressed - // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. - uint64_t buffer_limit; - std::shared_ptr - compression_dict_buffer_cache_res_mgr; - const bool use_delta_encoding_for_index_values; - std::unique_ptr filter_builder; - OffsetableCacheKey base_cache_key; - const TableFileCreationReason reason; - BlockHandle pending_handle; // Handle to add to index block + // Compression queue will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + using CompressQueue = WorkQueue; + CompressQueue compress_queue; + std::vector compress_thread_pool; - std::string compressed_output; - std::unique_ptr flush_block_policy; + // Write queue will pass references to BlockRep::slot in block_rep_buf, + // and those references are always valid before the corresponding + // BlockRep::slot is destructed, which is before the destruction of + // block_rep_buf. + using WriteQueue = WorkQueue; + WriteQueue write_queue; + std::unique_ptr write_thread; - std::vector> table_properties_collectors; + // Estimate output file size when parallel compression is enabled. This is + // necessary because compression & flush are no longer synchronized, + // and BlockBasedTableBuilder::FileSize() is no longer accurate. + // memory_order_relaxed suffices because accurate statistics is not required. + class FileSizeEstimator { + public: + explicit FileSizeEstimator() + : uncomp_bytes_compressed(0), + uncomp_bytes_curr_block(0), + uncomp_bytes_curr_block_set(false), + uncomp_bytes_inflight(0), + blocks_inflight(0), + curr_compression_ratio(0), + estimated_file_size(0) {} - std::unique_ptr pc_rep; - BlockCreateContext create_context; + // Estimate file size when a block is about to be emitted to + // compression thread + void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_add(uncomp_block_size, + std::memory_order_relaxed) + + uncomp_block_size; - // The size of the "tail" part of a SST file. "Tail" refers to - // all blocks after data blocks till the end of the SST file. - uint64_t tail_size; + uint64_t new_blocks_inflight = + blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; - // See class Footer - uint32_t base_context_checksum; + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + } - uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } - void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } + // Estimate file size when a block is already reaped from + // compression thread + void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { + assert(uncomp_bytes_curr_block_set); - bool IsParallelCompressionEnabled() const { - return compression_opts.parallel_threads > 1; - } + uint64_t new_uncomp_bytes_compressed = + uncomp_bytes_compressed + uncomp_bytes_curr_block; + assert(new_uncomp_bytes_compressed > 0); - Status GetStatus() { - // We need to make modifications of status visible when status_ok is set - // to false, and this is ensured by status_mutex, so no special memory - // order for status_ok is required. - if (status_ok.load(std::memory_order_relaxed)) { - return Status::OK(); - } else { - return CopyStatus(); - } - } + curr_compression_ratio.store( + (curr_compression_ratio.load(std::memory_order_relaxed) * + uncomp_bytes_compressed + + compressed_block_size) / + static_cast(new_uncomp_bytes_compressed), + std::memory_order_relaxed); + uncomp_bytes_compressed = new_uncomp_bytes_compressed; - Status CopyStatus() { - std::lock_guard lock(status_mutex); - return status; - } + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, + std::memory_order_relaxed) - + uncomp_bytes_curr_block; - IOStatus GetIOStatus() { - // We need to make modifications of io_status visible when status_ok is set - // to false, and this is ensured by io_status_mutex, so no special memory - // order for io_status_ok is required. - if (io_status_ok.load(std::memory_order_relaxed)) { -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition - auto ios = CopyIOStatus(); - ios.PermitUncheckedError(); - // Assume no races in unit tests - assert(ios.ok()); -#endif // ROCKSDB_ASSERT_STATUS_CHECKED - return IOStatus::OK(); - } else { - return CopyIOStatus(); - } - } - - IOStatus CopyIOStatus() { - std::lock_guard lock(io_status_mutex); - return io_status; - } + uint64_t new_blocks_inflight = + blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; - // Never erase an existing status that is not OK. - void SetStatus(Status s) { - if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { - // Locking is an overkill for non compression_opts.parallel_threads - // case but since it's unlikely that s is not OK, we take this cost - // to be simplicity. - std::lock_guard lock(status_mutex); - status = s; - status_ok.store(false, std::memory_order_relaxed); - } - } + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); - // Never erase an existing I/O status that is not OK. - // Calling this will also SetStatus(ios) - void SetIOStatus(IOStatus ios) { - if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { - // Locking is an overkill for non compression_opts.parallel_threads - // case but since it's unlikely that s is not OK, we take this cost - // to be simplicity. - std::lock_guard lock(io_status_mutex); - io_status = ios; - io_status_ok.store(false, std::memory_order_relaxed); + uncomp_bytes_curr_block_set = false; } - SetStatus(ios); - } - Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, - WritableFileWriter* f) - : ioptions(tbo.ioptions), - prefix_extractor(tbo.moptions.prefix_extractor), - write_options(tbo.write_options), - table_options(table_opt), - internal_comparator(tbo.internal_comparator), - ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), - persist_user_defined_timestamps( - tbo.ioptions.persist_user_defined_timestamps), - file(f), - offset(0), - alignment(table_options.block_align - ? std::min(static_cast(table_options.block_size), - kDefaultPageSize) - : 0), - data_block(table_options.block_restart_interval, - table_options.use_delta_encoding, - false /* use_value_delta_encoding */, - tbo.internal_comparator.user_comparator() - ->CanKeysWithDifferentByteContentsBeEqual() - ? BlockBasedTableOptions::kDataBlockBinarySearch - : table_options.data_block_index_type, - table_options.data_block_hash_table_util_ratio, ts_sz, - persist_user_defined_timestamps), - range_del_block( - 1 /* block_restart_interval */, true /* use_delta_encoding */, - false /* use_value_delta_encoding */, - BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, - 0.75 /* data_block_hash_table_util_ratio */, ts_sz, - persist_user_defined_timestamps), - internal_prefix_transform(prefix_extractor.get()), - compression_type(tbo.compression_type), - sample_for_compression(tbo.moptions.sample_for_compression), - compressible_input_data_bytes(0), - uncompressible_input_data_bytes(0), - sampled_input_data_bytes(0), - sampled_output_slow_data_bytes(0), - sampled_output_fast_data_bytes(0), - compression_opts(tbo.compression_opts), - compression_dict(), - compression_ctxs(tbo.compression_opts.parallel_threads), - verify_ctxs(tbo.compression_opts.parallel_threads), - verify_dict(), - state((tbo.compression_opts.max_dict_bytes > 0 && - tbo.compression_type != kNoCompression) - ? State::kBuffered - : State::kUnbuffered), - use_delta_encoding_for_index_values(table_opt.format_version >= 4 && - !table_opt.block_align), - reason(tbo.reason), - flush_block_policy( - table_options.flush_block_policy_factory->NewFlushBlockPolicy( - table_options, data_block)), - create_context(&table_options, &ioptions, ioptions.stats, - compression_type == kZSTD || - compression_type == kZSTDNotFinalCompression, - tbo.moptions.block_protection_bytes_per_key, - tbo.internal_comparator.user_comparator(), - !use_delta_encoding_for_index_values, - table_opt.index_type == - BlockBasedTableOptions::kBinarySearchWithFirstKey), - tail_size(0), - status_ok(true), - io_status_ok(true) { - if (tbo.target_file_size == 0) { - buffer_limit = compression_opts.max_dict_buffer_bytes; - } else if (compression_opts.max_dict_buffer_bytes == 0) { - buffer_limit = tbo.target_file_size; - } else { - buffer_limit = std::min(tbo.target_file_size, - compression_opts.max_dict_buffer_bytes); + void SetEstimatedFileSize(uint64_t size) { + estimated_file_size.store(size, std::memory_order_relaxed); } - const auto compress_dict_build_buffer_charged = - table_options.cache_usage_options.options_overrides - .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) - .charged; - if (table_options.block_cache && - (compress_dict_build_buffer_charged == - CacheEntryRoleOptions::Decision::kEnabled || - compress_dict_build_buffer_charged == - CacheEntryRoleOptions::Decision::kFallback)) { - compression_dict_buffer_cache_res_mgr = - std::make_shared>( - table_options.block_cache); - } else { - compression_dict_buffer_cache_res_mgr = nullptr; + uint64_t GetEstimatedFileSize() { + return estimated_file_size.load(std::memory_order_relaxed); } - assert(compression_ctxs.size() >= compression_opts.parallel_threads); - for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { - compression_ctxs[i].reset( - new CompressionContext(compression_type, compression_opts)); - } - if (table_options.index_type == - BlockBasedTableOptions::kTwoLevelIndexSearch) { - p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( - &internal_comparator, use_delta_encoding_for_index_values, - table_options, ts_sz, persist_user_defined_timestamps); - index_builder.reset(p_index_builder_); - } else { - index_builder.reset(IndexBuilder::CreateIndexBuilder( - table_options.index_type, &internal_comparator, - &this->internal_prefix_transform, use_delta_encoding_for_index_values, - table_options, ts_sz, persist_user_defined_timestamps)); + void SetCurrBlockUncompSize(uint64_t size) { + uncomp_bytes_curr_block = size; + uncomp_bytes_curr_block_set = true; } - if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { - // Apply optimize_filters_for_hits setting here when applicable by - // skipping filter generation - filter_builder.reset(); - } else if (tbo.skip_filters) { - // For SstFileWriter skip_filters - filter_builder.reset(); - } else if (!table_options.filter_policy) { - // Null filter_policy -> no filter - filter_builder.reset(); - } else { - FilterBuildingContext filter_context(table_options); - filter_context.info_log = ioptions.logger; - filter_context.column_family_name = tbo.column_family_name; - filter_context.reason = reason; + private: + // Input bytes compressed so far. + uint64_t uncomp_bytes_compressed; + // Size of current block being appended. + uint64_t uncomp_bytes_curr_block; + // Whether uncomp_bytes_curr_block has been set for next + // ReapBlock call. + bool uncomp_bytes_curr_block_set; + // Input bytes under compression and not appended yet. + std::atomic uncomp_bytes_inflight; + // Number of blocks under compression and not appended yet. + std::atomic blocks_inflight; + // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. + std::atomic curr_compression_ratio; + // Estimated SST file size. + std::atomic estimated_file_size; + }; + FileSizeEstimator file_size_estimator; - // Only populate other fields if known to be in LSM rather than - // generating external SST file - if (reason != TableFileCreationReason::kMisc) { - filter_context.compaction_style = ioptions.compaction_style; - filter_context.num_levels = ioptions.num_levels; - filter_context.level_at_creation = tbo.level_at_creation; - filter_context.is_bottommost = tbo.is_bottommost; - assert(filter_context.level_at_creation < filter_context.num_levels); - } + // Facilities used for waiting first block completion. Need to Wait for + // the completion of first block compression and flush to get a non-zero + // compression ratio. + std::atomic first_block_processed; + std::condition_variable first_block_cond; + std::mutex first_block_mutex; - filter_builder.reset(CreateFilterBlockBuilder( - ioptions, tbo.moptions, filter_context, - use_delta_encoding_for_index_values, p_index_builder_, ts_sz, - persist_user_defined_timestamps)); + explicit ParallelCompressionRep(uint32_t parallel_threads) + : curr_block_keys(new Keys()), + block_rep_buf(parallel_threads), + block_rep_pool(parallel_threads), + compress_queue(parallel_threads), + write_queue(parallel_threads), + first_block_processed(false) { + for (uint32_t i = 0; i < parallel_threads; i++) { + block_rep_buf[i].contents = Slice(); + block_rep_buf[i].compressed_contents = Slice(); + block_rep_buf[i].data.reset(new std::string()); + block_rep_buf[i].compressed_data.reset(new std::string()); + block_rep_buf[i].compression_type = CompressionType(); + block_rep_buf[i].first_key_in_next_block.reset(new std::string()); + block_rep_buf[i].keys.reset(new Keys()); + block_rep_buf[i].slot.reset(new BlockRepSlot()); + block_rep_buf[i].status = Status::OK(); + block_rep_pool.push(&block_rep_buf[i]); } + } - assert(tbo.internal_tbl_prop_coll_factories); - for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { - assert(factory); + ~ParallelCompressionRep() { block_rep_pool.finish(); } - std::unique_ptr collector{ - factory->CreateInternalTblPropColl( - tbo.column_family_id, tbo.level_at_creation, - tbo.ioptions.num_levels, - tbo.last_level_inclusive_max_seqno_threshold)}; - if (collector) { - table_properties_collectors.emplace_back(std::move(collector)); - } + // Make a block prepared to be emitted to compression thread + // Used in non-buffered mode + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + BlockBuilder* data_block) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + data_block->SwapAndReset(*(block_rep->data)); + block_rep->contents = *(block_rep->data); + std::swap(block_rep->keys, curr_block_keys); + curr_block_keys->Clear(); + return block_rep; + } + + // Used in EnterUnbuffered + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + std::string* data_block, + std::vector* keys) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + std::swap(*(block_rep->data), *data_block); + block_rep->contents = *(block_rep->data); + block_rep->keys->SwapAssign(*keys); + return block_rep; + } + + // Emit a block to compression thread + void EmitBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + assert(block_rep->status.ok()); + if (!write_queue.push(block_rep->slot.get())) { + return; } - table_properties_collectors.emplace_back( - new BlockBasedTablePropertiesCollector( - table_options.index_type, table_options.whole_key_filtering, - prefix_extractor != nullptr, - table_options.decouple_partitioned_filters)); - if (ts_sz > 0 && persist_user_defined_timestamps) { - table_properties_collectors.emplace_back( - new TimestampTablePropertiesCollector( - tbo.internal_comparator.user_comparator())); + if (!compress_queue.push(block_rep)) { + return; } - if (table_options.verify_compression) { - for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { - verify_ctxs[i].reset(new UncompressionContext(compression_type)); - } + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::unique_lock lock(first_block_mutex); + first_block_cond.wait(lock, [this] { + return first_block_processed.load(std::memory_order_relaxed); + }); } + } - // These are only needed for populating table properties - props.column_family_id = tbo.column_family_id; - props.column_family_name = tbo.column_family_name; - props.oldest_key_time = tbo.oldest_key_time; - props.file_creation_time = tbo.file_creation_time; - props.orig_file_number = tbo.cur_file_num; - props.db_id = tbo.db_id; - props.db_session_id = tbo.db_session_id; - props.db_host_id = ioptions.db_host_id; - if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { - ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); + // Reap a block from compression thread + void ReapBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + block_rep->compressed_data->clear(); + block_rep_pool.push(block_rep); + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::lock_guard lock(first_block_mutex); + first_block_processed.store(true, std::memory_order_relaxed); + first_block_cond.notify_one(); } - // Default is UINT64_MAX for unknown. Setting it to 0 here - // to allow updating it by taking max in BlockBasedTableBuilder::Add(). - props.key_largest_seqno = 0; + } - if (FormatVersionUsesContextChecksum(table_options.format_version)) { - // Must be non-zero and semi- or quasi-random - // TODO: ideally guaranteed different for related files (e.g. use file - // number and db_session, for benefit of SstFileWriter) - do { - base_context_checksum = Random::GetTLSInstance()->Next(); - } while (UNLIKELY(base_context_checksum == 0)); + private: + BlockRep* PrepareBlockInternal(CompressionType compression_type, + const Slice* first_key_in_next_block) { + BlockRep* block_rep = nullptr; + block_rep_pool.pop(block_rep); + assert(block_rep != nullptr); + + assert(block_rep->data); + + block_rep->compression_type = compression_type; + + if (first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); } else { - base_context_checksum = 0; + block_rep->first_key_in_next_block->assign( + first_key_in_next_block->data(), first_key_in_next_block->size()); } - if (alignment > 0 && compression_type != kNoCompression) { - // With better sanitization in `CompactionPicker::CompactFiles()`, we - // would not need to handle this case here and could change it to an - // assertion instead. - SetStatus(Status::InvalidArgument( - "Enable block_align, but compression enabled")); - } + return block_rep; } +}; - Rep(const Rep&) = delete; - Rep& operator=(const Rep&) = delete; +struct BlockBasedTableBuilder::Rep { + const ImmutableOptions ioptions; + // BEGIN from MutableCFOptions + std::shared_ptr prefix_extractor; + // END from MutableCFOptions + const WriteOptions write_options; + const BlockBasedTableOptions table_options; + const InternalKeyComparator& internal_comparator; + // Size in bytes for the user-defined timestamps. + size_t ts_sz; + // When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the + // user key will be stripped when creating the block based table. This + // stripping happens for all user keys, including the keys in data block, + // index block for data block, index block for index block (if index type is + // `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned + // filters), the `first_internal_key` in `IndexValue`, the `end_key` for range + // deletion entries. + // As long as the user keys are sorted when added via `Add` API, their logic + // ordering won't change after timestamps are stripped. However, for each user + // key to be logically equivalent before and after timestamp is stripped, the + // user key should contain the minimum timestamp. + bool persist_user_defined_timestamps; + WritableFileWriter* file; + std::atomic offset; + size_t alignment; + BlockBuilder data_block; + // Buffers uncompressed data blocks to replay later. Needed when + // compression dictionary is enabled so we can finalize the dictionary before + // compressing any data blocks. + std::vector data_block_buffers; + BlockBuilder range_del_block; - private: - // Synchronize status & io_status accesses across threads from main thread, - // compression thread and write thread in parallel compression. - std::mutex status_mutex; - std::atomic status_ok; - Status status; - std::mutex io_status_mutex; - std::atomic io_status_ok; - IOStatus io_status; -}; + InternalKeySliceTransform internal_prefix_transform; + std::unique_ptr index_builder; + std::string index_separator_scratch; + PartitionedIndexBuilder* p_index_builder_ = nullptr; -struct BlockBasedTableBuilder::ParallelCompressionRep { - // TODO: consider replacing with autovector or similar - // Keys is a wrapper of vector of strings avoiding - // releasing string memories during vector clear() - // in order to save memory allocation overhead - class Keys { - public: - Keys() : keys_(kKeysInitSize), size_(0) {} - void PushBack(const Slice& key) { - if (size_ == keys_.size()) { - keys_.emplace_back(key.data(), key.size()); - } else { - keys_[size_].assign(key.data(), key.size()); - } - size_++; - } - void SwapAssign(std::vector& keys) { - size_ = keys.size(); - std::swap(keys_, keys); - } - void Clear() { size_ = 0; } - size_t Size() { return size_; } - std::string& Back() { return keys_[size_ - 1]; } - std::string& operator[](size_t idx) { - assert(idx < size_); - return keys_[idx]; - } + std::string last_ikey; // Internal key or empty (unset) + const Slice* first_key_in_next_block = nullptr; + CompressionType compression_type; + uint64_t sample_for_compression; + std::atomic compressible_input_data_bytes; + std::atomic uncompressible_input_data_bytes; + std::atomic sampled_input_data_bytes; + std::atomic sampled_output_slow_data_bytes; + std::atomic sampled_output_fast_data_bytes; + CompressionOptions compression_opts; + std::unique_ptr compression_dict; + std::vector> compression_ctxs; + std::vector> verify_ctxs; + std::unique_ptr verify_dict; - private: - const size_t kKeysInitSize = 32; - std::vector keys_; - size_t size_; - }; - std::unique_ptr curr_block_keys; + size_t data_begin_offset = 0; - class BlockRepSlot; + TableProperties props; - // BlockRep instances are fetched from and recycled to - // block_rep_pool during parallel compression. - struct BlockRep { - Slice contents; - Slice compressed_contents; - std::unique_ptr data; - std::unique_ptr compressed_data; - CompressionType compression_type; - std::unique_ptr first_key_in_next_block; - std::unique_ptr keys; - std::unique_ptr slot; - Status status; + // States of the builder. + // + // - `kBuffered`: This is the initial state where zero or more data blocks are + // accumulated uncompressed in-memory. From this state, call + // `EnterUnbuffered()` to finalize the compression dictionary if enabled, + // compress/write out any buffered blocks, and proceed to the `kUnbuffered` + // state. + // + // - `kUnbuffered`: This is the state when compression dictionary is finalized + // either because it wasn't enabled in the first place or it's been created + // from sampling previously buffered data. In this state, blocks are simply + // compressed/written out as they fill up. From this state, call `Finish()` + // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete + // the partially created file. + // + // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been + // called, so the table builder is no longer usable. We must be in this + // state by the time the destructor runs. + enum class State { + kBuffered, + kUnbuffered, + kClosed, }; - // Use a vector of BlockRep as a buffer for a determined number - // of BlockRep structures. All data referenced by pointers in - // BlockRep will be freed when this vector is destructed. - using BlockRepBuffer = std::vector; - BlockRepBuffer block_rep_buf; - // Use a thread-safe queue for concurrent access from block - // building thread and writer thread. - using BlockRepPool = WorkQueue; - BlockRepPool block_rep_pool; + State state; + // `kBuffered` state is allowed only as long as the buffering of uncompressed + // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. + uint64_t buffer_limit; + std::shared_ptr + compression_dict_buffer_cache_res_mgr; + const bool use_delta_encoding_for_index_values; + std::unique_ptr filter_builder; + OffsetableCacheKey base_cache_key; + const TableFileCreationReason reason; - // Use BlockRepSlot to keep block order in write thread. - // slot_ will pass references to BlockRep - class BlockRepSlot { - public: - BlockRepSlot() : slot_(1) {} - template - void Fill(T&& rep) { - slot_.push(std::forward(rep)); - } - void Take(BlockRep*& rep) { slot_.pop(rep); } + BlockHandle pending_handle; // Handle to add to index block - private: - // slot_ will pass references to BlockRep in block_rep_buf, - // and those references are always valid before the destruction of - // block_rep_buf. - WorkQueue slot_; - }; + std::string compressed_output; + std::unique_ptr flush_block_policy; - // Compression queue will pass references to BlockRep in block_rep_buf, - // and those references are always valid before the destruction of - // block_rep_buf. - using CompressQueue = WorkQueue; - CompressQueue compress_queue; - std::vector compress_thread_pool; + std::vector> table_properties_collectors; - // Write queue will pass references to BlockRep::slot in block_rep_buf, - // and those references are always valid before the corresponding - // BlockRep::slot is destructed, which is before the destruction of - // block_rep_buf. - using WriteQueue = WorkQueue; - WriteQueue write_queue; - std::unique_ptr write_thread; + std::unique_ptr pc_rep; + BlockCreateContext create_context; - // Estimate output file size when parallel compression is enabled. This is - // necessary because compression & flush are no longer synchronized, - // and BlockBasedTableBuilder::FileSize() is no longer accurate. - // memory_order_relaxed suffices because accurate statistics is not required. - class FileSizeEstimator { - public: - explicit FileSizeEstimator() - : uncomp_bytes_compressed(0), - uncomp_bytes_curr_block(0), - uncomp_bytes_curr_block_set(false), - uncomp_bytes_inflight(0), - blocks_inflight(0), - curr_compression_ratio(0), - estimated_file_size(0) {} + // The size of the "tail" part of a SST file. "Tail" refers to + // all blocks after data blocks till the end of the SST file. + uint64_t tail_size; - // Estimate file size when a block is about to be emitted to - // compression thread - void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { - uint64_t new_uncomp_bytes_inflight = - uncomp_bytes_inflight.fetch_add(uncomp_block_size, - std::memory_order_relaxed) + - uncomp_block_size; + // See class Footer + uint32_t base_context_checksum; - uint64_t new_blocks_inflight = - blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } + void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } - estimated_file_size.store( - curr_file_size + - static_cast( - static_cast(new_uncomp_bytes_inflight) * - curr_compression_ratio.load(std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); - } + bool IsParallelCompressionEnabled() const { + return compression_opts.parallel_threads > 1; + } - // Estimate file size when a block is already reaped from - // compression thread - void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { - assert(uncomp_bytes_curr_block_set); + Status GetStatus() { + // We need to make modifications of status visible when status_ok is set + // to false, and this is ensured by status_mutex, so no special memory + // order for status_ok is required. + if (status_ok.load(std::memory_order_relaxed)) { + return Status::OK(); + } else { + return CopyStatus(); + } + } - uint64_t new_uncomp_bytes_compressed = - uncomp_bytes_compressed + uncomp_bytes_curr_block; - assert(new_uncomp_bytes_compressed > 0); + Status CopyStatus() { + std::lock_guard lock(status_mutex); + return status; + } - curr_compression_ratio.store( - (curr_compression_ratio.load(std::memory_order_relaxed) * - uncomp_bytes_compressed + - compressed_block_size) / - static_cast(new_uncomp_bytes_compressed), - std::memory_order_relaxed); - uncomp_bytes_compressed = new_uncomp_bytes_compressed; + IOStatus GetIOStatus() { + // We need to make modifications of io_status visible when status_ok is set + // to false, and this is ensured by io_status_mutex, so no special memory + // order for io_status_ok is required. + if (io_status_ok.load(std::memory_order_relaxed)) { +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED // Avoid unnecessary lock acquisition + auto ios = CopyIOStatus(); + ios.PermitUncheckedError(); + // Assume no races in unit tests + assert(ios.ok()); +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + return IOStatus::OK(); + } else { + return CopyIOStatus(); + } + } - uint64_t new_uncomp_bytes_inflight = - uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, - std::memory_order_relaxed) - - uncomp_bytes_curr_block; + IOStatus CopyIOStatus() { + std::lock_guard lock(io_status_mutex); + return io_status; + } - uint64_t new_blocks_inflight = - blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; + // Never erase an existing status that is not OK. + void SetStatus(Status s) { + if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(status_mutex); + status = s; + status_ok.store(false, std::memory_order_relaxed); + } + } - estimated_file_size.store( - curr_file_size + - static_cast( - static_cast(new_uncomp_bytes_inflight) * - curr_compression_ratio.load(std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); + // Never erase an existing I/O status that is not OK. + // Calling this will also SetStatus(ios) + void SetIOStatus(IOStatus ios) { + if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(io_status_mutex); + io_status = ios; + io_status_ok.store(false, std::memory_order_relaxed); + } + SetStatus(ios); + } - uncomp_bytes_curr_block_set = false; + Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo, + WritableFileWriter* f) + : ioptions(tbo.ioptions), + prefix_extractor(tbo.moptions.prefix_extractor), + write_options(tbo.write_options), + table_options(table_opt), + internal_comparator(tbo.internal_comparator), + ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()), + persist_user_defined_timestamps( + tbo.ioptions.persist_user_defined_timestamps), + file(f), + offset(0), + alignment(table_options.block_align + ? std::min(static_cast(table_options.block_size), + kDefaultPageSize) + : 0), + data_block(table_options.block_restart_interval, + table_options.use_delta_encoding, + false /* use_value_delta_encoding */, + tbo.internal_comparator.user_comparator() + ->CanKeysWithDifferentByteContentsBeEqual() + ? BlockBasedTableOptions::kDataBlockBinarySearch + : table_options.data_block_index_type, + table_options.data_block_hash_table_util_ratio, ts_sz, + persist_user_defined_timestamps), + range_del_block( + 1 /* block_restart_interval */, true /* use_delta_encoding */, + false /* use_value_delta_encoding */, + BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */, + 0.75 /* data_block_hash_table_util_ratio */, ts_sz, + persist_user_defined_timestamps), + internal_prefix_transform(prefix_extractor.get()), + compression_type(tbo.compression_type), + sample_for_compression(tbo.moptions.sample_for_compression), + compressible_input_data_bytes(0), + uncompressible_input_data_bytes(0), + sampled_input_data_bytes(0), + sampled_output_slow_data_bytes(0), + sampled_output_fast_data_bytes(0), + compression_opts(tbo.compression_opts), + compression_dict(), + compression_ctxs(tbo.compression_opts.parallel_threads), + verify_ctxs(tbo.compression_opts.parallel_threads), + verify_dict(), + state((tbo.compression_opts.max_dict_bytes > 0 && + tbo.compression_type != kNoCompression) + ? State::kBuffered + : State::kUnbuffered), + use_delta_encoding_for_index_values(table_opt.format_version >= 4 && + !table_opt.block_align), + reason(tbo.reason), + flush_block_policy( + table_options.flush_block_policy_factory->NewFlushBlockPolicy( + table_options, data_block)), + create_context(&table_options, &ioptions, ioptions.stats, + compression_type == kZSTD || + compression_type == kZSTDNotFinalCompression, + tbo.moptions.block_protection_bytes_per_key, + tbo.internal_comparator.user_comparator(), + !use_delta_encoding_for_index_values, + table_opt.index_type == + BlockBasedTableOptions::kBinarySearchWithFirstKey), + tail_size(0), + status_ok(true), + io_status_ok(true) { + if (tbo.target_file_size == 0) { + buffer_limit = compression_opts.max_dict_buffer_bytes; + } else if (compression_opts.max_dict_buffer_bytes == 0) { + buffer_limit = tbo.target_file_size; + } else { + buffer_limit = std::min(tbo.target_file_size, + compression_opts.max_dict_buffer_bytes); } - void SetEstimatedFileSize(uint64_t size) { - estimated_file_size.store(size, std::memory_order_relaxed); + const auto compress_dict_build_buffer_charged = + table_options.cache_usage_options.options_overrides + .at(CacheEntryRole::kCompressionDictionaryBuildingBuffer) + .charged; + if (table_options.block_cache && + (compress_dict_build_buffer_charged == + CacheEntryRoleOptions::Decision::kEnabled || + compress_dict_build_buffer_charged == + CacheEntryRoleOptions::Decision::kFallback)) { + compression_dict_buffer_cache_res_mgr = + std::make_shared>( + table_options.block_cache); + } else { + compression_dict_buffer_cache_res_mgr = nullptr; } - uint64_t GetEstimatedFileSize() { - return estimated_file_size.load(std::memory_order_relaxed); + assert(compression_ctxs.size() >= compression_opts.parallel_threads); + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + compression_ctxs[i].reset( + new CompressionContext(compression_type, compression_opts)); } - - void SetCurrBlockUncompSize(uint64_t size) { - uncomp_bytes_curr_block = size; - uncomp_bytes_curr_block_set = true; + if (table_options.index_type == + BlockBasedTableOptions::kTwoLevelIndexSearch) { + p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( + &internal_comparator, use_delta_encoding_for_index_values, + table_options, ts_sz, persist_user_defined_timestamps); + index_builder.reset(p_index_builder_); + } else { + index_builder.reset(IndexBuilder::CreateIndexBuilder( + table_options.index_type, &internal_comparator, + &this->internal_prefix_transform, use_delta_encoding_for_index_values, + table_options, ts_sz, persist_user_defined_timestamps)); } + if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) { + // Apply optimize_filters_for_hits setting here when applicable by + // skipping filter generation + filter_builder.reset(); + } else if (tbo.skip_filters) { + // For SstFileWriter skip_filters + filter_builder.reset(); + } else if (!table_options.filter_policy) { + // Null filter_policy -> no filter + filter_builder.reset(); + } else { + FilterBuildingContext filter_context(table_options); - private: - // Input bytes compressed so far. - uint64_t uncomp_bytes_compressed; - // Size of current block being appended. - uint64_t uncomp_bytes_curr_block; - // Whether uncomp_bytes_curr_block has been set for next - // ReapBlock call. - bool uncomp_bytes_curr_block_set; - // Input bytes under compression and not appended yet. - std::atomic uncomp_bytes_inflight; - // Number of blocks under compression and not appended yet. - std::atomic blocks_inflight; - // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. - std::atomic curr_compression_ratio; - // Estimated SST file size. - std::atomic estimated_file_size; - }; - FileSizeEstimator file_size_estimator; + filter_context.info_log = ioptions.logger; + filter_context.column_family_name = tbo.column_family_name; + filter_context.reason = reason; - // Facilities used for waiting first block completion. Need to Wait for - // the completion of first block compression and flush to get a non-zero - // compression ratio. - std::atomic first_block_processed; - std::condition_variable first_block_cond; - std::mutex first_block_mutex; + // Only populate other fields if known to be in LSM rather than + // generating external SST file + if (reason != TableFileCreationReason::kMisc) { + filter_context.compaction_style = ioptions.compaction_style; + filter_context.num_levels = ioptions.num_levels; + filter_context.level_at_creation = tbo.level_at_creation; + filter_context.is_bottommost = tbo.is_bottommost; + assert(filter_context.level_at_creation < filter_context.num_levels); + } - explicit ParallelCompressionRep(uint32_t parallel_threads) - : curr_block_keys(new Keys()), - block_rep_buf(parallel_threads), - block_rep_pool(parallel_threads), - compress_queue(parallel_threads), - write_queue(parallel_threads), - first_block_processed(false) { - for (uint32_t i = 0; i < parallel_threads; i++) { - block_rep_buf[i].contents = Slice(); - block_rep_buf[i].compressed_contents = Slice(); - block_rep_buf[i].data.reset(new std::string()); - block_rep_buf[i].compressed_data.reset(new std::string()); - block_rep_buf[i].compression_type = CompressionType(); - block_rep_buf[i].first_key_in_next_block.reset(new std::string()); - block_rep_buf[i].keys.reset(new Keys()); - block_rep_buf[i].slot.reset(new BlockRepSlot()); - block_rep_buf[i].status = Status::OK(); - block_rep_pool.push(&block_rep_buf[i]); + filter_builder.reset(CreateFilterBlockBuilder( + ioptions, tbo.moptions, filter_context, + use_delta_encoding_for_index_values, p_index_builder_, ts_sz, + persist_user_defined_timestamps)); } - } - - ~ParallelCompressionRep() { block_rep_pool.finish(); } - - // Make a block prepared to be emitted to compression thread - // Used in non-buffered mode - BlockRep* PrepareBlock(CompressionType compression_type, - const Slice* first_key_in_next_block, - BlockBuilder* data_block) { - BlockRep* block_rep = - PrepareBlockInternal(compression_type, first_key_in_next_block); - assert(block_rep != nullptr); - data_block->SwapAndReset(*(block_rep->data)); - block_rep->contents = *(block_rep->data); - std::swap(block_rep->keys, curr_block_keys); - curr_block_keys->Clear(); - return block_rep; - } - // Used in EnterUnbuffered - BlockRep* PrepareBlock(CompressionType compression_type, - const Slice* first_key_in_next_block, - std::string* data_block, - std::vector* keys) { - BlockRep* block_rep = - PrepareBlockInternal(compression_type, first_key_in_next_block); - assert(block_rep != nullptr); - std::swap(*(block_rep->data), *data_block); - block_rep->contents = *(block_rep->data); - block_rep->keys->SwapAssign(*keys); - return block_rep; - } + assert(tbo.internal_tbl_prop_coll_factories); + for (auto& factory : *tbo.internal_tbl_prop_coll_factories) { + assert(factory); - // Emit a block to compression thread - void EmitBlock(BlockRep* block_rep) { - assert(block_rep != nullptr); - assert(block_rep->status.ok()); - if (!write_queue.push(block_rep->slot.get())) { - return; + std::unique_ptr collector{ + factory->CreateInternalTblPropColl( + tbo.column_family_id, tbo.level_at_creation, + tbo.ioptions.num_levels, + tbo.last_level_inclusive_max_seqno_threshold)}; + if (collector) { + table_properties_collectors.emplace_back(std::move(collector)); + } } - if (!compress_queue.push(block_rep)) { - return; + table_properties_collectors.emplace_back( + new BlockBasedTablePropertiesCollector( + table_options.index_type, table_options.whole_key_filtering, + prefix_extractor != nullptr, + table_options.decouple_partitioned_filters)); + if (ts_sz > 0 && persist_user_defined_timestamps) { + table_properties_collectors.emplace_back( + new TimestampTablePropertiesCollector( + tbo.internal_comparator.user_comparator())); } - - if (!first_block_processed.load(std::memory_order_relaxed)) { - std::unique_lock lock(first_block_mutex); - first_block_cond.wait(lock, [this] { - return first_block_processed.load(std::memory_order_relaxed); - }); + if (table_options.verify_compression) { + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + verify_ctxs[i].reset(new UncompressionContext(compression_type)); + } } - } - - // Reap a block from compression thread - void ReapBlock(BlockRep* block_rep) { - assert(block_rep != nullptr); - block_rep->compressed_data->clear(); - block_rep_pool.push(block_rep); - if (!first_block_processed.load(std::memory_order_relaxed)) { - std::lock_guard lock(first_block_mutex); - first_block_processed.store(true, std::memory_order_relaxed); - first_block_cond.notify_one(); + // These are only needed for populating table properties + props.column_family_id = tbo.column_family_id; + props.column_family_name = tbo.column_family_name; + props.oldest_key_time = tbo.oldest_key_time; + props.file_creation_time = tbo.file_creation_time; + props.orig_file_number = tbo.cur_file_num; + props.db_id = tbo.db_id; + props.db_session_id = tbo.db_session_id; + props.db_host_id = ioptions.db_host_id; + if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) { + ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set"); } - } - - private: - BlockRep* PrepareBlockInternal(CompressionType compression_type, - const Slice* first_key_in_next_block) { - BlockRep* block_rep = nullptr; - block_rep_pool.pop(block_rep); - assert(block_rep != nullptr); - - assert(block_rep->data); - - block_rep->compression_type = compression_type; + // Default is UINT64_MAX for unknown. Setting it to 0 here + // to allow updating it by taking max in BlockBasedTableBuilder::Add(). + props.key_largest_seqno = 0; - if (first_key_in_next_block == nullptr) { - block_rep->first_key_in_next_block.reset(nullptr); + if (FormatVersionUsesContextChecksum(table_options.format_version)) { + // Must be non-zero and semi- or quasi-random + // TODO: ideally guaranteed different for related files (e.g. use file + // number and db_session, for benefit of SstFileWriter) + do { + base_context_checksum = Random::GetTLSInstance()->Next(); + } while (UNLIKELY(base_context_checksum == 0)); } else { - block_rep->first_key_in_next_block->assign( - first_key_in_next_block->data(), first_key_in_next_block->size()); + base_context_checksum = 0; } - return block_rep; + if (alignment > 0 && compression_type != kNoCompression) { + // With better sanitization in `CompactionPicker::CompactFiles()`, we + // would not need to handle this case here and could change it to an + // assertion instead. + SetStatus(Status::InvalidArgument( + "Enable block_align, but compression enabled")); + } } + + Rep(const Rep&) = delete; + Rep& operator=(const Rep&) = delete; + + private: + // Synchronize status & io_status accesses across threads from main thread, + // compression thread and write thread in parallel compression. + std::mutex status_mutex; + std::atomic status_ok; + Status status; + std::mutex io_status_mutex; + std::atomic io_status_ok; + IOStatus io_status; }; BlockBasedTableBuilder::BlockBasedTableBuilder(