Skip to content

Commit

Permalink
feat: add blocks_per_file field to StaticFileProvider (#11043)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Sep 19, 2024
1 parent 9f23443 commit 69c8ddb
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 41 deletions.
5 changes: 2 additions & 3 deletions crates/cli/commands/src/db/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use reth_db_api::{
};
use reth_node_builder::NodeTypesWithDB;
use reth_provider::{ProviderFactory, StaticFileProviderFactory};
use reth_static_file_types::{find_fixed_range, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;

/// The arguments for the `reth db clear` command
#[derive(Parser, Debug)]
Expand All @@ -32,8 +32,7 @@ impl Command {

if let Some(segment_static_files) = static_files.get(&segment) {
for (block_range, _) in segment_static_files {
static_file_provider
.delete_jar(segment, find_fixed_range(block_range.start()))?;
static_file_provider.delete_jar(segment, block_range.start())?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/commands/src/db/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_fs_util as fs;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::providers::StaticFileProvider;
use reth_static_file_types::{find_fixed_range, SegmentRangeInclusive};
use reth_static_file_types::SegmentRangeInclusive;
use std::{sync::Arc, time::Duration};

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -191,7 +191,7 @@ impl Command {
) = (0, 0, 0, 0, 0, 0);

for (block_range, tx_range) in &ranges {
let fixed_block_range = find_fixed_range(block_range.start());
let fixed_block_range = static_file_provider.find_fixed_range(block_range.start());
let jar_provider = static_file_provider
.get_segment_provider(segment, || Some(fixed_block_range), None)?
.ok_or_else(|| {
Expand Down
5 changes: 2 additions & 3 deletions crates/cli/commands/src/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_node_builder::NodeTypesWithEngine;
use reth_node_core::args::StageEnum;
use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory};
use reth_stages::StageId;
use reth_static_file_types::{find_fixed_range, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;

/// `reth drop-stage` command
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -54,8 +54,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.sorted_by_key(|(block_range, _)| block_range.start())
.rev()
{
static_file_provider
.delete_jar(static_file_segment, find_fixed_range(block_range.start()))?;
static_file_provider.delete_jar(static_file_segment, block_range.start())?;
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions crates/static-file/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use compression::Compression;
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};

/// Default static file block count.
pub const BLOCKS_PER_STATIC_FILE: u64 = 500_000;
pub const DEFAULT_BLOCKS_PER_STATIC_FILE: u64 = 500_000;

/// Highest static file block numbers, per data segment.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
Expand Down Expand Up @@ -64,7 +64,10 @@ impl HighestStaticFiles {

/// Each static file has a fixed number of blocks. This gives out the range where the requested
/// block is positioned. Used for segment filename.
pub const fn find_fixed_range(block: BlockNumber) -> SegmentRangeInclusive {
let start = (block / BLOCKS_PER_STATIC_FILE) * BLOCKS_PER_STATIC_FILE;
SegmentRangeInclusive::new(start, start + BLOCKS_PER_STATIC_FILE - 1)
pub const fn find_fixed_range(
block: BlockNumber,
blocks_per_static_file: u64,
) -> SegmentRangeInclusive {
let start = (block / blocks_per_static_file) * blocks_per_static_file;
SegmentRangeInclusive::new(start, start + blocks_per_static_file - 1)
}
53 changes: 35 additions & 18 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::{
metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
BLOCKS_PER_STATIC_FILE,
};
use crate::{
to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
Expand All @@ -27,7 +26,10 @@ use reth_db_api::{
};
use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
use reth_primitives::{
static_file::{find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive},
static_file::{
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive,
DEFAULT_BLOCKS_PER_STATIC_FILE,
},
Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader,
StaticFileSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, Withdrawal,
Withdrawals,
Expand Down Expand Up @@ -72,7 +74,7 @@ impl StaticFileAccess {
}

/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub struct StaticFileProvider(pub(crate) Arc<StaticFileProviderInner>);

impl StaticFileProvider {
Expand Down Expand Up @@ -194,7 +196,7 @@ impl Deref for StaticFileProvider {
}

/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct StaticFileProviderInner {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
Expand All @@ -210,6 +212,8 @@ pub struct StaticFileProviderInner {
metrics: Option<Arc<StaticFileProviderMetrics>>,
/// Access rights of the provider.
access: StaticFileAccess,
/// Number of blocks per file.
blocks_per_file: u64,
/// Write lock for when access is [`StaticFileAccess::RW`].
_lock_file: Option<StorageLock>,
}
Expand All @@ -231,6 +235,7 @@ impl StaticFileProviderInner {
path: path.as_ref().to_path_buf(),
metrics: None,
access,
blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
_lock_file,
};

Expand All @@ -240,9 +245,24 @@ impl StaticFileProviderInner {
pub const fn is_read_only(&self) -> bool {
self.access.is_read_only()
}

/// Each static file has a fixed number of blocks. This gives out the range where the requested
/// block is positioned.
pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
find_fixed_range(block, self.blocks_per_file)
}
}

impl StaticFileProvider {
/// Set a custom number of blocks per file.
#[cfg(any(test, feature = "test-utils"))]
pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
let mut provider =
Arc::try_unwrap(self.0).expect("should be called when initializing only");
provider.blocks_per_file = blocks_per_file;
Self(Arc::new(provider))
}

/// Enables metrics on the [`StaticFileProvider`].
pub fn with_metrics(self) -> Self {
let mut provider =
Expand All @@ -262,7 +282,7 @@ impl StaticFileProvider {
let mut size = 0;

for (block_range, _) in &ranges {
let fixed_block_range = find_fixed_range(block_range.start());
let fixed_block_range = self.find_fixed_range(block_range.start());
let jar_provider = self
.get_segment_provider(segment, || Some(fixed_block_range), None)?
.ok_or(ProviderError::MissingStaticFileBlock(segment, block_range.start()))?;
Expand Down Expand Up @@ -369,14 +389,11 @@ impl StaticFileProvider {
self.map.remove(&(fixed_block_range_end, segment));
}

/// Given a segment and block range it deletes the jar and all files associated with it.
/// Given a segment and block, it deletes the jar and all files from the respective block range.
///
/// CAUTION: destructive. Deletes files on disk.
pub fn delete_jar(
&self,
segment: StaticFileSegment,
fixed_block_range: SegmentRangeInclusive,
) -> ProviderResult<()> {
pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
let fixed_block_range = self.find_fixed_range(block);
let key = (fixed_block_range.end(), segment);
let jar = if let Some((_, jar)) = self.map.remove(&key) {
jar.jar
Expand Down Expand Up @@ -435,7 +452,7 @@ impl StaticFileProvider {
.read()
.get(&segment)
.filter(|max| **max >= block)
.map(|_| find_fixed_range(block))
.map(|_| self.find_fixed_range(block))
}

/// Gets a static file segment's fixed block range from the provider inner
Expand All @@ -459,7 +476,7 @@ impl StaticFileProvider {
}
let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
if tx_start <= tx {
return Some(find_fixed_range(block_range.end()))
return Some(self.find_fixed_range(block_range.end()))
}
}
None
Expand All @@ -483,7 +500,7 @@ impl StaticFileProvider {
Some(segment_max_block) => {
// Update the max block for the segment
max_block.insert(segment, segment_max_block);
let fixed_range = find_fixed_range(segment_max_block);
let fixed_range = self.find_fixed_range(segment_max_block);

let jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_range)),
Expand Down Expand Up @@ -744,7 +761,7 @@ impl StaticFileProvider {
pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
if let Some(latest_block) = self.get_highest_static_file_block(segment) {
let file_path =
self.directory().join(segment.filename(&find_fixed_range(latest_block)));
self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));

let jar = NippyJar::<SegmentHeader>::load(&file_path)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Expand Down Expand Up @@ -894,14 +911,14 @@ impl StaticFileProvider {
func: impl Fn(StaticFileJarProvider<'_>) -> ProviderResult<Option<T>>,
) -> ProviderResult<Option<T>> {
if let Some(highest_block) = self.get_highest_static_file_block(segment) {
let mut range = find_fixed_range(highest_block);
let mut range = self.find_fixed_range(highest_block);
while range.end() > 0 {
if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
return Ok(Some(res))
}
range = SegmentRangeInclusive::new(
range.start().saturating_sub(BLOCKS_PER_STATIC_FILE),
range.end().saturating_sub(BLOCKS_PER_STATIC_FILE),
range.start().saturating_sub(self.blocks_per_file),
range.end().saturating_sub(self.blocks_per_file),
);
}
}
Expand Down
11 changes: 5 additions & 6 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::Deref, sync::Arc};

const BLOCKS_PER_STATIC_FILE: u64 = 500_000;

/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;

Expand Down Expand Up @@ -62,7 +60,7 @@ mod tests {
use rand::seq::SliceRandom;
use reth_db::{CanonicalHeaders, HeaderNumbers, HeaderTerminalDifficulties, Headers};
use reth_db_api::transaction::DbTxMut;
use reth_primitives::static_file::find_fixed_range;
use reth_primitives::static_file::{find_fixed_range, DEFAULT_BLOCKS_PER_STATIC_FILE};
use reth_testing_utils::generators::{self, random_header_range};

#[test]
Expand All @@ -74,9 +72,10 @@ mod tests {
// Data sources
let factory = create_test_provider_factory();
let static_files_path = tempfile::tempdir().unwrap();
let static_file = static_files_path
.path()
.join(StaticFileSegment::Headers.filename(&find_fixed_range(*range.end())));
let static_file = static_files_path.path().join(
StaticFileSegment::Headers
.filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
);

// Setup data
let mut headers = random_header_range(
Expand Down
12 changes: 8 additions & 4 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use reth_codecs::Compact;
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_primitives::{
static_file::{find_fixed_range, SegmentHeader, SegmentRangeInclusive},
static_file::{SegmentHeader, SegmentRangeInclusive},
Header, Receipt, StaticFileSegment, TransactionSignedNoHash,
};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
Expand Down Expand Up @@ -139,7 +139,7 @@ impl StaticFileProviderRW {

let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);

let block_range = find_fixed_range(block);
let block_range = static_file_provider.find_fixed_range(block);
let (jar, path) = match static_file_provider.get_segment_provider_from_block(
segment,
block_range.start(),
Expand Down Expand Up @@ -328,8 +328,12 @@ impl StaticFileProviderRW {
self.writer = writer;
self.data_path = data_path;

*self.writer.user_header_mut() =
SegmentHeader::new(find_fixed_range(last_block + 1), None, None, segment);
*self.writer.user_header_mut() = SegmentHeader::new(
self.reader().find_fixed_range(last_block + 1),
None,
None,
segment,
);
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet},
ops::{RangeBounds, RangeInclusive},
path::PathBuf,
sync::Arc,
};

Expand Down Expand Up @@ -554,7 +555,7 @@ impl PruneCheckpointReader for NoopProvider {

impl StaticFileProviderFactory for NoopProvider {
fn static_file_provider(&self) -> StaticFileProvider {
StaticFileProvider::default()
StaticFileProvider::read_only(PathBuf::default(), false).unwrap()
}
}

Expand Down

0 comments on commit 69c8ddb

Please sign in to comment.