Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support batch mint of random queued nfts #88

Merged
merged 4 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions consumer/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use holaplex_hub_nfts_solana_core::proto::{
};
use holaplex_hub_nfts_solana_entity::{collection_mints, collections, update_revisions};
use hub_core::prelude::*;
use solana_program::pubkey::Pubkey;
use solana_program::{hash::Hash, pubkey::Pubkey};
#[derive(Clone)]
pub struct MasterEditionAddresses {
pub metadata: Pubkey,
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct TransferAssetAddresses {

/// Represents a response from a transaction on the blockchain. This struct
/// provides the serialized message and the signatures of the signed message.

#[derive(Clone)]
pub struct TransactionResponse<A> {
/// The serialized version of the message from the transaction.
pub serialized_message: Vec<u8>,
Expand Down Expand Up @@ -138,15 +140,20 @@ pub trait CollectionBackend {

#[async_trait]
pub trait MintBackend<T, R> {
async fn mint(&self, collection: &collections::Model, txn: T)
-> Result<TransactionResponse<R>>;
async fn mint(
&self,
collection: &collections::Model,
blockhash: Option<Hash>,
txn: T,
) -> Result<TransactionResponse<R>>;
}

#[async_trait]
pub trait TransferBackend<M, R> {
async fn transfer(
&self,
collection_mint: &M,

txn: TransferMetaplexAssetTransaction,
) -> Result<TransactionResponse<R>>;
}
170 changes: 163 additions & 7 deletions consumer/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use holaplex_hub_nfts_solana_core::{
MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction,
MintMetaplexMetadataTransaction, SolanaCompletedMintTransaction,
SolanaCompletedTransferTransaction, SolanaCompletedUpdateTransaction,
SolanaFailedTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction,
SolanaFailedTransaction, SolanaMintBatchPayload, SolanaMintOpenDropBatchedPayload,
SolanaMintTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction,
SolanaTransactionFailureReason, SwitchCollectionPayload, TransferMetaplexAssetTransaction,
UpdateSolanaMintPayload,
},
Expand All @@ -20,6 +21,8 @@ use holaplex_hub_nfts_solana_entity::{
collection_mints, collections, compression_leafs, update_revisions,
};
use hub_core::{
anyhow,
backon::{ExponentialBuilder, Retryable},
chrono::Utc,
metrics::KeyValue,
prelude::*,
Expand All @@ -29,6 +32,7 @@ use hub_core::{
uuid,
uuid::Uuid,
};
use solana_client::client_error::ClientError;
use solana_program::pubkey::{ParsePubkeyError, Pubkey};
use solana_sdk::signature::Signature;

Expand All @@ -39,6 +43,7 @@ use crate::{
},
metrics::Metrics,
solana::{CompressedRef, EditionRef, Solana, SolanaAssetIdError, UncompressedRef},
with_retry,
};

#[derive(Debug, thiserror::Error, Triage)]
Expand Down Expand Up @@ -126,6 +131,7 @@ pub enum EventKind {
UpdateOpenDrop,
RetryCreateOpenDrop,
RetryMintOpenDrop,
MintOpenDropBatched,
}

impl EventKind {
Expand All @@ -150,6 +156,7 @@ impl EventKind {
Self::UpdateOpenDrop => "open drop update",
Self::RetryCreateOpenDrop => "open drop creation retry",
Self::RetryMintOpenDrop => "open drop mint retry",
Self::MintOpenDropBatched => "open drop mint batch",
}
}

Expand Down Expand Up @@ -190,6 +197,7 @@ impl EventKind {
SolanaNftEvent::RetryCreateOpenDropSigningRequested(tx)
},
EventKind::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropSigningRequested(tx),
EventKind::MintOpenDropBatched => unreachable!(),
}
}

Expand Down Expand Up @@ -390,6 +398,7 @@ impl EventKind {
address: collection_mint.mint,
})
},
Self::MintOpenDropBatched => unreachable!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not instead of writing a different batch method you incorporate the logic here? Can have helper function for consolidating the logic.

})
}

Expand All @@ -414,6 +423,7 @@ impl EventKind {
Self::UpdateOpenDrop => SolanaNftEvent::UpdateOpenDropFailed(tx),
Self::RetryCreateOpenDrop => SolanaNftEvent::RetryCreateOpenDropFailed(tx),
Self::RetryMintOpenDrop => SolanaNftEvent::RetryMintOpenDropFailed(tx),
Self::MintOpenDropBatched => unreachable!(),
}
}
}
Expand Down Expand Up @@ -625,6 +635,16 @@ impl Processor {
)
.await
},

Some(NftEvent::SolanaMintOpenDropBatched(payload)) => {
self.process_mint_batch(&key, payload).await.map_err(|e| {
ProcessorError::new(
ProcessorErrorKind::Solana(e),
EventKind::MintOpenDropBatched,
ErrorSource::NftFailure,
)
})
},
_ => Ok(()),
}
},
Expand Down Expand Up @@ -714,6 +734,142 @@ impl Processor {
}
}

async fn process_mint_batch(
&self,
key: &SolanaNftEventKey,
payload: SolanaMintOpenDropBatchedPayload,
) -> anyhow::Result<()> {
let conn = self.db.get();
let producer = self.producer.clone();

let collection_id = Uuid::parse_str(&payload.collection_id)?;
let collection = Collection::find_by_id(conn, collection_id)
.await?
.ok_or(ProcessorErrorKind::RecordNotFound)
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.ok_or(ProcessorErrorKind::RecordNotFound)
.unwrap();
.ok_or(ProcessorErrorKind::RecordNotFound)?;


let signers_pubkeys = vec![
self.solana().treasury_wallet().to_string(),
collection.owner.clone(),
];
let blockhash = with_retry!(self.solana().rpc().get_latest_blockhash())
.await
.context("blockhash not found")
.map_err(ProcessorErrorKind::Solana)
.unwrap();
imabdulbasit marked this conversation as resolved.
Show resolved Hide resolved

let send_event = |mint_transactions: Vec<SolanaMintTransaction>,
signers_pubkeys: Vec<String>| async {
producer
.send(
Some(&SolanaNftEvents {
event: Some(SolanaNftEvent::MintOpenDropBatchSigningRequested(
SolanaMintBatchPayload {
signers_pubkeys,
mint_transactions,
},
)),
}),
Some(key),
)
.await
};

if payload.compressed {
let backend = &CompressedRef(self.solana());
let mut leafs: Vec<compression_leafs::ActiveModel> = Vec::new();
let mut mint_transactions = Vec::new();

for mint_tx in payload.mint_open_drop_transactions.clone() {
let id = Uuid::from_str(&mint_tx.mint_id)?;

let tx = backend
.mint(
&collection,
Some(blockhash),
MintMetaplexMetadataTransaction {
recipient_address: mint_tx.recipient_address,
metadata: mint_tx.metadata,
collection_id: payload.collection_id.clone(),
compressed: payload.compressed,
},
)
.await?;

mint_transactions.push(SolanaMintTransaction {
serialized_message: tx.serialized_message,
mint_id: mint_tx.mint_id,
signer_signature: None,
});

let compression_leaf = compression_leafs::Model {
id,
collection_id: collection.id,
merkle_tree: tx.addresses.merkle_tree.to_string(),
tree_authority: tx.addresses.tree_authority.to_string(),
tree_delegate: tx.addresses.tree_delegate.to_string(),
leaf_owner: tx.addresses.leaf_owner.to_string(),
created_at: Utc::now().naive_utc(),
..Default::default()
};

leafs.push(compression_leaf.into());
}

compression_leafs::Entity::insert_many(leafs)
.exec(conn)
.await?;

send_event(mint_transactions, signers_pubkeys).await?;

return Ok(());
}

let backend = &UncompressedRef(self.solana());
let mut mints: Vec<collection_mints::ActiveModel> = Vec::new();
let mut mint_transactions = Vec::new();

for mint_tx in payload.mint_open_drop_transactions.clone() {
let id = Uuid::from_str(&mint_tx.mint_id)?;
let tx = backend
.mint(
&collection,
Some(blockhash),
MintMetaplexMetadataTransaction {
recipient_address: mint_tx.recipient_address,
metadata: mint_tx.metadata,
collection_id: payload.collection_id.clone(),
compressed: payload.compressed,
},
)
.await?;

mint_transactions.push(SolanaMintTransaction {
serialized_message: tx.serialized_message,
mint_id: mint_tx.mint_id,
signer_signature: tx.signatures_or_signers_public_keys.get(1).cloned(),
});

let collection_mint = collection_mints::Model {
id,
collection_id: collection.id,
owner: tx.addresses.recipient.to_string(),
mint: tx.addresses.mint.to_string(),
created_at: Utc::now().naive_utc(),
associated_token_account: tx.addresses.associated_token_account.to_string(),
};

mints.push(collection_mint.into());
}

collection_mints::Entity::insert_many(mints)
.exec(conn)
.await?;

send_event(mint_transactions, signers_pubkeys).await?;
Ok(())
}

async fn process_nft(
&self,
kind: EventKind,
Expand Down Expand Up @@ -880,7 +1036,7 @@ impl Processor {
let backend = &CompressedRef(self.solana());

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand Down Expand Up @@ -911,7 +1067,7 @@ impl Processor {
let backend = &UncompressedRef(self.solana());

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand Down Expand Up @@ -951,7 +1107,7 @@ impl Processor {
.ok_or(ProcessorErrorKind::RecordNotFound)?;

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand Down Expand Up @@ -1165,7 +1321,7 @@ impl Processor {
let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?;

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand Down Expand Up @@ -1205,7 +1361,7 @@ impl Processor {
let backend = &CompressedRef(self.solana());

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand All @@ -1228,7 +1384,7 @@ impl Processor {
let backend = &UncompressedRef(self.solana());

let tx = backend
.mint(&collection, payload)
.mint(&collection, None, payload)
.await
.map_err(ProcessorErrorKind::Solana)?;

Expand Down
Loading
Loading