Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert blocks during reorg #112

Open
wants to merge 5 commits into
base: mempool
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::chain::{BNetwork, BlockHash, Network, Txid};
use crate::elements::peg::{get_pegin_data, get_pegout_data, PeginInfo, PegoutInfo};
use crate::elements::registry::{AssetMeta, AssetRegistry};
use crate::errors::*;
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::schema::{Operation, TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
use crate::util::{bincode_util, full_hash, Bytes, FullHash, TransactionStatus, TxInput};

Expand Down Expand Up @@ -178,11 +178,17 @@ pub fn index_confirmed_tx_assets(
network: Network,
parent_network: BNetwork,
rows: &mut Vec<DBRow>,
op: &Operation,
) {
let (history, issuances) = index_tx_assets(tx, network, parent_network);

rows.extend(history.into_iter().map(|(asset_id, info)| {
asset_history_row(&asset_id, confirmed_height, tx_position, info).into_row()
let history_row = asset_history_row(&asset_id, confirmed_height, tx_position, info);
junderw marked this conversation as resolved.
Show resolved Hide resolved
if let Operation::DeleteBlocksWithHistory(tx) = op {
tx.send(history_row.key.hash)
.expect("unbounded channel won't fail");
}
history_row.into_row()
}));

// the initial issuance is kept twice: once in the history index under I<asset><height><txid:vin>,
Expand Down
9 changes: 9 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ impl DB {
self.db.write_opt(batch, &opts).unwrap();
}

pub fn delete(&self, keys: Vec<Vec<u8>>) {
debug!("deleting {} rows from {:?}", keys.len(), self.db);
for key in keys {
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error while deleting DB row: {err}");
});
}
}

pub fn flush(&self) {
self.db.flush().unwrap();
}
Expand Down
168 changes: 134 additions & 34 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,22 @@ pub struct ChainQuery {
network: Network,
}

#[derive(Debug, Clone)]
pub enum Operation {
AddBlocks,
DeleteBlocks,
DeleteBlocksWithHistory(crossbeam_channel::Sender<[u8; 32]>),
}

impl std::fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Operation::AddBlocks => "Adding",
Operation::DeleteBlocks | Operation::DeleteBlocksWithHistory(_) => "Deleting",
})
}
}

// TODO: &[Block] should be an iterator / a queue.
impl Indexer {
pub fn open(store: Arc<Store>, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self {
Expand Down Expand Up @@ -268,18 +284,68 @@ impl Indexer {
Ok(result)
}

fn reorg(&self, reorged: Vec<HeaderEntry>, daemon: &Daemon) -> Result<()> {
if reorged.len() > 10 {
warn!(
"reorg of over 10 blocks ({}) detected! Wonky stuff might happen!",
reorged.len()
);
}
// This channel holds a Vec of [u8; 32] scripts found in the blocks (with duplicates)
// if we reorg the whole mainnet chain it should come out to about 145 GB of memory.
let (tx, rx) = crossbeam_channel::unbounded();
// Delete txstore
start_fetcher(self.from, daemon, reorged.clone())?
.map(|blocks| self.add(&blocks, Operation::DeleteBlocks));
// Delete history_db
start_fetcher(self.from, daemon, reorged)?
.map(|blocks| self.index(&blocks, Operation::DeleteBlocksWithHistory(tx.clone())));
// All senders must be dropped for receiver iterator to finish
drop(tx);

// All senders are dropped by now, so the receiver will iterate until the
// end of the unbounded queue.
let scripts = rx.into_iter().collect::<HashSet<_>>();
for script in scripts {
// cancel the script cache DB for these scripts. They might get incorrect data mixed in.
self.store.cache_db.delete(vec![
junderw marked this conversation as resolved.
Show resolved Hide resolved
StatsCacheRow::key(&script),
UtxoCacheRow::key(&script),
#[cfg(feature = "liquid")]
[b"z", &script[..]].concat(), // asset cache key
]);
}
Ok(())
}

pub fn update(&mut self, daemon: &Daemon) -> Result<BlockHash> {
let daemon = daemon.reconnect()?;
let tip = daemon.getbestblockhash()?;
let new_headers = self.get_new_headers(&daemon, &tip)?;

// Must rollback blocks before rolling forward
let headers_len = {
let mut headers = self.store.indexed_headers.write().unwrap();
let reorged = headers.apply(new_headers.clone());
assert_eq!(tip, *headers.tip());
let headers_len = headers.len();
drop(headers);

if !reorged.is_empty() {
self.reorg(reorged, &daemon)?;
}

headers_len
};

let to_add = self.headers_to_add(&new_headers);
debug!(
"adding transactions from {} blocks using {:?}",
to_add.len(),
self.from
);
start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks));
start_fetcher(self.from, &daemon, to_add)?
.map(|blocks| self.add(&blocks, Operation::AddBlocks));
self.start_auto_compactions(&self.store.txstore_db);

let to_index = self.headers_to_index(&new_headers);
Expand All @@ -288,7 +354,8 @@ impl Indexer {
to_index.len(),
self.from
);
start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks));
start_fetcher(self.from, &daemon, to_index)?
.map(|blocks| self.index(&blocks, Operation::AddBlocks));
self.start_auto_compactions(&self.store.history_db);

if let DBFlush::Disable = self.flush {
Expand All @@ -302,65 +369,87 @@ impl Indexer {
debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip));

let mut headers = self.store.indexed_headers.write().unwrap();
headers.apply(new_headers);
assert_eq!(tip, *headers.tip());

if let FetchFrom::BlkFiles = self.from {
self.from = FetchFrom::Bitcoind;
}

self.tip_metric.set(headers.len() as i64 - 1);
self.tip_metric.set(headers_len as i64 - 1);

Ok(tip)
}

fn add(&self, blocks: &[BlockEntry]) {
debug!("Adding {} blocks to Indexer", blocks.len());
fn add(&self, blocks: &[BlockEntry], op: Operation) {
debug!("{} {} blocks to Indexer", op, blocks.len());
let write_label = match &op {
Operation::AddBlocks => "add_write",
_ => "delete_write",
};

// TODO: skip orphaned blocks?
let rows = {
let _timer = self.start_timer("add_process");
add_blocks(blocks, &self.iconfig)
};
{
let _timer = self.start_timer("add_write");
self.store.txstore_db.write(rows, self.flush);
let _timer = self.start_timer(write_label);
if let Operation::AddBlocks = op {
self.store.txstore_db.write(rows, self.flush);
} else {
self.store
.txstore_db
.delete(rows.into_iter().map(|r| r.key).collect());
}
}

self.store
.added_blockhashes
.write()
.unwrap()
.extend(blocks.iter().map(|b| {
if b.entry.height() % 10_000 == 0 {
info!("Tx indexing is up to height={}", b.entry.height());
}
b.entry.hash()
}));
if let Operation::AddBlocks = op {
self.store
.added_blockhashes
.write()
.unwrap()
.extend(blocks.iter().map(|b| {
if b.entry.height() % 10_000 == 0 {
info!("Tx indexing is up to height={}", b.entry.height());
}
b.entry.hash()
}));
} else {
let mut added_blockhashes = self.store.added_blockhashes.write().unwrap();
for b in blocks {
added_blockhashes.remove(b.entry.hash());
}
}
}

fn index(&self, blocks: &[BlockEntry]) {
debug!("Indexing {} blocks with Indexer", blocks.len());
fn index(&self, blocks: &[BlockEntry], op: Operation) {
debug!("Indexing ({}) {} blocks with Indexer", op, blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
};
let rows = {
let _timer = self.start_timer("index_process");
let added_blockhashes = self.store.added_blockhashes.read().unwrap();
for b in blocks {
if b.entry.height() % 10_000 == 0 {
info!("History indexing is up to height={}", b.entry.height());
}
let blockhash = b.entry.hash();
// TODO: replace by lookup into txstore_db?
if !added_blockhashes.contains(blockhash) {
panic!("cannot index block {} (missing from store)", blockhash);
if let Operation::AddBlocks = op {
let added_blockhashes = self.store.added_blockhashes.read().unwrap();
for b in blocks {
if b.entry.height() % 10_000 == 0 {
info!("History indexing is up to height={}", b.entry.height());
}
let blockhash = b.entry.hash();
// TODO: replace by lookup into txstore_db?
if !added_blockhashes.contains(blockhash) {
panic!("cannot index block {} (missing from store)", blockhash);
}
}
}
index_blocks(blocks, &previous_txos_map, &self.iconfig)
index_blocks(blocks, &previous_txos_map, &self.iconfig, &op)
};
self.store.history_db.write(rows, self.flush);
if let Operation::AddBlocks = op {
self.store.history_db.write(rows, self.flush);
} else {
self.store
.history_db
.delete(rows.into_iter().map(|r| r.key).collect());
}
}
}

Expand Down Expand Up @@ -1370,6 +1459,7 @@ fn index_blocks(
block_entries: &[BlockEntry],
previous_txos_map: &HashMap<OutPoint, TxOut>,
iconfig: &IndexerConfig,
op: &Operation,
) -> Vec<DBRow> {
block_entries
.par_iter() // serialization is CPU-intensive
Expand All @@ -1384,6 +1474,7 @@ fn index_blocks(
previous_txos_map,
&mut rows,
iconfig,
op,
);
}
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
Expand All @@ -1401,13 +1492,19 @@ fn index_transaction(
previous_txos_map: &HashMap<OutPoint, TxOut>,
rows: &mut Vec<DBRow>,
iconfig: &IndexerConfig,
op: &Operation,
) {
// persist history index:
// H{funding-scripthash}{spending-height}{spending-block-pos}S{spending-txid:vin}{funding-txid:vout} → ""
// H{funding-scripthash}{funding-height}{funding-block-pos}F{funding-txid:vout} → ""
// persist "edges" for fast is-this-TXO-spent check
// S{funding-txid:vout}{spending-txid:vin} → ""
let txid = full_hash(&tx.txid()[..]);
let script_callback = |script_hash| {
if let Operation::DeleteBlocksWithHistory(tx) = op {
tx.send(script_hash).expect("unbounded channel won't fail");
}
};
for (txo_index, txo) in tx.output.iter().enumerate() {
if is_spendable(txo) || iconfig.index_unspendables {
let history = TxHistoryRow::new(
Expand All @@ -1420,6 +1517,7 @@ fn index_transaction(
value: txo.value,
}),
);
script_callback(history.key.hash);
rows.push(history.into_row());

if iconfig.address_search {
Expand Down Expand Up @@ -1449,6 +1547,7 @@ fn index_transaction(
value: prev_txo.value,
}),
);
script_callback(history.key.hash);
rows.push(history.into_row());

let edge = TxEdgeRow::new(
Expand All @@ -1469,6 +1568,7 @@ fn index_transaction(
iconfig.network,
iconfig.parent_network,
rows,
op,
);
}

Expand Down
9 changes: 6 additions & 3 deletions src/util/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ impl HeaderList {
.collect()
}

pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
/// Returns any rolled back blocks in order from old tip first and first block in the fork is last
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
Expand All @@ -175,21 +176,23 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height
}
None => return,
None => return vec![],
};
debug!(
"applying {} new headers from height {}",
new_headers.len(),
new_height
);
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
let mut removed = self.headers.split_off(new_height); // keep [0..new_height) entries
for new_header in new_headers {
let height = new_header.height();
assert_eq!(height, self.headers.len());
self.tip = *new_header.hash();
self.headers.push(new_header);
self.heights.insert(self.tip, height);
}
removed.reverse();
removed
}

pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
Expand Down
Loading