Skip to content

Commit

Permalink
test: use pre-defined time for records when testing timestamp based o…
Browse files Browse the repository at this point in the history
…ffset
  • Loading branch information
fMeow committed Nov 14, 2024
1 parent e72eab7 commit 6b7ba50
Showing 1 changed file with 6 additions and 16 deletions.
22 changes: 6 additions & 16 deletions tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use assert_matches::assert_matches;
use chrono::{DateTime, Timelike, Utc};
use chrono::{DateTime, TimeZone, Timelike, Utc};
use futures::{Stream, StreamExt};
use tokio::time::timeout;

Expand Down Expand Up @@ -435,35 +435,27 @@ async fn test_stream_consumer_start_timestamp_based_offset() {
.await
.unwrap();

fn now() -> DateTime<Utc> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time before Unix epoch");
DateTime::from_timestamp(now.as_secs() as i64, now.subsec_nanos()).unwrap()
}

let partition_client = Arc::new(
client
.partition_client(&topic, 0, UnknownTopicHandling::Retry)
.await
.unwrap(),
);
let record_1 = record_with_timestamp_milliseconds(b"x", now());
let ts = Utc.timestamp_millis_opt(1337).unwrap();
let record_1 = record_with_timestamp_milliseconds(b"x", ts);
let record_2 = record_with_timestamp_milliseconds(b"y", ts + Duration::from_millis(100));
partition_client
.produce(vec![record_1.clone()], Compression::NoCompression)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let ts = now();

let record_2 = record_with_timestamp_milliseconds(b"y", now());
partition_client
.produce(vec![record_2.clone()], Compression::NoCompression)
.await
.unwrap();

let offset = partition_client
.get_offset(OffsetAt::Timestamp(ts))
.get_offset(OffsetAt::Timestamp(ts + Duration::from_millis(100)))
.await
.unwrap();
assert_eq!(offset, 1);
Expand Down Expand Up @@ -507,8 +499,6 @@ fn record_with_timestamp_milliseconds(key: &[u8], timestamp: DateTime<Utc>) -> R
key: Some(key.to_vec()),
value: Some(b"hello kafka".to_vec()),
headers: std::collections::BTreeMap::from([("foo".to_owned(), b"bar".to_vec())]),
timestamp: timestamp
.with_nanosecond(timestamp.nanosecond() / 1_000_000 * 1_000_000)
.unwrap(),
timestamp,
}
}

0 comments on commit 6b7ba50

Please sign in to comment.