Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disk cache deduped get_ranges #1218

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,36 @@ impl DiskCache {
}

async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> {
let file_path = std::path::Path::new(&self.root_dir)
.join(filename)
// When write payload to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
let tmp_filename = format!("{filename}.tmp");
let tmp_filepath = std::path::Path::new(&self.root_dir)
.join(&tmp_filename)
.into_os_string()
.into_string()
.unwrap();
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved

let mut file = File::create(&file_path)
.await
.context(Io { file: &file_path })?;
let mut file = File::create(&tmp_filepath).await.context(Io {
file: &tmp_filepath,
})?;

let encoding = PageFileEncoder { payload };
encoding.encode_and_persist(&mut file, filename).await
encoding
.encode_and_persist(&mut file, &tmp_filename)
.await?;

let dest_filepath = std::path::Path::new(&self.root_dir)
.join(filename)
.into_os_string()
.into_string()
.unwrap();
tokio::fs::rename(tmp_filepath, dest_filepath)
.await
.context(Io { file: filename })?;

Ok(())
}

/// Read the bytes from the cached file.
Expand Down Expand Up @@ -940,6 +958,7 @@ mod test {
use upstream::local::LocalFileSystem;

use super::*;
use crate::test_util::MemoryStore;

struct StoreWithCacheDir {
inner: DiskCacheStore,
Expand All @@ -951,8 +970,7 @@ mod test {
cap: usize,
partition_bits: usize,
) -> StoreWithCacheDir {
let local_path = tempdir().unwrap();
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
let local_store = Arc::new(MemoryStore::default());

let cache_dir = tempdir().unwrap();
let store = DiskCacheStore::try_new(
Expand Down
2 changes: 2 additions & 0 deletions components/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub mod multipart;
pub mod obkv;
pub mod prefix;
pub mod s3;
#[cfg(test)]
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
pub mod test_util;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;
172 changes: 172 additions & 0 deletions components/object_store/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::{self, BoxStream};
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

#[derive(Debug)]
struct StoreError {
path: Path,
msg: String,
}

impl Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreError")
.field("path", &self.path)
.field("msg", &self.msg)
.finish()
}
}

impl std::error::Error for StoreError {}

/// A memory based object store implementation, mainly used for testing.
#[derive(Debug, Default)]
pub struct MemoryStore {
files: RwLock<HashMap<Path, Bytes>>,
get_range_counts: RwLock<HashMap<Path, usize>>,
}

impl Display for MemoryStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryStore")
.field("counts", &self.get_counts())
.finish()
}
}

impl MemoryStore {
pub fn get_counts(&self) -> HashMap<Path, usize> {
let counts = self.get_range_counts.read().unwrap();
counts.clone().into_iter().collect()
}
}

#[async_trait]
impl ObjectStore for MemoryStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let mut files = self.files.write().unwrap();
files.insert(location.clone(), bytes);
Ok(())
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
let bs = bs.clone();
Ok(GetResult::Stream(Box::pin(stream::once(
async move { Ok(bs) },
))))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get",
source,
})
}
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
{
let mut counts = self.get_range_counts.write().unwrap();
counts
.entry(location.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}

let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
Ok(bs.slice(range))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get_range",
source,
})
}
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let files = self.files.read().unwrap();

if let Some(bs) = files.get(location) {
Ok(ObjectMeta {
location: location.clone(),
size: bs.len(),
last_modified: Default::default(),
})
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "head",
source,
})
}
}

async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
unimplemented!()
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
unimplemented!()
}

async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}

async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
unimplemented!()
}

async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}

async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
}