From 0305f1de2efcb8457993eae948e65ad3558e5ad9 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Mon, 16 Oct 2023 15:53:42 +0500 Subject: [PATCH 1/4] Support batch mint of random queued nfts --- consumer/src/backend.rs | 13 ++- consumer/src/events.rs | 170 ++++++++++++++++++++++++++++++++++++++-- consumer/src/solana.rs | 19 +++-- core/proto.lock | 10 +-- core/proto.toml | 8 +- 5 files changed, 195 insertions(+), 25 deletions(-) diff --git a/consumer/src/backend.rs b/consumer/src/backend.rs index d24ff60..8a218f7 100644 --- a/consumer/src/backend.rs +++ b/consumer/src/backend.rs @@ -4,7 +4,7 @@ use holaplex_hub_nfts_solana_core::proto::{ }; use holaplex_hub_nfts_solana_entity::{collection_mints, collections, update_revisions}; use hub_core::prelude::*; -use solana_program::pubkey::Pubkey; +use solana_program::{hash::Hash, pubkey::Pubkey}; #[derive(Clone)] pub struct MasterEditionAddresses { pub metadata: Pubkey, @@ -77,6 +77,8 @@ pub struct TransferAssetAddresses { /// Represents a response from a transaction on the blockchain. This struct /// provides the serialized message and the signatures of the signed message. + +#[derive(Clone)] pub struct TransactionResponse { /// The serialized version of the message from the transaction. pub serialized_message: Vec, @@ -138,8 +140,12 @@ pub trait CollectionBackend { #[async_trait] pub trait MintBackend { - async fn mint(&self, collection: &collections::Model, txn: T) - -> Result>; + async fn mint( + &self, + collection: &collections::Model, + blockhash: Option, + txn: T, + ) -> Result>; } #[async_trait] @@ -147,6 +153,7 @@ pub trait TransferBackend { async fn transfer( &self, collection_mint: &M, + txn: TransferMetaplexAssetTransaction, ) -> Result>; } diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 714d73a..d9cb0fe 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -9,7 +9,8 @@ use holaplex_hub_nfts_solana_core::{ MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction, MintMetaplexMetadataTransaction, SolanaCompletedMintTransaction, SolanaCompletedTransferTransaction, SolanaCompletedUpdateTransaction, - SolanaFailedTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, + SolanaFailedTransaction, SolanaMintBatchPayload, SolanaMintOpenDropBatchedPayload, + SolanaMintTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason, SwitchCollectionPayload, TransferMetaplexAssetTransaction, UpdateSolanaMintPayload, }, @@ -20,6 +21,8 @@ use holaplex_hub_nfts_solana_entity::{ collection_mints, collections, compression_leafs, update_revisions, }; use hub_core::{ + anyhow, + backon::{ExponentialBuilder, Retryable}, chrono::Utc, metrics::KeyValue, prelude::*, @@ -29,6 +32,7 @@ use hub_core::{ uuid, uuid::Uuid, }; +use solana_client::client_error::ClientError; use solana_program::pubkey::{ParsePubkeyError, Pubkey}; use solana_sdk::signature::Signature; @@ -39,6 +43,7 @@ use crate::{ }, metrics::Metrics, solana::{CompressedRef, EditionRef, Solana, SolanaAssetIdError, UncompressedRef}, + with_retry, }; #[derive(Debug, thiserror::Error, Triage)] @@ -126,6 +131,7 @@ pub enum EventKind { UpdateOpenDrop, RetryCreateOpenDrop, RetryMintOpenDrop, + MintOpenDropBatched, } impl EventKind { @@ -150,6 +156,7 @@ impl EventKind { Self::UpdateOpenDrop => "open drop update", Self::RetryCreateOpenDrop => "open drop creation retry", Self::RetryMintOpenDrop => "open drop mint retry", + Self::MintOpenDropBatched => "open drop mint batch", } } @@ -190,6 +197,7 @@ impl EventKind { SolanaNftEvent::RetryCreateOpenDropSigningRequested(tx) }, EventKind::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropSigningRequested(tx), + EventKind::MintOpenDropBatched => unreachable!(), } } @@ -390,6 +398,7 @@ impl EventKind { address: collection_mint.mint, }) }, + Self::MintOpenDropBatched => unreachable!(), }) } @@ -414,6 +423,7 @@ impl EventKind { Self::UpdateOpenDrop => SolanaNftEvent::UpdateOpenDropFailed(tx), Self::RetryCreateOpenDrop => SolanaNftEvent::RetryCreateOpenDropFailed(tx), Self::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropFailed(tx), + Self::MintOpenDropBatched => unreachable!(), } } } @@ -625,6 +635,16 @@ impl Processor { ) .await }, + + Some(NftEvent::SolanaMintOpenDropBatched(payload)) => { + self.process_mint_batch(&key, payload).await.map_err(|e| { + ProcessorError::new( + ProcessorErrorKind::Solana(e), + EventKind::MintOpenDropBatched, + ErrorSource::NftFailure, + ) + }) + }, _ => Ok(()), } }, @@ -714,6 +734,142 @@ impl Processor { } } + async fn process_mint_batch( + &self, + key: &SolanaNftEventKey, + payload: SolanaMintOpenDropBatchedPayload, + ) -> anyhow::Result<()> { + let conn = self.db.get(); + let producer = self.producer.clone(); + + let collection_id = Uuid::parse_str(&payload.collection_id)?; + let collection = Collection::find_by_id(conn, collection_id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound) + .unwrap(); + + let signers_pubkeys = vec![ + self.solana().treasury_wallet().to_string(), + collection.owner.clone(), + ]; + let blockhash = with_retry!(self.solana().rpc().get_latest_blockhash()) + .await + .context("blockhash not found") + .map_err(ProcessorErrorKind::Solana) + .unwrap(); + + let send_event = |mint_transactions: Vec, + signers_pubkeys: Vec| async { + producer + .send( + Some(&SolanaNftEvents { + event: Some(SolanaNftEvent::MintOpenDropBatchSigningRequested( + SolanaMintBatchPayload { + signers_pubkeys, + mint_transactions, + }, + )), + }), + Some(key), + ) + .await + }; + + if payload.compressed { + let backend = &CompressedRef(self.solana()); + let mut leafs: Vec = Vec::new(); + let mut mint_transactions = Vec::new(); + + for mint_tx in payload.mint_open_drop_transactions.clone() { + let id = Uuid::from_str(&mint_tx.mint_id)?; + + let tx = backend + .mint( + &collection, + Some(blockhash), + MintMetaplexMetadataTransaction { + recipient_address: mint_tx.recipient_address, + metadata: mint_tx.metadata, + collection_id: payload.collection_id.clone(), + compressed: payload.compressed, + }, + ) + .await?; + + mint_transactions.push(SolanaMintTransaction { + serialized_message: tx.serialized_message, + mint_id: mint_tx.mint_id, + signer_signature: None, + }); + + let compression_leaf = compression_leafs::Model { + id, + collection_id: collection.id, + merkle_tree: tx.addresses.merkle_tree.to_string(), + tree_authority: tx.addresses.tree_authority.to_string(), + tree_delegate: tx.addresses.tree_delegate.to_string(), + leaf_owner: tx.addresses.leaf_owner.to_string(), + created_at: Utc::now().naive_utc(), + ..Default::default() + }; + + leafs.push(compression_leaf.into()); + } + + compression_leafs::Entity::insert_many(leafs) + .exec(conn) + .await?; + + send_event(mint_transactions, signers_pubkeys).await?; + + return Ok(()); + } + + let backend = &UncompressedRef(self.solana()); + let mut mints: Vec = Vec::new(); + let mut mint_transactions = Vec::new(); + + for mint_tx in payload.mint_open_drop_transactions.clone() { + let id = Uuid::from_str(&mint_tx.mint_id)?; + let tx = backend + .mint( + &collection, + Some(blockhash), + MintMetaplexMetadataTransaction { + recipient_address: mint_tx.recipient_address, + metadata: mint_tx.metadata, + collection_id: payload.collection_id.clone(), + compressed: payload.compressed, + }, + ) + .await?; + + mint_transactions.push(SolanaMintTransaction { + serialized_message: tx.serialized_message, + mint_id: mint_tx.mint_id, + signer_signature: tx.signatures_or_signers_public_keys.get(1).cloned(), + }); + + let collection_mint = collection_mints::Model { + id, + collection_id: collection.id, + owner: tx.addresses.recipient.to_string(), + mint: tx.addresses.mint.to_string(), + created_at: Utc::now().naive_utc(), + associated_token_account: tx.addresses.associated_token_account.to_string(), + }; + + mints.push(collection_mint.into()); + } + + collection_mints::Entity::insert_many(mints) + .exec(conn) + .await?; + + send_event(mint_transactions, signers_pubkeys).await?; + Ok(()) + } + async fn process_nft( &self, kind: EventKind, @@ -880,7 +1036,7 @@ impl Processor { let backend = &CompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -911,7 +1067,7 @@ impl Processor { let backend = &UncompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -951,7 +1107,7 @@ impl Processor { .ok_or(ProcessorErrorKind::RecordNotFound)?; let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1165,7 +1321,7 @@ impl Processor { let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1205,7 +1361,7 @@ impl Processor { let backend = &CompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; @@ -1228,7 +1384,7 @@ impl Processor { let backend = &UncompressedRef(self.solana()); let tx = backend - .mint(&collection, payload) + .mint(&collection, None, payload) .await .map_err(ProcessorErrorKind::Solana)?; diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 3956ee8..6df34aa 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -65,7 +65,7 @@ use crate::{ UpdateMasterEditionAddresses, }, }; - +#[macro_export] macro_rules! with_retry { ($expr:expr) => {{ (|| async { $expr.await }) @@ -81,7 +81,7 @@ macro_rules! with_retry { }) }}; } - +pub use with_retry; const TOKEN_PROGRAM_PUBKEY: &str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"; #[derive(Debug, clap::Args)] @@ -203,6 +203,10 @@ impl Solana { self.rpc_client.clone() } + pub fn treasury_wallet(&self) -> Pubkey { + self.treasury_wallet_address + } + /// Res /// /// # Errors @@ -756,6 +760,7 @@ impl<'a> MintBackend for E async fn mint( &self, collection: &collections::Model, + blockhash: Option, txn: MintMetaplexEditionTransaction, ) -> hub_core::prelude::Result> { let rpc = &self.0.rpc_client; @@ -837,7 +842,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; + let blockhash = blockhash.unwrap_or(with_retry!(rpc.get_latest_blockhash()).await?); let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -1036,6 +1041,7 @@ impl<'a> MintBackend, txn: MintMetaplexMetadataTransaction, ) -> hub_core::prelude::Result> { let MintMetaplexMetadataTransaction { @@ -1125,8 +1131,8 @@ impl<'a> MintBackend MintBackend async fn mint( &self, collection: &collections::Model, + blockhash: Option, txn: MintMetaplexMetadataTransaction, ) -> hub_core::prelude::Result> { let MintMetaplexMetadataTransaction { @@ -1190,7 +1197,7 @@ impl<'a> MintBackend let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; - let blockhash = with_retry!(rpc.get_latest_blockhash()).await?; + let blockhash = blockhash.unwrap_or(with_retry!(rpc.get_latest_blockhash()).await?); let create_account_ins = solana_program::system_instruction::create_account( &payer, diff --git a/core/proto.lock b/core/proto.lock index 1c4fe4d..b76259d 100644 --- a/core/proto.lock +++ b/core/proto.lock @@ -1,14 +1,14 @@ [[schemas]] subject = "nfts" -version = 29 -sha512 = "b3b2136bd6c7a136d317da84395661de5fc056e8270510575a3281d78884d99a0d89f444754ed02cb18ad26dcc7cd65300c1df73b9d74d2edc6bcc8d552465d0" +version = 1 +sha512 = "449574f8551ab8c17824af9e08b1658ad1b26ac80340230ddf02e7a1e0979d8a47025913a6598799cf83dd1a9cda87697ee87a13f404ebb52c95ea0084205767" [[schemas]] subject = "solana_nfts" -version = 11 -sha512 = "967fefde938a0f6ce05194e4fca15673e681caac54d8aeec114c5d38418632b9696dbaf5362345a15114e5abb49de55d0af8b9edcc0f2c91f9ef1ccc4ff55d68" +version = 2 +sha512 = "c990f468d9bb9d44655c70b7de6ed57e512c75c4821768ff3d3d1ee067d5b54537969e9885e354f637c1386a3d11c0e97fabaf9e2d567c436a745e8b6d6af8cd" [[schemas]] subject = "treasury" -version = 23 +version = 1 sha512 = "0e4d77999767d5971122e720c1cee7a57c3e47ce69f58a582f1762d8e65e031ea3bd9024cfc21bd7da5db6e38a71657151c58cdfa21d9ff643fb2fc657105cf5" diff --git a/core/proto.toml b/core/proto.toml index 62864f2..6703b03 100644 --- a/core/proto.toml +++ b/core/proto.toml @@ -1,7 +1,7 @@ [registry] -endpoint = "https://schemas.holaplex.tools" +endpoint = "http://localhost:8081" [schemas] -nfts = 29 -treasury = 23 -solana_nfts = 11 \ No newline at end of file +nfts = 1 +treasury = 1 +solana_nfts = 2 \ No newline at end of file From f64c19ec5d8eec43d1603d26791a9be9880a8569 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Tue, 17 Oct 2023 09:52:26 +0500 Subject: [PATCH 2/4] update schemas --- consumer/src/events.rs | 6 +++--- core/proto.lock | 8 ++++---- core/proto.toml | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/consumer/src/events.rs b/consumer/src/events.rs index d9cb0fe..cd5d3b5 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -9,7 +9,7 @@ use holaplex_hub_nfts_solana_core::{ MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction, MintMetaplexMetadataTransaction, SolanaCompletedMintTransaction, SolanaCompletedTransferTransaction, SolanaCompletedUpdateTransaction, - SolanaFailedTransaction, SolanaMintBatchPayload, SolanaMintOpenDropBatchedPayload, + SolanaFailedTransaction, SolanaMintOpenDropBatchedPayload, SolanaMintPendingTransactions, SolanaMintTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason, SwitchCollectionPayload, TransferMetaplexAssetTransaction, UpdateSolanaMintPayload, @@ -763,8 +763,8 @@ impl Processor { producer .send( Some(&SolanaNftEvents { - event: Some(SolanaNftEvent::MintOpenDropBatchSigningRequested( - SolanaMintBatchPayload { + event: Some(SolanaNftEvent::MintOpenDropBatchedSigningRequested( + SolanaMintPendingTransactions { signers_pubkeys, mint_transactions, }, diff --git a/core/proto.lock b/core/proto.lock index b76259d..cf509b3 100644 --- a/core/proto.lock +++ b/core/proto.lock @@ -1,14 +1,14 @@ [[schemas]] subject = "nfts" -version = 1 +version = 31 sha512 = "449574f8551ab8c17824af9e08b1658ad1b26ac80340230ddf02e7a1e0979d8a47025913a6598799cf83dd1a9cda87697ee87a13f404ebb52c95ea0084205767" [[schemas]] subject = "solana_nfts" -version = 2 -sha512 = "c990f468d9bb9d44655c70b7de6ed57e512c75c4821768ff3d3d1ee067d5b54537969e9885e354f637c1386a3d11c0e97fabaf9e2d567c436a745e8b6d6af8cd" +version = 12 +sha512 = "4f85496c50a82cb40faa097cf6d0cb23275b3b90cb561d01388f3e5a71282a8b8e1eea617b7d712b0e415d65af483209fac2db1591456fa814a1f41a1c457433" [[schemas]] subject = "treasury" -version = 1 +version = 23 sha512 = "0e4d77999767d5971122e720c1cee7a57c3e47ce69f58a582f1762d8e65e031ea3bd9024cfc21bd7da5db6e38a71657151c58cdfa21d9ff643fb2fc657105cf5" diff --git a/core/proto.toml b/core/proto.toml index 6703b03..c784d59 100644 --- a/core/proto.toml +++ b/core/proto.toml @@ -1,7 +1,7 @@ [registry] -endpoint = "http://localhost:8081" +endpoint = "https://schemas.holaplex.tools" [schemas] -nfts = 1 -treasury = 1 -solana_nfts = 2 \ No newline at end of file +nfts = 31 +treasury = 23 +solana_nfts = 12 \ No newline at end of file From 3d1a9aa372ba91a9ae6910c1a9e7046064e1754a Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Tue, 17 Oct 2023 18:13:58 +0500 Subject: [PATCH 3/4] replace unwrap_or --- consumer/src/events.rs | 3 +-- consumer/src/solana.rs | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/consumer/src/events.rs b/consumer/src/events.rs index cd5d3b5..5a9d275 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -755,8 +755,7 @@ impl Processor { let blockhash = with_retry!(self.solana().rpc().get_latest_blockhash()) .await .context("blockhash not found") - .map_err(ProcessorErrorKind::Solana) - .unwrap(); + .map_err(ProcessorErrorKind::Solana)?; let send_event = |mint_transactions: Vec, signers_pubkeys: Vec| async { diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 6df34aa..768085c 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -1197,7 +1197,11 @@ impl<'a> MintBackend let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).await?; - let blockhash = blockhash.unwrap_or(with_retry!(rpc.get_latest_blockhash()).await?); + let blockhash = if let Some(blockhash) = blockhash { + blockhash + } else { + with_retry!(rpc.get_latest_blockhash()).await? + }; let create_account_ins = solana_program::system_instruction::create_account( &payer, From c0e070ba6dfe6ba7330a2407b319ad6106342bb1 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Wed, 18 Oct 2023 16:36:15 +0500 Subject: [PATCH 4/4] error handling --- consumer/src/events.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 5a9d275..d068e28 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -21,7 +21,6 @@ use holaplex_hub_nfts_solana_entity::{ collection_mints, collections, compression_leafs, update_revisions, }; use hub_core::{ - anyhow, backon::{ExponentialBuilder, Retryable}, chrono::Utc, metrics::KeyValue, @@ -635,11 +634,10 @@ impl Processor { ) .await }, - Some(NftEvent::SolanaMintOpenDropBatched(payload)) => { self.process_mint_batch(&key, payload).await.map_err(|e| { ProcessorError::new( - ProcessorErrorKind::Solana(e), + e, EventKind::MintOpenDropBatched, ErrorSource::NftFailure, ) @@ -738,15 +736,14 @@ impl Processor { &self, key: &SolanaNftEventKey, payload: SolanaMintOpenDropBatchedPayload, - ) -> anyhow::Result<()> { + ) -> ProcessResult<()> { let conn = self.db.get(); let producer = self.producer.clone(); let collection_id = Uuid::parse_str(&payload.collection_id)?; let collection = Collection::find_by_id(conn, collection_id) .await? - .ok_or(ProcessorErrorKind::RecordNotFound) - .unwrap(); + .ok_or(ProcessorErrorKind::RecordNotFound)?; let signers_pubkeys = vec![ self.solana().treasury_wallet().to_string(), @@ -793,7 +790,8 @@ impl Processor { compressed: payload.compressed, }, ) - .await?; + .await + .map_err(ProcessorErrorKind::Solana)?; mint_transactions.push(SolanaMintTransaction { serialized_message: tx.serialized_message, @@ -841,7 +839,8 @@ impl Processor { compressed: payload.compressed, }, ) - .await?; + .await + .map_err(ProcessorErrorKind::Solana)?; mint_transactions.push(SolanaMintTransaction { serialized_message: tx.serialized_message,