Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development of Parallel API #180

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ futures-timer = "3.0.0"
fxhash = "0.2"
lazy_static = "1.4"
log = "0.4"
ipc-channel = "0.14.0"
serde = { version = "1.0", features = ["derive"] }


# TODO: https://github.com/cogciprocate/qutex/pull/5
# TODO: https://github.com/cogciprocate/qutex/pull/6
bastion-qutex = { version = "0.2", features = ["async_await"] }
Expand Down
2 changes: 2 additions & 0 deletions bastion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub mod message;
pub mod path;
pub mod supervisor;

pub mod parallel;

///
/// Prelude of Bastion
pub mod prelude {
Expand Down
27 changes: 27 additions & 0 deletions bastion/src/parallel/callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::boxed::Box;
use std::io;

pub type CallbackFunc = dyn FnMut() -> io::Result<()> + Send + Sync + 'static;
pub type SafeCallbackFunc = Arc<Mutex<Box<CallbackFunc>>>;

#[derive(Default)]
pub struct ProcessCallbacks {
pub before_start: Option<SafeCallbackFunc>,
pub before_restart: Option<SafeCallbackFunc>,
pub after_restart: Option<SafeCallbackFunc>,
pub after_stop: Option<SafeCallbackFunc>,
}

impl Debug for ProcessCallbacks {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("ProcessCallbacks")
.field("before_start", &self.before_start.is_some())
.field("before_restart", &self.before_start.is_some())
.field("after_restart", &self.before_start.is_some())
.field("after_stop", &self.before_start.is_some())
.finish()
}
}

Empty file added bastion/src/parallel/macros.rs
Empty file.
3 changes: 3 additions & 0 deletions bastion/src/parallel/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod callbacks;
pub mod ops;
pub mod process;
Empty file added bastion/src/parallel/ops.rs
Empty file.
152 changes: 152 additions & 0 deletions bastion/src/parallel/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::ffi::OsString;
use std::process::Stdio;
use std::io;
use super::callbacks::*;

#[derive(Debug)]
pub struct ProcessData {
pub(crate) callbacks: ProcessCallbacks,
pub(crate) envs: HashMap<OsString, OsString>,
}

impl Default for ProcessData {
fn default() -> Self {
Self {
callbacks: ProcessCallbacks::default(),
envs: std::env::vars_os().collect(),
}
}
}


#[derive(Debug, Default)]
pub struct Builder {
pub(crate) stdin: Option<Stdio>,
pub(crate) stdout: Option<Stdio>,
pub(crate) stderr: Option<Stdio>,
pub(crate) data: ProcessData,
}

impl Builder {
pub fn new() -> Self {
Self {
stdin: None,
stdout: None,
stderr: None,
data: ProcessData::default()
}
}

/// Process data which has given to spawned Process
pub(crate) fn data(&mut self, proc_data: ProcessData) -> &mut Self {
self.data = proc_data;
self
}

/// Set an environment variable in the spawned process. Equivalent to `Command::env`
pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
where
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.data.envs
.insert(key.as_ref().to_owned(), val.as_ref().to_owned());
self
}

/// Set environment variables in the spawned process. Equivalent to `Command::envs`
pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.data.envs.extend(
vars.into_iter()
.map(|(k, v)| (k.as_ref().to_owned(), v.as_ref().to_owned())),
);
self
}

///
/// Removes an environment variable in the spawned process. Equivalent to `Command::env_remove`
pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Self {
self.data.envs.remove(key.as_ref());
self
}

///
/// Clears all environment variables in the spawned process. Equivalent to `Command::env_clear`
pub fn env_clear(&mut self) -> &mut Self {
self.data.envs.clear();
self
}

///
/// Captures the `stdin` of the spawned process, allowing you to manually send data via `JoinHandle::stdin`
pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Self {
self.stdin = Some(cfg.into());
self
}

///
/// Captures the `stdout` of the spawned process, allowing you to manually receive data via `JoinHandle::stdout`
pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Self {
self.stdout = Some(cfg.into());
self
}

///
/// Captures the `stderr` of the spawned process, allowing you to manually receive data via `JoinHandle::stderr`
pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Self {
self.stderr = Some(cfg.into());
self
}

///
/// Process before start
#[cfg(unix)]
pub fn before_start<F>(&mut self, f: F) -> &mut Self
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static
{
self.data.callbacks.before_start = Some(Arc::new(Mutex::new(Box::new(f))));
self
}

///
/// Process before restart
#[cfg(unix)]
pub fn before_restart<F>(&mut self, f: F) -> &mut Self
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static
{
self.data.callbacks.before_restart = Some(Arc::new(Mutex::new(Box::new(f))));
self
}

///
/// Process after restart
#[cfg(unix)]
pub fn after_restart<F>(&mut self, f: F) -> &mut Self
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static
{
self.data.callbacks.after_restart = Some(Arc::new(Mutex::new(Box::new(f))));
self
}

///
/// Process after stop
#[cfg(unix)]
pub fn after_stop<F>(&mut self, f: F) -> &mut Self
where
F: FnMut() -> io::Result<()> + Send + Sync + 'static
{
self.data.callbacks.after_stop = Some(Arc::new(Mutex::new(Box::new(f))));
self
}
}