Skip to content

Commit

Permalink
Add support for new events introduced at v0.6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
fernandofcampos committed Nov 8, 2024
1 parent 03bd135 commit 89894ec
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 25 deletions.
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ RUN mkdir -p /usr/local/bin/previous/v2 && \
mkdir -p /usr/local/bin/previous/v3 && \
mkdir -p /usr/local/bin/previous/v4 && \
mkdir -p /usr/local/bin/previous/v5 && \
mkdir -p /usr/local/bin/previous/v6 && \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.2.14/allorad_linux_${TARGETARCH} -o /usr/local/bin/previous/v2/allorad; \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.3.0/allorad_linux_${TARGETARCH} -o /usr/local/bin/previous/v3/allorad; \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.4.0/allorad_linux_${TARGETARCH} -o /usr/local/bin/previous/v4/allorad; \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.5.0/allorad_linux_${TARGETARCH} -o /usr/local/bin/previous/v5/allorad; \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.6.0/allorad_linux_${TARGETARCH} -o /usr/local/bin/allorad; \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.6.0/allorad_linux_${TARGETARCH} -o /usr/local/bin/previous/v6/allorad && \
curl -L https://github.com/allora-network/allora-chain/releases/download/v0.6.2/allorad_linux_${TARGETARCH} -o /usr/local/bin/allorad && \
chmod -R 777 /usr/local/bin/allorad && \
chmod -R 777 /usr/local/bin/previous/v2/allorad && \
chmod -R 777 /usr/local/bin/previous/v3/allorad && \
chmod -R 777 /usr/local/bin/previous/v4/allorad && \
chmod -R 777 /usr/local/bin/previous/v5/allorad
chmod -R 777 /usr/local/bin/previous/v5/allorad && \
chmod -R 777 /usr/local/bin/previous/v6/allorad

COPY --from=gobuilder /src/allora-indexer /usr/local/bin/allora-indexer
# EXPOSE 8080
Expand Down
322 changes: 322 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ const (
TB_TOPIC_FORECASTING_SCORES = "topic_forecasting_scores"
TB_ECOSYSTEM_TOKEN_MINT = "ecosystem_token_mint"
TB_REWARD_CURRENT_BLOCK_EMISSION = "reward_current_block_emission"
TB_LISTENING_COEFFICIENTS = "listening_coefficients"
TB_INFERER_NETWORK_REGRET = "inferer_network_regret"
TB_FORECASTER_NETWORK_REGRET = "forecaster_network_regret"
TB_NAIVE_INFERER_NETWORK_REGRET = "naive_inferer_network_regret"
TB_TOPIC_INITIAL_REGRET = "topic_initial_regret"
)

var dbPool *pgxpool.Pool //*pgx.Conn
Expand Down Expand Up @@ -521,6 +526,42 @@ func createEventsTablesSQL() string {
block_height BIGINT,
token_amount NUMERIC(72,18)
);
CREATE TABLE IF NOT EXISTS ` + TB_LISTENING_COEFFICIENTS + ` (
id SERIAL PRIMARY KEY,
actor_type INT,
topic_id BIGINT,
block_height BIGINT,
addresses TEXT[],
coefficients NUMERIC(72,18)[]
);
CREATE TABLE IF NOT EXISTS ` + TB_INFERER_NETWORK_REGRET + ` (
id SERIAL PRIMARY KEY,
height_tx BIGINT,
addresses TEXT[],
regrets NUMERIC(72,18)[]
);
CREATE TABLE IF NOT EXISTS ` + TB_FORECASTER_NETWORK_REGRET + ` (
id SERIAL PRIMARY KEY,
height_tx BIGINT,
addresses TEXT[],
regrets NUMERIC(72,18)[]
);
CREATE TABLE IF NOT EXISTS ` + TB_NAIVE_INFERER_NETWORK_REGRET + ` (
id SERIAL PRIMARY KEY,
height_tx BIGINT,
addresses TEXT[],
regrets NUMERIC(72,18)[]
);
CREATE TABLE IF NOT EXISTS ` + TB_TOPIC_INITIAL_REGRET + ` (
id SERIAL PRIMARY KEY,
height_tx BIGINT,
regret NUMERIC(72,18)
);
`
}

Expand Down Expand Up @@ -688,6 +729,26 @@ func isRewardCurrentBlockEmissionEvent(event EventRecord) bool {
return isEventType(event.Type, "mint.v", "EventRewardCurrentBlockEmission")
}

func isListeningCoefficientsEvent(event EventRecord) bool {
return isEventType(event.Type, "emissions.v", "EventListeningCoefficientsSet")
}

func isInfererNetworkRegretEvent(event EventRecord) bool {
return isEventType(event.Type, "emissions.v", "EventInfererNetworkRegretSet")
}

func isForecasterNetworkRegretEvent(event EventRecord) bool {
return isEventType(event.Type, "emissions.v", "EventForecasterNetworkRegretSet")
}

func isNaiveInfererNetworkRegretEvent(event EventRecord) bool {
return isEventType(event.Type, "emissions.v", "EventNaiveInfererNetworkRegretSet")
}

func isTopicInitialRegretEvent(event EventRecord) bool {
return isEventType(event.Type, "emissions.v", "EventTopicInitialRegretSet")
}

func insertEvents(events []EventRecord) error {
var scoreEvents []EventRecord
var rewardEvents []EventRecord
Expand All @@ -699,6 +760,11 @@ func insertEvents(events []EventRecord) error {
var tokenomicsEvents []EventRecord
var ecosystemTokenMintEvents []EventRecord
var rewardCurrentBlockEmissionEvents []EventRecord
var listeningCoefficientsEvents []EventRecord
var infererNetworkRegretEvents []EventRecord
var forecasterNetworkRegretEvents []EventRecord
var naiveInfererNetworkRegretEvents []EventRecord
var topicInitialRegretEvents []EventRecord
// For inserting events in batch:
var insertStatements []string
var values []interface{}
Expand Down Expand Up @@ -726,6 +792,16 @@ func insertEvents(events []EventRecord) error {
ecosystemTokenMintEvents = append(ecosystemTokenMintEvents, event) // Function to check if it's an ecosystem token mint event
} else if isRewardCurrentBlockEmissionEvent(event) {
rewardCurrentBlockEmissionEvents = append(rewardCurrentBlockEmissionEvents, event) // Function to check if it's a reward current block emission event
} else if isListeningCoefficientsEvent(event) {
listeningCoefficientsEvents = append(listeningCoefficientsEvents, event) // Function to check if it's a listening coefficients event
} else if isInfererNetworkRegretEvent(event) {
infererNetworkRegretEvents = append(infererNetworkRegretEvents, event) // Function to check if it's an inferer network regret event
} else if isForecasterNetworkRegretEvent(event) {
forecasterNetworkRegretEvents = append(forecasterNetworkRegretEvents, event) // Function to check if it's a forecaster network regret event
} else if isNaiveInfererNetworkRegretEvent(event) {
naiveInfererNetworkRegretEvents = append(naiveInfererNetworkRegretEvents, event) // Function to check if it's a naive inferer network regret event
} else if isTopicInitialRegretEvent(event) {
topicInitialRegretEvents = append(topicInitialRegretEvents, event) // Function to check if it's a topic initial regret event
} else {
log.Info().Msg("Unrecognized event, ignoring")
continue
Expand Down Expand Up @@ -836,6 +912,46 @@ func insertEvents(events []EventRecord) error {
log.Error().Err(err).Msg("failed to insert reward current block emission")
}
}

// Insert listening coefficients if any
if len(listeningCoefficientsEvents) > 0 {
err := insertListeningCoefficients(listeningCoefficientsEvents)
if err != nil {
log.Error().Err(err).Msg("failed to insert listening coefficients")
}
}

// Insert inferer network regret if any
if len(infererNetworkRegretEvents) > 0 {
err := insertInfererNetworkRegret(infererNetworkRegretEvents)
if err != nil {
log.Error().Err(err).Msg("failed to insert inferer network regret")
}
}

// Insert forecaster network regret if any
if len(forecasterNetworkRegretEvents) > 0 {
err := insertForecasterNetworkRegret(forecasterNetworkRegretEvents)
if err != nil {
log.Error().Err(err).Msg("failed to insert forecaster network regret")
}
}

// Insert naive inferer network regret if any
if len(naiveInfererNetworkRegretEvents) > 0 {
err := insertNaiveInfererNetworkRegret(naiveInfererNetworkRegretEvents)
if err != nil {
log.Error().Err(err).Msg("failed to insert naive inferer network regret")
}
}

// Insert topic initial regret if any
if len(topicInitialRegretEvents) > 0 {
err := insertTopicInitialRegret(topicInitialRegretEvents)
if err != nil {
log.Error().Err(err).Msg("failed to insert topic initial regret")
}
}
return nil
}

Expand Down Expand Up @@ -1513,6 +1629,212 @@ func insertRewardCurrentBlockEmission(events []EventRecord) error {
return nil
}

func insertListeningCoefficients(events []EventRecord) error { // TODO: Implement
log.Info().Msg("Inserting listening coefficients")
var insertStatements []string
var values []interface{}

placeholderCounter := 1 // Placeholder index starts at 1 in PostgreSQL
for _, event := range events {
log.Trace().Interface("Event listening coefficients", event).Msg("Processing event listening coefficients")
var attributes []Attribute
err := json.Unmarshal(event.Data, &attributes)
if err != nil {
return fmt.Errorf("failed to unmarshal event data: %w", err)
}

var actorType int
var topicID int64
var blockHeight uint64
var addresses []string
var coefficients []big.Float
for _, attr := range attributes {
switch attr.Key {
case "actor_type":
actorType, err = strconv.Atoi(strings.Trim(attr.Value, "\""))
if err != nil {
return fmt.Errorf("failed to convert actor_type to int: %w", err)
}
case "topic_id":
topicID, err = strconv.ParseInt(strings.Trim(attr.Value, "\""), 10, 64)
if err != nil {
return fmt.Errorf("failed to convert topic_id to int: %w", err)
}
case "block_height":
cleanedValue := strings.Trim(attr.Value, "\"")
blockHeight, err = strconv.ParseUint(cleanedValue, 10, 64)
if err != nil {
return fmt.Errorf("failed to convert block_height to int: %w", err)
}
case "addresses":
addresses = strings.Split(strings.Trim(attr.Value, "[]"), ",")
case "coefficients":
var rawCoefficients []string
err = json.Unmarshal([]byte(attr.Value), &rawCoefficients)
if err != nil {
return fmt.Errorf("failed to unmarshal coefficients: %w", err)
}
for _, rawCoefficient := range rawCoefficients {
coefficient := new(big.Float)
_, ok := coefficient.SetString(rawCoefficient)
if !ok {
return fmt.Errorf("failed to convert coefficient to big.Float: %w", err)
}
coefficients = append(coefficients, *coefficient)
}
}
}
newStmt := fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", placeholderCounter, placeholderCounter+1, placeholderCounter+2, placeholderCounter+3, placeholderCounter+4)
insertStatements = append(insertStatements, newStmt)
values = append(values, actorType, topicID, blockHeight, addresses, coefficients)
placeholderCounter += 5 // Increase counter for next row
}

if len(insertStatements) > 0 {
sqlStatement := fmt.Sprintf(`
INSERT INTO %s (height_tx, actor_type, topic_id, block_height, addresses, coefficients)
VALUES %s`, TB_LISTENING_COEFFICIENTS, strings.Join(insertStatements, ","))
_, err := dbPool.Exec(context.Background(), sqlStatement, values...)
if err != nil {
return fmt.Errorf("failed to insert listening coefficients event")
}
} else {
log.Info().Msg("No listening coefficients event to insert")
}
return nil
}

func insertNetworkRegret(events []EventRecord, tableName string) error { // TODO: Implement
log.Info().Msg("Inserting network regret")
var insertStatements []string
var values []interface{}

placeholderCounter := 1 // Placeholder index starts at 1 in PostgreSQL
for _, event := range events {
log.Trace().Interface("Event network regret", event).Msg("Processing event network regret")
var attributes []Attribute
err := json.Unmarshal(event.Data, &attributes)
if err != nil {
return fmt.Errorf("failed to unmarshal event data: %w", err)
}

var heightTx uint64
var addresses []string
var regrets []big.Float
for _, attr := range attributes {
switch attr.Key {
case "height_tx":
cleanedValue := strings.Trim(attr.Value, "\"")
heightTx, err = strconv.ParseUint(cleanedValue, 10, 64)
if err != nil {
return fmt.Errorf("failed to convert height_tx to int: %w", err)
}
case "addresses":
addresses = strings.Split(strings.Trim(attr.Value, "[]"), ",")
case "regrets":
var rawRegrets []string
err = json.Unmarshal([]byte(attr.Value), &rawRegrets)
if err != nil {
return fmt.Errorf("failed to unmarshal regrets: %w", err)
}
for _, rawRegret := range rawRegrets {
regret := new(big.Float)
_, ok := regret.SetString(rawRegret)
if !ok {
return fmt.Errorf("failed to convert regret to big.Float: %w", err)
}
regrets = append(regrets, *regret)
}
}
}
newStmt := fmt.Sprintf("($%d, $%d, $%d)", placeholderCounter, placeholderCounter+1, placeholderCounter+2)
insertStatements = append(insertStatements, newStmt)
values = append(values, heightTx, addresses, regrets)
placeholderCounter += 3 // Increase counter for next row
}

if len(insertStatements) > 0 {
sqlStatement := fmt.Sprintf(`
INSERT INTO %s (height_tx, addresses, regrets)
VALUES %s`, tableName, strings.Join(insertStatements, ","))
_, err := dbPool.Exec(context.Background(), sqlStatement, values...)
if err != nil {
return fmt.Errorf("failed to insert network regret event")
}
} else {
log.Info().Msg("No network regret event to insert")
}
return nil
}

func insertInfererNetworkRegret(events []EventRecord) error {
return insertNetworkRegret(events, TB_INFERER_NETWORK_REGRET)
}

func insertForecasterNetworkRegret(events []EventRecord) error {
return insertNetworkRegret(events, TB_FORECASTER_NETWORK_REGRET)
}

func insertNaiveInfererNetworkRegret(events []EventRecord) error {
return insertNetworkRegret(events, TB_NAIVE_INFERER_NETWORK_REGRET)
}

func insertTopicInitialRegret(events []EventRecord) error {
log.Info().Msg("Inserting topic initial regret")
var insertStatements []string
var values []interface{}

placeholderCounter := 1 // Placeholder index starts at 1 in PostgreSQL
for _, event := range events {
log.Trace().Interface("Event topic initial regret", event).Msg("Processing event topic initial regret")
var attributes []Attribute
err := json.Unmarshal(event.Data, &attributes)
if err != nil {
return fmt.Errorf("failed to unmarshal event data: %w", err)
}

var heightTx uint64
var regret big.Float
for _, attr := range attributes {
switch attr.Key {
case "height_tx":
cleanedValue := strings.Trim(attr.Value, "\"")
heightTx, err = strconv.ParseUint(cleanedValue, 10, 64)
if err != nil {
return fmt.Errorf("failed to convert height_tx to int: %w", err)
}
case "regret":
var rawRegret string
err = json.Unmarshal([]byte(attr.Value), &rawRegret)
if err != nil {
return fmt.Errorf("failed to unmarshal regrets: %w", err)
}
_, ok := regret.SetString(rawRegret)
if !ok {
return fmt.Errorf("failed to convert regret to big.Float: %w", err)
}
}
}
newStmt := fmt.Sprintf("($%d, $%d, $%d)", placeholderCounter, placeholderCounter+1, placeholderCounter+2)
insertStatements = append(insertStatements, newStmt)
values = append(values, heightTx, regret)
placeholderCounter += 2 // Increase counter for next row
}

if len(insertStatements) > 0 {
sqlStatement := fmt.Sprintf(`
INSERT INTO %s (height_tx, regret)
VALUES %s`, TB_TOPIC_INITIAL_REGRET, strings.Join(insertStatements, ","))
_, err := dbPool.Exec(context.Background(), sqlStatement, values...)
if err != nil {
return fmt.Errorf("failed to insert topic initial regret event")
}
} else {
log.Info().Msg("No topic initial regret event to insert")
}
return nil
}

func isDataEmpty(table string) (bool, error) {
var count int
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", table)
Expand Down
Loading

0 comments on commit 89894ec

Please sign in to comment.