Skip to content

Commit

Permalink
[improve](mow) merge and remove old version of delete bitmap when cum…
Browse files Browse the repository at this point in the history
…ulative compaction is done on local mode (#41636)

pr #40204 remove delete bitmap on
cloud mode, this pr is used for local mode
  • Loading branch information
hust-hhb authored Dec 10, 2024
1 parent 82d799a commit cebf648
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 123 deletions.
28 changes: 5 additions & 23 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,10 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
auto pre_max_version = _output_rowset->version().second;
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
DBUG_EXECUTE_IF("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed",
Expand All @@ -424,9 +406,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{ static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets();
});
}
}
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "cloud_delete_bitmap_action.h"
#include "delete_bitmap_action.h"

#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
Expand All @@ -34,8 +34,10 @@
#include <utility>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
Expand All @@ -44,7 +46,6 @@
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
Expand All @@ -59,10 +60,9 @@ constexpr std::string_view HEADER_JSON = "application/json";

} // namespace

CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
BaseStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype),
_engine(engine),
_delete_bitmap_action_type(ctype) {}
Expand All @@ -80,20 +80,24 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}

CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
BaseTabletSPtr tablet = nullptr;
if (config::is_cloud_mode()) {
tablet = DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id));
} else {
tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id);
}
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
auto size = tablet->tablet_meta()->delete_bitmap().get_size();
Expand All @@ -115,23 +119,23 @@ Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpReque
return Status::OK();
}

Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
std::string* json_result) {
uint64_t tablet_id = 0;
// check & retrieve tablet_id from req if it contains
RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed");
if (tablet_id == 0) {
return Status::InternalError("check param failed: missing tablet_id");
}
TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
auto st = _engine.to_cloud().meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
if (!st.ok()) {
LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id
<< ", st=" << st.to_string();
return st;
}
auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(), std::move(tablet_meta));
st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
Expand All @@ -157,7 +161,7 @@ Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest*
return Status::OK();
}

void CloudDeleteBitmapAction::handle(HttpRequest* req) {
void DeleteBitmapAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
std::string json_result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

#include <string>

#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"

namespace doris {
Expand All @@ -35,13 +35,12 @@ class ExecEnv;
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };

/// This action is used for viewing the delete bitmap status
class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
class DeleteBitmapAction : public HttpHandlerWithAuth {
public:
CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
CloudStorageEngine& engine, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);
DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env, BaseStorageEngine& engine,
TPrivilegeHier::type hier, TPrivilegeType::type ptype);

~CloudDeleteBitmapAction() override = default;
~DeleteBitmapAction() override = default;

void handle(HttpRequest* req) override;

Expand All @@ -50,7 +49,7 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result);

private:
CloudStorageEngine& _engine;
BaseStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
Expand Down
64 changes: 64 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,33 @@ Status CloudCompactionMixin::update_delete_bitmap() {
return Status::OK();
}

void Compaction::agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap) {
// agg previously rowset old version delete bitmap
auto pre_max_version = _output_rowset->version().second;
new_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
for (auto& rowset : pre_rowsets) {
if (rowset->rowset_meta()->total_disk_size() == 0) {
continue;
}
for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version};
auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
continue;
}
new_delete_bitmap->set(end, *d);
}
}
}

Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
Expand Down Expand Up @@ -1103,6 +1130,13 @@ Status CompactionMixin::modify_rowsets() {
tablet()->delete_expired_stale_rowset();
}

if (config::enable_delete_bitmap_merge_on_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() && _input_rowsets.size() != 1) {
process_old_version_delete_bitmap();
}

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand All @@ -1121,6 +1155,36 @@ Status CompactionMixin::modify_rowsets() {
return Status::OK();
}

void CompactionMixin::process_old_version_delete_bitmap() {
std::vector<RowsetSharedPtr> pre_rowsets {};
for (const auto& it : tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
if (!pre_rowsets.empty()) {
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>
to_remove_vec;
DeleteBitmapPtr new_delete_bitmap = nullptr;
agg_and_remove_old_version_delete_bitmap(pre_rowsets, to_remove_vec, new_delete_bitmap);
if (!new_delete_bitmap->empty()) {
// store agg delete bitmap
Version version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
_tablet->tablet_meta()->delete_bitmap().set(it->first, it->second);
}
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
to_remove_vec);
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets", {
static_cast<Tablet*>(_tablet.get())->delete_expired_stale_rowset();
});
}
}
}

bool CompactionMixin::_check_if_includes_input_rowsets(
const RowsetIdUnorderedSet& commit_rowset_ids_set) const {
std::vector<RowsetId> commit_rowset_ids {};
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Compaction {

virtual Status update_delete_bitmap() = 0;

void agg_and_remove_old_version_delete_bitmap(
std::vector<RowsetSharedPtr>& pre_rowsets,
std::vector<std::tuple<int64_t, DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>&
to_remove_vec,
DeleteBitmapPtr& new_delete_bitmap);

// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;

Expand Down Expand Up @@ -162,6 +168,8 @@ class CompactionMixin : public Compaction {

Status do_compact_ordered_rowsets();

void process_old_version_delete_bitmap();

bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;

PendingRowsetGuard _pending_rs_guard;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << _context.tablet->tablet_id()
<< ", rowset_ids: " << _context.mow_context->rowset_ids.size()
<< ", cur max_version: " << _context.mow_context->max_version
<< ", transaction_id: " << _context.mow_context->txn_id
<< ", transaction_id: " << _context.mow_context->txn_id << ", delete_bitmap_count: "
<< _context.tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count()
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
return Status::OK();
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,13 @@ void Tablet::delete_expired_stale_rowset() {
auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();

// do delete operation
std::vector<std::string> version_to_delete;
auto to_delete_iter = stale_version_path_map.begin();
while (to_delete_iter != stale_version_path_map.end()) {
std::vector<TimestampedVersionSharedPtr>& to_delete_version =
to_delete_iter->second->timestamped_versions();
int64_t start_version = -1;
int64_t end_version = -1;
for (auto& timestampedVersion : to_delete_version) {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
Expand All @@ -841,10 +844,17 @@ void Tablet::delete_expired_stale_rowset() {
<< timestampedVersion->version().second
<< "] not find in stale rs version map";
}
if (start_version < 0) {
start_version = timestampedVersion->version().first;
}
end_version = timestampedVersion->version().second;
_delete_stale_rowset_by_version(timestampedVersion->version());
}
Version version(start_version, end_version);
version_to_delete.emplace_back(version.to_string());
to_delete_iter++;
}
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);

bool reconstructed = _reconstruct_version_tracker_if_necessary();

Expand Down
18 changes: 14 additions & 4 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,9 @@ uint64_t DeleteBitmap::cardinality() const {
std::shared_lock l(lock);
uint64_t res = 0;
for (auto entry : delete_bitmap) {
res += entry.second.cardinality();
if (std::get<1>(entry.first) != DeleteBitmap::INVALID_SEGMENT_ID) {
res += entry.second.cardinality();
}
}
return res;
}
Expand All @@ -1100,7 +1102,9 @@ uint64_t DeleteBitmap::get_size() const {
std::shared_lock l(lock);
uint64_t charge = 0;
for (auto& [k, v] : delete_bitmap) {
charge += v.getSizeInBytes();
if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
charge += v.getSizeInBytes();
}
}
return charge;
}
Expand Down Expand Up @@ -1219,7 +1223,7 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::
_stale_delete_bitmap.erase(version_str);
}
}
if (tablet_id == -1 || to_delete.empty()) {
if (tablet_id == -1 || to_delete.empty() || !config::is_cloud_mode()) {
return;
}
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Expand All @@ -1232,7 +1236,13 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector<std::

uint64_t DeleteBitmap::get_delete_bitmap_count() {
std::shared_lock l(lock);
return delete_bitmap.size();
uint64_t count = 0;
for (auto it = delete_bitmap.begin(); it != delete_bitmap.end(); it++) {
if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
count++;
}
}
return count;
}

// We cannot just copy the underlying memory to construct a string
Expand Down
Loading

0 comments on commit cebf648

Please sign in to comment.