diff --git a/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectTransactionsSyncRunner.cs b/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectTransactionsSyncRunner.cs index 2900423..72ddccf 100644 --- a/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectTransactionsSyncRunner.cs +++ b/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectTransactionsSyncRunner.cs @@ -25,74 +25,24 @@ public ProjectInvestmentsSyncRunner(IOptions configuration, IL Delay = TimeSpan.FromMinutes(1); } + private bool CanRunInvestmentsSync() + { + return !( //sync with other runners + !Runner.GlobalState.IndexModeCompleted || + Runner.GlobalState.Blocked || + Runner.GlobalState.ReorgMode || + Runner.GlobalState.StoreTip == null || + Runner.GlobalState.IndexMode); + } public override async Task OnExecute() { - var investmentsInProjectOutputs = await angorMongoDb.ProjectTable.Aggregate(PipelineDefinition.Create( - new[] - { - new BsonDocument("$lookup", - new BsonDocument - { - { "from", "Investment" }, - { "let", new BsonDocument("angorProjectId", "_id") }, - { - "pipeline", new BsonArray - { - new BsonDocument("$match", - new BsonDocument("$expr", - new BsonDocument("$eq", - new BsonArray { "$AngorKey", "$$angorProjectId" }))), - new BsonDocument("$group", - new BsonDocument - { - { "_id", "$AngorKey" }, - { "projectMaxBlockScanned", new BsonDocument("$max", "$BlockIndex") } - }), - new BsonDocument("$project", - new BsonDocument("projectMaxBlockScanned", 1)) - } - }, - { "as", "joinedData" } - }), - new BsonDocument("$unwind", - new BsonDocument { { "path", "$joinedData" }, { "preserveNullAndEmptyArrays", true } }), - new BsonDocument("$project", - new BsonDocument - { - { "AddressOnFeeOutput", 1 }, { "TransactionId", 1 }, { "projectMaxBlockScanned", new BsonDocument("$ifNull", new BsonArray { "$joinedData.projectMaxBlockScanned", 0 }) } - }), - new BsonDocument("$lookup", - new BsonDocument - { - { "from", "Output" }, - { "let", new BsonDocument{{"address" , "$AddressOnFeeOutput" },{"trx", "$TransactionId"},{"projectMaxBlockScanned", "$projectMaxBlockScanned"}}}, - { "pipeline", new BsonArray - { - new BsonDocument("$match", - new BsonDocument("$expr", - new BsonDocument("$eq", - new BsonArray { "$Address", "$$address" }))), - new BsonDocument("$match", - new BsonDocument("$expr", - new BsonDocument("$and", new BsonArray - { - new BsonDocument("$gt", - new BsonArray { "$BlockIndex","$$projectMaxBlockScanned"}), - new BsonDocument("$ne", - new BsonArray { "$$trx","$Outpoint.TransactionId"}) - }))) - } }, - { "as", "o" } - }), - new BsonDocument("$unwind", "$o"), - new BsonDocument("$project", - new BsonDocument - { - { "OutputTransactionId", "$o.Outpoint.TransactionId" }, - { "OutputBlockIndex", "$o.BlockIndex" } - }) - })) + if (!CanRunInvestmentsSync()) + return false; + + var investmentsInProjectOutputs = await angorMongoDb.ProjectTable + .Aggregate(PipelineDefinition + .Create(MongoDbLookupForInvestments())) .ToListAsync(); var investmentTasks = investmentsInProjectOutputs.Select(ValidateAndCreateInvestmentAsync).ToList(); @@ -166,4 +116,90 @@ public override async Task OnExecute() StageOutpoint = stages.Select(x => x.Outpoint).ToList() }; } + + private BsonDocument[] MongoDbLookupForInvestments() + { + return new[] + { + //Left join to investment table on Angor key get max block index to only look an new blocks + new BsonDocument("$lookup", + new BsonDocument + { + { "from", "Investment" }, + { "let", new BsonDocument("angorProjectId", "_id") }, + { + "pipeline", new BsonArray + { + new BsonDocument("$match", + new BsonDocument("$expr", + new BsonDocument("$eq", + new BsonArray { "$AngorKey", "$$angorProjectId" }))), + new BsonDocument("$group", + new BsonDocument + { + { "_id", "$AngorKey" }, + { "projectMaxBlockScanned", new BsonDocument("$max", "$BlockIndex") } + }), + new BsonDocument("$project", + new BsonDocument("projectMaxBlockScanned", 1)) + } + }, + { "as", "joinedData" } + }), + new BsonDocument("$unwind", + new BsonDocument { { "path", "$joinedData" }, { "preserveNullAndEmptyArrays", true } }), + //Only take address transaction id and max block + new BsonDocument("$project", + new BsonDocument + { + { "AddressOnFeeOutput", 1 }, + { "TransactionId", 1 }, + { + "projectMaxBlockScanned", + new BsonDocument("$ifNull", new BsonArray { "$joinedData.projectMaxBlockScanned", 0 }) + } + }), + //Inner join with output on the indexed address and greater than block index and filter by trensaction id + new BsonDocument("$lookup", + new BsonDocument + { + { "from", "Output" }, + { + "let", + new BsonDocument + { + { "address", "$AddressOnFeeOutput" }, + { "trx", "$TransactionId" }, + { "projectMaxBlockScanned", "$projectMaxBlockScanned" } + } + }, + { + "pipeline", new BsonArray + { + new BsonDocument("$match", + new BsonDocument("$expr", + new BsonDocument("$eq", + new BsonArray { "$Address", "$$address" }))), + new BsonDocument("$match", + new BsonDocument("$expr", + new BsonDocument("$and", new BsonArray + { + new BsonDocument("$gt", + new BsonArray { "$BlockIndex", "$$projectMaxBlockScanned" }), + new BsonDocument("$ne", + new BsonArray { "$$trx", "$Outpoint.TransactionId" }) + }))) + } + }, + { "as", "o" } + }), + new BsonDocument("$unwind", "$o"), + // Only take fields needed for performance + new BsonDocument("$project", + new BsonDocument + { + { "OutputTransactionId", "$o.Outpoint.TransactionId" }, { "OutputBlockIndex", "$o.BlockIndex" } + }) + }; + } } diff --git a/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectsSyncRunner.cs b/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectsSyncRunner.cs index 173a1c5..d46e162 100644 --- a/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectsSyncRunner.cs +++ b/src/Blockcore.Indexer.Angor/Sync/SyncTasks/ProjectsSyncRunner.cs @@ -37,11 +37,26 @@ public ProjectsSyncRunner(IOptions configuration, ILogger OnExecute() { + if (!CanRunProjectSync()) + return false; + var blockIndexed = await AngorMongoDb.ProjectTable.EstimatedDocumentCountAsync() > 0 ? AngorMongoDb.ProjectTable.AsQueryable().Max(p => p.BlockIndex) : 0;