diff --git a/.sqlx/query-a24f3bde2965abe825d896dd7fd65783fea041032e08b6c7ecd65a4b6599a81c.json b/.sqlx/query-a24f3bde2965abe825d896dd7fd65783fea041032e08b6c7ecd65a4b6599a81c.json new file mode 100644 index 00000000..59aed4f9 --- /dev/null +++ b/.sqlx/query-a24f3bde2965abe825d896dd7fd65783fea041032e08b6c7ecd65a4b6599a81c.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO scalar_tap_rav_requests_failed (\n allocation_id,\n sender_address,\n expected_rav,\n rav_response,\n reason\n )\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Json", + "Json", + "Text" + ] + }, + "nullable": [] + }, + "hash": "a24f3bde2965abe825d896dd7fd65783fea041032e08b6c7ecd65a4b6599a81c" +} diff --git a/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json b/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json new file mode 100644 index 00000000..83116833 --- /dev/null +++ b/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n allocation_id,\n sender_address,\n timestamp_ns,\n value,\n received_receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Numeric", + "Numeric", + "Json" + ] + }, + "nullable": [] + }, + "hash": "cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7" +} diff --git a/migrations/20230912220523_tap_receipts.down.sql b/migrations/20230912220523_tap_receipts.down.sql index 8fce5043..1a8d515e 100644 --- a/migrations/20230912220523_tap_receipts.down.sql +++ b/migrations/20230912220523_tap_receipts.down.sql @@ -3,3 +3,4 @@ DROP TRIGGER IF EXISTS receipt_update ON scalar_tap_receipts CASCADE; DROP FUNCTION IF EXISTS scalar_tap_receipt_notify() CASCADE; DROP TABLE IF EXISTS scalar_tap_receipts CASCADE; +DROP TABLE IF EXISTS scalar_tap_receipts_invalid CASCADE; diff --git a/migrations/20230912220523_tap_receipts.up.sql b/migrations/20230912220523_tap_receipts.up.sql index 14d0f6ed..a95a4a09 100644 --- a/migrations/20230912220523_tap_receipts.up.sql +++ b/migrations/20230912220523_tap_receipts.up.sql @@ -23,3 +23,14 @@ CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id); CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns); + +-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent). +-- Used for logging and debugging purposes. +CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid ( + id BIGSERIAL PRIMARY KEY, + allocation_id CHAR(40) NOT NULL, + sender_address CHAR(40) NOT NULL, + timestamp_ns NUMERIC(20) NOT NULL, + value NUMERIC(39) NOT NULL, + received_receipt JSON NOT NULL +); diff --git a/migrations/20230915230734_tap_ravs.down.sql b/migrations/20230915230734_tap_ravs.down.sql index 31dca119..0ad115b3 100644 --- a/migrations/20230915230734_tap_ravs.down.sql +++ b/migrations/20230915230734_tap_ravs.down.sql @@ -1 +1,2 @@ DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; +DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; diff --git a/migrations/20230915230734_tap_ravs.up.sql b/migrations/20230915230734_tap_ravs.up.sql index 61ee9e86..5b66e30b 100644 --- a/migrations/20230915230734_tap_ravs.up.sql +++ b/migrations/20230915230734_tap_ravs.up.sql @@ -5,3 +5,14 @@ CREATE TABLE IF NOT EXISTS scalar_tap_ravs ( final BOOLEAN DEFAULT FALSE NOT NULL, PRIMARY KEY (allocation_id, sender_address) ); + +-- This table is used to store failed RAV requests. +-- Used for logging and debugging purposes. +CREATE TABLE IF NOT EXISTS scalar_tap_rav_requests_failed ( + id BIGSERIAL PRIMARY KEY, + allocation_id CHAR(40) NOT NULL, + sender_address CHAR(40) NOT NULL, + expected_rav JSON NOT NULL, + rav_response JSON NOT NULL, + reason TEXT NOT NULL +); diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs index 22a929cb..a253167c 100644 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -1,11 +1,11 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; -use anyhow::ensure; +use anyhow::{anyhow, ensure}; use ethereum_types::U256; use eventuals::Eventual; use indexer_common::prelude::SubgraphClient; @@ -14,8 +14,9 @@ use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; use tap_core::{ eip_712_signed_message::EIP712SignedMessage, - receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest, - tap_receipt::ReceiptCheck, + receipt_aggregate_voucher::ReceiptAggregateVoucher, + tap_manager::RAVRequest, + tap_receipt::{ReceiptCheck, ReceivedReceipt}, }; use tokio::{ sync::{Mutex, MutexGuard}, @@ -286,7 +287,7 @@ impl SenderAllocationRelationship { let RAVRequest { valid_receipts, previous_rav, - invalid_receipts: _, + invalid_receipts, expected_rav, } = inner .tap_manager @@ -297,6 +298,20 @@ impl SenderAllocationRelationship { ) .await?; + // Invalid receipts + if !invalid_receipts.is_empty() { + warn!( + "Found {} invalid receipts for allocation {} and sender {}.", + invalid_receipts.len(), + inner.allocation_id, + inner.sender + ); + + // Save invalid receipts to the database for logs. + // TODO: consider doing that in a spawned task? + Self::store_invalid_receipts(inner, &invalid_receipts).await?; + } + // TODO: Request compression and response decompression. Also a fancy user agent? let client = HttpClientBuilder::default() .request_timeout(Duration::from_secs( @@ -319,12 +334,39 @@ impl SenderAllocationRelationship { warn!("Warnings from sender's TAP aggregator: {:?}", warnings); } - inner + match inner .tap_manager - .verify_and_store_rav(expected_rav, response.data) - .await?; - - // TODO: Handle invalid receipts + .verify_and_store_rav(expected_rav.clone(), response.data.clone()) + .await + { + Ok(_) => {} + + // Adapter errors are local software errors. Shouldn't be a problem with the sender. + Err(tap_core::Error::AdapterError { source_error: e }) => { + anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) + } + + // The 3 errors below signal an invalid RAV, which should be about problems with the + // sender. The sender could be malicious. + Err( + e @ tap_core::Error::InvalidReceivedRAV { + expected_rav: _, + received_rav: _, + } + | e @ tap_core::Error::SignatureError(_) + | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, + ) => { + Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string()) + .await?; + anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); + } + + // All relevant errors should be handled above. If we get here, we forgot to handle + // an error case. + Err(e) => { + anyhow::bail!("Error while verifying and storing RAV: {:?}", e); + } + } // This is not the fastest way to do this, but it's the easiest. // Note: we rely on the unaggregated_fees lock to make sure we don't miss any receipt @@ -395,6 +437,74 @@ impl SenderAllocationRelationship { pub async fn state(&self) -> State { *self.inner.state.lock().await } + + async fn store_invalid_receipts( + inner: &Inner, + receipts: &[ReceivedReceipt], + ) -> anyhow::Result<()> { + for received_receipt in receipts.iter() { + sqlx::query!( + r#" + INSERT INTO scalar_tap_receipts_invalid ( + allocation_id, + sender_address, + timestamp_ns, + value, + received_receipt + ) + VALUES ($1, $2, $3, $4, $5) + "#, + inner + .allocation_id + .to_string() + .trim_start_matches("0x") + .to_owned(), + inner.sender.to_string().trim_start_matches("0x").to_owned(), + BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns), + BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?, + serde_json::to_value(received_receipt)? + ) + .execute(&inner.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?; + } + + Ok(()) + } + + async fn store_failed_rav( + inner: &Inner, + expected_rav: &ReceiptAggregateVoucher, + rav: &EIP712SignedMessage, + reason: &str, + ) -> anyhow::Result<()> { + sqlx::query!( + r#" + INSERT INTO scalar_tap_rav_requests_failed ( + allocation_id, + sender_address, + expected_rav, + rav_response, + reason + ) + VALUES ($1, $2, $3, $4, $5) + "#, + inner + .allocation_id + .to_string() + .trim_start_matches("0x") + .to_owned(), + inner.sender.to_string().trim_start_matches("0x").to_owned(), + serde_json::to_value(expected_rav)?, + serde_json::to_value(rav)?, + reason + ) + .execute(&inner.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; + + Ok(()) + } } impl Drop for SenderAllocationRelationship {