Skip to content

Commit

Permalink
Revert "feat(das): add metric for task queue depth (#83)" (#85)
Browse files Browse the repository at this point in the history
This reverts commit 11e1aef.
  • Loading branch information
Nikhil Acharya authored Jul 13, 2023
1 parent 20c1718 commit 70cd2ed
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 61 deletions.
27 changes: 13 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ This spec is what providers of this api must implement against.
Along with the above rust binaries, this repo also maintains examples and best practice settings for running the entire infrastructure.
The example infrastructure is as follows.

- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus.
- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system.
- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration.
- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software.
- A Solana No-Vote Validator - This validator is configured to only have secure access to the validator ledger and account data under consensus.
- A Geyser Plugin (Plerkle) - The above validator is further configured to load this geyser plugin that sends Plerkle Serialized Messages over a messaging system.
- A Redis Cluster (Stream Optimized) - The example messaging system is a light weight redis deployment that supports the streaming configuration.
- A Kubernetes Cluster - The orchestration system for the API and Ingester processes. Probably overkill for a small installation, but it's a rock solid platform for critical software.

This repo houses Helm Charts, Docker files and Terraform files to assist in the deployment of the example infrastructure.

Expand All @@ -67,10 +67,10 @@ If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`.

_Prerequisites_

- A Postgres Server running with the database setup according to ./init.sql
- A Redis instance that has streams enabled or a version that supports streams
- A local solana validator with the Plerkle plugin running.
- Environment Variables set to allow your validator, ingester and api to access those prerequisites.
- A Postgres Server running with the database setup according to ./init.sql
- A Redis instance that has streams enabled or a version that supports streams
- A local solana validator with the Plerkle plugin running.
- Environment Variables set to allow your validator, ingester and api to access those prerequisites.

See [Plugin Configuration](https://github.com/metaplex-foundation/digital-asset-validator-plugin#building-locally) for how to locally configure the test validator plugin to work.

Expand Down Expand Up @@ -118,11 +118,11 @@ For production you should split the components up.

Developing with Docker is much easier, but has some nuances to it. This test docker compose system relies on a programs folder being accessible, this folder needs to have the shared object files for the following programs

- Token Metadata
- Bubblegum
- Gummyroll
- Token 2022
- Latest version of the Associated token program
- Token Metadata
- Bubblegum
- Gummyroll
- Token 2022
- Latest version of the Associated token program

You need to run the following script (which takes a long time) in order to get all those .so files.

Expand Down Expand Up @@ -288,7 +288,6 @@ count ingester.bgtask.network_error
count ingester.bgtask.unrecoverable_error
time ingester.bgtask.bus_time
count ingester.bgtask.identical
gauge ingester.bgtask.queue_depth

### BACKFILLER

Expand Down
5 changes: 3 additions & 2 deletions digital_asset_types/src/dao/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ impl SearchAssetsQuery {
conditions = conditions.add(asset_creators::Column::Creator.eq(c));
}

// Without specifying the creators themselves, there is no index being hit.
// So in some rare scenarios, this query could be very slow.
// N.B. Something to consider is that without specifying the creators themselves,
// there is no index being hit. That means in some scenarios this query could be very slow.
// But those should only happen in rare scenarios.
if let Some(cv) = self.creator_verified.to_owned() {
conditions = conditions.add(asset_creators::Column::Verified.eq(cv));
}
Expand Down
18 changes: 9 additions & 9 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,31 @@ pub enum IngesterError {
StorageWriteError(String),
#[error("NotImplemented")]
NotImplemented,
#[error("Deserialization Error: {0}")]
#[error("Deserialization Error {0}")]
DeserializationError(String),
#[error("Task Manager Error: {0}")]
#[error("Task Manager Error {0}")]
TaskManagerError(String),
#[error("Missing or invalid configuration: ({msg})")]
ConfigurationError { msg: String },
#[error("Error getting RPC data {0}")]
RpcGetDataError(String),
#[error("RPC returned data in unsupported format: {0}")]
#[error("RPC returned data in unsupported format {0}")]
RpcDataUnsupportedFormat(String),
#[error("Data serializaton error: {0}")]
#[error("Data serializaton error {0}")]
SerializatonError(String),
#[error("Messenger error: {0}")]
#[error("Messenger error {0}")]
MessengerError(String),
#[error("Blockbuster Parsing error: {0}")]
#[error("Blockbuster Parsing error {0}")]
ParsingError(String),
#[error("Database Error: {0}")]
#[error("Data Base Error {0}")]
DatabaseError(String),
#[error("Unknown Task Type: {0}")]
#[error("Unknown Task Type {0}")]
UnknownTaskType(String),
#[error("BG Task Manager Not Started")]
TaskManagerNotStarted,
#[error("Unrecoverable task error: {0}")]
UnrecoverableTaskError(String),
#[error("Cache Storage Write Error: {0}")]
#[error("Cache Storage Write Error {0}")]
CacheStorageWriteError(String),
#[error("HttpError {status_code}")]
HttpError { status_code: String },
Expand Down
40 changes: 4 additions & 36 deletions nft_ingester/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::IngesterError, metric};
use async_trait::async_trait;
use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge, statsd_histogram};
use cadence_macros::{is_global_default_set, statsd_count, statsd_histogram};
use chrono::{Duration, NaiveDateTime, Utc};
use crypto::{digest::Digest, sha2::Sha256};
use digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks};
Expand Down Expand Up @@ -34,7 +34,6 @@ pub trait BgTask: Send + Sync {
}

const RETRY_INTERVAL: u64 = 1000;
const QUEUE_DEPTH_INTERVAL: u64 = 2500;
const DELETE_INTERVAL: u64 = 30000;
const MAX_TASK_BATCH_SIZE: u64 = 100;

Expand Down Expand Up @@ -192,14 +191,6 @@ impl TaskManager {
.map_err(|e| e.into())
}

pub async fn get_task_queue_depth(conn: &DatabaseConnection) -> Result<u64, IngesterError> {
tasks::Entity::find()
.filter(tasks::Column::Status.eq(TaskStatus::Pending))
.count(conn)
.await
.map_err(|e| e.into())
}

pub fn get_sender(&self) -> Result<UnboundedSender<TaskData>, IngesterError> {
self.producer
.clone()
Expand Down Expand Up @@ -332,9 +323,11 @@ impl TaskManager {
}

pub fn start_runner(&self) -> JoinHandle<()> {
let task_map = self.registered_task_types.clone();
let pool = self.pool.clone();
let instance_name = self.instance_name.clone();
tokio::spawn(async move {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
let mut interval = time::interval(tokio::time::Duration::from_millis(DELETE_INTERVAL));
loop {
interval.tick().await; // ticks immediately
Expand All @@ -349,33 +342,8 @@ impl TaskManager {
};
}
});

let pool = self.pool.clone();
tokio::spawn(async move {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let mut interval =
time::interval(tokio::time::Duration::from_millis(QUEUE_DEPTH_INTERVAL));
loop {
interval.tick().await; // ticks immediately
let res = TaskManager::get_task_queue_depth(&conn).await;
match res {
Ok(depth) => {
debug!("Task queue depth: {}", depth);
metric! {
statsd_gauge!("ingester.bgtask.queue_depth", depth);
}
}
Err(e) => {
error!("error getting queue depth: {}", e);
}
};
}
});

let pool = self.pool.clone();
let ipfs_gateway = self.ipfs_gateway.clone();
let task_map = self.registered_task_types.clone();
let instance_name = self.instance_name.clone();
tokio::spawn(async move {
let mut interval = time::interval(tokio::time::Duration::from_millis(RETRY_INTERVAL));
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
Expand Down

0 comments on commit 70cd2ed

Please sign in to comment.