diff --git a/crates/hyperqueue/src/client/commands/outputlog.rs b/crates/hyperqueue/src/client/commands/outputlog.rs index 55541813b..f90a1b894 100644 --- a/crates/hyperqueue/src/client/commands/outputlog.rs +++ b/crates/hyperqueue/src/client/commands/outputlog.rs @@ -1,6 +1,6 @@ use crate::client::globalsettings::GlobalSettings; use crate::common::arraydef::IntArray; -use crate::stream::reader::streamdir::StreamDir; +use crate::stream::reader::outputlog::OutputLog; use crate::JobId; use clap::Parser; use std::path::PathBuf; @@ -86,7 +86,7 @@ pub enum Channel { } pub fn command_reader(gsettings: &GlobalSettings, opts: OutputLogOpts) -> anyhow::Result<()> { - let mut stream_dir = StreamDir::open(&opts.path, opts.server_uid.as_deref())?; + let mut stream_dir = OutputLog::open(&opts.path, opts.server_uid.as_deref())?; match opts.command { StreamCommand::Summary(_) => { gsettings diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 031f28fb2..d46b360a8 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -12,7 +12,7 @@ use crate::common::format::{human_duration, human_mem_amount, human_size}; use crate::common::manager::info::GetManagerInfo; use crate::server::autoalloc::{Allocation, AllocationState}; use crate::server::job::{JobTaskCounters, JobTaskInfo, JobTaskState}; -use crate::stream::reader::streamdir::Summary; +use crate::stream::reader::outputlog::Summary; use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, QueueState, ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo, diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 72e99b479..8f944c4bf 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -22,7 +22,7 @@ use crate::common::arraydef::IntArray; use crate::common::manager::info::{GetManagerInfo, ManagerType}; use crate::server::autoalloc::{Allocation, AllocationState, QueueId}; use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData}; -use crate::stream::reader::streamdir::Summary; +use crate::stream::reader::outputlog::Summary; use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerInfo, diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index f938e065e..4d7007da0 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -4,7 +4,7 @@ use crate::transfer::messages::{ use crate::client::job::WorkerMap; use crate::server::autoalloc::Allocation; -use crate::stream::reader::streamdir::Summary; +use crate::stream::reader::outputlog::Summary; use std::path::Path; use crate::client::output::common::TaskToPathsMap; diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 7202d039a..3b197fae5 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -16,7 +16,7 @@ use crate::client::status::{job_status, Status}; use crate::common::arraydef::IntArray; use crate::server::autoalloc::Allocation; use crate::server::job::JobTaskInfo; -use crate::stream::reader::streamdir::Summary; +use crate::stream::reader::outputlog::Summary; use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, diff --git a/crates/hyperqueue/src/stream/reader/mod.rs b/crates/hyperqueue/src/stream/reader/mod.rs index 47580c508..9e5108f36 100644 --- a/crates/hyperqueue/src/stream/reader/mod.rs +++ b/crates/hyperqueue/src/stream/reader/mod.rs @@ -1 +1 @@ -pub mod streamdir; +pub mod outputlog; diff --git a/crates/hyperqueue/src/stream/reader/streamdir.rs b/crates/hyperqueue/src/stream/reader/outputlog.rs similarity index 98% rename from crates/hyperqueue/src/stream/reader/streamdir.rs rename to crates/hyperqueue/src/stream/reader/outputlog.rs index fca42234a..7e4efd128 100644 --- a/crates/hyperqueue/src/stream/reader/streamdir.rs +++ b/crates/hyperqueue/src/stream/reader/outputlog.rs @@ -74,7 +74,7 @@ type StreamIndex = BTreeMap>; /// Reader of a directory with .hts (stream files) /// It creates an index over all jobs and tasks in stream -pub struct StreamDir { +pub struct OutputLog { paths: Vec, index: StreamIndex, cache: LruCache>, @@ -94,9 +94,9 @@ pub struct Summary { pub superseded_stderr_size: u64, } -impl StreamDir { +impl OutputLog { pub fn open(path: &Path, server_uid: Option<&str>) -> crate::Result { - log::debug!("Reading stream dir {}", path.display()); + log::debug!("Reading output log {}", path.display()); let mut paths = Vec::new(); let mut server_uids: Set = Set::new(); let mut found = false; @@ -112,7 +112,7 @@ impl StreamDir { found = true; log::debug!("Discovered {}", path.display()); let mut file = BufReader::new(File::open(&path)?); - let header = match StreamDir::check_header(&mut file) { + let header = match OutputLog::check_header(&mut file) { Ok(header) => header, Err(e) => { log::debug!( @@ -148,7 +148,7 @@ impl StreamDir { return Err(HqError::GenericError(msg)); } let index = Self::create_index(&paths)?; - Ok(StreamDir { + Ok(OutputLog { paths, index, cache: LruCache::new(NonZeroUsize::new(16).unwrap()), @@ -173,7 +173,7 @@ impl StreamDir { let mut index: StreamIndex = BTreeMap::new(); for (file_idx, path) in paths.iter().enumerate() { let mut file = BufReader::new(File::open(path)?); - let _header = StreamDir::check_header(&mut file)?; + let _header = OutputLog::check_header(&mut file)?; while let Some(chunk_header) = Self::read_chunk(&mut file)? { let job = index.entry(chunk_header.job).or_default(); let task = job.entry(chunk_header.task).or_default(); diff --git a/docs/jobs/streaming.md b/docs/jobs/streaming.md index 7c8be3224..27274b901 100644 --- a/docs/jobs/streaming.md +++ b/docs/jobs/streaming.md @@ -25,14 +25,14 @@ You can redirect the output of `stdout` and `stderr` to a log file and thus enab to a filename where the log will be stored with the `--stream` option: ``` -$ hq submit --stream= --array=1-10_000 ... +$ hq submit --stream= --array=1-10_000 ... ``` -Stream path has to be a directory and it the user responsibility to ensure existence of the directory +Output log path has to be a directory and it the user responsibility to ensure existence of the directory and visibility of each worker. This command would cause the `stdout` and `stderr` of all `10_000` tasks to be streamed into the server, which will -write them to files in ``. The streamed data is written in a compact way independently on the number of +write them to files in ``. The streamed data is written in a compact way independently on the number of tasks. The format also contains additional metadata, which allows the resulting file to be filtered/sorted by tasks or channel. @@ -77,7 +77,7 @@ HyperQueue lets you inspect the data stored inside the stream file using various the following structure: ```bash -$ hq output-log +$ hq output-log ``` ### Stream summary @@ -85,7 +85,7 @@ $ hq output-log You can display a summary of a log file using the `summary` subcommand: ```bash -$ hq output-log summary +$ hq output-log summary ``` ### Stream jobs @@ -93,7 +93,7 @@ $ hq output-log summary To print all job IDs that streaming in the stream path, you can run the following command: ```bash -$ hq output-log jobs +$ hq output-log jobs ``` ### Printing stream content @@ -102,7 +102,7 @@ If you want to simply print the (textual) content of the log file, without any a `cat` subcommand: ```bash -$ hq output-log cat +$ hq output-log cat ``` It will print the raw content of either `stdout` or `stderr`, ordered by task id. All outputs will be concatenated one @@ -145,15 +145,16 @@ hence HyperQueue streaming is able to avoid mixing outputs from different executions of the same task, when a task is restarted. HyperQueue automatically marks all output from previous instance of a task except the last instance as *superseded*. -You can see statistics about superseded data via `hq output-log summary` command. +You can see statistics about superseded data via `hq output-log summary` command. In the current version, superseded data is ignored by all other commands. ## More server instances HyperQueue supports writing streams from the different server instances into the same directory. If you run `hq output-log` commands over such directory then it will detect the situation and prints all server uids -that writes into the directory. You have to specify the server instance via `hq output-log --server-uid= ...` -when working with such a stream directory. +that writes into the directory. You have to specify the server instance +via `hq output-log --server-uid= ...` +when working with such a output log directory. !!! note @@ -162,5 +163,5 @@ when working with such a stream directory. ## Working with non-shared file system -You do not need to have a shared file system when working with streaming path. It is just your responsibility to +You do not need to have a shared file system when working with streaming. It is just your responsibility to collect all generated files into one directory before using `hq output-log` commands. \ No newline at end of file diff --git a/tests/test_restore.py b/tests/test_restore.py index 0ca40046b..15fafb077 100644 --- a/tests/test_restore.py +++ b/tests/test_restore.py @@ -292,6 +292,6 @@ def test_restore_streaming(hq_env: HqEnv, tmp_path): hq_env.start_worker() hq_env.command(["job", "close", "1"]) wait_for_job_state(hq_env, 1, "FINISHED") - assert int(hq_env.command(["read", stream_path, "cat", "1", "stdout"])) > 0 - table = hq_env.command(["read", stream_path, "summary"], as_table=True) + assert int(hq_env.command(["output-log", stream_path, "cat", "1", "stdout"])) > 0 + table = hq_env.command(["output-log", stream_path, "summary"], as_table=True) table.check_row_value("Superseded streams", "1")