Skip to content

Commit

Permalink
Receive data on threads
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Jun 12, 2024
1 parent cf8c8ca commit dc8ccaf
Showing 1 changed file with 83 additions and 47 deletions.
130 changes: 83 additions & 47 deletions pkg/indexer/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,66 +92,102 @@ func (r *Receiver) worker(ctx context.Context, height uint64) {
blockId := starknetData.BlockID{
Number: &height,
}
var result Result
for {
select {
case <-ctx.Done():
return
default:
}

response, err := r.api.GetBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
var (
result Result
wg sync.WaitGroup
mx = new(sync.Mutex)
)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

for {
select {
case <-ctx.Done():
return
default:
}
r.log.Err(err).Msg("get block request")
time.Sleep(time.Second)
continue
}
result.Block = response
break
}

for {
select {
case <-ctx.Done():
return
default:
response, err := r.api.GetBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Msg("get block request")
time.Sleep(time.Second)
continue
}
mx.Lock()
{
result.Block = response
}
mx.Unlock()
break
}
}(mx, &wg)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

response, err := r.api.TraceBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
for {
select {
case <-ctx.Done():
return
default:
}
r.log.Err(err).Msg("get block traces request")
time.Sleep(time.Second)
continue
}
result.Traces = response
break
}

for {
select {
case <-ctx.Done():
return
default:
response, err := r.api.TraceBlock(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Msg("get block traces request")
time.Sleep(time.Second)
continue
}

mx.Lock()
{
result.Traces = response
}
mx.Unlock()
break
}
}(mx, &wg)

wg.Add(1)
go func(mx *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

response, err := r.getStateUpdate(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
for {
select {
case <-ctx.Done():
return
default:
}

response, err := r.getStateUpdate(ctx, blockId)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.log.Err(err).Msg("state update request")
time.Sleep(time.Second)
continue
}
r.log.Err(err).Msg("state update request")
time.Sleep(time.Second)
continue

mx.Lock()
{
result.StateUpdate = response
}
mx.Unlock()
break
}
result.StateUpdate = response
break
}
}(mx, &wg)

wg.Wait()

r.log.Info().Uint64("height", height).Int64("ms", time.Since(start).Milliseconds()).Msg("received block data")
r.result <- result
Expand Down

0 comments on commit dc8ccaf

Please sign in to comment.