diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 2ffb77dd49..b0b6de8810 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "math" + "sort" "sync" "time" @@ -22,6 +24,8 @@ import ( const TxSearchPerPage = 10 +const MaxNumOfWorkers = 500 + type FilterType byte const ( @@ -281,14 +285,14 @@ type LogFetcher struct { includeSyntheticReceipts bool } -func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) { +func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) { bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics) if crit.BlockHash != nil { block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1) if err != nil { return nil, 0, err } - return f.GetLogsForBlock(ctx, block, crit, bloomIndexes), block.Block.Height, nil + return f.GetLogsForBlock(block, crit, bloomIndexes), block.Block.Height, nil } applyOpenEndedLogLimit := f.filterConfig.maxLog > 0 && (crit.FromBlock == nil || crit.ToBlock == nil) latest := f.ctxProvider(LatestCtxHeight).BlockHeight() @@ -313,26 +317,84 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr if begin > end { return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end) } - blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes) - res := []*ethtypes.Log{} - for _, height := range blockHeights { - h := height - block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) - if err != nil { - return nil, 0, err + + // Parallelize execution + numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin+1))) + var wg sync.WaitGroup + tasksChan := make(chan int64, end-begin+1) + resultsChan := make(chan *ethtypes.Log, end-begin+1) + res = []*ethtypes.Log{} + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("%s", e) } - res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...) - if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { - res = res[:int(f.filterConfig.maxLog)] - break + }() + // Send tasks + go func() { + for height := begin; height <= end; height++ { + if height == 0 { + continue // Skip genesis height + } + tasksChan <- height + } + close(tasksChan) // Close the tasks channel to signal workers + }() + + // Worker function + worker := func() { + defer wg.Done() + for height := range tasksChan { + if len(crit.Addresses) != 0 || len(crit.Topics) != 0 { + providerCtx := f.ctxProvider(height) + blockBloom := f.k.GetBlockBloom(providerCtx) + if !MatchFilters(blockBloom, bloomIndexes) { + continue + } + } + h := height + block, berr := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) + if berr != nil { + panic(berr) + } + matchedLogs := f.GetLogsForBlock(block, crit, bloomIndexes) + for _, log := range matchedLogs { + resultsChan <- log + } } } + // Start workers + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go worker() + } + + // Collect results + go func() { + wg.Wait() + close(resultsChan) // Close the results channel after workers finish + }() + + // Aggregate results into the final slice + for result := range resultsChan { + res = append(res, result) + } + + // Sorting res in ascending order + sort.Slice(res, func(i, j int) bool { + return res[i].BlockNumber < res[j].BlockNumber + }) + + // Apply rate limit + if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { + res = res[:int(f.filterConfig.maxLog)] + } + return res, end, nil } -func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { - possibleLogs := f.FindLogsByBloom(block.Block.Height, filters) +func (f *LogFetcher) GetLogsForBlock(block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { + possibleLogs := f.FindLogsByBloom(block, filters) matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) }) for _, l := range matchedLogs { l.BlockHash = common.Hash(block.BlockID.Hash) @@ -340,25 +402,9 @@ func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.Resul return matchedLogs } -func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) { - //TODO: parallelize - for height := begin; height <= end; height++ { - if height == 0 { - // no block bloom on genesis height - continue - } - ctx := f.ctxProvider(height) - blockBloom := f.k.GetBlockBloom(ctx) - if MatchFilters(blockBloom, filters) { - res = append(res, height) - } - } - return -} - -func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) { +func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) { ctx := f.ctxProvider(LatestCtxHeight) - txHashes := f.k.GetTxHashesOnHeight(ctx, height) + txHashes := f.k.GetTxHashesOnHeight(ctx, block.Block.Height) for _, hash := range txHashes { receipt, err := f.k.GetReceipt(ctx, hash) if err != nil {