diff --git a/CHANGELOG.md b/CHANGELOG.md index 088d29106..3463e907a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -369,7 +369,7 @@ an old client/worker to a new server (Connecting a new client/worker to an old s ### Stream log -* Basic export of stream log into JSON (`hq log export`) +* Basic export of stream log into JSON (`hq read export`) ### Server @@ -589,7 +589,7 @@ would pass `OMP_NUM_THREADS=4` to the executed ``. is `hq submit`, which is now a shortcut for `hq job submit`. Here is a table of changed commands: | **Previous command** | **New command** | - |----------------------|--------------------| + |----------------------|--------------------| | `hq jobs` | `hq job list` | | `hq job` | `hq job info` | | `hq resubmit` | `hq job resubmit` | diff --git a/crates/hyperqueue/src/client/commands/reader.rs b/crates/hyperqueue/src/client/commands/reader.rs index 61a928376..ca61770f6 100644 --- a/crates/hyperqueue/src/client/commands/reader.rs +++ b/crates/hyperqueue/src/client/commands/reader.rs @@ -11,7 +11,7 @@ pub struct ReadOpts { path: PathBuf, /// Filter files for given server instance - #[arg(long, value_enum)] + #[arg(long)] pub server_uid: Option, /// Operation with log file @@ -66,16 +66,16 @@ pub enum StreamCommand { /// Prints summary of log file Summary(SummaryOpts), - /// Prints content of log ordered by time + /// Prints jobs ids in stream Jobs, - /// Prints content of log ordered by time + /// Prints content of stream ordered by time Show(ShowOpts), /// Prints a raw content of one channel Cat(CatOpts), - /// Export log into JSON + /// Export stream into JSON Export(ExportOpts), } diff --git a/crates/hyperqueue/src/client/commands/worker.rs b/crates/hyperqueue/src/client/commands/worker.rs index a9cc82578..d2dcbd414 100644 --- a/crates/hyperqueue/src/client/commands/worker.rs +++ b/crates/hyperqueue/src/client/commands/worker.rs @@ -244,12 +244,9 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result anyhow::Result serde_json::Value { listen_address, hostname, work_dir, - log_dir, heartbeat_interval, overview_configuration: _, idle_timeout, @@ -515,7 +514,6 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value { "heartbeat_interval": format_duration(heartbeat_interval), "idle_timeout": idle_timeout.map(format_duration), "time_limit": time_limit.map(format_duration), - "log_dir": log_dir, "work_dir": work_dir, "hostname": hostname, "group": group, diff --git a/crates/hyperqueue/src/stream/reader/streamdir.rs b/crates/hyperqueue/src/stream/reader/streamdir.rs index 181f7a33a..4f45a2aa4 100644 --- a/crates/hyperqueue/src/stream/reader/streamdir.rs +++ b/crates/hyperqueue/src/stream/reader/streamdir.rs @@ -3,7 +3,7 @@ use crate::common::arraydef::IntArray; use crate::common::error::HqError; use crate::server::event::bincode_config; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; -use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER}; +use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER, STREAM_FILE_SUFFIX}; use crate::{JobId, JobTaskId, Set}; use bincode::Options; use chrono::{DateTime, Utc}; @@ -70,9 +70,13 @@ impl TaskInfo { } } +type StreamIndex = BTreeMap>; + +/// Reader of a directory with .hts (stream files) +/// It creates an index over all jobs and tasks in stream pub struct StreamDir { paths: Vec, - index: BTreeMap>, + index: StreamIndex, cache: LruCache>, } @@ -102,7 +106,7 @@ impl StreamDir { if path .extension() .and_then(|e| e.to_str()) - .map(|s| s == "hqs") + .map(|s| s == STREAM_FILE_SUFFIX) .unwrap_or(false) { found = true; @@ -165,10 +169,8 @@ impl StreamDir { } } - fn create_index( - paths: &[PathBuf], - ) -> crate::Result>> { - let mut index: BTreeMap> = BTreeMap::new(); + fn create_index(paths: &[PathBuf]) -> crate::Result { + let mut index: StreamIndex = BTreeMap::new(); for (file_idx, path) in paths.iter().enumerate() { let mut file = BufReader::new(File::open(path)?); let _header = StreamDir::check_header(&mut file)?; @@ -265,7 +267,7 @@ impl StreamDir { } fn _gather_infos<'a>( - index: &'a BTreeMap>, + index: &'a StreamIndex, job_id: JobId, tasks: &Option, ) -> anyhow::Result> { diff --git a/crates/hyperqueue/src/transfer/stream.rs b/crates/hyperqueue/src/transfer/stream.rs index d6092d9ea..7aaee9af2 100644 --- a/crates/hyperqueue/src/transfer/stream.rs +++ b/crates/hyperqueue/src/transfer/stream.rs @@ -39,13 +39,5 @@ pub struct StreamChunkHeader { pub task: JobTaskId, pub instance: InstanceId, pub channel: ChannelId, - pub size: u64, -} - -pub enum StreamerMessage { - Write { - header: StreamChunkHeader, - data: Vec, - }, - Flush(tokio::sync::oneshot::Sender<()>), + pub size: u64, // size == 0 indicates end of the stream } diff --git a/crates/hyperqueue/src/worker/bootstrap.rs b/crates/hyperqueue/src/worker/bootstrap.rs index 0b3e67058..a1ba2e96f 100644 --- a/crates/hyperqueue/src/worker/bootstrap.rs +++ b/crates/hyperqueue/src/worker/bootstrap.rs @@ -99,7 +99,6 @@ pub async fn initialize_worker( })?; std::fs::create_dir_all(&configuration.work_dir)?; - std::fs::create_dir_all(&configuration.log_dir)?; let server_address = format!("{}:{}", record.worker.host, record.worker.port); log::info!("Connecting to: {}", server_address); diff --git a/crates/hyperqueue/src/worker/streamer.rs b/crates/hyperqueue/src/worker/streamer.rs index 4f6643111..d27d0470f 100644 --- a/crates/hyperqueue/src/worker/streamer.rs +++ b/crates/hyperqueue/src/worker/streamer.rs @@ -1,6 +1,6 @@ use crate::common::error::HqError; use crate::server::event::bincode_config; -use crate::transfer::stream::{ChannelId, StreamChunkHeader, StreamerMessage}; +use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::WrappedRcRefCell; use crate::{JobId, JobTaskId, Map}; use bincode::Options; @@ -20,6 +20,7 @@ use tokio::task::spawn_local; const STREAMER_BUFFER_SIZE: usize = 128; pub const STREAM_FILE_HEADER: &[u8] = b"hqsf0000"; +pub const STREAM_FILE_SUFFIX: &str = "hqs"; #[derive(Serialize, Deserialize, Debug)] pub(crate) struct StreamFileHeader<'a> { @@ -27,7 +28,15 @@ pub(crate) struct StreamFileHeader<'a> { pub worker_id: WorkerId, } -pub struct StreamDescriptor { +pub(crate) enum StreamerMessage { + Write { + header: StreamChunkHeader, + data: Vec, + }, + Flush(oneshot::Sender<()>), +} + +pub(crate) struct StreamDescriptor { sender: Sender, } @@ -166,7 +175,7 @@ async fn stream_writer( .map(char::from) .collect::(); let mut path = path.to_path_buf(); - path.push(format!("{uid}.hqs")); + path.push(format!("{uid}.{STREAM_FILE_SUFFIX}")); log::debug!("Opening stream file {}", path.display()); let mut file = BufWriter::new(File::create(path).await?); file.write_all(STREAM_FILE_HEADER).await?; diff --git a/crates/tako/src/internal/worker/configuration.rs b/crates/tako/src/internal/worker/configuration.rs index 9fd3abcc7..23f1f97a9 100644 --- a/crates/tako/src/internal/worker/configuration.rs +++ b/crates/tako/src/internal/worker/configuration.rs @@ -28,7 +28,6 @@ pub struct WorkerConfiguration { pub hostname: String, pub group: String, pub work_dir: PathBuf, - pub log_dir: PathBuf, pub heartbeat_interval: Duration, pub overview_configuration: Option, pub idle_timeout: Option, diff --git a/docs/jobs/jobfile.md b/docs/jobs/jobfile.md index f255101fe..170a24ce2 100644 --- a/docs/jobs/jobfile.md +++ b/docs/jobs/jobfile.md @@ -2,15 +2,14 @@ Job Definition File (JDF) a way how to submit a complex pipeline into a HyperQueue. It is a [TOML](https://toml.io/) file that describes tasks of a job. -JDF provides all functionalities as command line interface of HyperQueue and also adds access to additional features: +JDF provides all functionalities as command line interface of HyperQueue and also adds access to additional features: * *Heterogeneous tasks* -- Job may be composed of different tasks -* *Dependencies* -- Tasks may have dependencies +* *Dependencies* -- Tasks may have dependencies * *Resource request alternatives* -- Task may have alternative resource requests, e.g.: 4 cpus OR 1 cpus and 1 gpu Note that these features are also available through Python interface. - ## Minimal example First, we create file with the following content: @@ -44,7 +43,7 @@ The default are the same as CLI interface. ```toml name = "test-job" -stream_log = "output.log" # Stdout/Stderr streaming (see --log) +stream = "path/to/stream/dir" # Stdout/Stderr streaming (see --stream) max_fails = 11 [[task]] @@ -57,7 +56,7 @@ crash_limit = 12 command = ["/bin/bash", "-c", "echo $ABC"] # Environment variables -env = {"ABC" = "123", "XYZ" = "aaaa"} +env = { "ABC" = "123", "XYZ" = "aaaa" } # Content that will be written on stdin stdin = "Hello world!" @@ -130,7 +129,6 @@ The task's option `deps` defines on which tasks the given task dependents. The task is addressed by their IDs. The following example creates three tasks where the third task depends on the first two tasks. - ```toml [[task]] @@ -174,10 +172,9 @@ and 3 tasks in the second one. For a task with resource variants, HyperQueue sets variable `HQ_RESOURCE_VARIANT` to an index of chosen variant (counted from 0) when a task is started. - ## Non-integer resource amounts You may specify a resource number as float, e.g. `resources = { "foo" = 1.5 }`. It is valid but internally the type if converted to float, that may for some numbers lead to -a rounding up when number is converted to 4-digit precision of resource amounts. +a rounding up when number is converted to 4-digit precision of resource amounts. If you want to avoid this, put the number into parentheses, e.g. `resources = { "foo" = "1.5" }`. \ No newline at end of file diff --git a/docs/jobs/streaming.md b/docs/jobs/streaming.md index 58af1427b..caa8d1596 100644 --- a/docs/jobs/streaming.md +++ b/docs/jobs/streaming.md @@ -25,7 +25,7 @@ You can redirect the output of `stdout` and `stderr` to a log file and thus enab to a filename where the log will be stored with the `--stream` option: ``` -$ hq submit --log= --array=1-10_000 ... +$ hq submit --stream= --array=1-10_000 ... ``` Stream path has to be a directory and it the user responsibility to ensure existence of the directory @@ -77,7 +77,7 @@ HyperQueue lets you inspect the data stored inside the stream file using various the following structure: ```bash -$ hq log +$ hq read ``` ### Stream summary @@ -85,7 +85,7 @@ $ hq log You can display a summary of a log file using the `summary` subcommand: ```bash -$ hq log summary +$ hq read summary ``` ### Stream jobs @@ -93,7 +93,7 @@ $ hq log summary To print all job IDs that streaming in the stream path, you can run the following command: ```bash -$ hq log jobs +$ hq read jobs ``` ### Printing stream content