Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 28, 2023
1 parent d819ee6 commit f9cb94f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
21 changes: 15 additions & 6 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ use tempfile::NamedTempFile;
use tokio::sync::mpsc::Sender;
use tokio::task;

/// How much memory to reserve for performing in-memory sorts
/// How much memory to reserve for performing in-memory sorts prior to
/// spilling.
///
/// If the sort is configured to potentially spill to disk if it
/// exceeds its memory allocation, this much memory is reserved for
/// space to merge the buffered batches prior to spilling.
const EXTERNAL_SORTER_MERGE_RESERVATION: usize = 10 * 1024 * 1024;

struct ExternalSorterMetrics {
Expand Down Expand Up @@ -99,7 +104,7 @@ struct ExternalSorter {
fetch: Option<usize>,
/// Reservation for in_mem_batches
reservation: MemoryReservation,
/// Reservation for in memory sorting of batches
/// Reservation for in merging of batches
merge_reservation: MemoryReservation,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
Expand All @@ -124,7 +129,9 @@ impl ExternalSorter {
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);

merge_reservation.resize(EXTERNAL_SORTER_MERGE_RESERVATION);
if runtime.disk_manager.tmp_files_enabled() {
merge_reservation.resize(EXTERNAL_SORTER_MERGE_RESERVATION);
}

Self {
schema,
Expand Down Expand Up @@ -262,9 +269,11 @@ impl ExternalSorter {
.map(|x| x.get_array_memory_size())
.sum();

// Reserve headroom for next sort
self.merge_reservation
.resize(EXTERNAL_SORTER_MERGE_RESERVATION);
// Reserve headroom for next merge sort
if self.runtime.disk_manager.tmp_files_enabled() {
self.merge_reservation
.resize(EXTERNAL_SORTER_MERGE_RESERVATION);
}

self.reservation.try_resize(size)?;
self.in_mem_batches_sorted = true;
Expand Down
9 changes: 9 additions & 0 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ impl DiskManager {
}
}

/// Return true if this disk manager supports creating temporary
/// files. If this returns false, any call to `create_tmp_file`
/// will error.
pub fn tmp_files_enabled(&self) -> bool {
self.local_dirs.lock().is_some()
}

/// Return a temporary file from a randomized choice in the configured locations
///
/// If the file can not be created for some reason, returns an
Expand Down Expand Up @@ -198,6 +205,7 @@ mod tests {
);

let dm = DiskManager::try_new(config)?;
assert!(dm.tmp_files_enabled());
let actual = dm.create_tmp_file("Testing")?;

// the file should be in one of the specified local directories
Expand All @@ -210,6 +218,7 @@ mod tests {
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
assert!(!manager.tmp_files_enabled());
assert_eq!(
manager.create_tmp_file("Testing").unwrap_err().to_string(),
"Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
Expand Down

0 comments on commit f9cb94f

Please sign in to comment.