Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-768 Store config uniqueness enforced by StoreManager #1555

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
57 changes: 57 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,21 @@ pub enum StoreSpec {
noop(NoopSpec),
}

impl StoreSpec {
// To enforce no duplicate connection configs for a store, add it to the matcher and implement
// disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique.
pub fn disallow_duplicates_digest(&self) -> Option<String> {
match self {
Self::memory(spec) => Some(spec.disallow_duplicates_digest()),
Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()),
Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()),
Self::grpc(spec) => Some(spec.disallow_duplicates_digest()),
Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()),
_ => None,
}
}
}

/// Configuration for an individual shard of the store.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -514,6 +529,12 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

impl FilesystemSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.content_path, self.temp_path)
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
Expand All @@ -535,6 +556,12 @@ pub struct MemorySpec {
pub eviction_policy: Option<EvictionPolicy>,
}

impl MemorySpec {
pub fn disallow_duplicates_digest(&self) -> String {
"InMemoryStore".into()
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct DedupSpec {
Expand Down Expand Up @@ -787,6 +814,14 @@ pub struct S3Spec {
pub disable_http2: bool,
}

impl S3Spec {
pub fn disallow_duplicates_digest(&self) -> String {
let key_prefix = self.key_prefix.as_deref().unwrap_or_default();

format!("{}{}{}", self.region, self.bucket, key_prefix)
}
}

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum StoreType {
Expand Down Expand Up @@ -852,6 +887,22 @@ pub struct GrpcSpec {
pub connections_per_endpoint: usize,
}

impl GrpcSpec {
// todo: could improve duplication detection to individual endpoints to disallow accidental re-use
fn disallow_duplicates_digest(&self) -> String {
format!(
"{}{}",
self.instance_name,
self.endpoints
.clone()
.into_iter()
.map(|endpoint| endpoint.address)
.collect::<Vec<String>>()
.join(",")
)
}
}

/// The possible error codes that might occur on an upstream request.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ErrorCode {
Expand Down Expand Up @@ -993,6 +1044,12 @@ pub struct RedisSpec {
pub retry: Retry,
}

impl RedisSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.addresses.clone().join(","), self.key_prefix)
}
}

#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RedisMode {
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ rust_test_suite(
"tests/bep_server_test.rs",
"tests/bytestream_server_test.rs",
"tests/cas_server_test.rs",
"tests/store_overlap_rules_test.rs",
"tests/worker_api_server_test.rs",
],
proc_macro_deps = [
Expand Down
54 changes: 36 additions & 18 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::pin::Pin;
use std::sync::Arc;

use bytes::BytesMut;
use maplit::hashmap;
use nativelink_config::stores::{MemorySpec, StoreSpec};
use nativelink_config::stores::{FilesystemSpec, MemorySpec, StoreSpec};
use nativelink_error::Error;
use nativelink_macro::nativelink_test;
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache;
use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest,
};
use nativelink_service::ac_server::AcServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
Expand All @@ -48,29 +49,46 @@ async fn insert_into_store<T: Message>(
let data_len = store_data.len();
let digest = DigestInfo::try_new(hash, action_size)?;
store.update_oneshot(digest, store_data.freeze()).await?;

Ok(data_len.try_into().unwrap())
}

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what is the reason why you moved away from store_factory rather than modifying store_factory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using store_factory to build the Store, but wrapping up in make_and_add_store_to_manager because it makes building and adding a Store to a StoreManager a single step from a public-facing perspective.

"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
store_manager.add_store(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

let current_dir = env::current_dir().expect("Failed to get current directory");

let default_filesystem_spec = FilesystemSpec::default();

make_and_add_store_to_manager(
"main_ac",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::filesystem(FilesystemSpec {
content_path: current_dir
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too complicated. Would be simpler to keep MemorySpec here. I know it isn't pure but we don't have consistent way to test FilesystemStore. We need one.

.join("testing_data/ac/content_path")
.into_os_string()
.into_string()
.unwrap(),
temp_path: current_dir
.join("testing_data/ac/tmp_path")
.into_os_string()
.into_string()
.unwrap(),
read_buffer_size: default_filesystem_spec.read_buffer_size,
eviction_policy: default_filesystem_spec.eviction_policy,
block_size: default_filesystem_spec.block_size,
}),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bep_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{
PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId,
};
use nativelink_service::bep_server::BepServer;
use nativelink_store::default_store_factory::store_factory;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems heavy at first read but maybe it is an improvement to both the naming and the functionality. store_factory as a name says nothing about what its function is, but developers could go look at the definition. make_and_add_store_to_manager is definitely clearer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store_factory is still used but I made private and think that make_and_add_store_to_manager is better as a front-facing API because it makes and associates the Store with a StoreManager -- rather than leaving them as separate steps. The door is still open to manually build any Store instance though and call add_store on StoreManager.

use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::channel_body_for_tests::ChannelBody;
Expand All @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep";
/// Utility function to construct a [`StoreManager`]
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
BEP_STORE_NAME,
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{
QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse,
};
use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::channel_body_for_tests::ChannelBody;
use nativelink_util::common::{encode_stream_proto, DigestInfo};
Expand All @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_service::cas_server::CasServer;
use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
Expand All @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH";

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
69 changes: 69 additions & 0 deletions nativelink-service/tests/store_overlap_rules_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PERFECT!

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use nativelink_config::stores::{MemorySpec, StoreSpec};
use nativelink_error::Error;
use nativelink_macro::nativelink_test;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;

#[nativelink_test]
async fn same_datasource_disallowed_simple() -> Result<(), Error> {
let store_manager = Arc::new(StoreManager::new());
assert!(make_and_add_store_to_manager(
"main_cas",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_ok());

assert!(make_and_add_store_to_manager(
"main_ac",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_err());

Ok(())
}

#[nativelink_test]
async fn same_datasource_disallowed_complex() -> Result<(), Error> {
let store_manager = Arc::new(StoreManager::new());
assert!(make_and_add_store_to_manager(
"main_cas",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_ok());

assert!(make_and_add_store_to_manager(
"main_ac",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_err());

Ok(())
}
19 changes: 18 additions & 1 deletion nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,24 @@ use crate::verify_store::VerifyStore;

type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>;

pub fn store_factory<'a>(
pub async fn make_and_add_store_to_manager<'a>(
name: &'a str,
backend: &'a StoreSpec,
store_manager: &'a Arc<StoreManager>,
maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>,
) -> Result<(), Error> {
if let Some(digest) = backend.disallow_duplicates_digest() {
store_manager.digest_not_already_present(&digest)?;
store_manager.config_digest_add(digest);
}

let store = store_factory(backend, store_manager, maybe_health_registry_builder).await?;
store_manager.add_store(name, store)?;

Ok(())
}

fn store_factory<'a>(
backend: &'a StoreSpec,
store_manager: &'a Arc<StoreManager>,
maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>,
Expand Down
Loading
Loading