Skip to content

Commit

Permalink
Refacotoring after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Sep 16, 2024
1 parent c8e58e6 commit dc1607e
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 52 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ an old client/worker to a new server (Connecting a new client/worker to an old s

### Stream log

* Basic export of stream log into JSON (`hq log <log_file> export`)
* Basic export of stream log into JSON (`hq read <log_file> export`)

### Server

Expand Down Expand Up @@ -589,7 +589,7 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.
is `hq submit`, which is now a shortcut for `hq job submit`. Here is a table of changed commands:

| **Previous command** | **New command** |
|----------------------|--------------------|
|----------------------|--------------------|
| `hq jobs` | `hq job list` |
| `hq job` | `hq job info` |
| `hq resubmit` | `hq job resubmit` |
Expand Down
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/client/commands/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct ReadOpts {
path: PathBuf,

/// Filter files for given server instance
#[arg(long, value_enum)]
#[arg(long)]
pub server_uid: Option<String>,

/// Operation with log file
Expand Down Expand Up @@ -66,16 +66,16 @@ pub enum StreamCommand {
/// Prints summary of log file
Summary(SummaryOpts),

/// Prints content of log ordered by time
/// Prints jobs ids in stream
Jobs,

/// Prints content of log ordered by time
/// Prints content of stream ordered by time
Show(ShowOpts),

/// Prints a raw content of one channel
Cat(CatOpts),

/// Export log into JSON
/// Export stream into JSON
Export(ExportOpts),
}

Expand Down
8 changes: 2 additions & 6 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,9 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
let resources = ResourceDescriptor::new(resources);
resources.validate()?;

let (work_dir, log_dir) = {
let work_dir = {
let tmpdir = TempDir::with_prefix("hq-worker")?.into_path();
(
work_dir.unwrap_or_else(|| tmpdir.join("work")),
tmpdir.join("logs"),
)
work_dir.unwrap_or_else(|| tmpdir.join("work"))
};

let manager_info = gather_manager_info(manager)?;
Expand Down Expand Up @@ -292,7 +289,6 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
hostname,
group,
work_dir,
log_dir,
on_server_lost: on_server_lost.into(),
heartbeat_interval: heartbeat,
idle_timeout,
Expand Down
4 changes: 0 additions & 4 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,6 @@ impl Output for CliOutput {
"Working directory".cell().bold(true),
configuration.work_dir.display().cell(),
],
vec![
"Logging directory".cell().bold(true),
configuration.log_dir.display().cell(),
],
vec![
"Heartbeat".cell().bold(true),
format_duration(configuration.heartbeat_interval)
Expand Down
2 changes: 0 additions & 2 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
listen_address,
hostname,
work_dir,
log_dir,
heartbeat_interval,
overview_configuration: _,
idle_timeout,
Expand All @@ -515,7 +514,6 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
"heartbeat_interval": format_duration(heartbeat_interval),
"idle_timeout": idle_timeout.map(format_duration),
"time_limit": time_limit.map(format_duration),
"log_dir": log_dir,
"work_dir": work_dir,
"hostname": hostname,
"group": group,
Expand Down
18 changes: 10 additions & 8 deletions crates/hyperqueue/src/stream/reader/streamdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::common::arraydef::IntArray;
use crate::common::error::HqError;
use crate::server::event::bincode_config;
use crate::transfer::stream::{ChannelId, StreamChunkHeader};
use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER};
use crate::worker::streamer::{StreamFileHeader, STREAM_FILE_HEADER, STREAM_FILE_SUFFIX};
use crate::{JobId, JobTaskId, Set};
use bincode::Options;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -70,9 +70,13 @@ impl TaskInfo {
}
}

type StreamIndex = BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>>;

/// Reader of a directory with .hts (stream files)
/// It creates an index over all jobs and tasks in stream
pub struct StreamDir {
paths: Vec<PathBuf>,
index: BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>>,
index: StreamIndex,
cache: LruCache<usize, BufReader<File>>,
}

Expand Down Expand Up @@ -102,7 +106,7 @@ impl StreamDir {
if path
.extension()
.and_then(|e| e.to_str())
.map(|s| s == "hqs")
.map(|s| s == STREAM_FILE_SUFFIX)
.unwrap_or(false)
{
found = true;
Expand Down Expand Up @@ -165,10 +169,8 @@ impl StreamDir {
}
}

fn create_index(
paths: &[PathBuf],
) -> crate::Result<BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>>> {
let mut index: BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>> = BTreeMap::new();
fn create_index(paths: &[PathBuf]) -> crate::Result<StreamIndex> {
let mut index: StreamIndex = BTreeMap::new();
for (file_idx, path) in paths.iter().enumerate() {
let mut file = BufReader::new(File::open(path)?);
let _header = StreamDir::check_header(&mut file)?;
Expand Down Expand Up @@ -265,7 +267,7 @@ impl StreamDir {
}

fn _gather_infos<'a>(
index: &'a BTreeMap<JobId, BTreeMap<JobTaskId, TaskInfo>>,
index: &'a StreamIndex,
job_id: JobId,
tasks: &Option<IntArray>,
) -> anyhow::Result<Vec<(JobTaskId, &'a InstanceInfo)>> {
Expand Down
10 changes: 1 addition & 9 deletions crates/hyperqueue/src/transfer/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,5 @@ pub struct StreamChunkHeader {
pub task: JobTaskId,
pub instance: InstanceId,
pub channel: ChannelId,
pub size: u64,
}

pub enum StreamerMessage {
Write {
header: StreamChunkHeader,
data: Vec<u8>,
},
Flush(tokio::sync::oneshot::Sender<()>),
pub size: u64, // size == 0 indicates end of the stream
}
1 change: 0 additions & 1 deletion crates/hyperqueue/src/worker/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ pub async fn initialize_worker(
})?;

std::fs::create_dir_all(&configuration.work_dir)?;
std::fs::create_dir_all(&configuration.log_dir)?;

let server_address = format!("{}:{}", record.worker.host, record.worker.port);
log::info!("Connecting to: {}", server_address);
Expand Down
15 changes: 12 additions & 3 deletions crates/hyperqueue/src/worker/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::common::error::HqError;
use crate::server::event::bincode_config;
use crate::transfer::stream::{ChannelId, StreamChunkHeader, StreamerMessage};
use crate::transfer::stream::{ChannelId, StreamChunkHeader};
use crate::WrappedRcRefCell;
use crate::{JobId, JobTaskId, Map};
use bincode::Options;
Expand All @@ -20,14 +20,23 @@ use tokio::task::spawn_local;

const STREAMER_BUFFER_SIZE: usize = 128;
pub const STREAM_FILE_HEADER: &[u8] = b"hqsf0000";
pub const STREAM_FILE_SUFFIX: &str = "hqs";

#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct StreamFileHeader<'a> {
pub server_uid: Cow<'a, String>,
pub worker_id: WorkerId,
}

pub struct StreamDescriptor {
pub(crate) enum StreamerMessage {
Write {
header: StreamChunkHeader,
data: Vec<u8>,
},
Flush(oneshot::Sender<()>),
}

pub(crate) struct StreamDescriptor {
sender: Sender<StreamerMessage>,
}

Expand Down Expand Up @@ -166,7 +175,7 @@ async fn stream_writer(
.map(char::from)
.collect::<String>();
let mut path = path.to_path_buf();
path.push(format!("{uid}.hqs"));
path.push(format!("{uid}.{STREAM_FILE_SUFFIX}"));
log::debug!("Opening stream file {}", path.display());
let mut file = BufWriter::new(File::create(path).await?);
file.write_all(STREAM_FILE_HEADER).await?;
Expand Down
1 change: 0 additions & 1 deletion crates/tako/src/internal/worker/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct WorkerConfiguration {
pub hostname: String,
pub group: String,
pub work_dir: PathBuf,
pub log_dir: PathBuf,
pub heartbeat_interval: Duration,
pub overview_configuration: Option<OverviewConfiguration>,
pub idle_timeout: Option<Duration>,
Expand Down
13 changes: 5 additions & 8 deletions docs/jobs/jobfile.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

Job Definition File (JDF) a way how to submit a complex pipeline into a HyperQueue.
It is a [TOML](https://toml.io/) file that describes tasks of a job.
JDF provides all functionalities as command line interface of HyperQueue and also adds access to additional features:
JDF provides all functionalities as command line interface of HyperQueue and also adds access to additional features:

* *Heterogeneous tasks* -- Job may be composed of different tasks
* *Dependencies* -- Tasks may have dependencies
* *Dependencies* -- Tasks may have dependencies
* *Resource request alternatives* -- Task may have alternative resource requests, e.g.: 4 cpus OR 1 cpus and 1 gpu

Note that these features are also available through Python interface.


## Minimal example

First, we create file with the following content:
Expand Down Expand Up @@ -44,7 +43,7 @@ The default are the same as CLI interface.
```toml

name = "test-job"
stream_log = "output.log" # Stdout/Stderr streaming (see --log)
stream = "path/to/stream/dir" # Stdout/Stderr streaming (see --stream)
max_fails = 11

[[task]]
Expand All @@ -57,7 +56,7 @@ crash_limit = 12
command = ["/bin/bash", "-c", "echo $ABC"]

# Environment variables
env = {"ABC" = "123", "XYZ" = "aaaa"}
env = { "ABC" = "123", "XYZ" = "aaaa" }

# Content that will be written on stdin
stdin = "Hello world!"
Expand Down Expand Up @@ -130,7 +129,6 @@ The task's option `deps` defines on which tasks the given task dependents.
The task is addressed by their IDs.

The following example creates three tasks where the third task depends on the first two tasks.


```toml
[[task]]
Expand Down Expand Up @@ -174,10 +172,9 @@ and 3 tasks in the second one.
For a task with resource variants, HyperQueue sets variable `HQ_RESOURCE_VARIANT`
to an index of chosen variant (counted from 0) when a task is started.


## Non-integer resource amounts

You may specify a resource number as float, e.g. `resources = { "foo" = 1.5 }`.
It is valid but internally the type if converted to float, that may for some numbers lead to
a rounding up when number is converted to 4-digit precision of resource amounts.
a rounding up when number is converted to 4-digit precision of resource amounts.
If you want to avoid this, put the number into parentheses, e.g. `resources = { "foo" = "1.5" }`.
8 changes: 4 additions & 4 deletions docs/jobs/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ You can redirect the output of `stdout` and `stderr` to a log file and thus enab
to a filename where the log will be stored with the `--stream` option:

```
$ hq submit --log=<stream-path> --array=1-10_000 ...
$ hq submit --stream=<stream-path> --array=1-10_000 ...
```

Stream path has to be a directory and it the user responsibility to ensure existence of the directory
Expand Down Expand Up @@ -77,23 +77,23 @@ HyperQueue lets you inspect the data stored inside the stream file using various
the following structure:

```bash
$ hq log <stream-path> <subcommand> <subcommand-args>
$ hq read <stream-path> <subcommand> <subcommand-args>
```

### Stream summary

You can display a summary of a log file using the `summary` subcommand:

```bash
$ hq log <stream-path> summary
$ hq read <stream-path> summary
```

### Stream jobs

To print all job IDs that streaming in the stream path, you can run the following command:

```bash
$ hq log <stream-path> jobs
$ hq read <stream-path> jobs
```

### Printing stream content
Expand Down

0 comments on commit dc1607e

Please sign in to comment.