Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reorgs permanently for reorgs up to 6 blocks #111

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ where

// save updated stats to cache
if let Some(lastblock) = lastblock {
chain.store().cache_db().write(
chain.store().cache_db().write_nocache(
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
DBFlush::Enable,
);
Expand Down
149 changes: 144 additions & 5 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use bounded_vec_deque::BoundedVecDeque;
use rocksdb;

use std::collections::HashSet;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::RwLock;

use crate::config::Config;
use crate::util::{bincode_util, Bytes};
Expand Down Expand Up @@ -134,11 +138,23 @@ impl<'a> Iterator for ReverseScanGroupIterator<'a> {
}
}

type SingleBlockCache = HashSet<Vec<u8>>;
type TipsCache = BoundedVecDeque<SingleBlockCache>;

#[derive(Debug)]
pub struct DB {
db: rocksdb::DB,
// BoundedVecDeque of most recent blocks
// Outer Vec is a list of rocksdb keys to remove when reorged
// Inner Vec is the key (a key is Vec<u8>)
// It will automatically drop "blocks" that go over the bound.
rollback_cache: RwLock<TipsCache>,
rollback_active: AtomicBool,
}

// 6 blocks should be enough
const CACHE_CAPACITY: usize = 6;

#[derive(Copy, Clone, Debug)]
pub enum DBFlush {
Disable,
Expand All @@ -147,8 +163,13 @@ pub enum DBFlush {

impl DB {
pub fn open(path: &Path, config: &Config) -> DB {
let mut rollback_cache = BoundedVecDeque::with_capacity(CACHE_CAPACITY, CACHE_CAPACITY);
rollback_cache.push_back(HashSet::new()); // last HashSet is "current block"
let db = DB {
db: open_raw_db(path),
// TODO: Make the number of blocks configurable? 6 should be fine for mainnet
rollback_cache: RwLock::new(rollback_cache),
rollback_active: AtomicBool::new(false),
};
db.verify_compatibility(config);
db
Expand Down Expand Up @@ -220,17 +241,123 @@ impl DB {
ReverseScanGroupIterator::new(iters, value_offset)
}

pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
fn fill_cache(&self, key: &[u8]) {
// Single letter keys tend to be related to versioning and tips
// So do not cache as they don't need to be rolled back
if key.len() < 2 {
return;
}
self.with_cache(|cache| {
cache.insert(key.to_owned());
});
}

fn with_cache<F>(&self, func: F)
where
F: FnOnce(&mut SingleBlockCache),
{
func(
self.rollback_cache
.write()
.unwrap()
.back_mut()
.expect("Always one block"),
)
}

pub fn tick_next_block(&self) {
// Adding a new block's worth of cache
// This will automatically drop the oldest block (HashSet)
self.rollback_cache
.write()
.unwrap()
.push_back(HashSet::new());
}

/// Performs a rollback of `count` blocks, then ticks one block forward
pub fn rollback(&self, mut count: usize) -> usize {
if count == 0 {
return 0;
}
let mut cache = self.rollback_cache.write().unwrap();
while count > 0 {
if let Some(block) = cache.pop_back() {
debug!(
"Rolling back DB cached block with {} entries @ {:?}",
block.len(),
self.db.path()
);
for key in block {
// Ignore rocksdb errors, but log them
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error when deleting rocksdb rollback cache: {err}");
});
}
count -= 1;
} else {
break;
}
}
cache.push_back(HashSet::new());
count
}

pub fn rollbacks_enabled(&self) -> bool {
self.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
}

pub fn disable_rollbacks(&self) {
self.rollback_active
.store(false, std::sync::atomic::Ordering::Release);
}

pub fn enable_rollbacks(&self) {
self.rollback_active
.store(true, std::sync::atomic::Ordering::Release);
}

pub fn write_nocache(&self, rows: Vec<DBRow>, flush: DBFlush) {
self.write_blocks_nocache(vec![rows], flush);
}

pub fn write_blocks(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, false)
}

pub fn write_blocks_nocache(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, true)
}

#[inline]
fn write_blocks_inner(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush, skip_cache: bool) {
debug!(
"writing {} rows to {:?}, flush={:?}",
rows.len(),
blocks.iter().map(|b| b.len()).sum::<usize>(),
self.db,
flush
);
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
let mut batch = rocksdb::WriteBatch::default();
for row in rows {
batch.put(&row.key, &row.value);
for mut rows in blocks {
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
if !skip_cache
&& self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.with_cache(|cache| {
for row in &rows {
cache.insert(row.key.clone());
batch.put(&row.key, &row.value);
}
});
// Special case: we should tick forward blocks
self.tick_next_block();
} else {
for row in &rows {
batch.put(&row.key, &row.value);
}
}
}
let do_flush = match flush {
DBFlush::Enable => true,
Expand All @@ -247,10 +374,22 @@ impl DB {
}

pub fn put(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
self.db.put(key, value).unwrap();
}

pub fn put_sync(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
self.db.put_opt(key, value, &opts).unwrap();
Expand Down
61 changes: 48 additions & 13 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ impl Store {
}
}

pub fn tick_next_block(&self) {
self.txstore_db.tick_next_block();
self.history_db.tick_next_block();
self.cache_db.tick_next_block();
}

pub fn enable_rollback_cache(&self) {
self.txstore_db.enable_rollbacks();
self.history_db.enable_rollbacks();
self.cache_db.enable_rollbacks();
}

pub fn rollback(&self, count: usize) {
let mut leftover = 0;
leftover += self.txstore_db.rollback(count);
leftover += self.history_db.rollback(count);
leftover += self.cache_db.rollback(count);
if leftover > 0 {
warn!("Rolling back all DB caches missed {count} blocks. Re-orged duplicates might still be active in the DB.")
}
}
pub fn txstore_db(&self) -> &DB {
&self.txstore_db
}
Expand Down Expand Up @@ -273,6 +294,18 @@ impl Indexer {
let tip = daemon.getbestblockhash()?;
let new_headers = self.get_new_headers(&daemon, &tip)?;

// Deal with re-orgs before indexing
let headers_len = {
let mut headers = self.store.indexed_headers.write().unwrap();
let reorged = headers.apply(&new_headers);
assert_eq!(tip, *headers.tip());
// reorg happened
if !reorged.is_empty() {
self.store.rollback(reorged.len());
}
headers.len()
};

let to_add = self.headers_to_add(&new_headers);
debug!(
"adding transactions from {} blocks using {:?}",
Expand Down Expand Up @@ -301,16 +334,20 @@ impl Indexer {
// update the synced tip *after* the new data is flushed to disk
debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip));

let mut headers = self.store.indexed_headers.write().unwrap();
headers.apply(new_headers);
assert_eq!(tip, *headers.tip());
// Ticking cache DB "block every time we update"
// This means that each "block" essentially contains the
// cache updates between each update() call, and we will
// rollback more cache_db than other DBs when rolling back
// but this is just a cache anyway.
// We'd rather not have bad data in the cache.
self.store.cache_db.tick_next_block();

if let FetchFrom::BlkFiles = self.from {
self.store.enable_rollback_cache();
self.from = FetchFrom::Bitcoind;
}

self.tip_metric.set(headers.len() as i64 - 1);
self.tip_metric.set(headers_len as i64 - 1);

Ok(tip)
}
Expand All @@ -324,7 +361,7 @@ impl Indexer {
};
{
let _timer = self.start_timer("add_write");
self.store.txstore_db.write(rows, self.flush);
self.store.txstore_db.write_blocks(rows, self.flush);
}

self.store
Expand Down Expand Up @@ -360,7 +397,7 @@ impl Indexer {
}
index_blocks(blocks, &previous_txos_map, &self.iconfig)
};
self.store.history_db.write(rows, self.flush);
self.store.history_db.write_blocks(rows, self.flush);
}
}

Expand Down Expand Up @@ -820,7 +857,7 @@ impl ChainQuery {
// save updated utxo set to cache
if let Some(lastblock) = lastblock {
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
self.store.cache_db.write_nocache(
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
flush,
);
Expand Down Expand Up @@ -928,7 +965,7 @@ impl ChainQuery {
// save updated stats to cache
if let Some(lastblock) = lastblock {
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write(
self.store.cache_db.write_nocache(
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
flush,
);
Expand Down Expand Up @@ -1256,7 +1293,7 @@ fn load_blockheaders(db: &DB) -> HashMap<BlockHash, BlockHeader> {
.collect()
}

fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRow> {
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<Vec<DBRow>> {
// persist individual transactions:
// T{txid} → {rawtx}
// C{txid}{blockhash}{height} →
Expand Down Expand Up @@ -1284,7 +1321,6 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRo
rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added"
rows
})
.flatten()
.collect()
}

Expand Down Expand Up @@ -1370,7 +1406,7 @@ fn index_blocks(
block_entries: &[BlockEntry],
previous_txos_map: &HashMap<OutPoint, TxOut>,
iconfig: &IndexerConfig,
) -> Vec<DBRow> {
) -> Vec<Vec<DBRow>> {
block_entries
.par_iter() // serialization is CPU-intensive
.map(|b| {
Expand All @@ -1389,7 +1425,6 @@ fn index_blocks(
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
rows
})
.flatten()
.collect()
}

Expand Down
12 changes: 7 additions & 5 deletions src/util/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl HeaderList {
);

let mut headers = HeaderList::empty();
headers.apply(headers.order(headers_chain));
headers.apply(&headers.order(headers_chain));
headers
}

Expand Down Expand Up @@ -155,7 +155,8 @@ impl HeaderList {
.collect()
}

pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
/// Returns any re-orged headers
pub fn apply(&mut self, new_headers: &[HeaderEntry]) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
Expand All @@ -175,21 +176,22 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height
}
None => return,
None => return vec![],
};
debug!(
"applying {} new headers from height {}",
new_headers.len(),
new_height
);
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
let removed = self.headers.split_off(new_height); // keep [0..new_height) entries
for new_header in new_headers {
let height = new_header.height();
assert_eq!(height, self.headers.len());
self.tip = *new_header.hash();
self.headers.push(new_header);
self.headers.push(new_header.clone());
self.heights.insert(self.tip, height);
}
removed
}

pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
Expand Down
Loading