Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
use lower of configure timeout and time till midnight
Browse files Browse the repository at this point in the history
  • Loading branch information
anil-db committed May 29, 2024
1 parent f05cd5d commit 201c8f2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vector"
version = "0.37.1-databricks-v1"
version = "0.37.1-databricks-v2"

Check failure

Code scanning / check-spelling

Unrecognized Spelling Error

databricks is not a recognized word. (unrecognized-spelling)
authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
description = "A lightweight and ultra-fast tool for building observability pipelines"
Expand Down
1 change: 1 addition & 0 deletions README.databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ This lists custom changes merged in Databricks fork of Vector.
4. Allow retries on all sink exceptions https://github.com/databricks/vector/pull/7
5. Also allowing retries on AccessDenied exceptions in AWS https://github.com/databricks/vector/pull/12
6. Updating version to also carry a Databricks version https://github.com/databricks/vector/pull/13
7. Use sink timeout as minimum of time till midnight or configured timeout https://github.com/databricks/vector/pull/15
5 changes: 5 additions & 0 deletions lib/vector-core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ pub trait KeyedTimer<K> {
/// If the given key already exists in the timer, the underlying subtimer is reset.
fn insert(&mut self, item_key: K);

/// Insert a new subtimer, keyed by `K` and with a maximum delay.
/// expiration delay would be the minimum of the configured delay and supplied max_delay.
/// If the given key already exists in the timer, the underlying subtimer is reset.
fn insert_with_max_delay(&mut self, item_key: K, max_delay: std::time::Duration);

/// Removes a subtimer from the list.
fn remove(&mut self, item_key: &K);

Expand Down
1 change: 1 addition & 0 deletions lib/vector-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
publish = false

[dependencies]
chrono.workspace = true
async-stream = { version = "0.3.5", default-features = false }
futures = { version = "0.3.30", default-features = false, features = ["std"] }
futures-util = { version = "0.3.29", default-features = false, features = ["std"] }
Expand Down
26 changes: 22 additions & 4 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use chrono::Local;
use futures::stream::{Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tokio_util::time::{delay_queue::Key, DelayQueue};
Expand Down Expand Up @@ -67,14 +68,18 @@ where
}

fn insert(&mut self, item_key: K) {
self.insert_with_max_delay(item_key, self.timeout)
}

fn insert_with_max_delay(&mut self, item_key: K, max_delay: Duration) {
if let Some(expiration_key) = self.expiration_map.get(&item_key) {
// We already have an expiration entry for this item key, so
// just reset the expiration.
self.expirations.reset(expiration_key, self.timeout);
self.expirations.reset(expiration_key, std::cmp::min(max_delay, self.timeout));
} else {
// This is a yet-unseen item key, so create a new expiration
// entry.
let expiration_key = self.expirations.insert(item_key.clone(), self.timeout);
let expiration_key = self.expirations.insert(item_key.clone(), std::cmp::min(max_delay, self.timeout));
assert!(self
.expiration_map
.insert(item_key, expiration_key)
Expand Down Expand Up @@ -203,6 +208,15 @@ where
stream: Fuse<St>,
}

fn duration_till_midnight() -> Duration {
let now = Local::now();
let time_to_midnight_opt = now.date_naive().succ_opt()
.map(|d| d.and_hms_opt(0, 0, 10))
.flatten()
.map(|d| d.signed_duration_since(now.naive_local()));

time_to_midnight_opt.map_or_else(|| Duration::from_secs(24 * 60 * 60), |d| d.to_std().unwrap_or_else(|_| Duration::from_secs(24 * 60 * 60)))
}
impl<St, Prt, C, F, B> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F, B>
where
St: Stream<Item = Prt::Item>,
Expand Down Expand Up @@ -308,7 +322,7 @@ where
} else {
let batch = (this.state)();
this.batches.insert(item_key.clone(), batch);
this.timer.insert(item_key.clone());
this.timer.insert_with_max_delay(item_key.clone(), duration_till_midnight());
this.batches
.get_mut(&item_key)
.expect("batch has just been inserted so should exist")
Expand All @@ -325,7 +339,7 @@ where
// The batch for this partition key was set to
// expire, but now it's overflowed and must be
// pushed out, so now we reset the batch timeout.
this.timer.insert(item_key.clone());
this.timer.insert_with_max_delay(item_key.clone(), duration_till_midnight());
}

// Insert the item into the batch.
Expand Down Expand Up @@ -396,6 +410,10 @@ mod test {
self.valid_keys.insert(item_key);
}

fn insert_with_max_delay(&mut self, item_key: u8, max_delay: Duration) {
self.insert(item_key);
}

fn remove(&mut self, item_key: &u8) {
self.valid_keys.remove(item_key);
}
Expand Down

0 comments on commit 201c8f2

Please sign in to comment.