diff --git a/.github/workflows/build-xmtpd.yml b/.github/workflows/build-xmtpd.yml index f77591df..3277c648 100644 --- a/.github/workflows/build-xmtpd.yml +++ b/.github/workflows/build-xmtpd.yml @@ -39,4 +39,4 @@ jobs: file: ./dev/docker/Dockerfile push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file + labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ab29f22f..960d4e3c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,10 +10,15 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + with: + submodules: recursive - uses: actions/setup-go@v3 with: go-version-file: go.mod - run: dev/docker/up + - name: Install Foundry + uses: foundry-rs/foundry-toolchain@v1 + - run: dev/contracts/deploy-local - name: Run Tests run: | export GOPATH="${HOME}/go/" diff --git a/dev/contracts/deploy-local b/dev/contracts/deploy-local index 61cc9316..c3862a4e 100755 --- a/dev/contracts/deploy-local +++ b/dev/contracts/deploy-local @@ -3,6 +3,9 @@ source dev/contracts/.env +# Make sure the build directory exists +mkdir -p ./build + cd ./contracts # Deploy a contract and save the output (which includes the contract address) to a JSON file to be used in tests diff --git a/pkg/config/options.go b/pkg/config/options.go index dd082a42..c59d3b56 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -12,7 +12,7 @@ type ContractsOptions struct { RpcUrl string `long:"rpc-url" description:"Blockchain RPC URL"` NodesContractAddress string `long:"nodes-address" description:"Node contract address"` MessagesContractAddress string `long:"messages-address" description:"Message contract address"` - RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval" default:"60s"` + RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval for the nodes registry" default:"60s"` } type DbOptions struct { diff --git a/pkg/db/queries/db.go b/pkg/db/queries/db.go index 1cbab906..fa785733 100644 --- a/pkg/db/queries/db.go +++ b/pkg/db/queries/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.26.0 package queries diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index 36178292..a022ba32 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.26.0 package queries diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index f68153fd..ac4308ef 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.27.0 +// sqlc v1.26.0 // source: queries.sql package queries diff --git a/pkg/indexer/blockchain/client.go b/pkg/indexer/blockchain/client.go new file mode 100644 index 00000000..7699152c --- /dev/null +++ b/pkg/indexer/blockchain/client.go @@ -0,0 +1,11 @@ +package blockchain + +import ( + "context" + + "github.com/ethereum/go-ethereum/ethclient" +) + +func NewClient(ctx context.Context, rpcUrl string) (*ethclient.Client, error) { + return ethclient.DialContext(ctx, rpcUrl) +} diff --git a/pkg/indexer/blockchain/rpcLogStreamer.go b/pkg/indexer/blockchain/rpcLogStreamer.go index a01dcdcd..ec577883 100644 --- a/pkg/indexer/blockchain/rpcLogStreamer.go +++ b/pkg/indexer/blockchain/rpcLogStreamer.go @@ -26,11 +26,11 @@ type RpcLogStreamBuilder struct { // All the listeners contractConfigs []contractConfig logger *zap.Logger - rpcUrl string + ethclient *ethclient.Client } -func NewRpcLogStreamBuilder(rpcUrl string, logger *zap.Logger) *RpcLogStreamBuilder { - return &RpcLogStreamBuilder{rpcUrl: rpcUrl, logger: logger} +func NewRpcLogStreamBuilder(client *ethclient.Client, logger *zap.Logger) *RpcLogStreamBuilder { + return &RpcLogStreamBuilder{ethclient: client, logger: logger} } func (c *RpcLogStreamBuilder) ListenForContractEvent( @@ -47,11 +47,7 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent( } func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) { - client, err := ethclient.Dial(c.rpcUrl) - if err != nil { - return nil, err - } - return NewRpcLogStreamer(client, c.logger, c.contractConfigs), nil + return NewRpcLogStreamer(c.ethclient, c.logger, c.contractConfigs), nil } // Struct defining all the information required to filter events from logs diff --git a/pkg/indexer/blockchain/rpcLogStreamer_test.go b/pkg/indexer/blockchain/rpcLogStreamer_test.go index 7d2d24ad..e7e6629c 100644 --- a/pkg/indexer/blockchain/rpcLogStreamer_test.go +++ b/pkg/indexer/blockchain/rpcLogStreamer_test.go @@ -1,6 +1,7 @@ package blockchain import ( + "context" big "math/big" "testing" @@ -15,10 +16,6 @@ import ( "go.uber.org/zap" ) -// Using a free RPC url so that the dial function works. -// May be unwise or flaky and we may need to reconsider -const RPC_URL = "https://nodes.mewapi.io/rpc/eth" - func buildStreamer( t *testing.T, client ChainClient, @@ -39,7 +36,9 @@ func buildStreamer( } func TestBuilder(t *testing.T) { - builder := NewRpcLogStreamBuilder(RPC_URL, testutils.NewLog(t)) + testclient, err := NewClient(context.Background(), testutils.GetContractsOptions(t).RpcUrl) + require.NoError(t, err) + builder := NewRpcLogStreamBuilder(testclient, testutils.NewLog(t)) listenerChannel := builder.ListenForContractEvent( 1, diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 718a8fac..a53908f0 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/xmtp/xmtpd/pkg/abis" "github.com/xmtp/xmtpd/pkg/config" "github.com/xmtp/xmtpd/pkg/db/queries" @@ -22,7 +23,11 @@ func StartIndexer( queries *queries.Queries, cfg config.ContractsOptions, ) error { - builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger) + client, err := blockchain.NewClient(ctx, cfg.RpcUrl) + if err != nil { + return err + } + builder := blockchain.NewRpcLogStreamBuilder(client, logger) messagesTopic, err := buildMessagesTopic() if err != nil { @@ -35,11 +40,16 @@ func StartIndexer( []common.Hash{messagesTopic}, ) + messagesContract, err := messagesContract(cfg, client) + if err != nil { + return err + } + indexLogs( ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)), - storer.NewGroupMessageStorer(queries, logger), + storer.NewGroupMessageStorer(queries, logger, messagesContract), ) streamer, err := builder.Build() @@ -92,3 +102,13 @@ func buildMessagesTopic() (common.Hash, error) { } return utils.GetEventTopic(abi, "MessageSent") } + +func messagesContract( + cfg config.ContractsOptions, + client *ethclient.Client, +) (*abis.GroupMessages, error) { + return abis.NewGroupMessages( + common.HexToAddress(cfg.MessagesContractAddress), + client, + ) +} diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index 3858ecde..4f6e1c47 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -2,23 +2,55 @@ package storer import ( "context" - "errors" + "fmt" "github.com/ethereum/go-ethereum/core/types" + "github.com/xmtp/xmtpd/pkg/abis" "github.com/xmtp/xmtpd/pkg/db/queries" "go.uber.org/zap" ) type GroupMessageStorer struct { - queries *queries.Queries - logger *zap.Logger + contract *abis.GroupMessages + queries *queries.Queries + logger *zap.Logger } -func NewGroupMessageStorer(queries *queries.Queries, logger *zap.Logger) *GroupMessageStorer { - return &GroupMessageStorer{queries: queries, logger: logger} +func NewGroupMessageStorer( + queries *queries.Queries, + logger *zap.Logger, + contract *abis.GroupMessages, +) *GroupMessageStorer { + return &GroupMessageStorer{queries: queries, logger: logger, contract: contract} } // Validate and store a group message log event func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { - return NewLogStorageError(errors.New("not implemented"), true) + msgSent, err := s.contract.ParseMessageSent(event) + if err != nil { + return NewLogStorageError(err, false) + } + + // TODO:nm figure out topic structure + topic := buildTopic(msgSent.GroupId) + + s.logger.Debug("Inserting message from contract", zap.String("topic", topic)) + + if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ + // We may not want to hardcode this to 0 and have an originator ID for each smart contract? + OriginatorID: 0, + OriginatorSequenceID: int64(msgSent.SequenceId), + Topic: []byte(topic), + OriginatorEnvelope: msgSent.Message, // TODO:nm parse originator envelope and do some validation + }); err != nil { + s.logger.Error("Error inserting envelope from smart contract", zap.Error(err)) + return NewLogStorageError(err, true) + } + + return nil +} + +func buildTopic(groupId [32]byte) string { + // We should think about simplifying the topics, since backwards compatibility shouldn't really matter here + return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId) } diff --git a/pkg/indexer/storer/groupMessage_test.go b/pkg/indexer/storer/groupMessage_test.go new file mode 100644 index 00000000..416c4a9d --- /dev/null +++ b/pkg/indexer/storer/groupMessage_test.go @@ -0,0 +1,123 @@ +package storer + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/db" + "github.com/xmtp/xmtpd/pkg/db/queries" + "github.com/xmtp/xmtpd/pkg/indexer/blockchain" + testutils "github.com/xmtp/xmtpd/pkg/testing" + "github.com/xmtp/xmtpd/pkg/utils" +) + +func buildGroupMessageStorer(t *testing.T) (*GroupMessageStorer, func()) { + ctx, cancel := context.WithCancel(context.Background()) + db, _, cleanup := testutils.NewDB(t, ctx) + queryImpl := queries.New(db) + config := testutils.GetContractsOptions(t) + contractAddress := config.MessagesContractAddress + + client, err := blockchain.NewClient(ctx, config.RpcUrl) + require.NoError(t, err) + contract, err := abis.NewGroupMessages( + common.HexToAddress(contractAddress), + client, + ) + + require.NoError(t, err) + storer := NewGroupMessageStorer(queryImpl, testutils.NewLog(t), contract) + + return storer, func() { + cancel() + cleanup() + } +} + +func TestStoreGroupMessages(t *testing.T) { + ctx := context.Background() + storer, cleanup := buildGroupMessageStorer(t) + defer cleanup() + + var groupID [32]byte + copy(groupID[:], testutils.RandomBytes(32)) + message := testutils.RandomBytes(30) + sequenceID := uint64(1) + + logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID) + + err := storer.StoreLog( + ctx, + logMessage, + ) + require.NoError(t, err) + + envelopes, queryErr := storer.queries.SelectGatewayEnvelopes( + ctx, + queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)}, + ) + require.NoError(t, queryErr) + + require.Equal(t, len(envelopes), 1) + + firstEnvelope := envelopes[0] + require.Equal(t, firstEnvelope.OriginatorEnvelope, message) +} + +func TestStoreGroupMessageDuplicate(t *testing.T) { + ctx := context.Background() + storer, cleanup := buildGroupMessageStorer(t) + defer cleanup() + + var groupID [32]byte + copy(groupID[:], testutils.RandomBytes(32)) + message := testutils.RandomBytes(30) + sequenceID := uint64(1) + + logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID) + + err := storer.StoreLog( + ctx, + logMessage, + ) + require.NoError(t, err) + // Store the log a second time + err = storer.StoreLog( + ctx, + logMessage, + ) + require.NoError(t, err) + + envelopes, queryErr := storer.queries.SelectGatewayEnvelopes( + ctx, + queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)}, + ) + require.NoError(t, queryErr) + + require.Equal(t, len(envelopes), 1) +} + +func TestStoreGroupMessageMalformed(t *testing.T) { + ctx := context.Background() + storer, cleanup := buildGroupMessageStorer(t) + defer cleanup() + + abi, err := abis.GroupMessagesMetaData.GetAbi() + require.NoError(t, err) + + topic, err := utils.GetEventTopic(abi, "MessageSent") + require.NoError(t, err) + + logMessage := types.Log{ + Topics: []common.Hash{topic}, + Data: []byte("foo"), + } + + storageErr := storer.StoreLog(ctx, logMessage) + require.Error(t, storageErr) + require.False(t, storageErr.ShouldRetry()) +} diff --git a/pkg/testing/config.go b/pkg/testing/config.go new file mode 100644 index 00000000..021fe3f1 --- /dev/null +++ b/pkg/testing/config.go @@ -0,0 +1,73 @@ +package testing + +import ( + "encoding/json" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/config" +) + +const BLOCKCHAIN_RPC_URL = "http://localhost:7545" + +type contractInfo struct { + DeployedTo string `json:"deployedTo"` +} + +/* +* +In tests it's weirdly difficult to get the working directory of the project root. + +Keep moving up the folder hierarchy until you find a go.mod +* +*/ +func rootPath(t *testing.T) string { + dir, err := os.Getwd() + require.NoError(t, err) + + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + parent := filepath.Dir(dir) + if parent == dir { // reached the root directory + t.Fatal("Could not find the root directory") + } + dir = parent + } +} + +/* +* +Parse the JSON file at this location to +* +*/ +func getDeployedTo(t *testing.T, fileName string) string { + data, err := os.ReadFile(fileName) + if err != nil { + t.Fatalf("Failed to read GroupMessages.json: %v", err) + } + + var info contractInfo + + if err := json.Unmarshal(data, &info); err != nil { + t.Fatalf("Failed to parse GroupMessages.json: %v", err) + } + + return info.DeployedTo +} + +func GetContractsOptions(t *testing.T) config.ContractsOptions { + rootDir := rootPath(t) + + return config.ContractsOptions{ + RpcUrl: BLOCKCHAIN_RPC_URL, + MessagesContractAddress: getDeployedTo(t, path.Join(rootDir, "./build/GroupMessages.json")), + NodesContractAddress: getDeployedTo(t, path.Join(rootDir, "./build/Nodes.json")), + RefreshInterval: 100 * time.Millisecond, + } +} diff --git a/pkg/testing/contracts.go b/pkg/testing/contracts.go new file mode 100644 index 00000000..783ea366 --- /dev/null +++ b/pkg/testing/contracts.go @@ -0,0 +1,46 @@ +package testing + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/utils" +) + +// Build an abi encoded MessageSent event struct +func BuildMessageSentEvent( + groupID [32]byte, + message []byte, + sequenceID uint64, +) ([]byte, error) { + abi, err := abis.GroupMessagesMetaData.GetAbi() + if err != nil { + return nil, err + } + return abi.Events["MessageSent"].Inputs.Pack(groupID, message, sequenceID) +} + +// Build a log message for a MessageSent event +func BuildMessageSentLog( + t *testing.T, + groupID [32]byte, + message []byte, + sequenceID uint64, +) types.Log { + eventData, err := BuildMessageSentEvent(groupID, message, sequenceID) + require.NoError(t, err) + + abi, err := abis.GroupMessagesMetaData.GetAbi() + require.NoError(t, err) + + topic, err := utils.GetEventTopic(abi, "MessageSent") + require.NoError(t, err) + + return types.Log{ + Topics: []common.Hash{topic}, + Data: eventData, + } +}