Skip to content

Commit

Permalink
TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
rstout committed Sep 20, 2024
1 parent cf2125c commit f061a3f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 9 deletions.
36 changes: 36 additions & 0 deletions execute/exectypes/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ type CommitObservations map[cciptypes.ChainSelector][]CommitData
// and sequence number.
type MessageObservations map[cciptypes.ChainSelector]map[cciptypes.SeqNum]cciptypes.Message

// PriceObservations TODO: doc
type PriceObservations struct {
// TODO: doc
JoulePriceUSD cciptypes.BigInt `json:"joulePriceUSD"`

// TODO: doc
GasPrice cciptypes.BigInt `json:"gasPrice"`

// TODO: doc
DestNativeTokenPriceUSD cciptypes.BigInt `json:"destNativeTokenPriceUSD"`

// TODO: doc
// Maps message IDs to the estimated gas cost of executing these messages
MessageExecutionGasCosts map[string]cciptypes.BigInt `json:"messageExecutionCosts"`
}

// NonceObservations contain the latest nonce for senders in the previously observed messages.
// Nonces are organized by source chain selector and the string encoded sender address. The address
// must be encoding according to the destination chain requirements with typeconv.AddressBytesToString.
Expand Down Expand Up @@ -140,6 +156,9 @@ type Observation struct {
// execute report.
Messages MessageObservations `json:"messages"`

// Prices TODO: doc
Prices PriceObservations `json:"prices"`

// TokenData are determined during the second phase of execute.
// It contains the token data for the messages identified in the same stage as Messages
TokenData TokenDataObservations `json:"tokenDataObservations"`
Expand All @@ -156,13 +175,15 @@ type Observation struct {
func NewObservation(
commitReports CommitObservations,
messages MessageObservations,
prices PriceObservations,
tokenData TokenDataObservations,
nonces NonceObservations,
contracts dt.Observation,
) Observation {
return Observation{
CommitReports: commitReports,
Messages: messages,
Prices: prices,
TokenData: tokenData,
Nonces: nonces,
Contracts: contracts,
Expand All @@ -183,3 +204,18 @@ func DecodeObservation(b []byte) (Observation, error) {
err := json.Unmarshal(b, &obs)
return obs, err
}

// PriceObserver TODO: doc
type PriceObserver interface {
ObservePrices([]cciptypes.Message) (PriceObservations, error)
}

type StaticPriceObserver struct {
PriceObservations PriceObservations
}

func (spo *StaticPriceObserver) ObservePrices([]cciptypes.Message) (PriceObservations, error) {
return spo.PriceObservations, nil
}

var _ PriceObserver = &StaticPriceObserver{}
27 changes: 22 additions & 5 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
libocrtypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/execute/exectypes"
"github.com/smartcontractkit/chainlink-ccip/execute/internal/gas"
"github.com/smartcontractkit/chainlink-ccip/execute/report"
Expand Down Expand Up @@ -52,6 +53,7 @@ type Plugin struct {
tokenDataObserver tokendata.TokenDataObserver
estimateProvider gas.EstimateProvider
lggr logger.Logger
priceObserver exectypes.PriceObserver

// state
contractsInitialized bool
Expand Down Expand Up @@ -80,6 +82,15 @@ func NewPlugin(
lggr.Errorw("error starting background reader syncer", "err", err)
}

staticPriceObserver := exectypes.StaticPriceObserver{
PriceObservations: exectypes.PriceObservations{
JoulePriceUSD: cciptypes.NewBigIntFromInt64(1),
GasPrice: cciptypes.NewBigIntFromInt64(1),
DestNativeTokenPriceUSD: cciptypes.NewBigIntFromInt64(1),
MessageExecutionGasCosts: make(map[string]cciptypes.BigInt),
},
}

return &Plugin{
donID: donID,
reportingCfg: reportingCfg,
Expand All @@ -93,6 +104,7 @@ func NewPlugin(
tokenDataObserver: tokenDataObserver,
estimateProvider: estimateProvider,
lggr: lggr,
priceObserver: &staticPriceObserver,
discovery: discovery.NewContractDiscoveryProcessor(
lggr,
&ccipReader,
Expand Down Expand Up @@ -222,7 +234,7 @@ func (p *Plugin) Observation(
}

// TODO: truncate grouped to a maximum observation size?
return exectypes.NewObservation(groupedCommits, nil, nil, nil, discoveryObs).Encode()
return exectypes.NewObservation(groupedCommits, nil, exectypes.PriceObservations{}, nil, nil, discoveryObs).Encode()
}

// No observation for non-dest readers.
Expand Down Expand Up @@ -282,7 +294,12 @@ func (p *Plugin) Observation(
return types.Observation{}, fmt.Errorf("unable to process token data %w", err1)
}

return exectypes.NewObservation(groupedCommits, messages, tkData, nil, discoveryObs).Encode()
pricesObservation, err := observePrices(p.priceObserver, messages)
if err != nil {
return types.Observation{}, fmt.Errorf("unable to observe prices: %w", err)
}

return exectypes.NewObservation(groupedCommits, messages, pricesObservation, tkData, nil, discoveryObs).Encode()

case exectypes.Filter:
// Phase 3: observe nonce for each unique source/sender pair.
Expand Down Expand Up @@ -312,7 +329,7 @@ func (p *Plugin) Observation(
nonceObservations[srcChain] = nonces
}

return exectypes.NewObservation(nil, nil, nil, nonceObservations, discoveryObs).Encode()
return exectypes.NewObservation(nil, nil, exectypes.PriceObservations{}, nil, nonceObservations, discoveryObs).Encode()
default:
return types.Observation{}, fmt.Errorf("unknown state")
}
Expand Down
36 changes: 36 additions & 0 deletions execute/plugin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ func computeRanges(reports []exectypes.CommitData) ([]cciptypes.SeqNumRange, err
return ranges, nil
}

// ConvertMessageObservationsToSlice converts exectypes.MessageObservations to a slice of cciptypes.Message.
func ConvertMessageObservationsToSlice(observations exectypes.MessageObservations) []cciptypes.Message {
var messages []cciptypes.Message
for _, msgMap := range observations {
for _, msg := range msgMap {
messages = append(messages, msg)
}
}
return messages
}

// TODO: doc
func observePrices(observer exectypes.PriceObserver, messageObservations exectypes.MessageObservations) (exectypes.PriceObservations, error) {
return observer.ObservePrices(ConvertMessageObservationsToSlice(messageObservations))
}

func groupByChainSelector(
reports []plugintypes2.CommitPluginReportWithMeta) exectypes.CommitObservations {
commitReportCache := make(map[cciptypes.ChainSelector][]exectypes.CommitData)
Expand Down Expand Up @@ -264,6 +280,16 @@ func mergeMessageObservations(
return results, nil
}

func mergePriceObservations(
aos []plugincommon.AttributedObservation[exectypes.Observation], fChain map[cciptypes.ChainSelector]int,

Check failure on line 284 in execute/plugin_functions.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

`mergePriceObservations` - `fChain` is unused (unparam)
) (exectypes.PriceObservations, error) {

Check failure on line 285 in execute/plugin_functions.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

mergePriceObservations - result 1 (error) is always nil (unparam)
if len(aos) == 0 {
return exectypes.PriceObservations{}, nil
}

return aos[0].Observation.Prices, nil
}

// mergeCommitObservations merges all observations which reach the fChain threshold into a single result.
// Any observations, or subsets of observations, which do not reach the threshold are ignored.
func mergeCommitObservations(
Expand Down Expand Up @@ -418,6 +444,15 @@ func getConsensusObservation(
"oracle", oracleID,
"mergedMessageObservations", mergedMessageObservations)

mergedPriceObservations, err := mergePriceObservations(aos, fChain)
if err != nil {
return exectypes.Observation{}, fmt.Errorf("unable to merge price observations: %w", err)
}
lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged price observations", oracleID),
"oracle", oracleID,
"mergedPriceObservations", mergedPriceObservations)

mergedTokenObservations := mergeTokenObservations(aos, fChain)
lggr.Debugw(
fmt.Sprintf("[oracle %d] exec outcome: merged token data observations", oracleID),
Expand All @@ -434,6 +469,7 @@ func getConsensusObservation(
observation := exectypes.NewObservation(
mergedCommitObservations,
mergedMessageObservations,
mergedPriceObservations,
mergedTokenObservations,
mergedNonceObservations,
dt.Observation{},
Expand Down
14 changes: 10 additions & 4 deletions execute/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestPlugin_ValidateObservation_IneligibleObserver(t *testing.T) {
},
},
},
}, nil, nil, dt.Observation{})
}, exectypes.PriceObservations{}, nil, nil, dt.Observation{})
encoded, err := observation.Encode()
require.NoError(t, err)
err = p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{
Expand Down Expand Up @@ -264,7 +264,9 @@ func TestPlugin_ValidateObservation_ValidateObservedSeqNum_Error(t *testing.T) {
{MerkleRoot: root},
},
}
observation := exectypes.NewObservation(commitReports, nil, nil, nil, dt.Observation{})
observation := exectypes.NewObservation(
commitReports, nil, exectypes.PriceObservations{}, nil, nil, dt.Observation{},
)
encoded, err := observation.Encode()
require.NoError(t, err)
err = p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{
Expand Down Expand Up @@ -357,7 +359,9 @@ func TestPlugin_Outcome_CommitReportsMergeError(t *testing.T) {
commitReports := map[cciptypes.ChainSelector][]exectypes.CommitData{
1: {},
}
observation, err := exectypes.NewObservation(commitReports, nil, nil, nil, dt.Observation{}).Encode()
observation, err := exectypes.NewObservation(
commitReports, nil, exectypes.PriceObservations{}, nil, nil, dt.Observation{},
).Encode()
require.NoError(t, err)
_, err = p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{
{
Expand Down Expand Up @@ -390,7 +394,9 @@ func TestPlugin_Outcome_MessagesMergeError(t *testing.T) {
},
},
}
observation, err := exectypes.NewObservation(nil, messages, nil, nil, dt.Observation{}).Encode()
observation, err := exectypes.NewObservation(
nil, messages, exectypes.PriceObservations{}, nil, nil, dt.Observation{},
).Encode()
require.NoError(t, err)
_, err = p.Outcome(ocr3types.OutcomeContext{}, nil, []types.AttributedObservation{
{
Expand Down

0 comments on commit f061a3f

Please sign in to comment.