diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs
index 621b60f..44ecaf6 100644
--- a/consumer/src/backend.rs
+++ b/consumer/src/backend.rs
@@ -2,7 +2,7 @@ use holaplex_hub_nfts_solana_entity::{collection_mints, collections};
use holaplex_hub_nfts_solana_core::proto::{
MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction,
- TransferMetaplexAssetTransaction,
+ TransferMetaplexAssetTransaction, SolanaPendingTransaction,
};
use hub_core::prelude::*;
use solana_program::pubkey::Pubkey;
@@ -54,6 +54,21 @@ pub struct TransactionResponse {
pub addresses: A,
}
+impl From> for SolanaPendingTransaction {
+ fn from(
+ TransactionResponse {
+ serialized_message,
+ signatures_or_signers_public_keys,
+ ..
+ }: TransactionResponse,
+ ) -> Self {
+ Self {
+ serialized_message,
+ signatures_or_signers_public_keys,
+ }
+ }
+}
+
// TODO: include this in collections::Model
pub enum CollectionType {
Legacy,
diff --git a/consumer/src/events.rs b/consumer/src/events.rs
index 34ccdbd..399f9b5 100644
--- a/consumer/src/events.rs
+++ b/consumer/src/events.rs
@@ -1,56 +1,207 @@
use holaplex_hub_nfts_solana_core::{
- db::Connection,
+ db,
proto::{
- nft_events::Event::{
- SolanaCreateDrop, SolanaMintDrop, SolanaRetryDrop, SolanaRetryMintDrop,
- SolanaTransferAsset, SolanaUpdateDrop,
- },
- solana_nft_events::Event::{
- CreateDropFailed, CreateDropSigningRequested, CreateDropSubmitted, MintDropFailed,
- MintDropSigningRequested, MintDropSubmitted, RetryCreateDropFailed,
- RetryCreateDropSigningRequested, RetryCreateDropSubmitted, RetryMintDropFailed,
- RetryMintDropSigningRequested, RetryMintDropSubmitted, TransferAssetFailed,
- TransferAssetSigningRequested, TransferAssetSubmitted, UpdateDropFailed,
- UpdateDropSigningRequested, UpdateDropSubmitted,
- },
- treasury_events::{Event as TreasuryEvent, TransactionStatus},
+ nft_events::Event as NftEvent,
+ solana_nft_events::Event as SolanaNftEvent,
+ treasury_events::{Event as TreasuryEvent, SolanaTransactionResult, TransactionStatus},
MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction,
SolanaCompletedMintTransaction, SolanaCompletedTransferTransaction,
SolanaCompletedUpdateTransaction, SolanaFailedTransaction, SolanaNftEventKey,
SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason,
TransferMetaplexAssetTransaction,
},
- sea_orm::Set,
+ sea_orm::{DbErr, Set},
Collection, CollectionMint, Services,
};
use holaplex_hub_nfts_solana_entity::{collection_mints, collections};
-use hub_core::{chrono::Utc, prelude::*, producer::Producer, thiserror::Error, uuid::Uuid};
+use hub_core::{
+ chrono::Utc,
+ prelude::*,
+ producer::{Producer, SendError},
+ thiserror, uuid,
+ uuid::Uuid,
+};
use crate::{
- backend::{self, MasterEditionAddresses, TransactionResponse},
+ backend::{self, MasterEditionAddresses},
solana::{Solana, UncompressedRef},
};
-#[derive(Error, Debug)]
-pub enum ProcessorError {
- #[error("record not found")]
+#[derive(Debug, thiserror::Error)]
+pub enum ProcessorErrorKind {
+ #[error("Associated record not found in database")]
RecordNotFound,
- #[error("message not found")]
- MessageNotFound,
- #[error("transaction status not found")]
+ #[error("Transaction status not found in treasury event payload")]
TransactionStatusNotFound,
+
+ #[error("Error processing Solana operation")]
+ Solana(#[source] Error),
+ #[error("Error sending message")]
+ SendError(#[from] SendError),
+ #[error("Invalid UUID")]
+ InvalidUuid(#[from] uuid::Error),
+ #[error("Database error")]
+ DbError(#[from] DbErr),
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error("Error handling {} of {}", src.name(), evt.name())]
+pub struct ProcessorError {
+ #[source]
+ kind: ProcessorErrorKind,
+ evt: EventKind,
+ src: ErrorSource,
+}
+
+impl ProcessorError {
+ fn new(kind: ProcessorErrorKind, evt: EventKind, src: ErrorSource) -> Self {
+ Self { kind, evt, src }
+ }
+}
+
+type ProcessResult = std::result::Result;
+type Result = std::result::Result;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+enum ErrorSource {
+ NftFailure,
+ NftSignRequest,
+ TreasuryStatus,
+ TreasurySuccess,
+ TreasuryFailure,
+}
+
+impl ErrorSource {
+ fn name(self) -> &'static str {
+ match self {
+ Self::NftFailure => "NFT failure response",
+ Self::NftSignRequest => "NFT transaction signature request",
+ Self::TreasuryStatus => "treasury status check",
+ Self::TreasurySuccess => "treasury success response",
+ Self::TreasuryFailure => "treasury success failure",
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum EventKind {
+ CreateDrop,
+ MintDrop,
+ UpdateDrop,
+ TransferAsset,
+ RetryCreateDrop,
+ RetryMintDrop,
+}
+
+impl EventKind {
+ fn name(self) -> &'static str {
+ match self {
+ Self::CreateDrop => "drop creation",
+ Self::MintDrop => "drop mint",
+ Self::UpdateDrop => "drop update",
+ Self::TransferAsset => "drop asset transfer",
+ Self::RetryCreateDrop => "drop creation retry",
+ Self::RetryMintDrop => "drop mint retry",
+ }
+ }
+
+ fn into_sign_request(self, tx: SolanaPendingTransaction) -> SolanaNftEvent {
+ match self {
+ EventKind::CreateDrop => SolanaNftEvent::CreateDropSigningRequested(tx),
+ EventKind::MintDrop => SolanaNftEvent::MintDropSigningRequested(tx),
+ EventKind::UpdateDrop => SolanaNftEvent::UpdateDropSigningRequested(tx),
+ EventKind::TransferAsset => SolanaNftEvent::TransferAssetSigningRequested(tx),
+ EventKind::RetryCreateDrop => SolanaNftEvent::RetryCreateDropSigningRequested(tx),
+ EventKind::RetryMintDrop => SolanaNftEvent::RetryMintDropSigningRequested(tx),
+ }
+ }
+
+ async fn into_success(
+ self,
+ db: &db::Connection,
+ key: &SolanaNftEventKey,
+ signature: String,
+ ) -> ProcessResult {
+ let id = || Uuid::parse_str(&key.id);
+
+ Ok(match self {
+ Self::CreateDrop => {
+ let id = id()?;
+ let collection = Collection::find_by_id(db, id)
+ .await?
+ .ok_or(ProcessorErrorKind::RecordNotFound)?;
+
+ SolanaNftEvent::CreateDropSubmitted(SolanaCompletedMintTransaction {
+ signature,
+ address: collection.mint,
+ })
+ },
+ Self::MintDrop => {
+ let id = id()?;
+ let collection_mint = CollectionMint::find_by_id(db, id)
+ .await?
+ .ok_or(ProcessorErrorKind::RecordNotFound)?;
+
+ SolanaNftEvent::MintDropSubmitted(SolanaCompletedMintTransaction {
+ signature,
+ address: collection_mint.mint,
+ })
+ },
+ Self::UpdateDrop => {
+ SolanaNftEvent::UpdateDropSubmitted(SolanaCompletedUpdateTransaction { signature })
+ },
+ Self::TransferAsset => {
+ SolanaNftEvent::TransferAssetSubmitted(SolanaCompletedTransferTransaction {
+ signature,
+ })
+ },
+ Self::RetryCreateDrop => {
+ let id = id()?;
+ let collection = Collection::find_by_id(db, id)
+ .await?
+ .ok_or(ProcessorErrorKind::RecordNotFound)?;
+
+ SolanaNftEvent::RetryCreateDropSubmitted(SolanaCompletedMintTransaction {
+ signature,
+ address: collection.mint,
+ })
+ },
+ Self::RetryMintDrop => {
+ let id = id()?;
+ let collection_mint = CollectionMint::find_by_id(db, id)
+ .await?
+ .ok_or(ProcessorErrorKind::RecordNotFound)?;
+
+ SolanaNftEvent::RetryMintDropSubmitted(SolanaCompletedMintTransaction {
+ signature,
+ address: collection_mint.mint,
+ })
+ },
+ })
+ }
+
+ fn into_failure(self, tx: SolanaFailedTransaction) -> SolanaNftEvent {
+ match self {
+ Self::CreateDrop => SolanaNftEvent::CreateDropFailed(tx),
+ Self::MintDrop => SolanaNftEvent::MintDropFailed(tx),
+ Self::UpdateDrop => SolanaNftEvent::UpdateDropFailed(tx),
+ Self::TransferAsset => SolanaNftEvent::TransferAssetFailed(tx),
+ Self::RetryCreateDrop => SolanaNftEvent::RetryCreateDropFailed(tx),
+ Self::RetryMintDrop => SolanaNftEvent::RetryMintDropFailed(tx),
+ }
+ }
}
-#[derive(Clone)]
pub struct Processor {
solana: Solana,
- db: Connection,
+ db: db::Connection,
producer: Producer,
}
impl Processor {
+ #[inline]
#[must_use]
- pub fn new(solana: Solana, db: Connection, producer: Producer) -> Self {
+ pub fn new(solana: Solana, db: db::Connection, producer: Producer) -> Self {
Self {
solana,
db,
@@ -58,274 +209,87 @@ impl Processor {
}
}
- /// Process the given message for various services.
- ///
- /// # Errors
- /// This function can return an error if it fails to process any event
pub async fn process(&self, msg: Services) -> Result<()> {
- // match topics
match msg {
- Services::Nfts(key, e) => {
+ Services::Nfts(key, msg) => {
let key = SolanaNftEventKey::from(key);
- // TODO: swap UncompressedRef for CompressedRef or LegacyCollectionRef depending on
- // message context
-
- match e.event {
- Some(SolanaCreateDrop(payload)) => {
- let create_drop_result = self
- .create_drop(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if create_drop_result.is_err() {
- self.create_drop_failed(key, SolanaTransactionFailureReason::Assemble)
- .await?;
- }
-
- Ok(())
+ match msg.event {
+ Some(NftEvent::SolanaCreateDrop(payload)) => {
+ self.process_nft(
+ EventKind::CreateDrop,
+ &key,
+ self.create_drop(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(SolanaMintDrop(payload)) => {
- let mint_drop_result = self
- .mint_drop(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if mint_drop_result.is_err() {
- self.mint_drop_failed(key, SolanaTransactionFailureReason::Assemble)
- .await?;
- }
-
- Ok(())
+ Some(NftEvent::SolanaMintDrop(payload)) => {
+ self.process_nft(
+ EventKind::MintDrop,
+ &key,
+ self.mint_drop(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(SolanaUpdateDrop(payload)) => {
- let update_drop_result = self
- .update_drop(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if update_drop_result.is_err() {
- self.update_drop_failed(key, SolanaTransactionFailureReason::Assemble)
- .await?;
- }
-
- Ok(())
+ Some(NftEvent::SolanaUpdateDrop(payload)) => {
+ self.process_nft(
+ EventKind::UpdateDrop,
+ &key,
+ self.update_drop(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(SolanaTransferAsset(payload)) => {
- let transfer_asset_result = self
- .transfer_asset(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if transfer_asset_result.is_err() {
- self.transfer_asset_failed(
- key,
- SolanaTransactionFailureReason::Assemble,
- )
- .await?;
- }
-
- Ok(())
+ Some(NftEvent::SolanaTransferAsset(payload)) => {
+ self.process_nft(
+ EventKind::TransferAsset,
+ &key,
+ self.transfer_asset(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(SolanaRetryDrop(payload)) => {
- let retry_drop_result = self
- .retry_drop(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if retry_drop_result.is_err() {
- self.retry_create_drop_failed(
- key,
- SolanaTransactionFailureReason::Assemble,
- )
- .await?;
- }
-
- Ok(())
+ Some(NftEvent::SolanaRetryDrop(payload)) => {
+ self.process_nft(
+ EventKind::RetryCreateDrop,
+ &key,
+ self.retry_create_drop(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(SolanaRetryMintDrop(payload)) => {
- let retry_mint_drop_result = self
- .retry_mint_drop(&UncompressedRef(&self.solana), key.clone(), payload)
- .await;
-
- if retry_mint_drop_result.is_err() {
- self.retry_mint_drop_failed(
- key,
- SolanaTransactionFailureReason::Assemble,
- )
- .await?;
- }
-
- Ok(())
+ Some(NftEvent::SolanaRetryMintDrop(payload)) => {
+ self.process_nft(
+ EventKind::RetryMintDrop,
+ &key,
+ self.retry_mint_drop(&UncompressedRef(&self.solana), &key, payload),
+ )
+ .await
},
- Some(_) | None => Ok(()),
+ _ => Ok(()),
}
},
- Services::Treasury(key, e) => {
+ Services::Treasury(key, msg) => {
let key = SolanaNftEventKey::from(key);
- match e.event {
- Some(TreasuryEvent::SolanaCreateDropSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.create_drop_failed(key, SolanaTransactionFailureReason::Sign)
- .await?;
-
- return Ok(());
- }
-
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.create_drop_submitted(key, signature).await?;
- },
- Err(_) => {
- self.create_drop_failed(
- key,
- SolanaTransactionFailureReason::Submit,
- )
- .await?;
- },
- }
-
- Ok(())
+ match msg.event {
+ Some(TreasuryEvent::SolanaCreateDropSigned(res)) => {
+ self.process_treasury(EventKind::CreateDrop, key, res).await
},
- Some(TreasuryEvent::SolanaUpdateDropSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.update_drop_failed(key, SolanaTransactionFailureReason::Sign)
- .await?;
-
- return Ok(());
- }
-
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.update_drop_submitted(key, signature).await?;
- },
- Err(_) => {
- self.update_drop_failed(
- key,
- SolanaTransactionFailureReason::Submit,
- )
- .await?;
- },
- }
-
- Ok(())
+ Some(TreasuryEvent::SolanaMintDropSigned(res)) => {
+ self.process_treasury(EventKind::MintDrop, key, res).await
},
- Some(TreasuryEvent::SolanaMintDropSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.mint_drop_failed(key, SolanaTransactionFailureReason::Sign)
- .await?;
-
- return Ok(());
- }
-
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.mint_drop_submitted(key, signature).await?;
- },
- Err(_) => {
- self.mint_drop_failed(key, SolanaTransactionFailureReason::Submit)
- .await?;
- },
- }
-
- Ok(())
+ Some(TreasuryEvent::SolanaUpdateDropSigned(res)) => {
+ self.process_treasury(EventKind::UpdateDrop, key, res).await
},
- Some(TreasuryEvent::SolanaTransferAssetSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.transfer_asset_failed(key, SolanaTransactionFailureReason::Sign)
- .await?;
-
- return Ok(());
- }
-
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.transfer_asset_submitted(key, signature).await?;
- },
- Err(_) => {
- self.transfer_asset_failed(
- key,
- SolanaTransactionFailureReason::Submit,
- )
- .await?;
- },
- }
-
- Ok(())
+ Some(TreasuryEvent::SolanaTransferAssetSigned(res)) => {
+ self.process_treasury(EventKind::TransferAsset, key, res)
+ .await
},
- Some(TreasuryEvent::SolanaRetryCreateDropSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.retry_create_drop_failed(
- key,
- SolanaTransactionFailureReason::Sign,
- )
- .await?;
-
- return Ok(());
- }
-
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.retry_create_drop_submitted(key, signature).await?;
- },
- Err(_) => {
- self.retry_create_drop_failed(
- key,
- SolanaTransactionFailureReason::Submit,
- )
- .await?;
- },
- }
-
- Ok(())
+ Some(TreasuryEvent::SolanaRetryCreateDropSigned(res)) => {
+ self.process_treasury(EventKind::RetryCreateDrop, key, res)
+ .await
},
- Some(TreasuryEvent::SolanaRetryMintDropSigned(payload)) => {
- let status = TransactionStatus::from_i32(payload.status)
- .ok_or(ProcessorError::TransactionStatusNotFound)?;
-
- if status == TransactionStatus::Failed {
- self.retry_mint_drop_failed(key, SolanaTransactionFailureReason::Sign)
- .await?;
-
- return Ok(());
- }
- let signature_result = self.solana.submit_transaction(&payload);
-
- match signature_result {
- Ok(signature) => {
- self.retry_mint_drop_submitted(key, signature).await?;
- },
- Err(_) => {
- self.retry_mint_drop_failed(
- key,
- SolanaTransactionFailureReason::Submit,
- )
- .await?;
- },
- }
-
- Ok(())
+ Some(TreasuryEvent::SolanaRetryMintDropSigned(res)) => {
+ self.process_treasury(EventKind::RetryMintDrop, key, res)
+ .await
},
_ => Ok(()),
}
@@ -333,75 +297,118 @@ impl Processor {
}
}
- async fn create_drop(
+ async fn process_nft(
&self,
- backend: &B,
- key: SolanaNftEventKey,
- payload: MetaplexMasterEditionTransaction,
+ kind: EventKind,
+ key: &SolanaNftEventKey,
+ fut: impl Future