diff --git a/src/client.rs b/src/client.rs index 78d05f99..5fe5cc20 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,8 +19,7 @@ use crate::Reference; use crate::errors::{OciDistributionError, Result}; use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache}; use futures_util::future; -use futures_util::stream::{self, StreamExt, TryStreamExt}; -use futures_util::Stream; +use futures_util::stream::{self, Stream, StreamExt, TryStreamExt}; use http::HeaderValue; use http_auth::{parser::ChallengeParser, ChallengeRef}; use olpc_cjson::CanonicalFormatter; @@ -492,6 +491,35 @@ impl Client { .await } + /// Pushes a blob to the registry as a stream, chunking it + /// upstream. + pub async fn push_blob_stream( + &self, + image: &Reference, + mut blob_stream: impl Stream> + + std::marker::Unpin, + blob_digest: &str, + ) -> Result { + let mut location = self.begin_push_chunked_session(image).await?; + let mut start: usize = 0; + let mut blob_data = Vec::new(); + let mut done: bool = false; + loop { + if let Some(bytes) = blob_stream.next().await { + blob_data.extend(&bytes?); + } else { + done = true; + } + // gonna break when push chunk finishes + (location, start) = self.push_chunk(&location, image, &blob_data, start).await?; + if done && start >= blob_data.len() { + break; + } + } + self.end_push_chunked_session(&location, image, blob_digest) + .await + } + /// Perform an OAuth v2 auth request if necessary. /// /// This performs authorization and then stores the token internally to be used @@ -2457,6 +2485,39 @@ mod test { assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port)); } + #[tokio::test] + #[cfg(feature = "test-registry")] + async fn can_push_stream() { + let docker = clients::Cli::default(); + let test_container = docker.run(registry_image()); + let port = test_container.get_host_port_ipv4(5000); + + let c = Client::new(ClientConfig { + protocol: ClientProtocol::Http, + ..Default::default() + }); + let url = format!("localhost:{}/hello-wasm:v1", port); + let image: Reference = url.parse().unwrap(); + + c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push) + .await + .expect("result from auth request"); + + let image_data: Vec> = vec![b"iamawebassemblymodule".to_vec()]; + let digest = sha256_digest(&image_data[0]); + let layer_location = c + .push_blob_stream( + &image, + stream::iter(image_data) + .map(|chunk| Ok::<_, std::io::Error>(bytes::Bytes::from(chunk))), + &digest, + ) + .await + .expect("failed to blob stream push"); + + assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port)); + } + #[tokio::test] #[cfg(feature = "test-registry")] async fn can_push_multiple_chunks() {