Skip to content

Commit

Permalink
Merge branch 'rc/v1.7.0' into extend-miniblocks-and-operations-index
Browse files Browse the repository at this point in the history
  • Loading branch information
miiu96 authored Sep 13, 2024
2 parents 07e198d + 9091e78 commit ec4b31d
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 68 deletions.
41 changes: 40 additions & 1 deletion client/elasticClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,14 @@ func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body *
log.Warn("elasticClient.doRefresh", "cannot do refresh", err)
}

writeIndex, err := ec.getWriteIndex(index)
if err != nil {
log.Warn("elasticClient.getWriteIndex", "cannot do get write index", err)
return err
}

res, err := ec.client.DeleteByQuery(
[]string{index},
[]string{writeIndex},
body,
ec.client.DeleteByQuery.WithIgnoreUnavailable(true),
ec.client.DeleteByQuery.WithConflicts(esConflictsPolicy),
Expand Down Expand Up @@ -323,6 +329,39 @@ func (ec *elasticClient) createAlias(alias string, index string) error {
return parseResponse(res, nil, elasticDefaultErrorResponseHandler)
}

func (ec *elasticClient) getWriteIndex(alias string) (string, error) {
res, err := ec.client.Indices.GetAlias(
ec.client.Indices.GetAlias.WithIndex(alias),
)
if err != nil {
return "", err
}

var indexData map[string]struct {
Aliases map[string]struct {
IsWriteIndex bool `json:"is_write_index"`
} `json:"aliases"`
}
err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler)
if err != nil {
return "", err
}

for index, details := range indexData {
if len(indexData) == 1 {
return index, nil
}

for _, indexAlias := range details.Aliases {
if indexAlias.IsWriteIndex {
return index, nil
}
}
}

return alias, nil
}

// UpdateByQuery will update all the documents that match the provided query from the provided index
func (ec *elasticClient) UpdateByQuery(ctx context.Context, index string, buff *bytes.Buffer) error {
reader := bytes.NewReader(buff.Bytes())
Expand Down
52 changes: 50 additions & 2 deletions client/elasticClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

import (
"context"
"io/ioutil"
"io"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
jsonFile, err := os.Open("./testsData/response-multi-get.json")
require.Nil(t, err)

byteValue, _ := ioutil.ReadAll(jsonFile)
byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

Expand All @@ -75,3 +75,51 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
_, ok := resMap["docs"]
require.True(t, ok)
}

func TestElasticClient_GetWriteIndexMultipleIndicesBehind(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("blocks")
require.Nil(t, err)
require.Equal(t, "blocks-000004", res)
}

func TestElasticClient_GetWriteIndexOneIndex(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias-only-one-index.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("delegators")
require.Nil(t, err)
require.Equal(t, "delegators-000001", res)
}
7 changes: 7 additions & 0 deletions client/testsData/response-get-alias-only-one-index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"delegators-000001" : {
"aliases" : {
"delegators" : { }
}
}
}
30 changes: 30 additions & 0 deletions client/testsData/response-get-alias.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"blocks-000003": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000004": {
"aliases": {
"blocks": {
"is_write_index": true
}
}
},
"blocks-000002": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000001": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
}
}
2 changes: 2 additions & 0 deletions data/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ type PreparedLogsResults struct {
TokensInfo []*TokenInfo
NFTsDataUpdates []*NFTDataUpdate
TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties
DBLogs []*Logs
DBEvents []*LogEvent
}
2 changes: 1 addition & 1 deletion process/elasticproc/block/blockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func appendBlockDetailsFromHeaders(block *data.Block, header coreData.HeaderHand

func appendBlockDetailsFromIntraShardMbs(block *data.Block, intraShardMbs []*block.MiniBlock, pool *outport.TransactionPool, offset int) {
for idx, intraMB := range intraShardMbs {
if intraMB.Type == nodeBlock.PeerBlock {
if intraMB.Type == nodeBlock.PeerBlock || intraMB.Type == nodeBlock.ReceiptBlock {
continue
}

Expand Down
15 changes: 7 additions & 8 deletions process/elasticproc/elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,12 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader
return err
}

err = ei.prepareAndIndexLogs(obh.TransactionPool.Logs, headerTimestamp, buffers, obh.ShardID)
err = ei.indexLogs(logsData.DBLogs, buffers)
if err != nil {
return err
}

err = ei.indexEvents(logsData.DBEvents, buffers)
if err != nil {
return err
}
Expand Down Expand Up @@ -517,13 +522,7 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]*
return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex)
}

func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice, shardID uint32) error {
logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID)
err := ei.indexEvents(eventsDB, buffSlice)
if err != nil {
return err
}

func (ei *elasticProcessor) indexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error {
if !ei.isIndexEnabled(elasticIndexer.LogsIndex) {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion process/elasticproc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type DBValidatorsHandler interface {

// DBLogsAndEventsHandler defines the actions that a logs and events handler should do
type DBLogsAndEventsHandler interface {
PrepareLogsForDB(logsAndEvents []*outport.LogData, timestamp uint64, shardID uint32) ([]*data.Logs, []*data.LogEvent)
ExtractDataFromLogs(
logsAndEvents []*outport.LogData,
preparedResults *data.PreparedResults,
Expand Down
Loading

0 comments on commit ec4b31d

Please sign in to comment.