diff --git a/src/common/container/cuckoo_map.cpp b/src/common/container/cuckoo_map.cpp index b7690754c83..4e59f1d242d 100644 --- a/src/common/container/cuckoo_map.cpp +++ b/src/common/container/cuckoo_map.cpp @@ -14,6 +14,7 @@ #include #include "common/container/cuckoo_map.h" +#include "common/container/lock_free_queue.h" #include "common/internal_types.h" #include "common/item_pointer.h" #include "common/logger.h" @@ -125,4 +126,7 @@ template class CuckooMap, std::shared_ptr>; // Used in StatementCacheManager template class CuckooMap; +// Used in TransactionLevelGCManager +template class CuckooMap>>; + } // namespace peloton diff --git a/src/gc/transaction_level_gc_manager.cpp b/src/gc/transaction_level_gc_manager.cpp index e6630aa6cf6..b58d293f95a 100644 --- a/src/gc/transaction_level_gc_manager.cpp +++ b/src/gc/transaction_level_gc_manager.cpp @@ -14,9 +14,13 @@ #include "brain/query_logger.h" #include "catalog/manager.h" +#include "catalog/catalog.h" #include "common/container_tuple.h" #include "concurrency/epoch_manager_factory.h" #include "concurrency/transaction_manager_factory.h" +#include "executor/executor_context.h" +#include "executor/logical_tile.h" +#include "executor/logical_tile_factory.h" #include "index/index.h" #include "settings/settings_manager.h" #include "storage/database.h" @@ -28,6 +32,91 @@ namespace peloton { namespace gc { +TransactionLevelGCManager::TransactionLevelGCManager( + const uint32_t &thread_count) + : gc_thread_count_(thread_count), + local_unlink_queues_(thread_count), + reclaim_maps_(thread_count) { + unlink_queues_.reserve(thread_count); + + for (uint32_t i = 0; i < gc_thread_count_; ++i) { + unlink_queues_.emplace_back( + std::make_shared>( + QUEUE_LENGTH)); + } + + recycle_queues_ = std::make_shared< + peloton::CuckooMap>>( + INITIAL_MAP_SIZE); +} + +void TransactionLevelGCManager::TransactionLevelGCManager::Reset() { + local_unlink_queues_.clear(); + local_unlink_queues_.resize(gc_thread_count_); + + reclaim_maps_.clear(); + reclaim_maps_.resize(gc_thread_count_); + + unlink_queues_.clear(); + unlink_queues_.reserve(gc_thread_count_); + + for (uint32_t i = 0; i < gc_thread_count_; ++i) { + unlink_queues_.emplace_back( + std::make_shared>( + QUEUE_LENGTH)); + } + + recycle_queues_ = std::make_shared< + peloton::CuckooMap>>( + INITIAL_MAP_SIZE); + + is_running_ = false; +} + +TransactionLevelGCManager &TransactionLevelGCManager::GetInstance( + const uint32_t &thread_count) { + static TransactionLevelGCManager gc_manager(thread_count); + return gc_manager; +} + +void TransactionLevelGCManager::StartGC( + std::vector> &gc_threads) { + LOG_TRACE("Starting GC"); + + is_running_ = true; + gc_threads.resize(gc_thread_count_); + + for (uint32_t i = 0; i < gc_thread_count_; ++i) { + gc_threads[i].reset( + new std::thread(&TransactionLevelGCManager::Running, this, i)); + } +} + +void TransactionLevelGCManager::StartGC() { + LOG_TRACE("Starting GC"); + is_running_ = true; + + for (uint32_t i = 0; i < gc_thread_count_; ++i) { + thread_pool.SubmitDedicatedTask(&TransactionLevelGCManager::Running, this, + std::move(i)); + } +}; + +void TransactionLevelGCManager::RegisterTable(const oid_t &table_id) { + // if table already registered, ignore it + if (recycle_queues_->Contains(table_id)) { + return; + } + // Insert a new entry for the table + auto recycle_queue = std::make_shared(QUEUE_LENGTH); + recycle_queues_->Insert(table_id, recycle_queue); +} + +void TransactionLevelGCManager::DeregisterTable(const oid_t &table_id) { + recycle_queues_->Erase(table_id); +} + +// Assumes that location is a valid ItemPointer bool TransactionLevelGCManager::ResetTuple(const ItemPointer &location) { auto storage_manager = storage::StorageManager::GetInstance(); auto tile_group = storage_manager->GetTileGroup(location.block).get(); @@ -50,8 +139,8 @@ bool TransactionLevelGCManager::ResetTuple(const ItemPointer &location) { return true; } -void TransactionLevelGCManager::Running(const int &thread_id) { - PELOTON_ASSERT(is_running_ == true); +void TransactionLevelGCManager::Running(const uint32_t &thread_id) { + PELOTON_ASSERT(is_running_); uint32_t backoff_shifts = 0; while (true) { auto &epoch_manager = concurrency::EpochManagerFactory::GetInstance(); @@ -64,12 +153,13 @@ void TransactionLevelGCManager::Running(const int &thread_id) { continue; } - int reclaimed_count = Reclaim(thread_id, expired_eid); int unlinked_count = Unlink(thread_id, expired_eid); + int reclaimed_count = Reclaim(thread_id, expired_eid); - if (is_running_ == false) { + if (!is_running_) { return; } + if (reclaimed_count == 0 && unlinked_count == 0) { // sleep at most 0.8192 s if (backoff_shifts < 13) { @@ -90,8 +180,8 @@ void TransactionLevelGCManager::RecycleTransaction( epoch_manager.ExitEpoch(txn->GetThreadId(), txn->GetEpochId()); - if (!txn->IsReadOnly() && \ - txn->GetResult() != ResultType::SUCCESS && txn->IsGCSetEmpty() != true) { + if (!txn->IsReadOnly() && txn->GetResult() != ResultType::SUCCESS && + !txn->IsGCSetEmpty()) { txn->SetEpochId(epoch_manager.GetNextEpochId()); } @@ -99,43 +189,43 @@ void TransactionLevelGCManager::RecycleTransaction( unlink_queues_[HashToThread(txn->GetThreadId())]->Enqueue(txn); } -int TransactionLevelGCManager::Unlink(const int &thread_id, - const eid_t &expired_eid) { - int tuple_counter = 0; +uint32_t TransactionLevelGCManager::Unlink(const uint32_t &thread_id, + const eid_t &expired_eid) { + uint32_t tuple_counter = 0; // check if any garbage can be unlinked from indexes. - // every time we garbage collect at most MAX_ATTEMPT_COUNT tuples. + // every time we garbage collect at most MAX_PROCESSED_COUNT tuples. std::vector garbages; // First iterate the local unlink queue local_unlink_queues_[thread_id].remove_if( - [&garbages, &tuple_counter, expired_eid, - this](concurrency::TransactionContext *txn_ctx) -> bool { - bool res = txn_ctx->GetEpochId() <= expired_eid; - if (res == true) { + [&garbages, &tuple_counter, expired_eid, this]( + concurrency::TransactionContext *txn_ctx) -> bool { + bool result = txn_ctx->GetEpochId() <= expired_eid; + if (result) { // unlink versions from version chain and indexes - UnlinkVersions(txn_ctx); + RemoveVersionsFromIndexes(txn_ctx); // Add to the garbage map garbages.push_back(txn_ctx); tuple_counter++; } - return res; + return result; }); - for (size_t i = 0; i < MAX_ATTEMPT_COUNT; ++i) { + for (size_t i = 0; i < MAX_PROCESSED_COUNT; ++i) { concurrency::TransactionContext *txn_ctx; // if there's no more tuples in the queue, then break. - if (unlink_queues_[thread_id]->Dequeue(txn_ctx) == false) { + if (!unlink_queues_[thread_id]->Dequeue(txn_ctx)) { break; } // Log the query into query_history_catalog if (settings::SettingsManager::GetBool(settings::SettingId::brain)) { std::vector query_strings = txn_ctx->GetQueryStrings(); - if (query_strings.size() != 0) { + if (!query_strings.empty()) { uint64_t timestamp = txn_ctx->GetTimestamp(); auto &pool = threadpool::MonoQueuePool::GetBrainInstance(); - for (auto query_string : query_strings) { + for (const auto &query_string : query_strings) { pool.SubmitTask([query_string, timestamp] { brain::QueryLogger::LogQuery(query_string, timestamp); }); @@ -152,13 +242,12 @@ int TransactionLevelGCManager::Unlink(const int &thread_id, } if (txn_ctx->GetEpochId() <= expired_eid) { - // as the global expired epoch id is no less than the garbage version's - // epoch id, it means that no active transactions can read the version. As - // a result, we can delete all the tuples from the indexes to which it - // belongs. + // since this txn's epochId is <= the global expired epoch id + // no active transactions can read the version. Asa result, + // we can delete remove all of its garbage tuples from the indexes // unlink versions from version chain and indexes - UnlinkVersions(txn_ctx); + RemoveVersionsFromIndexes(txn_ctx); // Add to the garbage map garbages.push_back(txn_ctx); tuple_counter++; @@ -169,7 +258,7 @@ int TransactionLevelGCManager::Unlink(const int &thread_id, } } // end for - // once the current epoch id is expired, then we know all the transactions + // once the current epoch is expired, we know all the transactions // that are active at this time point will be committed/aborted. // at that time point, it is safe to recycle the version. eid_t safe_expired_eid = @@ -183,9 +272,9 @@ int TransactionLevelGCManager::Unlink(const int &thread_id, } // executed by a single thread. so no synchronization is required. -int TransactionLevelGCManager::Reclaim(const int &thread_id, - const eid_t &expired_eid) { - int gc_counter = 0; +uint32_t TransactionLevelGCManager::Reclaim(const uint32_t &thread_id, + const eid_t &expired_eid) { + uint32_t gc_counter = 0; // we delete garbage in the free list auto garbage_ctx_entry = reclaim_maps_[thread_id].begin(); @@ -193,10 +282,11 @@ int TransactionLevelGCManager::Reclaim(const int &thread_id, const eid_t garbage_eid = garbage_ctx_entry->first; auto txn_ctx = garbage_ctx_entry->second; - // if the global expired epoch id is no less than the garbage version's - // epoch id, then recycle the garbage version + // if the the garbage version's epoch id is expired + // then recycle the garbage version if (garbage_eid <= expired_eid) { - AddToRecycleMap(txn_ctx); + RecycleTupleSlots(txn_ctx); + RemoveObjectLevelGarbage(txn_ctx); // Remove from the original map garbage_ctx_entry = reclaim_maps_[thread_id].erase(garbage_ctx_entry); @@ -210,48 +300,69 @@ int TransactionLevelGCManager::Reclaim(const int &thread_id, return gc_counter; } -// Multiple GC thread share the same recycle map -void TransactionLevelGCManager::AddToRecycleMap( +// Multiple GC threads share the same recycle map +void TransactionLevelGCManager::RecycleTupleSlots( concurrency::TransactionContext *txn_ctx) { + // for each tile group that this txn created garbage tuples in for (auto &entry : *(txn_ctx->GetGCSetPtr().get())) { - auto storage_manager = storage::StorageManager::GetInstance(); - auto tile_group = storage_manager->GetTileGroup(entry.first); + auto tile_group_id = entry.first; - // During the resetting, a table may be deconstructed because of the DROP - // TABLE request - if (tile_group == nullptr) { - delete txn_ctx; - return; + // recycle each garbage tuple in the tile group + for (auto &element : entry.second) { + auto offset = element.first; + RecycleTupleSlot(ItemPointer(tile_group_id, offset)); } + } +} - PELOTON_ASSERT(tile_group != nullptr); +void TransactionLevelGCManager::RecycleTupleSlot(const ItemPointer &location) { + auto tile_group_id = location.block; + auto tile_group = storage::StorageManager::GetInstance()->GetTileGroup(tile_group_id); - storage::DataTable *table = - dynamic_cast(tile_group->GetAbstractTable()); - PELOTON_ASSERT(table != nullptr); + // During the resetting, + // a table may be deconstructed because of a DROP TABLE request + if (tile_group == nullptr) { + return; + } - oid_t table_id = table->GetOid(); - auto tile_group_header = tile_group->GetHeader(); - PELOTON_ASSERT(tile_group_header != nullptr); - bool immutable = tile_group_header->GetImmutability(); + oid_t table_id = tile_group->GetTableId(); + auto table = storage::StorageManager::GetInstance()->GetTableWithOid( + tile_group->GetDatabaseId(), table_id); + if (table == nullptr) { + // Guard against the table being dropped out from under us + return; + } - for (auto &element : entry.second) { - // as this transaction has been committed, we should reclaim older - // versions. - ItemPointer location(entry.first, element.first); + auto recycle_queue = GetTableRecycleQueue(table_id); + if (recycle_queue == nullptr) { + return; + } - // If the tuple being reset no longer exists, just skip it - if (ResetTuple(location) == false) { - continue; - } - // if immutable is false and the entry for table_id exists. - if ((!immutable) && - recycle_queue_map_.find(table_id) != recycle_queue_map_.end()) { - recycle_queue_map_[table_id]->Enqueue(location); - } - } + // If the tuple being reset no longer exists, just skip it + if (!ResetTuple(location)) { + return; + } + + auto tile_group_header = tile_group->GetHeader(); + PELOTON_ASSERT(tile_group_header != nullptr); + if (tile_group_header == nullptr) { + return; + } + + bool immutable = tile_group_header->GetImmutability(); + + if (!immutable) { + // this slot should be recycled, add it to the recycle stack + recycle_queue->Enqueue(location); } + LOG_TRACE("Recycled tuple slot count for tile_group %u is %zu", tile_group_id, + tile_group_header->GetNumRecycled()); +} + +void TransactionLevelGCManager::RemoveObjectLevelGarbage( + concurrency::TransactionContext *txn_ctx) { + // Perform object-level GC (e.g. dropped tables, indexes, databases) auto storage_manager = storage::StorageManager::GetInstance(); for (auto &entry : *(txn_ctx->GetGCObjectSetPtr().get())) { oid_t database_oid = std::get<0>(entry); @@ -262,35 +373,37 @@ void TransactionLevelGCManager::AddToRecycleMap( PELOTON_ASSERT(database != nullptr); if (table_oid == INVALID_OID) { storage_manager->RemoveDatabaseFromStorageManager(database_oid); + LOG_TRACE("GCing database %u", database_oid); continue; } auto table = database->GetTableWithOid(table_oid); PELOTON_ASSERT(table != nullptr); if (index_oid == INVALID_OID) { database->DropTableWithOid(table_oid); - LOG_DEBUG("GCing table %u", table_oid); + LOG_TRACE("GCing table %u", table_oid); continue; } auto index = table->GetIndexWithOid(index_oid); PELOTON_ASSERT(index != nullptr); table->DropIndexWithOid(index_oid); - LOG_DEBUG("GCing index %u", index_oid); + LOG_TRACE("GCing index %u", index_oid); } delete txn_ctx; } -// this function returns a free tuple slot, if one exists -// called by data_table. -ItemPointer TransactionLevelGCManager::ReturnFreeSlot(const oid_t &table_id) { - // for catalog tables, we directly return invalid item pointer. - if (recycle_queue_map_.find(table_id) == recycle_queue_map_.end()) { +// looks for a free tuple slot that can now be reused +// called by data_table, which passes in a pointer to itself +ItemPointer TransactionLevelGCManager::GetRecycledTupleSlot( + storage::DataTable *table) { + + auto recycle_queue = GetTableRecycleQueue(table->GetOid()); + + if (recycle_queue == nullptr) { return INVALID_ITEMPOINTER; } - ItemPointer location; - PELOTON_ASSERT(recycle_queue_map_.find(table_id) != recycle_queue_map_.end()); - auto recycle_queue = recycle_queue_map_[table_id]; + ItemPointer location; if (recycle_queue->Dequeue(location) == true) { LOG_TRACE("Reuse tuple(%u, %u) in table %u", location.block, location.offset, table_id); @@ -299,40 +412,46 @@ ItemPointer TransactionLevelGCManager::ReturnFreeSlot(const oid_t &table_id) { return INVALID_ITEMPOINTER; } -void TransactionLevelGCManager::ClearGarbage(int thread_id) { +void TransactionLevelGCManager::ClearGarbage(const uint32_t &thread_id) { + while (!unlink_queues_[thread_id]->IsEmpty() || - !local_unlink_queues_[thread_id].empty()) { + !local_unlink_queues_[thread_id].empty()) { Unlink(thread_id, MAX_CID); } - while (reclaim_maps_[thread_id].size() != 0) { + while (!reclaim_maps_[thread_id].empty()) { Reclaim(thread_id, MAX_CID); } - - return; } void TransactionLevelGCManager::StopGC() { LOG_TRACE("Stopping GC"); this->is_running_ = false; // clear the garbage in each GC thread - for (int thread_id = 0; thread_id < gc_thread_count_; ++thread_id) { + for (uint32_t thread_id = 0; thread_id < gc_thread_count_; ++thread_id) { ClearGarbage(thread_id); } } -void TransactionLevelGCManager::UnlinkVersions( +void TransactionLevelGCManager::RemoveVersionsFromIndexes( concurrency::TransactionContext *txn_ctx) { + // for each tile group that this txn created garbage tuples in for (auto entry : *(txn_ctx->GetGCSetPtr().get())) { - for (auto &element : entry.second) { - UnlinkVersion(ItemPointer(entry.first, element.first), element.second); + auto tile_group_id = entry.first; + auto garbage_tuples = entry.second; + + // for each garbage tuple in the tile group + for (auto &element : garbage_tuples) { + auto offset = element.first; + auto gc_type = element.second; + RemoveVersionFromIndexes(ItemPointer(tile_group_id, offset), gc_type); } } } -// delete a tuple from all its indexes it belongs to. -void TransactionLevelGCManager::UnlinkVersion(const ItemPointer location, - GCVersionType type) { +// unlink garbage tuples and update indexes appropriately (according to gc type) +void TransactionLevelGCManager::RemoveVersionFromIndexes( + const ItemPointer &location, const GCVersionType &type) { // get indirection from the indirection array. auto tile_group = storage::StorageManager::GetInstance()->GetTileGroup(location.block); @@ -343,9 +462,7 @@ void TransactionLevelGCManager::UnlinkVersion(const ItemPointer location, return; } - auto tile_group_header = - storage::StorageManager::GetInstance()->GetTileGroup(location.block)->GetHeader(); - + auto tile_group_header = tile_group->GetHeader(); ItemPointer *indirection = tile_group_header->GetIndirection(location.offset); // do nothing if indirection is null @@ -356,40 +473,107 @@ void TransactionLevelGCManager::UnlinkVersion(const ItemPointer location, ContainerTuple current_tuple(tile_group.get(), location.offset); - storage::DataTable *table = + auto table = dynamic_cast(tile_group->GetAbstractTable()); - PELOTON_ASSERT(table != nullptr); + if (table == nullptr) { + // guard against table being GC'd by another GC thread + return; + } - // NOTE: for now, we only consider unlinking tuple versions from primary - // indexes. if (type == GCVersionType::COMMIT_UPDATE) { // the gc'd version is an old version. - // this version needs to be reclaimed by the GC. - // if the version differs from the previous one in some columns where - // secondary indexes are built on, then we need to unlink the previous - // version from the secondary index. - } else if (type == GCVersionType::COMMIT_DELETE) { - // the gc'd version is an old version. - // need to recycle this version as well as its newer (empty) version. - // we also need to delete the tuple from the primary and secondary - // indexes. + // this old version needs to be reclaimed by the GC. + // if this old version differs from the newest version in some columns that + // secondary indexes are built on, then we need to delete this old version + // from those secondary indexes + ContainerTuple older_tuple(tile_group.get(), + location.offset); + + ItemPointer newer_location = + tile_group_header->GetPrevItemPointer(location.offset); + + if (newer_location == INVALID_ITEMPOINTER) { + return; + } + + auto newer_tile_group = + storage::StorageManager::GetInstance()->GetTileGroup(newer_location.block); + ContainerTuple newer_tuple(newer_tile_group.get(), + newer_location.offset); + // remove the older version from all the indexes + // where it no longer matches the newer version + for (uint32_t idx = 0; idx < table->GetIndexCount(); ++idx) { + auto index = table->GetIndex(idx); + if (index == nullptr) continue; + auto index_schema = index->GetKeySchema(); + auto indexed_columns = index_schema->GetIndexedColumns(); + + // build keys + std::unique_ptr older_key( + new storage::Tuple(index_schema, true)); + older_key->SetFromTuple(&older_tuple, indexed_columns, index->GetPool()); + std::unique_ptr newer_key( + new storage::Tuple(index_schema, true)); + newer_key->SetFromTuple(&newer_tuple, indexed_columns, index->GetPool()); + + // if older_key is different, delete it from index + if (newer_key->Compare(*older_key) != 0) { + index->DeleteEntry(older_key.get(), indirection); + } + } } else if (type == GCVersionType::ABORT_UPDATE) { // the gc'd version is a newly created version. // if the version differs from the previous one in some columns where // secondary indexes are built on, then we need to unlink this version // from the secondary index. + ContainerTuple newer_tuple(tile_group.get(), + location.offset); + + ItemPointer older_location = + tile_group_header->GetNextItemPointer(location.offset); - } else if (type == GCVersionType::ABORT_DELETE) { + if (older_location == INVALID_ITEMPOINTER) { + return; + } + + auto older_tile_group = + storage::StorageManager::GetInstance()->GetTileGroup(older_location.block); + ContainerTuple older_tuple(older_tile_group.get(), + older_location.offset); + // remove the newer version from all the indexes + // where it no longer matches the older version + for (uint32_t idx = 0; idx < table->GetIndexCount(); ++idx) { + auto index = table->GetIndex(idx); + if (index == nullptr) continue; + auto index_schema = index->GetKeySchema(); + auto indexed_columns = index_schema->GetIndexedColumns(); + + // build keys + std::unique_ptr older_key( + new storage::Tuple(index_schema, true)); + older_key->SetFromTuple(&older_tuple, indexed_columns, index->GetPool()); + std::unique_ptr newer_key( + new storage::Tuple(index_schema, true)); + newer_key->SetFromTuple(&newer_tuple, indexed_columns, index->GetPool()); + + // if newer_key is different, delete it from index + if (newer_key->Compare(*older_key) != 0) { + index->DeleteEntry(newer_key.get(), indirection); + } + } + + } else if (type == GCVersionType::TOMBSTONE) { // the gc'd version is a newly created empty version. // need to recycle this version. // no index manipulation needs to be made. } else { PELOTON_ASSERT(type == GCVersionType::ABORT_INSERT || - type == GCVersionType::COMMIT_INS_DEL || - type == GCVersionType::ABORT_INS_DEL); + type == GCVersionType::COMMIT_INS_DEL || + type == GCVersionType::ABORT_INS_DEL || + type == GCVersionType::COMMIT_DELETE); // attempt to unlink the version from all the indexes. - for (size_t idx = 0; idx < table->GetIndexCount(); ++idx) { + for (uint32_t idx = 0; idx < table->GetIndexCount(); ++idx) { auto index = table->GetIndex(idx); if (index == nullptr) continue; auto index_schema = index->GetKeySchema(); @@ -406,5 +590,20 @@ void TransactionLevelGCManager::UnlinkVersion(const ItemPointer location, } } +inline unsigned int TransactionLevelGCManager::HashToThread( + const size_t &thread_id) { + return (unsigned int)thread_id % gc_thread_count_; +} + +std::shared_ptr TransactionLevelGCManager::GetTableRecycleQueue( + const oid_t &table_id) const { + std::shared_ptr recycle_queue; + if (recycle_queues_->Find(table_id, recycle_queue)) { + return recycle_queue; + } else { + return nullptr; + } +} + } // namespace gc -} // namespace peloton +} // namespace peloton \ No newline at end of file diff --git a/src/include/gc/transaction_level_gc_manager.h b/src/include/gc/transaction_level_gc_manager.h index 89704685d39..8fa0ba4398f 100644 --- a/src/include/gc/transaction_level_gc_manager.h +++ b/src/include/gc/transaction_level_gc_manager.h @@ -6,7 +6,7 @@ // // Identification: src/include/gc/transaction_level_gc_manager.h // -// Copyright (c) 2015-16, Carnegie Mellon University Database Group +// Copyright (c) 2015-18, Carnegie Mellon University Database Group // //===----------------------------------------------------------------------===// @@ -18,150 +18,199 @@ #include #include +#include "common/container/cuckoo_map.h" +#include "common/container/lock_free_queue.h" #include "common/init.h" #include "common/logger.h" #include "common/thread_pool.h" +#include "common/internal_types.h" #include "concurrency/transaction_context.h" #include "gc/gc_manager.h" -#include "common/internal_types.h" - -#include "common/container/lock_free_queue.h" namespace peloton { + namespace gc { -#define MAX_QUEUE_LENGTH 100000 -#define MAX_ATTEMPT_COUNT 100000 +using RecycleQueue = peloton::LockFreeQueue; + +static constexpr size_t QUEUE_LENGTH = 100000; +static constexpr size_t INITIAL_MAP_SIZE = 256; +static constexpr size_t MAX_PROCESSED_COUNT = 100000; class TransactionLevelGCManager : public GCManager { public: - TransactionLevelGCManager(const int thread_count) - : gc_thread_count_(thread_count), reclaim_maps_(thread_count) { - unlink_queues_.reserve(thread_count); - for (int i = 0; i < gc_thread_count_; ++i) { - std::shared_ptr> - unlink_queue(new LockFreeQueue( - MAX_QUEUE_LENGTH)); - unlink_queues_.push_back(unlink_queue); - local_unlink_queues_.emplace_back(); - } - } + /** + * @brief TransactionLevelGCManager should be created with GetInstance() + */ + TransactionLevelGCManager() = delete; - virtual ~TransactionLevelGCManager() {} + /** + * @brief Resets member variables and data structures to defaults. + * + * Intended for testing purposes only. + * + * @warning This leaks tuple slots, txns, etc. if StopGC() not called first! + */ + virtual void Reset() override; - // this function cleans up all the member variables in the class object. - virtual void Reset() override { - unlink_queues_.clear(); - local_unlink_queues_.clear(); + /** + * + * @param[in] thread_count Number of Garbage Collector threads + * @return Singleton instance of the TransactionLevelGCManager + */ + static TransactionLevelGCManager &GetInstance( + const uint32_t &thread_count = 1); - unlink_queues_.reserve(gc_thread_count_); - for (int i = 0; i < gc_thread_count_; ++i) { - std::shared_ptr> - unlink_queue(new LockFreeQueue( - MAX_QUEUE_LENGTH)); - unlink_queues_.push_back(unlink_queue); - local_unlink_queues_.emplace_back(); - } + virtual void StartGC( + std::vector> &gc_threads) override; - reclaim_maps_.clear(); - reclaim_maps_.resize(gc_thread_count_); - recycle_queue_map_.clear(); + /** + * @brief Launches GC threads + */ + virtual void StartGC() override; - is_running_ = false; - } + /** + * @brief Clears garbage for each GC thread and then ends the threads + */ + virtual void StopGC() override; - static TransactionLevelGCManager &GetInstance(const int thread_count = 1) { - static TransactionLevelGCManager gc_manager(thread_count); - return gc_manager; - } + /** + * @brief Registers the provided table with the GC to recycle its + * tuple slots + * @param[in] table_id Global oid of the table to start recycling + * slots for + */ + virtual void RegisterTable(const oid_t &table_id) override; - virtual void StartGC( - std::vector> &gc_threads) override { - LOG_TRACE("Starting GC"); - this->is_running_ = true; - gc_threads.resize(gc_thread_count_); - for (int i = 0; i < gc_thread_count_; ++i) { - gc_threads[i].reset( - new std::thread(&TransactionLevelGCManager::Running, this, i)); - } - } - - virtual void StartGC() override { - LOG_TRACE("Starting GC"); - this->is_running_ = true; - for (int i = 0; i < gc_thread_count_; ++i) { - thread_pool.SubmitDedicatedTask(&TransactionLevelGCManager::Running, this, - std::move(i)); - } - }; - - /** - * @brief This stops the Garbage Collector when Peloton shuts down - * - * @return No return value. + /** + * @brief Deregisters the provided table with the GC to recycle its + * tuple slots + * @param[in] table_id Global oid of the table to stop recycling + * slots for */ - virtual void StopGC() override; + virtual void DeregisterTable(const oid_t &table_id) override; + /** + * @brief Passes a transaction's context to the GC for freeing and + * possible recycling + * @param[id] txn TransactionContext pointer for the GC to process. + * @warning txn will be freed by the GC, so do not dereference it + * after calling this function with txn + */ virtual void RecycleTransaction( concurrency::TransactionContext *txn) override; - virtual ItemPointer ReturnFreeSlot(const oid_t &table_id) override; + /** + * @brief Attempt to get a recycled ItemPointer for this table from the GC + * @param[in] table Pointer of the table to return a recycled ItemPointer for + * @return ItemPointer to a recycled tuple slot on success, + * INVALID_ITEMPOINTER + * otherwise + */ + virtual ItemPointer GetRecycledTupleSlot(storage::DataTable *table) override; - virtual void RegisterTable(const oid_t &table_id) override { - // Insert a new entry for the table - if (recycle_queue_map_.find(table_id) == recycle_queue_map_.end()) { - std::shared_ptr> recycle_queue( - new LockFreeQueue(MAX_QUEUE_LENGTH)); - recycle_queue_map_[table_id] = recycle_queue; - } - } + /** + * @brief Recycle the provided tuple slot. May trigger TileGroup compaction or + * TileGroup freeing if enabled + * @param[id] location ItemPointer of the tuple slot to be recycled + */ + virtual void RecycleTupleSlot(const ItemPointer &location) override; - virtual void DeregisterTable(const oid_t &table_id) override { - // Remove dropped tables - if (recycle_queue_map_.find(table_id) != recycle_queue_map_.end()) { - recycle_queue_map_.erase(table_id); - } - } + /** + * + * @return Number of tables currently registered with the GC for recycling + */ + virtual size_t GetTableCount() override { return recycle_queues_->GetSize(); } - virtual size_t GetTableCount() override { return recycle_queue_map_.size(); } + /** + * @brief Process unlink queue for provided thread + * @param[id] thread_id Zero-indexed thread id to access unlink queue + * @param[id] expired_eid Expired epoch from the EpochManager + * @return Number of processed tuples + */ + uint32_t Unlink(const uint32_t &thread_id, const eid_t &expired_eid); - int Unlink(const int &thread_id, const eid_t &expired_eid); + /** + * @brief Process reclaim queue for provided thread + * @param[id] thread_id Zero-indexed thread id to access reclaim queue + * @param[id] expired_eid Expired epoch from the EpochManager + * @return Number of processed objects + */ + uint32_t Reclaim(const uint32_t &thread_id, const eid_t &expired_eid); - int Reclaim(const int &thread_id, const eid_t &expired_eid); + /** + * @brief Unlink and reclaim the objects currently in queues + * + * Meant to be used primarily internally by GC and in tests, not + * by outside classes + * + * @param[in] thread_id + */ + void ClearGarbage(const uint32_t &thread_id); private: - inline unsigned int HashToThread(const size_t &thread_id) { - return (unsigned int)thread_id % gc_thread_count_; - } + TransactionLevelGCManager(const uint32_t &thread_count); + + virtual ~TransactionLevelGCManager() {} + + inline unsigned int HashToThread(const size_t &thread_id); /** - * @brief Unlink and reclaim the tuples remained in a garbage collection - * thread when the Garbage Collector stops. - * - * @return No return value. + * @brief Primary function for GC threads: wakes up, runs GC, has + * exponential backoff if queues are empty + * @param[id] thread_id Zero-indexed thread id for queue access */ - void ClearGarbage(int thread_id); + void Running(const uint32_t &thread_id); - void Running(const int &thread_id); + /** + * @brief Recycles all of the tuple slots in transaction context's GCSet + * @param[in] txn_ctx TransactionConext pointer containing GCSet to be + * processed + */ + void RecycleTupleSlots(concurrency::TransactionContext *txn_ctx); - void AddToRecycleMap(concurrency::TransactionContext *txn_ctx); + /** + * @brief Recycles all of the objects in transaction context's GCObjectSet + * @param[in] txn_ctx TransactionConext pointer containing GCObjectSet + * to be processed + */ + void RemoveObjectLevelGarbage(concurrency::TransactionContext *txn_ctx); + /** + * @brief Resets a tuple slot's version chain info and varlen pool + * @return True on success, false if TileGroup no longer exists + */ bool ResetTuple(const ItemPointer &); - // this function iterates the gc context and unlinks every version - // from the indexes. - // this function will call the UnlinkVersion() function. - void UnlinkVersions(concurrency::TransactionContext *txn_ctx); + /** + * @brief Helper function to easily look up a table's RecycleStack + * @param[id] table_id Global oid of the table + * @return Smart pointer to the RecycleStack for the provided table. + * May be nullptr if the table is not registered with the GC + */ + std::shared_ptr GetTableRecycleQueue( + const oid_t &table_id) const; + + /** + * @brief Unlinks all tuples in GCSet from indexes. + * @param[in] txn_ctx TransactionConext pointer containing GCSet to be + * processed + */ + void RemoveVersionsFromIndexes(concurrency::TransactionContext *txn_ctx); // this function unlinks a specified version from the index. - void UnlinkVersion(const ItemPointer location, const GCVersionType type); + /** + * @brief Unlinks provided tuple from indexes + * @param[in] location ItemPointer to garbage tuple to be processed + * @param[in] type GCVersionType for the provided garbage tuple + */ + void RemoveVersionFromIndexes(const ItemPointer &location, + const GCVersionType &type); - private: //===--------------------------------------------------------------------===// // Data members //===--------------------------------------------------------------------===// - - int gc_thread_count_; + uint32_t gc_thread_count_; // queues for to-be-unlinked tuples. // # unlink_queues == # gc_threads @@ -182,10 +231,9 @@ class TransactionLevelGCManager : public GCManager { reclaim_maps_; // queues for to-be-reused tuples. - // # recycle_queue_maps == # tables - std::unordered_map>> - recycle_queue_map_; + // oid_t here is global DataTable oid + std::shared_ptr>> + recycle_queues_; }; } } // namespace peloton diff --git a/src/storage/data_table.cpp b/src/storage/data_table.cpp index 85240e196e1..498061c4551 100644 --- a/src/storage/data_table.cpp +++ b/src/storage/data_table.cpp @@ -210,7 +210,7 @@ ItemPointer DataTable::GetEmptyTupleSlot(const storage::Tuple *tuple) { //=============== garbage collection================== // check if there are recycled tuple slots auto &gc_manager = gc::GCManagerFactory::GetInstance(); - auto free_item_pointer = gc_manager.ReturnFreeSlot(this->table_oid); + auto free_item_pointer = gc_manager.GetRecycledTupleSlot(this); if (free_item_pointer.IsNull() == false) { // when inserting a tuple if (tuple != nullptr) { @@ -319,6 +319,12 @@ ItemPointer DataTable::InsertTuple(const storage::Tuple *tuple, auto result = InsertTuple(tuple, location, transaction, index_entry_ptr, check_fk); if (result == false) { + // Insertion failed due to some constraint (indexes, etc.) but tuple + // is in the table already, need to give the ItemPointer back to the + // GCManager + auto &gc_manager = gc::GCManagerFactory::GetInstance(); + gc_manager.RecycleTupleSlot(location); + return INVALID_ITEMPOINTER; } return location;