Skip to content

Commit

Permalink
Merge pull request #112 from mempool/junderw/fix-reorg-take-2
Browse files Browse the repository at this point in the history
Revert blocks during reorg
  • Loading branch information
wiz authored Jan 19, 2025
2 parents 0ad5a87 + a454994 commit c2992a6
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 41 deletions.
10 changes: 8 additions & 2 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::chain::{BNetwork, BlockHash, Network, Txid};
use crate::elements::peg::{get_pegin_data, get_pegout_data, PeginInfo, PegoutInfo};
use crate::elements::registry::{AssetMeta, AssetRegistry};
use crate::errors::*;
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::schema::{Operation, TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
use crate::util::{bincode_util, full_hash, Bytes, FullHash, TransactionStatus, TxInput};

Expand Down Expand Up @@ -178,11 +178,17 @@ pub fn index_confirmed_tx_assets(
network: Network,
parent_network: BNetwork,
rows: &mut Vec<DBRow>,
op: &Operation,
) {
let (history, issuances) = index_tx_assets(tx, network, parent_network);

rows.extend(history.into_iter().map(|(asset_id, info)| {
asset_history_row(&asset_id, confirmed_height, tx_position, info).into_row()
let history_row = asset_history_row(&asset_id, confirmed_height, tx_position, info);
if let Operation::DeleteBlocksWithHistory(tx) = op {
tx.send(history_row.key.hash)
.expect("unbounded channel won't fail");
}
history_row.into_row()
}));

// the initial issuance is kept twice: once in the history index under I<asset><height><txid:vin>,
Expand Down
9 changes: 9 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ impl DB {
self.db.write_opt(batch, &opts).unwrap();
}

pub fn delete(&self, keys: Vec<Vec<u8>>) {
debug!("deleting {} rows from {:?}", keys.len(), self.db);
for key in keys {
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error while deleting DB row: {err}");
});
}
}

pub fn flush(&self) {
self.db.flush().unwrap();
}
Expand Down
51 changes: 51 additions & 0 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,57 @@ pub struct BlockEntry {

type SizedBlock = (Block, u32);

pub struct SequentialFetcher<T> {
fetcher: Box<dyn FnOnce() -> Vec<Vec<T>>>,
}

impl<T> SequentialFetcher<T> {
fn from<F: FnOnce() -> Vec<Vec<T>> + 'static>(pre_func: F) -> Self {
SequentialFetcher {
fetcher: Box::new(pre_func),
}
}

pub fn map<FN>(self, mut func: FN)
where
FN: FnMut(Vec<T>),
{
for item in (self.fetcher)() {
func(item);
}
}
}

pub fn bitcoind_sequential_fetcher(
daemon: &Daemon,
new_headers: Vec<HeaderEntry>,
) -> Result<SequentialFetcher<BlockEntry>> {
let daemon = daemon.reconnect()?;
Ok(SequentialFetcher::from(move || {
new_headers
.chunks(100)
.map(|entries| {
let blockhashes: Vec<BlockHash> = entries.iter().map(|e| *e.hash()).collect();
let blocks = daemon
.getblocks(&blockhashes)
.expect("failed to get blocks from bitcoind");
assert_eq!(blocks.len(), entries.len());
let block_entries: Vec<BlockEntry> = blocks
.into_iter()
.zip(entries)
.map(|(block, entry)| BlockEntry {
entry: entry.clone(), // TODO: remove this clone()
size: block.size() as u32,
block,
})
.collect();
assert_eq!(block_entries.len(), entries.len());
block_entries
})
.collect()
}))
}

pub struct Fetcher<T> {
receiver: crossbeam_channel::Receiver<T>,
thread: thread::JoinHandle<()>,
Expand Down
Loading

0 comments on commit c2992a6

Please sign in to comment.