From e3b5f537bae293ac11f73d84ac861ab5ce01e718 Mon Sep 17 00:00:00 2001
From: Georgi Petrov <32372905+G-D-Petrov@users.noreply.github.com>
Date: Tue, 14 Jan 2025 17:32:57 +0200
Subject: [PATCH] Notimplemented handling (#2108)
#### Reference Issues/PRs
#203 in enterprise
#### What does this implement or fix?
Add a way to differentiate between a NotImplemented error and a regular
StorageException.
This way we can properly handle S3 storages that haven't implemented the
write_if_none operation.
Also updated the ReliableStorageLock to use the new behaviour.
#### Any other comments?
#### Checklist
Checklist for code changes...
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
- [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
---
cpp/arcticdb/storage/mock/s3_mock_client.cpp | 6 ++
cpp/arcticdb/storage/s3/detail-inl.hpp | 4 +
.../storage/s3/nfs_backed_storage.hpp | 4 +-
cpp/arcticdb/util/error_code.hpp | 7 ++
.../util/reliable_storage_lock-inl.hpp | 65 ++++++++++----
cpp/arcticdb/util/reliable_storage_lock.hpp | 12 ++-
.../util/test/test_reliable_storage_lock.cpp | 85 +++++++++++++++----
7 files changed, 145 insertions(+), 38 deletions(-)
diff --git a/cpp/arcticdb/storage/mock/s3_mock_client.cpp b/cpp/arcticdb/storage/mock/s3_mock_client.cpp
index bbb999f315..5c2db79e0d 100644
--- a/cpp/arcticdb/storage/mock/s3_mock_client.cpp
+++ b/cpp/arcticdb/storage/mock/s3_mock_client.cpp
@@ -48,6 +48,7 @@ std::optional has_failure_trigger(const std::string& s3_object
const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false));
const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false));
+const auto not_implemented_error = Aws::S3::S3Error(Aws::Client::AWSError(Aws::S3::S3Errors::UNKNOWN, "NotImplemented", "A header you provided implies functionality that is not implemented", false));
S3Result MockS3Client::head_object(
const std::string& s3_object_name,
@@ -91,6 +92,11 @@ S3Result MockS3Client::put_object(
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE);
+
+ if (maybe_error.has_value() && header == PutHeader::IF_NONE_MATCH) {
+ return {not_implemented_error};
+ }
+
if (maybe_error.has_value()) {
return {*maybe_error};
}
diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp
index 3ba5282c5d..b7bf244521 100644
--- a/cpp/arcticdb/storage/s3/detail-inl.hpp
+++ b/cpp/arcticdb/storage/s3/detail-inl.hpp
@@ -68,6 +68,10 @@ using Range = folly::Range;
error_message_suffix));
}
+ if(err.GetExceptionName().find("NotImplemented") != std::string::npos) {
+ raise(fmt::format("Operation is not implemented for storage: {}", error_message_suffix));
+ }
+
if (err.ShouldRetry()) {
raise(fmt::format("Retry-able error: {}",
error_message_suffix));
diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
index 1b3af5839c..fd1020975b 100644
--- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
+++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
@@ -36,7 +36,7 @@ class NfsBackedStorage final : public Storage {
void do_write(KeySegmentPair&& key_seg) final;
void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
- storage::raise("Atomic operations are only supported for s3 backend");
+ storage::raise("do_write_if_none not implemented for NFS backed storage");
};
void do_update(KeySegmentPair&& key_seg, UpdateOpts opts) final;
@@ -58,7 +58,7 @@ class NfsBackedStorage final : public Storage {
}
bool do_supports_atomic_writes() const final {
- return false;
+ return ConfigsMap::instance()->get_int("NfsStorage.SupportsAtomicWrites", 0) == 1;
}
bool do_fast_delete() final {
diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp
index 9a229e5d66..5967a0cf82 100644
--- a/cpp/arcticdb/util/error_code.hpp
+++ b/cpp/arcticdb/util/error_code.hpp
@@ -86,6 +86,7 @@ inline std::unordered_map get_error_category_names()
ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \
ERROR_CODE(5021, E_S3_RETRYABLE) \
ERROR_CODE(5022, E_ATOMIC_OPERATION_FAILED) \
+ ERROR_CODE(5023, E_NOT_IMPLEMENTED_BY_STORAGE) \
ERROR_CODE(5030, E_UNEXPECTED_AZURE_ERROR) \
ERROR_CODE(5050, E_MONGO_BULK_OP_NO_REPLY) \
ERROR_CODE(5051, E_UNEXPECTED_MONGO_ERROR) \
@@ -181,6 +182,7 @@ using UnsortedDataException = ArcticSpecificException;
using CompatibilityException = ArcticCategorizedException;
using CodecException = ArcticCategorizedException;
+using NotImplementedException = ArcticSpecificException;
template
[[noreturn]] void throw_error(const std::string& msg) {
@@ -242,6 +244,11 @@ template<>
throw ArcticSpecificException(msg);
}
+template<>
+[[noreturn]] inline void throw_error(const std::string& msg) {
+ throw ArcticSpecificException(msg);
+}
+
}
namespace fmt {
diff --git a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp
index cd564b4bfc..b7f3423843 100644
--- a/cpp/arcticdb/util/reliable_storage_lock-inl.hpp
+++ b/cpp/arcticdb/util/reliable_storage_lock-inl.hpp
@@ -4,6 +4,7 @@
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
+#pragma once
#include
@@ -107,18 +108,34 @@ void ReliableStorageLock::clear_old_locks(const std::vector
-std::optional ReliableStorageLock::try_take_lock() const {
+ReliableLockResult ReliableStorageLock::try_take_lock() const {
auto [existing_locks, latest] = get_all_locks();
if (latest.has_value()) {
auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::ATOMIC_LOCK});
if (expires > ClockType::nanos_since_epoch()) {
// An unexpired lock exists
- return std::nullopt;
+ return LockInUse{};
}
}
return try_take_next_id(existing_locks, latest);
}
+inline std::optional parse_reliable_storage_lock_result(const ReliableLockResult& result) {
+ return util::variant_match(
+ result,
+ [&](const AcquiredLock &acquired_lock) -> std::optional {
+ return acquired_lock;
+ },
+ [&](const LockInUse &) -> std::optional {
+ return std::nullopt;
+ },
+ [&](const UnsupportedOperation &err) -> std::optional {
+ log::lock().error("Unsupported operation while taking lock: {}", err);
+ throw LostReliableLock();
+ }
+ );
+}
+
template
AcquiredLockId ReliableStorageLock::retry_until_take_lock() const {
// We don't use the ExponentialBackoff because we want to be able to wait indefinitely
@@ -132,24 +149,25 @@ AcquiredLockId ReliableStorageLock::retry_until_take_lock() const {
return current_wait * factor;
};
- auto acquired_lock = try_take_lock();
+ std::optional acquired_lock = parse_reliable_storage_lock_result(try_take_lock());
+
while (!acquired_lock.has_value()) {
std::this_thread::sleep_for(jittered_wait());
current_wait = std::min(current_wait * 2, max_wait);
- acquired_lock = try_take_lock();
+ acquired_lock = parse_reliable_storage_lock_result(try_take_lock());
}
return acquired_lock.value();
}
template
-std::optional ReliableStorageLock::try_extend_lock(AcquiredLockId acquired_lock) const {
+ReliableLockResult ReliableStorageLock::try_extend_lock(AcquiredLockId acquired_lock) const {
auto [existing_locks, latest] = get_all_locks();
util::check(latest.has_value() && latest.value() >= acquired_lock,
"We are trying to extend a newer lock_id than the existing one in storage. Extend lock_id: {}",
acquired_lock);
if (latest.value() != acquired_lock) {
// We have lost the lock while holding it (most likely due to timeout).
- return std::nullopt;
+ return LockInUse{};
}
return try_take_next_id(existing_locks, latest);
}
@@ -170,23 +188,27 @@ void ReliableStorageLock::free_lock(AcquiredLockId acquired_lock) con
}
template
-std::optional ReliableStorageLock::try_take_next_id(const std::vector& existing_locks, std::optional latest) const {
+ReliableLockResult ReliableStorageLock::try_take_next_id(const std::vector& existing_locks, std::optional latest) const {
AcquiredLockId lock_id = get_next_id(latest);
auto lock_stream_id = get_stream_id(lock_id);
auto expiration = ClockType::nanos_since_epoch() + timeout_;
try {
store_->write_if_none_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration));
+ } catch (const NotImplementedException& e) {
+ auto err = fmt::format("Failed to acquire lock (storage does not support atomic writes): {}", e.what());
+ log::lock().debug(err);
+ return UnsupportedOperation{err};
} catch (const StorageException& e) {
// There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException.
// Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error.
- // If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate
+ // If error persists we'll appropriately raise in the next attempt to LIST/GET the existing lock and propagate
// the transient error.
log::lock().debug("Failed to acquire lock (likely someone acquired it before us): {}", e.what());
- return std::nullopt;
+ return LockInUse{};
}
- // We clear old locks only after aquiring the lock to avoid duplicating the deletion work
+ // We clear old locks only after acquiring the lock to avoid duplicating the deletion work
clear_old_locks(existing_locks);
- return lock_id;
+ return AcquiredLock{lock_id};
}
inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, std::optional&& on_lost_lock) :
@@ -198,11 +220,22 @@ inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageL
extend_lock_heartbeat_.addFunction(
[that=this](){
if (that->acquired_lock_.has_value()) {
- that->acquired_lock_ = that->lock_.try_extend_lock(that->acquired_lock_.value());
- if (!that->acquired_lock_.has_value()) {
- // Clean up if we have lost the lock.
- that->cleanup_on_lost_lock();
- }
+ auto result = that->lock_.try_extend_lock(that->acquired_lock_.value());
+ util::variant_match(
+ result,
+ [&](AcquiredLock &acquired_lock) {
+ that->acquired_lock_ = acquired_lock;
+ },
+ [&](LockInUse &) {
+ // Clean up if we have lost the lock.
+ that->cleanup_on_lost_lock();
+ },
+ [&](UnsupportedOperation &err) {
+ // This should never happen
+ log::lock().error("Unsupported operation while extending lock {}: {}", that->acquired_lock_.value(), err);
+ that->cleanup_on_lost_lock();
+ }
+ );
}
}, hearbeat_frequency, "Extend lock", hearbeat_frequency);
extend_lock_heartbeat_.start();
diff --git a/cpp/arcticdb/util/reliable_storage_lock.hpp b/cpp/arcticdb/util/reliable_storage_lock.hpp
index 4628a67b2b..f90a3a7a0f 100644
--- a/cpp/arcticdb/util/reliable_storage_lock.hpp
+++ b/cpp/arcticdb/util/reliable_storage_lock.hpp
@@ -21,6 +21,12 @@ namespace lock {
using AcquiredLockId = uint64_t;
+using AcquiredLock = AcquiredLockId;
+using LockInUse = std::monostate;
+using UnsupportedOperation = std::string;
+
+using ReliableLockResult = std::variant;
+
// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to
// provide a more reliable lock than the StorageLock but it requires the backend to support atomic operations. It should
// be completely consistent unless a process holding a lock gets paused for times comparable to the lock timeout.
@@ -36,12 +42,12 @@ class ReliableStorageLock {
ReliableStorageLock(const ReliableStorageLock& other) = default;
AcquiredLockId retry_until_take_lock() const;
- std::optional try_take_lock() const;
- std::optional try_extend_lock(AcquiredLockId acquired_lock) const;
+ ReliableLockResult try_take_lock() const;
+ ReliableLockResult try_extend_lock(AcquiredLockId acquired_lock) const;
void free_lock(AcquiredLockId acquired_lock) const;
timestamp timeout() const;
private:
- std::optional try_take_next_id(const std::vector& existing_locks, std::optional latest) const;
+ ReliableLockResult try_take_next_id(const std::vector& existing_locks, std::optional latest) const;
std::pair, std::optional> get_all_locks() const;
timestamp get_expiration(RefKey lock_key) const;
void clear_old_locks(const std::vector& acquired_locks) const;
diff --git a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
index 4fa555a0b6..83f74afe72 100644
--- a/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
+++ b/cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
@@ -13,8 +13,23 @@
#include
#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
using namespace arcticdb;
using namespace lock;
+namespace aa = arcticdb::async;
+namespace as = arcticdb::storage;
// These tests test the actual implementation
@@ -33,45 +48,45 @@ TEST(ReliableStorageLock, SingleThreaded) {
// We take the first lock at 0 and it should not expire until 20
Clock::time_ = 0;
- ASSERT_EQ(lock1.try_take_lock(), std::optional{0});
- ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional{0});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 5;
- ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
- ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 10;
- ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
- ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 19;
- ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
- ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
// Once the first lock has expired we can take a new lock with lock_id=1
Clock::time_ = 20;
- ASSERT_EQ(lock2.try_take_lock(), std::optional{1});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional{1});
ASSERT_EQ(count_locks(), 2);
// We can extend the lock timeout at 25 to 35 and get an lock_id=2
Clock::time_ = 25;
- ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
- ASSERT_EQ(lock2.try_extend_lock(1), std::optional{2});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_extend_lock(1)), std::optional{2});
ASSERT_EQ(count_locks(), 3);
Clock::time_ = 34;
- ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
// At time 35 the lock with lock_id=2 has expired and we can re-acquire the lock
Clock::time_ = 35;
- ASSERT_EQ(lock1.try_take_lock(), std::optional{3});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional{3});
ASSERT_EQ(count_locks(), 4);
- ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
// And we can free the lock immediately to allow re-aquiring without waiting for timeout
lock1.free_lock(3);
- ASSERT_EQ(lock2.try_take_lock(), std::optional{4});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional{4});
ASSERT_EQ(count_locks(), 5);
// But if we take a lock at 1000 all locks would have expired a 10xtimeout=100 ago, and we should clear all apart from latest lock_id=5
Clock::time_ = 1000;
- ASSERT_EQ(lock2.try_take_lock(), std::optional{5});
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional{5});
ASSERT_EQ(count_locks(), 1);
}
@@ -124,5 +139,41 @@ TEST(ReliableStorageLock, StressMultiThreaded) {
ASSERT_EQ(counter, threads);
// Also the lock should be free by the end (i.e. we can take a new lock)
- ASSERT_EQ(lock.try_take_lock().has_value(), true);
+ ASSERT_EQ(parse_reliable_storage_lock_result(lock.try_take_lock()).has_value(), true);
+}
+
+
+TEST(ReliableStorageLock, NotImplementedException) {
+ using namespace arcticdb::async;
+
+ // Given
+ as::EnvironmentName environment_name{"research"};
+ as::StorageName storage_name("storage_name");
+ as::LibraryPath library_path{"a", "b"};
+ namespace ap = arcticdb::pipelines;
+
+
+ auto failed_config = proto::s3_storage::Config();
+ failed_config.set_use_mock_storage_for_testing(true);
+
+ auto failed_env_config = arcticdb::get_test_environment_config(
+ library_path, storage_name, environment_name, std::make_optional(failed_config));
+ auto failed_config_resolver = as::create_in_memory_resolver(failed_env_config);
+ as::LibraryIndex failed_library_index{environment_name, failed_config_resolver};
+
+ as::UserAuth user_auth{"abc"};
+ auto codec_opt = std::make_shared();
+
+ auto lib = failed_library_index.get_library(library_path, as::OpenMode::WRITE, user_auth, storage::NativeVariantStorage());
+ auto store = std::make_shared>(aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1));
+
+ std::string sym = "test_lock";
+ std::string failureSymbol = storage::s3::MockS3Client::get_failure_trigger(sym, storage::StorageOperation::WRITE, Aws::S3::S3Errors::UNKNOWN);
+
+ ReliableStorageLock<> lock{StringId{failureSymbol}, store, ONE_SECOND};
+
+ // parse_reliable_storage_lock_result throws when we encounter an UnsupportedOperation
+ EXPECT_THROW({
+ parse_reliable_storage_lock_result(lock.try_take_lock());
+ }, LostReliableLock);
}