diff --git a/src/elements/asset.rs b/src/elements/asset.rs index 1a3bd24d..a1ae5218 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -516,7 +516,7 @@ where // save updated stats to cache if let Some(lastblock) = lastblock { - chain.store().cache_db().write( + chain.store().cache_db().write_nocache( vec![asset_cache_row(asset_id, &newstats, &lastblock)], DBFlush::Enable, ); diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 98de2d30..185dfcaa 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,6 +1,10 @@ +use bounded_vec_deque::BoundedVecDeque; use rocksdb; +use std::collections::HashSet; use std::path::Path; +use std::sync::atomic::AtomicBool; +use std::sync::RwLock; use crate::config::Config; use crate::util::{bincode_util, Bytes}; @@ -134,11 +138,23 @@ impl<'a> Iterator for ReverseScanGroupIterator<'a> { } } +type SingleBlockCache = HashSet>; +type TipsCache = BoundedVecDeque; + #[derive(Debug)] pub struct DB { db: rocksdb::DB, + // BoundedVecDeque of most recent blocks + // Outer Vec is a list of rocksdb keys to remove when reorged + // Inner Vec is the key (a key is Vec) + // It will automatically drop "blocks" that go over the bound. + rollback_cache: RwLock, + rollback_active: AtomicBool, } +// 6 blocks should be enough +const CACHE_CAPACITY: usize = 6; + #[derive(Copy, Clone, Debug)] pub enum DBFlush { Disable, @@ -147,8 +163,13 @@ pub enum DBFlush { impl DB { pub fn open(path: &Path, config: &Config) -> DB { + let mut rollback_cache = BoundedVecDeque::with_capacity(CACHE_CAPACITY, CACHE_CAPACITY); + rollback_cache.push_back(HashSet::new()); // last HashSet is "current block" let db = DB { db: open_raw_db(path), + // TODO: Make the number of blocks configurable? 6 should be fine for mainnet + rollback_cache: RwLock::new(rollback_cache), + rollback_active: AtomicBool::new(false), }; db.verify_compatibility(config); db @@ -220,17 +241,123 @@ impl DB { ReverseScanGroupIterator::new(iters, value_offset) } - pub fn write(&self, mut rows: Vec, flush: DBFlush) { + fn fill_cache(&self, key: &[u8]) { + // Single letter keys tend to be related to versioning and tips + // So do not cache as they don't need to be rolled back + if key.len() < 2 { + return; + } + self.with_cache(|cache| { + cache.insert(key.to_owned()); + }); + } + + fn with_cache(&self, func: F) + where + F: FnOnce(&mut SingleBlockCache), + { + func( + self.rollback_cache + .write() + .unwrap() + .back_mut() + .expect("Always one block"), + ) + } + + pub fn tick_next_block(&self) { + // Adding a new block's worth of cache + // This will automatically drop the oldest block (HashSet) + self.rollback_cache + .write() + .unwrap() + .push_back(HashSet::new()); + } + + /// Performs a rollback of `count` blocks, then ticks one block forward + pub fn rollback(&self, mut count: usize) -> usize { + if count == 0 { + return 0; + } + let mut cache = self.rollback_cache.write().unwrap(); + while count > 0 { + if let Some(block) = cache.pop_back() { + debug!( + "Rolling back DB cached block with {} entries @ {:?}", + block.len(), + self.db.path() + ); + for key in block { + // Ignore rocksdb errors, but log them + let _ = self.db.delete(key).inspect_err(|err| { + warn!("Error when deleting rocksdb rollback cache: {err}"); + }); + } + count -= 1; + } else { + break; + } + } + cache.push_back(HashSet::new()); + count + } + + pub fn rollbacks_enabled(&self) -> bool { + self.rollback_active + .load(std::sync::atomic::Ordering::Acquire) + } + + pub fn disable_rollbacks(&self) { + self.rollback_active + .store(false, std::sync::atomic::Ordering::Release); + } + + pub fn enable_rollbacks(&self) { + self.rollback_active + .store(true, std::sync::atomic::Ordering::Release); + } + + pub fn write_nocache(&self, rows: Vec, flush: DBFlush) { + self.write_blocks_nocache(vec![rows], flush); + } + + pub fn write_blocks(&self, blocks: Vec>, flush: DBFlush) { + self.write_blocks_inner(blocks, flush, false) + } + + pub fn write_blocks_nocache(&self, blocks: Vec>, flush: DBFlush) { + self.write_blocks_inner(blocks, flush, true) + } + + #[inline] + fn write_blocks_inner(&self, blocks: Vec>, flush: DBFlush, skip_cache: bool) { debug!( "writing {} rows to {:?}, flush={:?}", - rows.len(), + blocks.iter().map(|b| b.len()).sum::(), self.db, flush ); - rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); - for row in rows { - batch.put(&row.key, &row.value); + for mut rows in blocks { + rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + if !skip_cache + && self + .rollback_active + .load(std::sync::atomic::Ordering::Acquire) + { + self.with_cache(|cache| { + for row in &rows { + cache.insert(row.key.clone()); + batch.put(&row.key, &row.value); + } + }); + // Special case: we should tick forward blocks + self.tick_next_block(); + } else { + for row in &rows { + batch.put(&row.key, &row.value); + } + } } let do_flush = match flush { DBFlush::Enable => true, @@ -247,10 +374,22 @@ impl DB { } pub fn put(&self, key: &[u8], value: &[u8]) { + if self + .rollback_active + .load(std::sync::atomic::Ordering::Acquire) + { + self.fill_cache(key); + } self.db.put(key, value).unwrap(); } pub fn put_sync(&self, key: &[u8], value: &[u8]) { + if self + .rollback_active + .load(std::sync::atomic::Ordering::Acquire) + { + self.fill_cache(key); + } let mut opts = rocksdb::WriteOptions::new(); opts.set_sync(true); self.db.put_opt(key, value, &opts).unwrap(); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d09605a8..a6537c3e 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -86,6 +86,27 @@ impl Store { } } + pub fn tick_next_block(&self) { + self.txstore_db.tick_next_block(); + self.history_db.tick_next_block(); + self.cache_db.tick_next_block(); + } + + pub fn enable_rollback_cache(&self) { + self.txstore_db.enable_rollbacks(); + self.history_db.enable_rollbacks(); + self.cache_db.enable_rollbacks(); + } + + pub fn rollback(&self, count: usize) { + let mut leftover = 0; + leftover += self.txstore_db.rollback(count); + leftover += self.history_db.rollback(count); + leftover += self.cache_db.rollback(count); + if leftover > 0 { + warn!("Rolling back all DB caches missed {count} blocks. Re-orged duplicates might still be active in the DB.") + } + } pub fn txstore_db(&self) -> &DB { &self.txstore_db } @@ -273,6 +294,18 @@ impl Indexer { let tip = daemon.getbestblockhash()?; let new_headers = self.get_new_headers(&daemon, &tip)?; + // Deal with re-orgs before indexing + let headers_len = { + let mut headers = self.store.indexed_headers.write().unwrap(); + let reorged = headers.apply(&new_headers); + assert_eq!(tip, *headers.tip()); + // reorg happened + if !reorged.is_empty() { + self.store.rollback(reorged.len()); + } + headers.len() + }; + let to_add = self.headers_to_add(&new_headers); debug!( "adding transactions from {} blocks using {:?}", @@ -301,16 +334,20 @@ impl Indexer { // update the synced tip *after* the new data is flushed to disk debug!("updating synced tip to {:?}", tip); self.store.txstore_db.put_sync(b"t", &serialize(&tip)); - - let mut headers = self.store.indexed_headers.write().unwrap(); - headers.apply(new_headers); - assert_eq!(tip, *headers.tip()); + // Ticking cache DB "block every time we update" + // This means that each "block" essentially contains the + // cache updates between each update() call, and we will + // rollback more cache_db than other DBs when rolling back + // but this is just a cache anyway. + // We'd rather not have bad data in the cache. + self.store.cache_db.tick_next_block(); if let FetchFrom::BlkFiles = self.from { + self.store.enable_rollback_cache(); self.from = FetchFrom::Bitcoind; } - self.tip_metric.set(headers.len() as i64 - 1); + self.tip_metric.set(headers_len as i64 - 1); Ok(tip) } @@ -324,7 +361,7 @@ impl Indexer { }; { let _timer = self.start_timer("add_write"); - self.store.txstore_db.write(rows, self.flush); + self.store.txstore_db.write_blocks(rows, self.flush); } self.store @@ -360,7 +397,7 @@ impl Indexer { } index_blocks(blocks, &previous_txos_map, &self.iconfig) }; - self.store.history_db.write(rows, self.flush); + self.store.history_db.write_blocks(rows, self.flush); } } @@ -820,7 +857,7 @@ impl ChainQuery { // save updated utxo set to cache if let Some(lastblock) = lastblock { if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_nocache( vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()], flush, ); @@ -928,7 +965,7 @@ impl ChainQuery { // save updated stats to cache if let Some(lastblock) = lastblock { if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_nocache( vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()], flush, ); @@ -1256,7 +1293,7 @@ fn load_blockheaders(db: &DB) -> HashMap { .collect() } -fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec { +fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec> { // persist individual transactions: // T{txid} → {rawtx} // C{txid}{blockhash}{height} → @@ -1284,7 +1321,6 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec, iconfig: &IndexerConfig, -) -> Vec { +) -> Vec> { block_entries .par_iter() // serialization is CPU-intensive .map(|b| { @@ -1389,7 +1425,6 @@ fn index_blocks( rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" rows }) - .flatten() .collect() } diff --git a/src/util/block.rs b/src/util/block.rs index 66d5a5c5..44431e47 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -111,7 +111,7 @@ impl HeaderList { ); let mut headers = HeaderList::empty(); - headers.apply(headers.order(headers_chain)); + headers.apply(&headers.order(headers_chain)); headers } @@ -155,7 +155,8 @@ impl HeaderList { .collect() } - pub fn apply(&mut self, new_headers: Vec) { + /// Returns any re-orged headers + pub fn apply(&mut self, new_headers: &[HeaderEntry]) -> Vec { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); @@ -175,21 +176,22 @@ impl HeaderList { assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash); height } - None => return, + None => return vec![], }; debug!( "applying {} new headers from height {}", new_headers.len(), new_height ); - let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries + let removed = self.headers.split_off(new_height); // keep [0..new_height) entries for new_header in new_headers { let height = new_header.height(); assert_eq!(height, self.headers.len()); self.tip = *new_header.hash(); - self.headers.push(new_header); + self.headers.push(new_header.clone()); self.heights.insert(self.tip, height); } + removed } pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {