Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: stress application with simultaneous schedules #9

Merged
merged 1 commit into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ license-file = "LICENSE"
readme = "README.md"
keywords = ["transmit", "scheduler"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-nats = "0.33.0"
async-stream = "0.3.5"
Expand Down
151 changes: 135 additions & 16 deletions src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod tests {
use crate::contract::*;
use crate::grpc;
use crate::grpc::proto::transmit_client::TransmitClient;
use crate::grpc::proto::ScheduleTransmissionRequest;
use crate::metrics;
use crate::nats;
use crate::postgres;
Expand All @@ -30,6 +31,7 @@ mod tests {
async fn schedule_delayed_transmission() {
let subject = "INTEGRATION.delayed_transmission";
let grpc_port = 50053;
let cancel = CancellationToken::new();
let timestamp_now = Utc::now();
let listen_timeout_duration = std::time::Duration::from_millis(25);

Expand Down Expand Up @@ -67,11 +69,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request =
new_delayed_transmission_request(subject.to_string(), transmission_timestamp);
new_delayed_transmission_request(subject, transmission_timestamp);
let grpc_request = tonic::Request::new(schedule_transmission_request);

// Do the request.
Expand Down Expand Up @@ -99,6 +102,7 @@ mod tests {
async fn schedule_interval_transmission() {
let subject = "INTEGRATION.interval_transmission";
let grpc_port = 50052;
let cancel = CancellationToken::new();
let timestamp_now = Utc::now();
let listen_timeout_duration = std::time::Duration::from_millis(25);

Expand Down Expand Up @@ -157,11 +161,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request = new_interval_transmission_request(
subject.to_string(),
subject,
first_transmission_timestamp,
interval_duration,
repetitions,
Expand All @@ -188,10 +193,12 @@ mod tests {
.await;
}
}

#[tokio::test]
async fn schedule_cron_transmission() {
let subject = "INTEGRATION.cron_transmission";
let grpc_port = 50054;
let cancel = CancellationToken::new();
let timestamp_now: DateTime<Utc> =
DateTime::<Utc>::from_timestamp(1431648000, 0).expect("invalid timestamp");
assert_eq!(timestamp_now.to_string(), "2015-05-15 00:00:00 UTC");
Expand Down Expand Up @@ -262,11 +269,12 @@ mod tests {
.returning(move || process_listen_iteration.1);
}

let (scheduler, mut grpc_client, nats_connection) = initialise(now, grpc_port).await;
let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(now), grpc_port, cancel).await;

// Construct the grpc request, containing a schedule and message.
let schedule_transmission_request = new_cron_transmission_request(
subject.to_string(),
subject,
first_transmission_after,
&cron_expression,
repetitions,
Expand Down Expand Up @@ -294,8 +302,79 @@ mod tests {
}
}

fn many_transmission_requests(
now: DateTime<Utc>,
subject: &str,
) -> Vec<ScheduleTransmissionRequest> {
let mut transmission_requests = Vec::new();
for milliseconds in 0..100 {
// Allow some progression in delayed transmission times.
let transmission_time = now + time::Duration::from_secs(milliseconds);

transmission_requests
.push(new_delayed_transmission_request(subject, transmission_time));
transmission_requests.push(new_interval_transmission_request(
subject,
transmission_time,
time::Duration::from_micros(50),
100,
));
transmission_requests.push(new_cron_transmission_request(
subject,
transmission_time,
"3 5 14 * * * *",
100,
));
}

transmission_requests
}

#[tokio::test]
async fn test_schedule_many_transmissions() {
let subject = "INTEGRATION.stress_test";
let grpc_port = 50055;
let cancel = CancellationToken::new();
let timestamp_now: DateTime<Utc> = Utc::now();

let (scheduler, mut grpc_client, nats_connection) =
initialise(Arc::new(Utc::now), grpc_port, cancel.clone()).await;

// Start the scheduler process.
let cancel_scheduler = cancel.clone();
tokio::spawn(async move {
scheduler.run(cancel_scheduler).await;
});

let transmission_requests = many_transmission_requests(timestamp_now, subject);

// Do the requests.
for request in transmission_requests {
let grpc_response = grpc_client
.schedule_transmission(tonic::Request::new(request))
.await
.expect("grpc server should handle request");
let _ = uuid::Uuid::parse_str(&grpc_response.into_inner().transmission_id)
.expect("response should contain uuid");
}

let collect_handle = collect_nats_events(&nats_connection, subject, cancel.clone()).await;

tokio::time::sleep(std::time::Duration::from_millis(500)).await;
cancel.cancel();
let messages = collect_handle.await.expect("join handle");

// A multitude of messages must be transmitted. Specifics are
// unattainable, because of the dependency on time.
assert!(
messages.len() > 10,
"messages published: {}",
messages.len()
);
}

fn new_cron_transmission_request(
subject: String,
subject: &str,
first_transmission_after: DateTime<Utc>,
cron_schedule: &str,
iterations: u32,
Expand Down Expand Up @@ -325,7 +404,7 @@ mod tests {
}

fn new_interval_transmission_request(
subject: String,
subject: &str,
timestamp: DateTime<Utc>,
interval: time::Duration,
iterations: u32,
Expand Down Expand Up @@ -354,7 +433,7 @@ mod tests {
}

fn new_delayed_transmission_request(
subject: String,
subject: &str,
timestamp: DateTime<Utc>,
) -> grpc::proto::ScheduleTransmissionRequest {
let delayed = grpc::proto::Delayed {
Expand All @@ -374,6 +453,40 @@ mod tests {
schedule_transmission_request
}

async fn collect_nats_events(
nats_connection: &async_nats::Client,
subject: &str,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<Vec<async_nats::Message>> {
let mut subscriber = nats_connection
.subscribe(async_nats::Subject::from(subject))
.await
.expect("subscribing should succeed");

let mut collected_messages = vec![];

tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("Cancellation requested.");
break;
}
message = subscriber.next() =>{
if let Some(message) = message {

collected_messages.push(message);
} else {
break;
}
}
}
}

collected_messages
})
}

async fn listen_for_transmission(
nats_connection: &async_nats::Client,
subject: String,
Expand All @@ -390,6 +503,7 @@ mod tests {
*received_flag.lock().expect("failed to lock") = true;
}
});

handle
}

Expand Down Expand Up @@ -447,27 +561,31 @@ mod tests {
}

async fn new_scheduler(
now: MockNow,
now: Arc<dyn Now>,
nats_connection: &async_nats::Client,
) -> Arc<TransmissionScheduler> {
let scheduler = scheduler::TransmissionScheduler::new(
time::Duration::from_micros(10),
Arc::new(postgres_repository().await),
Arc::new(nats_publisher(&nats_connection)),
Arc::new(now),
now,
Arc::new(metric_client()),
);

Arc::new(scheduler)
}

async fn start_server(transmission_scheduler: Arc<TransmissionScheduler>, grpc_port: u16) {
async fn start_server(
transmission_scheduler: Arc<TransmissionScheduler>,
grpc_port: u16,
cancel: CancellationToken,
) {
let grpc_config = grpc::Config { port: grpc_port };
let grpc_server = grpc::GrpcServer::new(grpc_config, transmission_scheduler.clone());

tokio::task::spawn(async move {
grpc_server
.serve(CancellationToken::new())
.serve(cancel)
.await
.expect("failed to start grpc server");
});
Expand All @@ -488,16 +606,17 @@ mod tests {
}

async fn initialise(
now: MockNow,
now: Arc<dyn Now>,
grpc_port: u16,
cancel: CancellationToken,
) -> (
Arc<TransmissionScheduler>,
TransmitClient<tonic::transport::Channel>,
async_nats::Client,
) {
let nats_connection = nats_connection().await;
let scheduler = new_scheduler(now, &nats_connection).await;
start_server(scheduler.clone(), grpc_port).await;
start_server(scheduler.clone(), grpc_port, cancel).await;
let grpc_client = new_grpc_client(grpc_port).await;

(scheduler, grpc_client, nats_connection)
Expand All @@ -522,7 +641,7 @@ mod tests {
scheduler.process_batch().await.expect("process should run");

let timeout_reached = tokio::time::timeout(listen_duration, async {
// Wait until the flag is set to true (message received)
// Wait until the flag is set to true (message received).
while !*received_flag_before_transmission.lock().unwrap() {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
Expand Down
16 changes: 11 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::contract::{Metrics, Now, Repository, Scheduler, Transmitter};
use crate::model::{Message, MetricEvent, Schedule, ScheduleError, Transmission};

static BATCH_SIZE: u32 = 100;
static MAX_DELAYED_AGE: chrono::Duration = chrono::Duration::seconds(1);
static MAX_DELAYED_AGE: time::Duration = time::Duration::from_secs(1);
static MAX_NATS_SUBJECT_LENGTH: u32 = 256;

#[derive(Clone)]
Expand Down Expand Up @@ -59,14 +59,14 @@ fn validate_schedule(
) -> Result<(), ScheduleError> {
match schedule {
Schedule::Delayed(delayed) => {
if delayed.transmit_at - now < -MAX_DELAYED_AGE {
if delayed.transmit_at < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}

Ok(())
}
Schedule::Interval(interval) => {
if interval.first_transmission - now < -MAX_DELAYED_AGE {
if interval.first_transmission < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}
if interval.interval < clock_cycle_interval {
Expand All @@ -76,7 +76,7 @@ fn validate_schedule(
Ok(())
}
Schedule::Cron(cron_schedule) => {
if cron_schedule.first_transmission_after - now < -MAX_DELAYED_AGE {
if cron_schedule.first_transmission_after < now - MAX_DELAYED_AGE {
return Err(ScheduleError::AgedSchedule);
}

Expand Down Expand Up @@ -1301,7 +1301,8 @@ mod tests {
#[test]
fn test_validate_schedule() {
let now = DateTime::from_timestamp(1431648000, 0).expect("should be valid timestamp");
let long_ago = now - chrono::Duration::hours(1);
let long_ago = now - time::Duration::from_secs(2);
let in_future = now + chrono::Duration::hours(1);

struct TestCase {
name: String,
Expand All @@ -1315,6 +1316,11 @@ mod tests {
schedule: new_delayed(now),
expected_result: Ok(()),
},
TestCase {
name: String::from("valid delayed in future"),
schedule: new_delayed(in_future),
expected_result: Ok(()),
},
TestCase {
name: String::from("aged delayed"),
schedule: new_delayed(long_ago),
Expand Down
Loading