Skip to content

Commit

Permalink
Add daemon protocol store
Browse files Browse the repository at this point in the history
  • Loading branch information
griff committed Oct 24, 2023
1 parent 22bd27a commit 187e4cb
Show file tree
Hide file tree
Showing 74 changed files with 6,505 additions and 863 deletions.
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
pkg-config
fuse
protobuf libarchive
jq
rustc.llvmPackages.llvm
] ++ lib.optionals stdenv.isDarwin [
darwin.apple_sdk.frameworks.CoreServices
darwin.apple_sdk.frameworks.Security
Expand Down
32 changes: 16 additions & 16 deletions nix-docker-build/src/cached_store.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::fmt;

use async_trait::async_trait;
use nixrs::path_info::ValidPathInfo;
use nixrs::store::copy_paths;
use nixrs::store::{copy_paths, BuildMode};
use nixrs::store::legacy_worker::{LegacyStore, LegacyStoreBuilder, LegacyStoreClient};
use nixrs::store::{
BasicDerivation, BuildResult, BuildSettings, CheckSignaturesFlag, DerivedPath, Error,
BasicDerivation, BuildResult, CheckSignaturesFlag, DerivedPath, Error,
RepairFlag, Store, SubstituteFlag,
};
use nixrs::store_path::{StoreDir, StoreDirProvider, StorePath, StorePathSet};
use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
use tokio::process::{ChildStdin, ChildStdout};

pub struct CachedStore {
cache: LegacyStoreClient<ChildStdout, ChildStdin, ChildStderr>,
builder: Option<LegacyStoreClient<ChildStdout, ChildStdin, ChildStderr>>,
cache: LegacyStoreClient<ChildStdout, ChildStdin>,
builder: Option<LegacyStoreClient<ChildStdout, ChildStdin>>,
write_allowed: bool,
docker_bin: String,
}
Expand Down Expand Up @@ -58,7 +60,7 @@ impl Store for CachedStore {
self.cache.query_path_info(path).await
}
}
async fn nar_from_path<W: tokio::io::AsyncWrite + Send + Unpin>(
async fn nar_from_path<W: tokio::io::AsyncWrite + fmt::Debug + Send + Unpin>(
&mut self,
path: &StorePath,
sink: W,
Expand All @@ -74,7 +76,7 @@ impl Store for CachedStore {
}
}

async fn add_to_store<R: tokio::io::AsyncRead + Send + Unpin>(
async fn add_to_store<R: tokio::io::AsyncRead + fmt::Debug + Send + Unpin>(
&mut self,
info: &ValidPathInfo,
source: R,
Expand All @@ -86,21 +88,19 @@ impl Store for CachedStore {
.await
}

async fn build_paths<W: tokio::io::AsyncWrite + Send + Unpin>(
async fn build_paths(
&mut self,
_drv_paths: &[DerivedPath],
_settings: &BuildSettings,
_build_log: W,
_build_mode: BuildMode,
) -> Result<(), Error> {
Err(Error::Misc("Unsupported operation 'build_paths'".into()))
}

async fn build_derivation<W: tokio::io::AsyncWrite + Send + Unpin>(
async fn build_derivation(
&mut self,
drv_path: &StorePath,
drv: &BasicDerivation,
settings: &BuildSettings,
build_log: W,
build_mode: BuildMode,
) -> Result<BuildResult, Error> {
//let store_dir = self.store_dir();
let inputs = self.cache.query_closure(&drv.input_srcs, false).await?;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Store for CachedStore {

copy_paths(&mut self.cache, &mut builder, &inputs).await?;
let result = builder
.build_derivation(drv_path, drv, settings, build_log)
.build_derivation(drv_path, drv, build_mode)
.await?;

if result.success() {
Expand Down Expand Up @@ -192,15 +192,15 @@ impl LegacyStore for CachedStore {
}
}

async fn export_paths<W: tokio::io::AsyncWrite + Send + Unpin>(
async fn export_paths<W: tokio::io::AsyncWrite + fmt::Debug + Send + Unpin>(
&mut self,
paths: &StorePathSet,
sink: W,
) -> Result<(), Error> {
self.cache.export_paths(paths, sink).await
}

async fn import_paths<R: tokio::io::AsyncRead + Send + Unpin>(
async fn import_paths<R: tokio::io::AsyncRead + fmt::Debug + Send + Unpin>(
&mut self,
source: R,
) -> Result<(), Error> {
Expand Down
2 changes: 1 addition & 1 deletion nix-docker-build/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn main() {
.block_on(async move {
let store =
CachedStore::connect(store_uri, docker_bin, nix_store_bin, write_allowed).await?;
nixrs::store::legacy_worker::run_server(source, out, store, build_log, write_allowed)
nixrs::store::legacy_worker::run_server_with_log(source, out, store, build_log, write_allowed)
.await
});

Expand Down
2 changes: 1 addition & 1 deletion nixrs-nix-store/src/serve.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub use nixrs::store::legacy_worker::run_server;
pub use nixrs::store::legacy_worker::run_server_with_log;
1 change: 0 additions & 1 deletion nixrs-ssh-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2021"
anyhow = "1.0.75"
async-trait = "0.1.50"
futures = "0.3.28"
log = "0.4.14"
nixrs = { version = "0.1.0", path = "../nixrs" }
thrussh = "0.34.0"
thrussh-keys = "0.22.1"
Expand Down
9 changes: 8 additions & 1 deletion nixrs-ssh-store/src/io/data_write.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use log::{debug, error, trace};
use tracing::{debug, error, trace};
use thrussh::server::Handle;
use thrussh::{ChannelId, CryptoVec};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -59,6 +60,12 @@ impl DataWrite {
}
}

impl fmt::Debug for DataWrite {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DataWrite").field("id", &self.id).field("is_write_fut_valid", &self.is_write_fut_valid).field("write_fut", &self.write_fut).field("is_eof_fut_valid", &self.is_eof_fut_valid).field("eof_fut", &self.eof_fut).finish()
}
}

impl AsyncWrite for DataWrite {
fn poll_write(
mut self: Pin<&mut Self>,
Expand Down
9 changes: 8 additions & 1 deletion nixrs-ssh-store/src/io/extended_data_write.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use log::debug;
use tracing::debug;
use thrussh::server::Handle;
use thrussh::{ChannelId, CryptoVec};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -46,6 +47,12 @@ impl Clone for ExtendedDataWrite {
}
}

impl fmt::Debug for ExtendedDataWrite {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExtendedDataWrite").field("id", &self.id).field("ext", &self.ext).field("is_write_fut_valid", &self.is_write_fut_valid).field("write_fut", &self.write_fut).finish()
}
}

impl AsyncWrite for ExtendedDataWrite {
fn poll_write(
mut self: Pin<&mut Self>,
Expand Down
2 changes: 1 addition & 1 deletion nixrs-ssh-store/src/io/read_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::task::Poll;

/// Helper used when converting Future interfaces to poll-based interfaces.
/// Stores excess data that can be reused on future polls.
#[derive(Default)]
#[derive(Debug, Default)]
pub(crate) struct ReadBuffer(Option<(Vec<u8>, usize)>);

impl ReadBuffer {
Expand Down
3 changes: 2 additions & 1 deletion nixrs-ssh-store/src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use std::io;
use std::pin::Pin;
use std::task::{ready, Poll};

use log::debug;
use tracing::debug;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;

/// AsyncRead/AsyncWrite wrapper for SSH Channels
#[derive(Debug)]
pub struct ChannelRead {
incoming: mpsc::UnboundedReceiver<Vec<u8>>,
readbuf: ReadBuffer,
Expand Down
13 changes: 10 additions & 3 deletions nixrs-ssh-store/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fmt;
use std::error::Error as StdError;
use std::future::Future;

use self::io::ExtendedDataWrite;
use nixrs::store::daemon::DaemonStore;
use nixrs::store::legacy_worker::LegacyStore;

mod error;
Expand All @@ -10,9 +12,14 @@ pub mod io;
pub mod server;

pub trait StoreProvider {
type Store: LegacyStore + Send;
type Error: StdError + Send + Sync;
type Future: Future<Output = Result<Self::Store, Self::Error>>;

fn get_store(&self, stderr: ExtendedDataWrite) -> Self::Future;
type LegacyStore: LegacyStore + fmt::Debug + Send;
type LegacyFuture: Future<Output = Result<Option<Self::LegacyStore>, Self::Error>> + Send;

type DaemonStore: DaemonStore + fmt::Debug + Send;
type DaemonFuture: Future<Output = Result<Option<Self::DaemonStore>, Self::Error>> + Send;

fn get_legacy_store(&self, stderr: ExtendedDataWrite) -> Self::LegacyFuture;
fn get_daemon_store(&self) -> Self::DaemonFuture;
}
Loading

0 comments on commit 187e4cb

Please sign in to comment.