Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

use lower of configure timeout and time till midnight #15

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vector"
version = "0.37.1-databricks-v1"
version = "0.37.1-databricks-v2"
Dismissed Show dismissed Hide dismissed
authors = ["Vector Contributors <[email protected]>"]
edition = "2021"
description = "A lightweight and ultra-fast tool for building observability pipelines"
Expand Down
1 change: 1 addition & 0 deletions README.databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ This lists custom changes merged in Databricks fork of Vector.
4. Allow retries on all sink exceptions https://github.com/databricks/vector/pull/7
5. Also allowing retries on AccessDenied exceptions in AWS https://github.com/databricks/vector/pull/12
6. Updating version to also carry a Databricks version https://github.com/databricks/vector/pull/13
7. Use sink timeout as minimum of time till midnight or configured timeout https://github.com/databricks/vector/pull/15
5 changes: 5 additions & 0 deletions lib/vector-core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ pub trait KeyedTimer<K> {
/// If the given key already exists in the timer, the underlying subtimer is reset.
fn insert(&mut self, item_key: K);

/// Insert a new subtimer, keyed by `K` and with a maximum delay.
/// expiration delay would be the minimum of the configured delay and supplied max_delay.
/// If the given key already exists in the timer, the underlying subtimer is reset.
fn insert_with_max_delay(&mut self, item_key: K, max_delay: std::time::Duration);

/// Removes a subtimer from the list.
fn remove(&mut self, item_key: &K);

Expand Down
1 change: 1 addition & 0 deletions lib/vector-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
publish = false

[dependencies]
chrono.workspace = true
async-stream = { version = "0.3.5", default-features = false }
futures = { version = "0.3.30", default-features = false, features = ["std"] }
futures-util = { version = "0.3.29", default-features = false, features = ["std"] }
Expand Down
27 changes: 23 additions & 4 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use chrono::Local;
use futures::stream::{Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tokio_util::time::{delay_queue::Key, DelayQueue};
Expand Down Expand Up @@ -67,14 +68,19 @@ where
}

fn insert(&mut self, item_key: K) {
self.insert_with_max_delay(item_key, self.timeout)
}

fn insert_with_max_delay(&mut self, item_key: K, max_delay: Duration) {
info!(message = "max_delay and timeout", max_delay = ?max_delay, timeout = ?self.timeout);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change from info to debug or print only when max is less than configured.

if let Some(expiration_key) = self.expiration_map.get(&item_key) {
// We already have an expiration entry for this item key, so
// just reset the expiration.
self.expirations.reset(expiration_key, self.timeout);
self.expirations.reset(expiration_key, std::cmp::min(max_delay, self.timeout));
} else {
// This is a yet-unseen item key, so create a new expiration
// entry.
let expiration_key = self.expirations.insert(item_key.clone(), self.timeout);
let expiration_key = self.expirations.insert(item_key.clone(), std::cmp::min(max_delay, self.timeout));
assert!(self
.expiration_map
.insert(item_key, expiration_key)
Expand Down Expand Up @@ -203,6 +209,15 @@ where
stream: Fuse<St>,
}

fn duration_till_midnight() -> Duration {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor function to take secs from midnight as param.

let now = Local::now();
let time_to_midnight_opt = now.date_naive().succ_opt()
.map(|d| d.and_hms_opt(0, 0, 10))
.flatten()
.map(|d| d.signed_duration_since(now.naive_local()));

time_to_midnight_opt.map_or_else(|| Duration::from_secs(24 * 60 * 60), |d| d.to_std().unwrap_or_else(|_| Duration::from_secs(24 * 60 * 60)))
}
impl<St, Prt, C, F, B> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F, B>
where
St: Stream<Item = Prt::Item>,
Expand Down Expand Up @@ -308,7 +323,7 @@ where
} else {
let batch = (this.state)();
this.batches.insert(item_key.clone(), batch);
this.timer.insert(item_key.clone());
this.timer.insert_with_max_delay(item_key.clone(), duration_till_midnight());
this.batches
.get_mut(&item_key)
.expect("batch has just been inserted so should exist")
Expand All @@ -325,7 +340,7 @@ where
// The batch for this partition key was set to
// expire, but now it's overflowed and must be
// pushed out, so now we reset the batch timeout.
this.timer.insert(item_key.clone());
this.timer.insert_with_max_delay(item_key.clone(), duration_till_midnight());
}

// Insert the item into the batch.
Expand Down Expand Up @@ -396,6 +411,10 @@ mod test {
self.valid_keys.insert(item_key);
}

fn insert_with_max_delay(&mut self, item_key: u8, max_delay: Duration) {
self.insert(item_key);
}

fn remove(&mut self, item_key: &u8) {
self.valid_keys.remove(item_key);
}
Expand Down
Loading