Skip to content

Commit

Permalink
add pushing blob from a bytes stream
Browse files Browse the repository at this point in the history
Signed-off-by: Aviram Hassan <[email protected]>
  • Loading branch information
aviramha committed Jan 28, 2024
1 parent c60075f commit 648f32b
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::Reference;
use crate::errors::{OciDistributionError, Result};
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
use futures_util::future;
use futures_util::stream::{self, StreamExt, TryStreamExt};
use futures_util::Stream;
use futures_util::stream::{self, Stream, StreamExt, TryStreamExt};
use http::HeaderValue;
use http_auth::{parser::ChallengeParser, ChallengeRef};
use olpc_cjson::CanonicalFormatter;
Expand Down Expand Up @@ -492,6 +491,35 @@ impl Client {
.await
}

/// Pushes a blob to the registry as a stream, chunking it
/// upstream.
pub async fn push_blob_stream(
&self,
image: &Reference,
mut blob_stream: impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>
+ std::marker::Unpin,
blob_digest: &str,
) -> Result<String> {
let mut location = self.begin_push_chunked_session(image).await?;
let mut start: usize = 0;
let mut blob_data = Vec::new();
let mut done: bool = false;
loop {
if let Some(bytes) = blob_stream.next().await {
blob_data.extend(&bytes?);
} else {
done = true;
}
// gonna break when push chunk finishes
(location, start) = self.push_chunk(&location, image, &blob_data, start).await?;
if done && start >= blob_data.len() {
break;
}
}
self.end_push_chunked_session(&location, image, blob_digest)
.await
}

/// Perform an OAuth v2 auth request if necessary.
///
/// This performs authorization and then stores the token internally to be used
Expand Down Expand Up @@ -2457,6 +2485,39 @@ mod test {
assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
}

#[tokio::test]
#[cfg(feature = "test-registry")]
async fn can_push_stream() {
let docker = clients::Cli::default();
let test_container = docker.run(registry_image());
let port = test_container.get_host_port_ipv4(5000);

let c = Client::new(ClientConfig {
protocol: ClientProtocol::Http,
..Default::default()
});
let url = format!("localhost:{}/hello-wasm:v1", port);
let image: Reference = url.parse().unwrap();

c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
.await
.expect("result from auth request");

let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
let digest = sha256_digest(&image_data[0]);
let layer_location = c
.push_blob_stream(
&image,
stream::iter(image_data)
.map(|chunk| Ok::<_, std::io::Error>(bytes::Bytes::from(chunk))),
&digest,
)
.await
.expect("failed to blob stream push");

assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
}

#[tokio::test]
#[cfg(feature = "test-registry")]
async fn can_push_multiple_chunks() {
Expand Down

0 comments on commit 648f32b

Please sign in to comment.