Skip to content

Commit

Permalink
refactor(backfiller): select fixed list of trees to process. switch f…
Browse files Browse the repository at this point in the history
…rom a queue channel to a pool. move db checks for inserting a transaction into transaction worker thread instead of tree thread allowing for concurrent saving of transactions. (#117)
  • Loading branch information
kespinola authored and linuskendall committed Dec 28, 2023
1 parent 61cf24d commit 0b33200
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 98 deletions.
180 changes: 125 additions & 55 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@ use crate::{

use anyhow::Result;
use clap::Parser;
use digital_asset_types::dao::tree_transactions;
use indicatif::HumanDuration;
use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use sea_orm::{
sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait,
QueryFilter, QueryOrder,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::{mpsc, Semaphore};

Expand All @@ -22,17 +30,17 @@ pub struct Args {
pub solana_rpc_url: String,

/// Number of tree crawler workers
#[arg(long, env, default_value = "100")]
#[arg(long, env, default_value = "20")]
pub tree_crawler_count: usize,

/// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel.
#[arg(long, env, default_value = "1000")]
/// The size of the signature channel. This is the number of signatures that can be queued up.
#[arg(long, env, default_value = "10000")]
pub signature_channel_size: usize,

#[arg(long, env, default_value = "1000")]
pub queue_channel_size: usize,
#[arg(long, env, default_value = "100")]
pub transaction_worker_count: usize,

#[arg(long, env)]
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

/// Database configuration
Expand All @@ -48,77 +56,143 @@ pub struct Args {
pub metrics: MetricsArgs,
}

/// The main function for running the backfiller.
/// A thread-safe counter.
pub struct Counter(Arc<AtomicUsize>);

impl Counter {
/// Creates a new counter initialized to zero.
pub fn new() -> Self {
Self(Arc::new(AtomicUsize::new(0)))
}

/// Increments the counter by one.
pub fn increment(&self) {
self.0.fetch_add(1, Ordering::SeqCst);
}

/// Decrements the counter by one.
pub fn decrement(&self) {
self.0.fetch_sub(1, Ordering::SeqCst);
}

/// Returns the current value of the counter.
pub fn get(&self) -> usize {
self.0.load(Ordering::SeqCst)
}

/// Returns a future that resolves when the counter reaches zero.
/// The future periodically checks the counter value and sleeps for a short duration.
pub fn zero(&self) -> impl std::future::Future<Output = ()> {
let counter = self.clone();
async move {
while counter.get() > 0 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}

impl Clone for Counter {
/// Returns a clone of the counter.
/// The returned counter shares the same underlying atomic integer.
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}

/// Runs the backfilling process for trees.
///
/// This function does the following:
/// 1. Sets up the Solana RPC client and the database connection pool.
/// 2. Initializes the metrics for trees, signatures, and the queue.
/// 3. Creates channels for the queue and signatures.
/// 4. Spawns a new task to handle transactions.
/// 5. Spawns a new task to handle the queue.
/// 6. Fetches all trees and spawns a new task for each tree to crawl it.
/// 7. Waits for all crawling tasks to complete.
/// 8. Waits for the transaction worker count to reach zero.
/// 9. Waits for the queue handler to finish.
/// 10. Logs the total time taken and the number of trees crawled.
/// This function initializes the necessary components such as the Solana RPC client,
/// database connection, metrics, and worker queues. It then fetches all trees and
/// starts the crawling process for each tree in parallel, respecting the configured
/// concurrency limits. It also listens for signatures and processes transactions
/// concurrently. After crawling all trees, it completes the transaction handling
/// and logs the total time taken for the job.
///
/// # Arguments
///
/// * `config` - The configuration arguments for the backfiller.
/// * `config` - The configuration settings for the backfiller, including RPC URLs,
/// database settings, and worker counts.
///
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the function runs successfully. Returns an error otherwise.
/// This function returns a `Result` which is `Ok` if the backfilling process completes
/// successfully, or an `Error` if any part of the process fails.
pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url));
let sig_solana_rpc = Arc::clone(&solana_rpc);
let transaction_solana_rpc = Arc::clone(&solana_rpc);

let pool = db::connect(config.database).await?;
let transaction_pool = pool.clone();

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let signature_metrics = metrics.clone();
let queue_metrics = metrics.clone();
let transaction_metrics = metrics.clone();

let (queue_sender, mut queue_receiver) = mpsc::channel::<Vec<u8>>(config.queue_channel_size);
let signature_queue_sender = queue_sender.clone();
let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);
let (sig_sender, mut sig_receiver) =
mpsc::channel::<tree_transactions::ActiveModel>(config.signature_channel_size);

let mut queue = queue::Queue::setup(config.queue).await?;
let transaction_count = Counter::new();
let transaction_worker_transaction_count = transaction_count.clone();

let queue_handle = tokio::spawn(async move {
while let Some(data) = queue_receiver.recv().await {
if let Err(e) = queue.push(&data).await {
queue_metrics.increment("transaction.failed");
error!("pushing to queue: {:?}", e);
} else {
queue_metrics.increment("transaction.succeeded");
}
}
});
let queue = queue::QueuePool::try_from_config(config.queue).await?;

let signature_handle = tokio::spawn(async move {
while let Some(signature) = sig_receiver.recv().await {
let solana_rpc = Arc::clone(&sig_solana_rpc);
let queue_sender = signature_queue_sender.clone();
let metrics = signature_metrics.clone();
tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.transaction_worker_count));

while let Some(tree_transaction) = sig_receiver.recv().await {
let solana_rpc = transaction_solana_rpc.clone();
let metrics = transaction_metrics.clone();
let queue = queue.clone();
let pool = transaction_pool.clone();
let count = transaction_worker_transaction_count.clone();

count.increment();

let _permit = semaphore.acquire().await?;

tokio::spawn(async move {
let timing = Instant::now();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let inserted_tree_transaction = tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec_with_returning(&conn)
.await;

if let Ok(tree_transaction) = inserted_tree_transaction {
let signature = Signature::from_str(&tree_transaction.signature)?;

if let Err(e) = tree::transaction(solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
metrics.increment("transaction.succeeded");
}

if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await {
metrics.increment("transaction.failed");
error!("sending to queue: {:?}", e);
} else {
metrics.time("transaction.queued", timing.elapsed());
}

count.decrement();

Ok::<(), anyhow::Error>(())
});
}

Ok::<(), anyhow::Error>(())
});

let started = Instant::now();

let trees = tree::all(&solana_rpc).await?;
let trees = if let Some(only_trees) = config.only_trees {
tree::find(&solana_rpc, only_trees).await?
} else {
tree::all(&solana_rpc).await?
};
let tree_count = trees.len();

info!(
Expand All @@ -131,8 +205,8 @@ pub async fn run(config: Args) -> Result<()> {
let mut crawl_handles = Vec::with_capacity(tree_count);

for tree in trees {
let client = Arc::clone(&solana_rpc);
let semaphore = Arc::clone(&semaphore);
let client = solana_rpc.clone();
let semaphore = semaphore.clone();
let sig_sender = sig_sender.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
Expand All @@ -147,7 +221,7 @@ pub async fn run(config: Args) -> Result<()> {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
metrics.increment("tree.completed");
metrics.increment("tree.succeeded");
}

metrics.time("tree.crawled", timing.elapsed());
Expand All @@ -159,12 +233,8 @@ pub async fn run(config: Args) -> Result<()> {
}

futures::future::try_join_all(crawl_handles).await?;
drop(sig_sender);

signature_handle.await?;
drop(queue_sender);

queue_handle.await?;
transaction_count.zero().await;

metrics.time("job.completed", started.elapsed());

Expand Down
89 changes: 64 additions & 25 deletions tree_backfiller/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::Result;
use clap::Parser;
use figment::value::{Dict, Value};
use plerkle_messenger::{
redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType,
};
use std::sync::{Arc, Mutex};
use plerkle_messenger::{Messenger, MessengerConfig, MessengerType};
use std::num::TryFromIntError;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{mpsc::error::TrySendError, Mutex};

const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL";

Expand All @@ -14,8 +15,8 @@ pub struct QueueArgs {
pub messenger_redis_url: String,
#[arg(long, env, default_value = "100")]
pub messenger_redis_batch_size: String,
#[arg(long, env, default_value = "10000000")]
pub messenger_stream_max_buffer_size: usize,
#[arg(long, env, default_value = "25")]
pub messenger_queue_connections: u64,
}

impl From<QueueArgs> for MessengerConfig {
Expand All @@ -26,37 +27,75 @@ impl From<QueueArgs> for MessengerConfig {
"redis_connection_str".to_string(),
Value::from(args.messenger_redis_url),
);

connection_config.insert(
"batch_size".to_string(),
Value::from(args.messenger_redis_batch_size),
);
connection_config.insert(
"pipeline_size_bytes".to_string(),
Value::from(1u128.to_string()),
);

Self {
messenger_type: MessengerType::Redis,
connection_config: connection_config,
}
}
}
#[derive(Debug)]
pub struct Queue(RedisMessenger);

impl Queue {
pub async fn setup(config: QueueArgs) -> Result<Self, MessengerError> {
let mut messenger = RedisMessenger::new(config.clone().into()).await?;

messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?;
messenger
.set_buffer_size(
TRANSACTION_BACKFILL_STREAM,
config.messenger_stream_max_buffer_size,
)
.await;

Ok(Self(messenger))

#[derive(thiserror::Error, Debug)]
pub enum QueuePoolError {
#[error("messenger")]
Messenger(#[from] plerkle_messenger::MessengerError),
#[error("tokio try send to channel")]
TrySendMessengerChannel(#[from] TrySendError<Box<dyn Messenger>>),
#[error("revc messenger connection")]
RecvMessengerConnection,
#[error("try from int")]
TryFromInt(#[from] TryFromIntError),
#[error("tokio send to channel")]
SendMessengerChannel(#[from] mpsc::error::SendError<Box<dyn Messenger>>),
}

#[derive(Debug, Clone)]
pub struct QueuePool {
tx: mpsc::Sender<Box<dyn plerkle_messenger::Messenger>>,
rx: Arc<Mutex<mpsc::Receiver<Box<dyn plerkle_messenger::Messenger>>>>,
}

impl QueuePool {
pub async fn try_from_config(config: QueueArgs) -> anyhow::Result<Self, QueuePoolError> {
let size = usize::try_from(config.messenger_queue_connections)?;
let (tx, rx) = mpsc::channel(size);

for _ in 0..config.messenger_queue_connections {
let messenger_config: MessengerConfig = config.clone().into();
let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?;
messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?;
messenger
.set_buffer_size(TRANSACTION_BACKFILL_STREAM, 10000000000000000)
.await;

tx.try_send(messenger)?;
}

Ok(Self {
tx,
rx: Arc::new(Mutex::new(rx)),
})
}

pub async fn push(&mut self, message: &[u8]) -> Result<(), MessengerError> {
self.0.send(TRANSACTION_BACKFILL_STREAM, message).await
pub async fn push(&self, message: &[u8]) -> Result<(), QueuePoolError> {
let mut rx = self.rx.lock().await;
let mut messenger = rx
.recv()
.await
.ok_or(QueuePoolError::RecvMessengerConnection)?;

messenger.send(TRANSACTION_BACKFILL_STREAM, message).await?;

self.tx.send(messenger).await?;

Ok(())
}
}
Loading

0 comments on commit 0b33200

Please sign in to comment.