Skip to content

Commit

Permalink
Notimplemented handling (#2108)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
#203 in enterprise

#### What does this implement or fix?
Add a way to differentiate between a NotImplemented error and a regular
StorageException.
This way we can properly handle S3 storages that haven't implemented the
write_if_none operation.
Also updated the ReliableStorageLock to use the new behaviour.

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->
  • Loading branch information
G-D-Petrov authored Jan 14, 2025
1 parent c5554d0 commit e3b5f53
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 38 deletions.
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/mock/s3_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& s3_object

const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false));
const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false));
const auto not_implemented_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::UNKNOWN, "NotImplemented", "A header you provided implies functionality that is not implemented", false));

S3Result<std::monostate> MockS3Client::head_object(
const std::string& s3_object_name,
Expand Down Expand Up @@ -91,6 +92,11 @@ S3Result<std::monostate> MockS3Client::put_object(
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE);

if (maybe_error.has_value() && header == PutHeader::IF_NONE_MATCH) {
return {not_implemented_error};
}

if (maybe_error.has_value()) {
return {*maybe_error};
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ using Range = folly::Range<It>;
error_message_suffix));
}

if(err.GetExceptionName().find("NotImplemented") != std::string::npos) {
raise<ErrorCode::E_NOT_IMPLEMENTED_BY_STORAGE>(fmt::format("Operation is not implemented for storage: {}", error_message_suffix));
}

if (err.ShouldRetry()) {
raise<ErrorCode::E_S3_RETRYABLE>(fmt::format("Retry-able error: {}",
error_message_suffix));
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class NfsBackedStorage final : public Storage {
void do_write(KeySegmentPair&& key_seg) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
storage::raise<ErrorCode::E_NOT_IMPLEMENTED_BY_STORAGE>("do_write_if_none not implemented for NFS backed storage");
};

void do_update(KeySegmentPair&& key_seg, UpdateOpts opts) final;
Expand All @@ -58,7 +58,7 @@ class NfsBackedStorage final : public Storage {
}

bool do_supports_atomic_writes() const final {
return false;
return ConfigsMap::instance()->get_int("NfsStorage.SupportsAtomicWrites", 0) == 1;
}

bool do_fast_delete() final {
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \
ERROR_CODE(5021, E_S3_RETRYABLE) \
ERROR_CODE(5022, E_ATOMIC_OPERATION_FAILED) \
ERROR_CODE(5023, E_NOT_IMPLEMENTED_BY_STORAGE) \
ERROR_CODE(5030, E_UNEXPECTED_AZURE_ERROR) \
ERROR_CODE(5050, E_MONGO_BULK_OP_NO_REPLY) \
ERROR_CODE(5051, E_UNEXPECTED_MONGO_ERROR) \
Expand Down Expand Up @@ -181,6 +182,7 @@ using UnsortedDataException = ArcticSpecificException<ErrorCode::E_UNSORTED_DATA
using UserInputException = ArcticCategorizedException<ErrorCategory::USER_INPUT>;
using CompatibilityException = ArcticCategorizedException<ErrorCategory::COMPATIBILITY>;
using CodecException = ArcticCategorizedException<ErrorCategory::CODEC>;
using NotImplementedException = ArcticSpecificException<ErrorCode::E_NOT_IMPLEMENTED_BY_STORAGE>;

template<ErrorCode error_code>
[[noreturn]] void throw_error(const std::string& msg) {
Expand Down Expand Up @@ -242,6 +244,11 @@ template<>
throw ArcticSpecificException<ErrorCode::E_UNSORTED_DATA>(msg);
}

template<>
[[noreturn]] inline void throw_error<ErrorCode::E_NOT_IMPLEMENTED_BY_STORAGE>(const std::string& msg) {
throw ArcticSpecificException<ErrorCode::E_NOT_IMPLEMENTED_BY_STORAGE>(msg);
}

}

namespace fmt {
Expand Down
65 changes: 49 additions & 16 deletions cpp/arcticdb/util/reliable_storage_lock-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#pragma once

#include <arcticdb/util/reliable_storage_lock.hpp>

Expand Down Expand Up @@ -107,18 +108,34 @@ void ReliableStorageLock<ClockType>::clear_old_locks(const std::vector<AcquiredL
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_take_lock() const {
ReliableLockResult ReliableStorageLock<ClockType>::try_take_lock() const {
auto [existing_locks, latest] = get_all_locks();
if (latest.has_value()) {
auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::ATOMIC_LOCK});
if (expires > ClockType::nanos_since_epoch()) {
// An unexpired lock exists
return std::nullopt;
return LockInUse{};
}
}
return try_take_next_id(existing_locks, latest);
}

inline std::optional<AcquiredLockId> parse_reliable_storage_lock_result(const ReliableLockResult& result) {
return util::variant_match(
result,
[&](const AcquiredLock &acquired_lock) -> std::optional<AcquiredLockId> {
return acquired_lock;
},
[&](const LockInUse &) -> std::optional<AcquiredLockId> {
return std::nullopt;
},
[&](const UnsupportedOperation &err) -> std::optional<AcquiredLockId> {
log::lock().error("Unsupported operation while taking lock: {}", err);
throw LostReliableLock();
}
);
}

template<class ClockType>
AcquiredLockId ReliableStorageLock<ClockType>::retry_until_take_lock() const {
// We don't use the ExponentialBackoff because we want to be able to wait indefinitely
Expand All @@ -132,24 +149,25 @@ AcquiredLockId ReliableStorageLock<ClockType>::retry_until_take_lock() const {
return current_wait * factor;
};

auto acquired_lock = try_take_lock();
std::optional<AcquiredLockId> acquired_lock = parse_reliable_storage_lock_result(try_take_lock());

while (!acquired_lock.has_value()) {
std::this_thread::sleep_for(jittered_wait());
current_wait = std::min(current_wait * 2, max_wait);
acquired_lock = try_take_lock();
acquired_lock = parse_reliable_storage_lock_result(try_take_lock());
}
return acquired_lock.value();
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_extend_lock(AcquiredLockId acquired_lock) const {
ReliableLockResult ReliableStorageLock<ClockType>::try_extend_lock(AcquiredLockId acquired_lock) const {
auto [existing_locks, latest] = get_all_locks();
util::check(latest.has_value() && latest.value() >= acquired_lock,
"We are trying to extend a newer lock_id than the existing one in storage. Extend lock_id: {}",
acquired_lock);
if (latest.value() != acquired_lock) {
// We have lost the lock while holding it (most likely due to timeout).
return std::nullopt;
return LockInUse{};
}
return try_take_next_id(existing_locks, latest);
}
Expand All @@ -170,23 +188,27 @@ void ReliableStorageLock<ClockType>::free_lock(AcquiredLockId acquired_lock) con
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const {
ReliableLockResult ReliableStorageLock<ClockType>::try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const {
AcquiredLockId lock_id = get_next_id(latest);
auto lock_stream_id = get_stream_id(lock_id);
auto expiration = ClockType::nanos_since_epoch() + timeout_;
try {
store_->write_if_none_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration));
} catch (const NotImplementedException& e) {
auto err = fmt::format("Failed to acquire lock (storage does not support atomic writes): {}", e.what());
log::lock().debug(err);
return UnsupportedOperation{err};
} catch (const StorageException& e) {
// There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException.
// Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error.
// If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate
// If error persists we'll appropriately raise in the next attempt to LIST/GET the existing lock and propagate
// the transient error.
log::lock().debug("Failed to acquire lock (likely someone acquired it before us): {}", e.what());
return std::nullopt;
return LockInUse{};
}
// We clear old locks only after aquiring the lock to avoid duplicating the deletion work
// We clear old locks only after acquiring the lock to avoid duplicating the deletion work
clear_old_locks(existing_locks);
return lock_id;
return AcquiredLock{lock_id};
}

inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, std::optional<folly::Func>&& on_lost_lock) :
Expand All @@ -198,11 +220,22 @@ inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageL
extend_lock_heartbeat_.addFunction(
[that=this](){
if (that->acquired_lock_.has_value()) {
that->acquired_lock_ = that->lock_.try_extend_lock(that->acquired_lock_.value());
if (!that->acquired_lock_.has_value()) {
// Clean up if we have lost the lock.
that->cleanup_on_lost_lock();
}
auto result = that->lock_.try_extend_lock(that->acquired_lock_.value());
util::variant_match(
result,
[&](AcquiredLock &acquired_lock) {
that->acquired_lock_ = acquired_lock;
},
[&](LockInUse &) {
// Clean up if we have lost the lock.
that->cleanup_on_lost_lock();
},
[&](UnsupportedOperation &err) {
// This should never happen
log::lock().error("Unsupported operation while extending lock {}: {}", that->acquired_lock_.value(), err);
that->cleanup_on_lost_lock();
}
);
}
}, hearbeat_frequency, "Extend lock", hearbeat_frequency);
extend_lock_heartbeat_.start();
Expand Down
12 changes: 9 additions & 3 deletions cpp/arcticdb/util/reliable_storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ namespace lock {

using AcquiredLockId = uint64_t;

using AcquiredLock = AcquiredLockId;
using LockInUse = std::monostate;
using UnsupportedOperation = std::string;

using ReliableLockResult = std::variant<AcquiredLock, LockInUse, UnsupportedOperation>;

// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to
// provide a more reliable lock than the StorageLock but it requires the backend to support atomic operations. It should
// be completely consistent unless a process holding a lock gets paused for times comparable to the lock timeout.
Expand All @@ -36,12 +42,12 @@ class ReliableStorageLock {
ReliableStorageLock(const ReliableStorageLock<ClockType>& other) = default;

AcquiredLockId retry_until_take_lock() const;
std::optional<AcquiredLockId> try_take_lock() const;
std::optional<AcquiredLockId> try_extend_lock(AcquiredLockId acquired_lock) const;
ReliableLockResult try_take_lock() const;
ReliableLockResult try_extend_lock(AcquiredLockId acquired_lock) const;
void free_lock(AcquiredLockId acquired_lock) const;
timestamp timeout() const;
private:
std::optional<AcquiredLockId> try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const;
ReliableLockResult try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const;
std::pair<std::vector<AcquiredLockId>, std::optional<AcquiredLockId>> get_all_locks() const;
timestamp get_expiration(RefKey lock_key) const;
void clear_old_locks(const std::vector<AcquiredLockId>& acquired_locks) const;
Expand Down
85 changes: 68 additions & 17 deletions cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,23 @@
#include <arcticdb/util/test/gtest_utils.hpp>
#include <arcticdb/util/clock.hpp>

#include <arcticdb/storage/common.hpp>
#include <arcticdb/storage/config_resolvers.hpp>
#include <arcticdb/storage/library_index.hpp>
#include <arcticdb/storage/storage_factory.hpp>
#include <arcticdb/util/test/config_common.hpp>

#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/storage/s3/s3_storage.hpp>
#include <arcticdb/storage/s3/detail-inl.hpp>
#include <arcticdb/storage/mock/s3_mock_client.hpp>
#include <arcticdb/storage/mock/storage_mock_client.hpp>
#include <aws/core/Aws.h>

using namespace arcticdb;
using namespace lock;
namespace aa = arcticdb::async;
namespace as = arcticdb::storage;

// These tests test the actual implementation

Expand All @@ -33,45 +48,45 @@ TEST(ReliableStorageLock, SingleThreaded) {

// We take the first lock at 0 and it should not expire until 20
Clock::time_ = 0;
ASSERT_EQ(lock1.try_take_lock(), std::optional<uint64_t>{0});
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional<uint64_t>{0});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 5;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 10;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 19;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);

// Once the first lock has expired we can take a new lock with lock_id=1
Clock::time_ = 20;
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{1});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{1});
ASSERT_EQ(count_locks(), 2);

// We can extend the lock timeout at 25 to 35 and get an lock_id=2
Clock::time_ = 25;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_extend_lock(1), std::optional<uint64_t>{2});
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_extend_lock(1)), std::optional<uint64_t>{2});
ASSERT_EQ(count_locks(), 3);
Clock::time_ = 34;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);

// At time 35 the lock with lock_id=2 has expired and we can re-acquire the lock
Clock::time_ = 35;
ASSERT_EQ(lock1.try_take_lock(), std::optional<uint64_t>{3});
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional<uint64_t>{3});
ASSERT_EQ(count_locks(), 4);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);

// And we can free the lock immediately to allow re-aquiring without waiting for timeout
lock1.free_lock(3);
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{4});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{4});
ASSERT_EQ(count_locks(), 5);

// But if we take a lock at 1000 all locks would have expired a 10xtimeout=100 ago, and we should clear all apart from latest lock_id=5
Clock::time_ = 1000;
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{5});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{5});
ASSERT_EQ(count_locks(), 1);
}

Expand Down Expand Up @@ -124,5 +139,41 @@ TEST(ReliableStorageLock, StressMultiThreaded) {
ASSERT_EQ(counter, threads);

// Also the lock should be free by the end (i.e. we can take a new lock)
ASSERT_EQ(lock.try_take_lock().has_value(), true);
ASSERT_EQ(parse_reliable_storage_lock_result(lock.try_take_lock()).has_value(), true);
}


TEST(ReliableStorageLock, NotImplementedException) {
using namespace arcticdb::async;

// Given
as::EnvironmentName environment_name{"research"};
as::StorageName storage_name("storage_name");
as::LibraryPath library_path{"a", "b"};
namespace ap = arcticdb::pipelines;


auto failed_config = proto::s3_storage::Config();
failed_config.set_use_mock_storage_for_testing(true);

auto failed_env_config = arcticdb::get_test_environment_config(
library_path, storage_name, environment_name, std::make_optional(failed_config));
auto failed_config_resolver = as::create_in_memory_resolver(failed_env_config);
as::LibraryIndex failed_library_index{environment_name, failed_config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto lib = failed_library_index.get_library(library_path, as::OpenMode::WRITE, user_auth, storage::NativeVariantStorage());
auto store = std::make_shared<aa::AsyncStore<>>(aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1));

std::string sym = "test_lock";
std::string failureSymbol = storage::s3::MockS3Client::get_failure_trigger(sym, storage::StorageOperation::WRITE, Aws::S3::S3Errors::UNKNOWN);

ReliableStorageLock<> lock{StringId{failureSymbol}, store, ONE_SECOND};

// parse_reliable_storage_lock_result throws when we encounter an UnsupportedOperation
EXPECT_THROW({
parse_reliable_storage_lock_result(lock.try_take_lock());
}, LostReliableLock);
}

0 comments on commit e3b5f53

Please sign in to comment.