Skip to content

Commit

Permalink
Fix race between list_versions and delete_snapshot on NFS 8104588520 (#…
Browse files Browse the repository at this point in the history
…2092)

See arcticc PR #1170 for more context and discussion.

The keys in the `KeysNotFoundException` were not having
`unencode_object_id` applied to them, so in the snapshot iteration we
were incorrectly thinking they were different to the snapshot key being
iterated over (which has had the unencoding applied).
  • Loading branch information
poodlewars authored Dec 24, 2024
1 parent 6371256 commit 87ea83e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 14 deletions.
14 changes: 8 additions & 6 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ namespace s3 {
template<class KeyBucketizer>
void do_read_impl(Composite<VariantKey> &&ks,
const ReadVisitor &visitor,
folly::Function<VariantKey(VariantKey&&)> key_decoder,
const std::string &root_folder,
const std::string &bucket_name,
const S3ClientWrapper &s3_client,
Expand All @@ -179,7 +180,7 @@ namespace s3 {

(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach(
[&s3_client, &bucket_name, &root_folder, b = std::move(bucketizer), &visitor, &keys_not_found,
opts = opts](auto &&group) {
&key_decoder, opts = opts](auto &&group) {

for (auto &k: group.values()) {
auto key_type_dir = key_type_folder(root_folder, variant_key_type(k));
Expand All @@ -189,25 +190,26 @@ namespace s3 {
s3_object_name,
bucket_name);

auto unencoded_key = key_decoder(std::move(k));
if (get_object_result.is_success()) {
ARCTICDB_SUBSAMPLE(S3StorageVisitSegment, 0)

visitor(k, std::move(get_object_result.get_output()));
visitor(unencoded_key, std::move(get_object_result.get_output()));

ARCTICDB_DEBUG(log::storage(), "Read key {}: {}", variant_key_type(k),
variant_key_view(k));
ARCTICDB_DEBUG(log::storage(), "Read key {}: {}", variant_key_type(unencoded_key),
variant_key_view(unencoded_key));
} else {
auto &error = get_object_result.get_error();
raise_if_unexpected_error(error, s3_object_name);

log::storage().log(
opts.dont_warn_about_missing_key ? spdlog::level::debug : spdlog::level::warn,
"Failed to find segment for key '{}' {}: {}",
variant_key_view(k),
variant_key_view(unencoded_key),
error.GetExceptionName().c_str(),
error.GetMessage().c_str());

keys_not_found.push_back(k);
keys_not_found.push_back(unencoded_key);
}
}
});
Expand Down
6 changes: 1 addition & 5 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,11 @@ void NfsBackedStorage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts) {
}

void NfsBackedStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) {
auto func = [visitor] (const VariantKey& k, Segment&& seg) mutable {
visitor(unencode_object_id(k), std::move(seg));
};

auto enc = ks.transform([] (auto&& key) {
return encode_object_id(key);
});

s3::detail::do_read_impl(std::move(enc), func, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, opts);
s3::detail::do_read_impl(std::move(enc), visitor, unencode_object_id, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, opts);
}

void NfsBackedStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void S3Storage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts) {
}

void S3Storage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) {
detail::do_read_impl(std::move(ks), visitor, root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, opts);
detail::do_read_impl(std::move(ks), visitor, folly::identity, root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, opts);
}

void S3Storage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
Expand Down
51 changes: 50 additions & 1 deletion cpp/arcticdb/storage/test/test_s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/storage/s3/s3_storage.hpp>
#include <arcticdb/storage/s3/s3_mock_client.hpp>
#include <arcticdb/storage/s3/nfs_backed_storage.hpp>
#include <arcticdb/storage/s3/detail-inl.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/entity/variant_key.hpp>
Expand Down Expand Up @@ -225,7 +226,38 @@ class S3StorageFixture : public testing::Test {
S3Storage store;
};

TEST_F(S3StorageFixture, test_key_exists){
arcticdb::storage::nfs_backed::NfsBackedStorage::Config get_test_nfs_config() {
arcticdb::storage::nfs_backed::NfsBackedStorage::Config cfg;
cfg.set_use_mock_storage_for_testing(true);
return cfg;
}

class NfsStorageFixture : public testing::Test {
protected:
NfsStorageFixture():
store(LibraryPath("lib", '.'), OpenMode::DELETE, get_test_nfs_config())
{}

arcticdb::storage::nfs_backed::NfsBackedStorage store;
};

class S3AndNfsStorageFixture : public testing::TestWithParam<std::string> {
public:
std::unique_ptr<Storage> get_storage() {
LibraryPath lp{"lib"};
if (GetParam() == "nfs") {
return std::make_unique<arcticdb::storage::nfs_backed::NfsBackedStorage>(
lp, OpenMode::DELETE, get_test_nfs_config());
} else if (GetParam() == "s3") {
return std::make_unique<S3Storage>(
lp, OpenMode::DELETE, S3Settings(get_test_s3_config()));
} else {
util::raise_rte("Unexpected fixture type {}", GetParam());
}
}
};

TEST_F(S3StorageFixture, test_key_exists) {
write_in_store(store, "symbol");

ASSERT_TRUE(exists_in_store(store, "symbol"));
Expand All @@ -245,6 +277,23 @@ TEST_F(S3StorageFixture, test_read){
UnexpectedS3ErrorException);
}

TEST_P(S3AndNfsStorageFixture, test_read_missing_key_in_exception){
auto s = get_storage();
auto& store = *s;

try {
read_in_store(store, "snap-not-present", KeyType::SNAPSHOT_REF);
FAIL();
} catch (KeyNotFoundException& e) {
auto keys = e.keys().as_range();
ASSERT_EQ(keys.size(), 1);
const auto& key = keys.at(0);
ASSERT_EQ(variant_key_id(key), StreamId{"snap-not-present"});
}
}

INSTANTIATE_TEST_SUITE_P(S3AndNfs, S3AndNfsStorageFixture, testing::Values("s3", "nfs"));

TEST_F(S3StorageFixture, test_write){
write_in_store(store, "symbol");
ASSERT_THROW(
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/version/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ void iterate_snapshots(const std::shared_ptr<Store>& store, folly::Function<void
visitor(vk);
} catch (storage::KeyNotFoundException& e) {
e.keys().broadcast([&vk, &e](const VariantKey& key) {
if (key != vk) throw storage::KeyNotFoundException(std::move(e.keys()));
if (key != vk) {
throw storage::KeyNotFoundException(std::move(e.keys()));
}
});
ARCTICDB_DEBUG(log::version(), "Ignored exception due to {} being deleted during iterate_snapshots().");
}
Expand Down
46 changes: 46 additions & 0 deletions python/tests/integration/arcticdb/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import re
import time
from multiprocessing import Queue, Process

import pytest
import pandas as pd
Expand Down Expand Up @@ -78,6 +80,50 @@ def test_nfs_backed_s3_storage(lib_name, nfs_backed_s3_storage):
assert re.match(bucketized_pattern, o.key), f"Object {o.key} does not match pattern {bucketized_pattern}"


def read_repeatedly(version_store, queue: Queue):
while True:
try:
version_store.list_versions("tst")
except Exception as e:
queue.put(e)
raise
time.sleep(0.1)


def write_repeatedly(version_store):
while True:
for i in range(10):
version_store.snapshot(f"snap-{i}")
for i in range(10):
version_store.delete_snapshot(f"snap-{i}")
time.sleep(0.1)


def test_racing_list_and_delete_nfs(nfs_backed_s3_storage, lib_name):
"""This test is for a regression with NFS where iterating snapshots raced with
deleting them, due to a bug in our logic to suppress the KeyNotFoundException."""
lib = nfs_backed_s3_storage.create_version_store_factory(lib_name)()
lib.write("tst", [1, 2, 3])

exceptions_in_reader = Queue()
reader = Process(target=read_repeatedly, args=(lib, exceptions_in_reader))
writer = Process(target=write_repeatedly, args=(lib,))

try:
reader.start()
writer.start()

# Run test for 2 seconds - this was enough for this regression test to reliably fail
# 10 times in a row.
reader.join(2)
writer.join(0.001)
finally:
writer.terminate()
reader.terminate()

assert exceptions_in_reader.empty()


@pytest.fixture(scope="function", params=[MotoNfsBackedS3StorageFixtureFactory, MotoS3StorageFixtureFactory])
def s3_storage_dots_in_path(request):
prefix = "some_path/.thing_with_a_dot/even.more.dots/end"
Expand Down

0 comments on commit 87ea83e

Please sign in to comment.