Skip to content

Commit

Permalink
Stream dir/path renamed to output log path
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Sep 20, 2024
1 parent 50a25da commit 78b1f59
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 26 deletions.
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/commands/outputlog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::client::globalsettings::GlobalSettings;
use crate::common::arraydef::IntArray;
use crate::stream::reader::streamdir::StreamDir;
use crate::stream::reader::outputlog::OutputLog;
use crate::JobId;
use clap::Parser;
use std::path::PathBuf;
Expand Down Expand Up @@ -86,7 +86,7 @@ pub enum Channel {
}

pub fn command_reader(gsettings: &GlobalSettings, opts: OutputLogOpts) -> anyhow::Result<()> {
let mut stream_dir = StreamDir::open(&opts.path, opts.server_uid.as_deref())?;
let mut stream_dir = OutputLog::open(&opts.path, opts.server_uid.as_deref())?;
match opts.command {
StreamCommand::Summary(_) => {
gsettings
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::common::format::{human_duration, human_mem_amount, human_size};
use crate::common::manager::info::GetManagerInfo;
use crate::server::autoalloc::{Allocation, AllocationState};
use crate::server::job::{JobTaskCounters, JobTaskInfo, JobTaskState};
use crate::stream::reader::streamdir::Summary;
use crate::stream::reader::outputlog::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, QueueState,
ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::common::arraydef::IntArray;
use crate::common::manager::info::{GetManagerInfo, ManagerType};
use crate::server::autoalloc::{Allocation, AllocationState, QueueId};
use crate::server::job::{JobTaskInfo, JobTaskState, StartedTaskData};
use crate::stream::reader::streamdir::Summary;
use crate::stream::reader::outputlog::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, ServerInfo,
TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerInfo,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::transfer::messages::{

use crate::client::job::WorkerMap;
use crate::server::autoalloc::Allocation;
use crate::stream::reader::streamdir::Summary;
use crate::stream::reader::outputlog::Summary;
use std::path::Path;

use crate::client::output::common::TaskToPathsMap;
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::client::status::{job_status, Status};
use crate::common::arraydef::IntArray;
use crate::server::autoalloc::Allocation;
use crate::server::job::JobTaskInfo;
use crate::stream::reader::streamdir::Summary;
use crate::stream::reader::outputlog::Summary;
use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/stream/reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod streamdir;
pub mod outputlog;
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type StreamIndex = BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>>;

/// Reader of a directory with .hts (stream files)
/// It creates an index over all jobs and tasks in stream
pub struct StreamDir {
pub struct OutputLog {
paths: Vec<PathBuf>,
index: StreamIndex,
cache: LruCache<usize, BufReader<File>>,
Expand All @@ -94,9 +94,9 @@ pub struct Summary {
pub superseded_stderr_size: u64,
}

impl StreamDir {
impl OutputLog {
pub fn open(path: &Path, server_uid: Option<&str>) -> crate::Result<Self> {
log::debug!("Reading stream dir {}", path.display());
log::debug!("Reading output log {}", path.display());
let mut paths = Vec::new();
let mut server_uids: Set<String> = Set::new();
let mut found = false;
Expand All @@ -112,7 +112,7 @@ impl StreamDir {
found = true;
log::debug!("Discovered {}", path.display());
let mut file = BufReader::new(File::open(&path)?);
let header = match StreamDir::check_header(&mut file) {
let header = match OutputLog::check_header(&mut file) {
Ok(header) => header,
Err(e) => {
log::debug!(
Expand Down Expand Up @@ -148,7 +148,7 @@ impl StreamDir {
return Err(HqError::GenericError(msg));
}
let index = Self::create_index(&paths)?;
Ok(StreamDir {
Ok(OutputLog {
paths,
index,
cache: LruCache::new(NonZeroUsize::new(16).unwrap()),
Expand All @@ -173,7 +173,7 @@ impl StreamDir {
let mut index: StreamIndex = BTreeMap::new();
for (file_idx, path) in paths.iter().enumerate() {
let mut file = BufReader::new(File::open(path)?);
let _header = StreamDir::check_header(&mut file)?;
let _header = OutputLog::check_header(&mut file)?;
while let Some(chunk_header) = Self::read_chunk(&mut file)? {
let job = index.entry(chunk_header.job).or_default();
let task = job.entry(chunk_header.task).or_default();
Expand Down
23 changes: 12 additions & 11 deletions docs/jobs/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ You can redirect the output of `stdout` and `stderr` to a log file and thus enab
to a filename where the log will be stored with the `--stream` option:

```
$ hq submit --stream=<stream-path> --array=1-10_000 ...
$ hq submit --stream=<output-log-path> --array=1-10_000 ...
```

Stream path has to be a directory and it the user responsibility to ensure existence of the directory
Output log path has to be a directory and it the user responsibility to ensure existence of the directory
and visibility of each worker.

This command would cause the `stdout` and `stderr` of all `10_000` tasks to be streamed into the server, which will
write them to files in `<stream-path>`. The streamed data is written in a compact way independently on the number of
write them to files in `<output-log-path>`. The streamed data is written in a compact way independently on the number of
tasks. The format also contains additional metadata,
which allows the resulting file to be filtered/sorted by tasks or channel.

Expand Down Expand Up @@ -77,23 +77,23 @@ HyperQueue lets you inspect the data stored inside the stream file using various
the following structure:

```bash
$ hq output-log <stream-path> <subcommand> <subcommand-args>
$ hq output-log <output-log-path> <subcommand> <subcommand-args>
```

### Stream summary

You can display a summary of a log file using the `summary` subcommand:

```bash
$ hq output-log <stream-path> summary
$ hq output-log <output-log-path> summary
```

### Stream jobs

To print all job IDs that streaming in the stream path, you can run the following command:

```bash
$ hq output-log <stream-path> jobs
$ hq output-log <output-log-path> jobs
```

### Printing stream content
Expand All @@ -102,7 +102,7 @@ If you want to simply print the (textual) content of the log file, without any a
`cat` subcommand:

```bash
$ hq output-log <stream-path> cat <job-id> <stdout/stderr>
$ hq output-log <output-log-path> cat <job-id> <stdout/stderr>
```

It will print the raw content of either `stdout` or `stderr`, ordered by task id. All outputs will be concatenated one
Expand Down Expand Up @@ -145,15 +145,16 @@ hence HyperQueue streaming is able to avoid mixing
outputs from different executions of the same task, when a task is restarted.

HyperQueue automatically marks all output from previous instance of a task except the last instance as *superseded*.
You can see statistics about superseded data via `hq output-log <stream-path> summary` command.
You can see statistics about superseded data via `hq output-log <output-log-path> summary` command.
In the current version, superseded data is ignored by all other commands.

## More server instances

HyperQueue supports writing streams from the different server instances into the same directory.
If you run `hq output-log` commands over such directory then it will detect the situation and prints all server uids
that writes into the directory. You have to specify the server instance via `hq output-log --server-uid=<SERVER_UID> ...`
when working with such a stream directory.
that writes into the directory. You have to specify the server instance
via `hq output-log --server-uid=<SERVER_UID> ...`
when working with such a output log directory.

!!! note

Expand All @@ -162,5 +163,5 @@ when working with such a stream directory.

## Working with non-shared file system

You do not need to have a shared file system when working with streaming path. It is just your responsibility to
You do not need to have a shared file system when working with streaming. It is just your responsibility to
collect all generated files into one directory before using `hq output-log` commands.
4 changes: 2 additions & 2 deletions tests/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,6 @@ def test_restore_streaming(hq_env: HqEnv, tmp_path):
hq_env.start_worker()
hq_env.command(["job", "close", "1"])
wait_for_job_state(hq_env, 1, "FINISHED")
assert int(hq_env.command(["read", stream_path, "cat", "1", "stdout"])) > 0
table = hq_env.command(["read", stream_path, "summary"], as_table=True)
assert int(hq_env.command(["output-log", stream_path, "cat", "1", "stdout"])) > 0
table = hq_env.command(["output-log", stream_path, "summary"], as_table=True)
table.check_row_value("Superseded streams", "1")

0 comments on commit 78b1f59

Please sign in to comment.