Skip to content

Commit

Permalink
Add extra delete and some comments, also fetch better the last blolck…
Browse files Browse the repository at this point in the history
… query from memory not disk (#106)
  • Loading branch information
dangershony authored Feb 15, 2022
1 parent 4e2c281 commit 37a33c8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
21 changes: 17 additions & 4 deletions src/Blockcore.Indexer.Core/Storage/Mongo/BlockRewindOperation.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -62,18 +63,26 @@ public static async Task RewindBlockAsync(this MongoData storage, uint blockInde
// delete computed history
FilterDefinition<AddressHistoryComputedTable> addrCompHistFilter =
Builders<AddressHistoryComputedTable>.Filter.Eq(addr => addr.BlockIndex, blockIndex);
Task<DeleteResult> addressHistoryComputed =
storage.AddressHistoryComputedTable.DeleteManyAsync(addrCompHistFilter);
Task<DeleteResult> addressHistoryComputed = storage.AddressHistoryComputedTable.DeleteManyAsync(addrCompHistFilter);

await Task.WhenAll( output, transactions, addressComputed, addressHistoryComputed);
// this is an edge case, we delete from the utxo table in case a bath push failed half way and left
// item in the utxo table that where suppose to get deleted, to avoid duplicates in recovery processes
// we delete just in case (the utxo table has a unique key on outputs), there is no harm in deleting twice.
FilterDefinition<UnspentOutputTable> unspentOutputFilter1 =
Builders<UnspentOutputTable>.Filter.Eq(utxo => utxo.BlockIndex, blockIndex);
Task<DeleteResult> unspentOutput1 = storage.UnspentOutputTable.DeleteManyAsync(unspentOutputFilter1);

await Task.WhenAll( output, transactions, addressComputed, addressHistoryComputed, unspentOutput1);

await MergeRewindInputsToUnspentTransactionsAsync(storage, blockIndex);

FilterDefinition<InputTable> inputFilter =
Builders<InputTable>.Filter.Eq(addr => addr.BlockIndex, blockIndex);

var inputs = storage.InputTable.DeleteManyAsync(inputFilter);
Task<DeleteResult> inputs = storage.InputTable.DeleteManyAsync(inputFilter);

// TODO: if we filtered out outputs that where created and spent as part of the same block
// we may not need to delete again, however there is no harm in this extra delete.
FilterDefinition<UnspentOutputTable> unspentOutputFilter =
Builders<UnspentOutputTable>.Filter.Eq(utxo => utxo.BlockIndex, blockIndex);
Task<DeleteResult> unspentOutput = storage.UnspentOutputTable.DeleteManyAsync(unspentOutputFilter);
Expand Down Expand Up @@ -208,6 +217,10 @@ private static async Task MergeRewindInputsToUnspentTransactionsAsync(MongoData
// this is to unsure the values are unique
unspentOutputs.ToDictionary(a => a.Outpoint.ToString());

// TODO: filter out any outputs that belong to the block being reorged.
// this can happen for outputs that are created and spent in the same block.
// if they get pushed now such outputs willjust get deleted in the next step.

if (unspentOutputs.Any())
await storage.UnspentOutputTable.InsertManyAsync(unspentOutputs);
}
Expand Down
11 changes: 9 additions & 2 deletions src/Blockcore.Indexer.Core/Storage/Mongo/MongoData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,15 @@ public QueryResult<SyncBlockInfo> Blocks(int? offset, int limit)

public SyncBlockInfo GetLatestBlock()
{
SyncBlockInfo current = Blocks(null, 1).Items.FirstOrDefault();
return current;
if (globalState.StoreTip != null)
return globalState.StoreTip;

BlockTable recentBlock = BlockTable.AsQueryable().OrderByDescending(a => a.BlockIndex).FirstOrDefault();

if (recentBlock == null)
return null;

return mongoBlockToStorageBlock.Map(recentBlock);
}

public SyncBlockInfo BlockByIndex(long blockIndex)
Expand Down
17 changes: 11 additions & 6 deletions src/Blockcore.Indexer.Core/Storage/Mongo/MongoStorageOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void AddToStorageBatch(StorageBatch storageBatch, SyncBlockTransactionsOp
TransactionId = input.PrevOut.Hash.ToString(), OutputIndex = (int)input.PrevOut.N
};

storageBatch.OutputTable.TryGetValue(outpoint.ToString(), out OutputTable output);
storageBatch.OutputTable.TryGetValue(outpoint.ToString(), out OutputTable output);

storageBatch.InputTable.Add(new InputTable()
{
Expand Down Expand Up @@ -170,15 +170,17 @@ public SyncBlockInfo PushStorageBatch(StorageBatch storageBatch)
}
});



var utxos = new List<UnspentOutputTable>(storageBatch.OutputTable.Values.Count);

foreach (OutputTable outputTable in storageBatch.OutputTable.Values)
{
if (outputTable.Address.Equals(OpReturnAddress))
continue;

// TODO: filter out outputs that are already spent in the storageBatch.InputTable table
// such inputs will get deleted anyway in the next operation of UnspentOutputTable.DeleteMany
// this means we should probably make the storageBatch.InputTable a dictionary as well.

utxos.Add(new UnspentOutputTable
{
Address = outputTable.Address,
Expand Down Expand Up @@ -214,12 +216,15 @@ public SyncBlockInfo PushStorageBatch(StorageBatch storageBatch)

Task.WaitAll(blockTableTask, transactionBlockTableTask, outputTableTask, inputTableTask, transactionTableTask, unspentOutputTableTask);

var outpointsFromNewInput = storageBatch.InputTable
if (storageBatch.InputTable.Any())
{
// TODO: if earlier we filtered out outputs that are already spent and not pushed to the utxo table
// now we do not need to try and delete such outputs becuase they where never pushed to the store.

var outpointsFromNewInput = storageBatch.InputTable
.Select(_ => _.Outpoint)
.ToList();

if (outpointsFromNewInput.Any())
{
var filterToDelete = Builders<UnspentOutputTable>.Filter
.Where(_ => outpointsFromNewInput.Contains(_.Outpoint));

Expand Down

0 comments on commit 37a33c8

Please sign in to comment.