Skip to content

Commit

Permalink
test: stress application with simultaneous schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
kasbuunk committed Dec 15, 2024
1 parent 7083f87 commit 16ea3c5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 23 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ license-file = "LICENSE"
readme = "README.md"
keywords = ["transmit", "scheduler"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-nats = "0.33.0"
async-stream = "0.3.5"
Expand Down
151 changes: 135 additions & 16 deletions src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod tests {
use crate::contract::*;
use crate::grpc;
use crate::grpc::proto::transmit_client::TransmitClient;
use crate::grpc::proto::ScheduleTransmissionRequest;
use crate::metrics;
use crate::nats;
use crate::postgres;
Expand All @@ -30,6 +31,7 @@ mod tests {
async fn schedule_delayed_transmission() {
let subject = "INTEGRATION.delayed_transmission";
let grpc_port = 50053;
let cancel = CancellationToken::new();
let timestamp_now = Utc::now();
let listen_timeout_duration = std::time::Duration::from_millis(25);

Expand Down Expand Up @@ -67,11 +69,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request =
new_delayed_transmission_request(subject.to_string(), transmission_timestamp);
new_delayed_transmission_request(subject, transmission_timestamp);
let grpc_request = tonic::Request::new(schedule_transmission_request);

// Do the request.
Expand Down Expand Up @@ -99,6 +102,7 @@ mod tests {
async fn schedule_interval_transmission() {
let subject = "INTEGRATION.interval_transmission";
let grpc_port = 50052;
let cancel = CancellationToken::new();
let timestamp_now = Utc::now();
let listen_timeout_duration = std::time::Duration::from_millis(25);

Expand Down Expand Up @@ -157,11 +161,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request = new_interval_transmission_request(
subject.to_string(),
subject,
first_transmission_timestamp,
interval_duration,
repetitions,
Expand All @@ -188,10 +193,12 @@ mod tests {
.await;
}
}

#[tokio::test]
async fn schedule_cron_transmission() {
let subject = "INTEGRATION.cron_transmission";
let grpc_port = 50054;
let cancel = CancellationToken::new();
let timestamp_now: DateTime<Utc> =
DateTime::<Utc>::from_timestamp(1431648000, 0).expect("invalid timestamp");
assert_eq!(timestamp_now.to_string(), "2015-05-15 00:00:00 UTC");
Expand Down Expand Up @@ -262,11 +269,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request = new_cron_transmission_request(
subject.to_string(),
subject,
first_transmission_after,
&cron_expression,
repetitions,
Expand Down Expand Up @@ -294,8 +302,79 @@ mod tests {
}
}

fn many_transmission_requests(
now: DateTime<Utc>,
subject: &str,
) -> Vec<ScheduleTransmissionRequest> {
let mut transmission_requests = Vec::new();
for milliseconds in 0..100 {
// Allow some progression in delayed transmission times.
let transmission_time = now + time::Duration::from_secs(milliseconds);

transmission_requests
.push(new_delayed_transmission_request(subject, transmission_time));
transmission_requests.push(new_interval_transmission_request(
subject,
transmission_time,
time::Duration::from_micros(50),
100,
));
transmission_requests.push(new_cron_transmission_request(
subject,
transmission_time,
"3 5 14 * * * *",
100,
));
}

transmission_requests
}

#[tokio::test]
async fn test_schedule_many_transmissions() {
let subject = "INTEGRATION.stress_test";
let grpc_port = 50055;
let cancel = CancellationToken::new();
let timestamp_now: DateTime<Utc> = Utc::now();

let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(Utc::now), grpc_port, cancel.clone()).await;

// Start the scheduler process.
let cancel_scheduler = cancel.clone();
tokio::spawn(async move {
scheduler.run(cancel_scheduler).await;
});

let transmission_requests = many_transmission_requests(timestamp_now, subject);

// Do the requests.
for request in transmission_requests {
let grpc_response = grpc_client
.schedule_transmission(tonic::Request::new(request))
.await
.expect("grpc server should handle request");
let _ = uuid::Uuid::parse_str(&grpc_response.into_inner().transmission_id)
.expect("response should contain uuid");
}

let collect_handle = collect_nats_events(&nats_connection, subject, cancel.clone()).await;

tokio::time::sleep(std::time::Duration::from_millis(500)).await;
cancel.cancel();
let messages = collect_handle.await.expect("join handle");

// A multitude of messages must be transmitted. Specifics are
// unattainable, because of the dependency on time.
assert!(
messages.len() > 10,
"messages published: {}",
messages.len()
);
}

fn new_cron_transmission_request(
subject: String,
subject: &str,
first_transmission_after: DateTime<Utc>,
cron_schedule: &str,
iterations: u32,
Expand Down Expand Up @@ -325,7 +404,7 @@ mod tests {
}

fn new_interval_transmission_request(
subject: String,
subject: &str,
timestamp: DateTime<Utc>,
interval: time::Duration,
iterations: u32,
Expand Down Expand Up @@ -354,7 +433,7 @@ mod tests {
}

fn new_delayed_transmission_request(
subject: String,
subject: &str,
timestamp: DateTime<Utc>,
) -> grpc::proto::ScheduleTransmissionRequest {
let delayed = grpc::proto::Delayed {
Expand All @@ -374,6 +453,40 @@ mod tests {
schedule_transmission_request
}

async fn collect_nats_events(
nats_connection: &async_nats::Client,
subject: &str,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<Vec<async_nats::Message>> {
let mut subscriber = nats_connection
.subscribe(async_nats::Subject::from(subject))
.await
.expect("subscribing should succeed");

let mut collected_messages = vec![];

tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("Cancellation requested.");
break;
}
message = subscriber.next() =>{
if let Some(message) = message {

collected_messages.push(message);
} else {
break;
}
}
}
}

collected_messages
})
}

async fn listen_for_transmission(
nats_connection: &async_nats::Client,
subject: String,
Expand All @@ -390,6 +503,7 @@ mod tests {
*received_flag.lock().expect("failed to lock") = true;
}
});

handle
}

Expand Down Expand Up @@ -447,27 +561,31 @@ mod tests {
}

async fn new_scheduler(
now: MockNow,
now: Arc<dyn Now>,
nats_connection: &async_nats::Client,
) -> Arc<TransmissionScheduler> {
let scheduler = scheduler::TransmissionScheduler::new(
time::Duration::from_micros(10),
Arc::new(postgres_repository().await),
Arc::new(nats_publisher(&nats_connection)),
Arc::new(now),
now,
Arc::new(metric_client()),
);

Arc::new(scheduler)
}

async fn start_server(transmission_scheduler: Arc<TransmissionScheduler>, grpc_port: u16) {
async fn start_server(
transmission_scheduler: Arc<TransmissionScheduler>,
grpc_port: u16,
cancel: CancellationToken,
) {
let grpc_config = grpc::Config { port: grpc_port };
let grpc_server = grpc::GrpcServer::new(grpc_config, transmission_scheduler.clone());

tokio::task::spawn(async move {
grpc_server
.serve(CancellationToken::new())
.serve(cancel)
.await
.expect("failed to start grpc server");
});
Expand All @@ -488,16 +606,17 @@ mod tests {
}

async fn initialise(
now: MockNow,
now: Arc<dyn Now>,
grpc_port: u16,
cancel: CancellationToken,
) -> (
Arc<TransmissionScheduler>,
TransmitClient<tonic::transport::Channel>,
async_nats::Client,
) {
let nats_connection = nats_connection().await;
let scheduler = new_scheduler(now, &nats_connection).await;
start_server(scheduler.clone(), grpc_port).await;
start_server(scheduler.clone(), grpc_port, cancel).await;
let grpc_client = new_grpc_client(grpc_port).await;

(scheduler, grpc_client, nats_connection)
Expand All @@ -522,7 +641,7 @@ mod tests {
scheduler.process_batch().await.expect("process should run");

let timeout_reached = tokio::time::timeout(listen_duration, async {
// Wait until the flag is set to true (message received)
// Wait until the flag is set to true (message received).
while !*received_flag_before_transmission.lock().unwrap() {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
Expand Down
16 changes: 11 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::contract::{Metrics, Now, Repository, Scheduler, Transmitter};
use crate::model::{Message, MetricEvent, Schedule, ScheduleError, Transmission};

static BATCH_SIZE: u32 = 100;
static MAX_DELAYED_AGE: chrono::Duration = chrono::Duration::seconds(1);
static MAX_DELAYED_AGE: time::Duration = time::Duration::from_secs(1);
static MAX_NATS_SUBJECT_LENGTH: u32 = 256;

#[derive(Clone)]
Expand Down Expand Up @@ -59,14 +59,14 @@ fn validate_schedule(
) -> Result<(), ScheduleError> {
match schedule {
Schedule::Delayed(delayed) => {
if delayed.transmit_at - now < -MAX_DELAYED_AGE {
if delayed.transmit_at < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}

Ok(())
}
Schedule::Interval(interval) => {
if interval.first_transmission - now < -MAX_DELAYED_AGE {
if interval.first_transmission < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}
if interval.interval < clock_cycle_interval {
Expand All @@ -76,7 +76,7 @@ fn validate_schedule(
Ok(())
}
Schedule::Cron(cron_schedule) => {
if cron_schedule.first_transmission_after - now < -MAX_DELAYED_AGE {
if cron_schedule.first_transmission_after < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}

Expand Down Expand Up @@ -1301,7 +1301,8 @@ mod tests {
#[test]
fn test_validate_schedule() {
let now = DateTime::from_timestamp(1431648000, 0).expect("should be valid timestamp");
let long_ago = now - chrono::Duration::hours(1);
let long_ago = now - time::Duration::from_secs(2);
let in_future = now + chrono::Duration::hours(1);

struct TestCase {
name: String,
Expand All @@ -1315,6 +1316,11 @@ mod tests {
schedule: new_delayed(now),
expected_result: Ok(()),
},
TestCase {
name: String::from("valid delayed in future"),
schedule: new_delayed(in_future),
expected_result: Ok(()),
},
TestCase {
name: String::from("aged delayed"),
schedule: new_delayed(long_ago),
Expand Down

0 comments on commit 16ea3c5

Please sign in to comment.