Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Product Data] Introduce data persistence on gateways #5022

Merged
merged 14 commits into from
Oct 28, 2024
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ members = [
"common/exit-policy",
"common/gateway-requests",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
Expand Down
34 changes: 34 additions & 0 deletions common/gateway-stats-storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "nym-gateway-stats-storage"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
"time",
] }
time = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

nym-sphinx = { path = "../nymsphinx" }
nym-credentials-interface = { path = "../credentials-interface" }


[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
28 changes: 28 additions & 0 deletions common/gateway-stats-storage/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use sqlx::{Connection, SqliteConnection};
use std::env;

#[tokio::main]
async fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{}/gateway-stats-example.sqlite", out_dir);

let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path))
.await
.expect("Failed to create SQLx database connection");

sqlx::migrate!("./migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");

#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);

#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021 - Nym Technologies SA <[email protected]>
simonwicky marked this conversation as resolved.
Show resolved Hide resolved
* SPDX-License-Identifier: Apache-2.0
simonwicky marked this conversation as resolved.
Show resolved Hide resolved
*/

CREATE TABLE sessions_active
(
client_address TEXT NOT NULL PRIMARY KEY UNIQUE,
start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
typ TEXT NOT NULL
);

CREATE TABLE sessions_finished
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
day DATE NOT NULL,
duration_ms INTEGER NOT NULL,
typ TEXT NOT NULL
);

CREATE TABLE sessions_unique_users
(
day DATE NOT NULL,
client_address TEXT NOT NULL,
PRIMARY KEY (day, client_address)
);
13 changes: 13 additions & 0 deletions common/gateway-stats-storage/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use thiserror::Error;

#[derive(Error, Debug)]
pub enum StatsStorageError {
#[error("Database experienced an internal error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),

#[error("Failed to perform database migration: {0}")]
MigrationError(#[from] sqlx::migrate::MigrateError),
}
195 changes: 195 additions & 0 deletions common/gateway-stats-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use error::StatsStorageError;
use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use std::path::Path;
use time::Date;
use tracing::{debug, error};

pub mod error;
pub mod models;
mod sessions;

// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub struct PersistentStatsStorage {
session_manager: SessionManager,
}

impl PersistentStatsStorage {
/// Initialises `PersistentStatsStorage` using the provided path.
///
/// # Arguments
///
/// * `database_path`: path to the database.
pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
debug!(
"Attempting to connect to database {:?}",
database_path.as_ref().as_os_str()
);

// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();

// TODO: do we want auto_vacuum ?

let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};

if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
error!("Failed to perform migration on the SQLx database: {err}");
return Err(err.into());
}

// the cloning here are cheap as connection pool is stored behind an Arc
Ok(PersistentStatsStorage {
session_manager: sessions::SessionManager::new(connection_pool),
})
}

//Sessions fn
pub async fn insert_finished_session(
&self,
date: Date,
session: FinishedSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_finished_session(
date,
session.duration.whole_milliseconds() as i64,
session.typ.to_string().into(),
)
.await?)
}

pub async fn get_finished_sessions(
&self,
date: Date,
) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
Ok(self.session_manager.get_finished_sessions(date).await?)
}

pub async fn delete_finished_sessions(
&self,
before_date: Date,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_finished_sessions(before_date)
.await?)
}

pub async fn insert_unique_user(
&self,
date: Date,
client_address_bs58: String,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_unique_user(date, client_address_bs58)
.await?)
}

pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
Ok(self.session_manager.get_unique_users_count(date).await?)
}

pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_users(before_date)
.await?)
}

pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
session: ActiveSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_active_session(
client_address.as_base58_string(),
session.start,
session.typ.to_string().into(),
)
.await?)
}

pub async fn update_active_session_type(
&self,
client_address: DestinationAddressBytes,
session_type: SessionType,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.update_active_session_type(
client_address.as_base58_string(),
session_type.to_string().into(),
)
.await?)
}

pub async fn get_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<Option<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_active_session(client_address.as_base58_string())
.await?
.map(Into::into))
}

pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_all_active_sessions()
.await?
.into_iter()
.map(Into::into)
.collect())
}

pub async fn get_started_sessions_count(
&self,
start_date: Date,
) -> Result<i32, StatsStorageError> {
Ok(self
.session_manager
.get_started_sessions_count(start_date)
.await?)
}

pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
Ok(self.session_manager.get_active_users().await?)
}

pub async fn delete_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_active_session(client_address.as_base58_string())
.await?)
}

pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
Ok(self.session_manager.cleanup_active_sessions().await?)
}
}
Loading
Loading