From 042699f4cd0d47e9b7149facebd53d966a4af34b Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 26 Dec 2024 18:40:51 +0530 Subject: [PATCH] Implement Azure Blob Store --- .gitignore | 1 + Cargo.lock | 345 ++++++- nativelink-config/examples/README.md | 2 +- ...ure_blob_backend_with_local_fast_cas.json5 | 172 ++++ nativelink-config/src/stores.rs | 116 +++ nativelink-store/BUILD.bazel | 4 + nativelink-store/Cargo.toml | 3 + nativelink-store/src/azure_blob_store.rs | 843 ++++++++++++++++++ nativelink-store/src/default_store_factory.rs | 4 + nativelink-store/src/lib.rs | 1 + .../tests/azure_blob_store_test.rs | 1 + 11 files changed, 1464 insertions(+), 28 deletions(-) create mode 100644 nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 create mode 100644 nativelink-store/src/azure_blob_store.rs create mode 100644 nativelink-store/tests/azure_blob_store_test.rs diff --git a/.gitignore b/.gitignore index a3a6384fe..a506158a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ bazel-* target/ .vscode/ +.idea/ .zed .cache .terraform* diff --git a/Cargo.lock b/Cargo.lock index 6e25bb2b5..2ea8f4f8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.24.2" @@ -116,13 +122,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.3.1", "event-listener-strategy", "pin-project-lite", ] @@ -198,7 +215,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "time", "tokio", @@ -234,7 +251,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http-body 0.4.6", "once_cell", @@ -264,7 +281,7 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "hex", "hmac", "http 0.2.12", @@ -448,7 +465,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 2.3.0", "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", @@ -581,6 +598,91 @@ dependencies = [ "tower-service", ] +[[package]] +name = "azure_core" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "dyn-clone", + "futures", + "getrandom 0.2.15", + "hmac", + "http-types", + "once_cell", + "paste", + "pin-project", + "quick-xml", + "rand 0.8.5", + "rustc_version", + "serde", + "serde_json", + "sha2", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" +dependencies = [ + "RustyXML", + "async-lock", + "async-trait", + "azure_core", + "bytes", + "serde", + "serde_derive", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_storage_blobs" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97e83c3636ae86d9a6a7962b2112e3b19eb3903915c50ce06ff54ff0a2e6a7e4" +dependencies = [ + "RustyXML", + "azure_core", + "azure_storage", + "azure_svc_blobstorage", + "bytes", + "futures", + "serde", + "serde_derive", + "serde_json", + "time", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "azure_svc_blobstorage" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e6c6f20c5611b885ba94c7bae5e02849a267381aecb8aee577e8c35ff4064c6" +dependencies = [ + "azure_core", + "bytes", + "futures", + "log", + "once_cell", + "serde", + "serde_json", + "time", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -596,6 +698,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -1006,6 +1114,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1036,6 +1145,12 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "either" version = "1.13.0" @@ -1058,6 +1173,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.3.1" @@ -1075,7 +1196,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener", + "event-listener 5.3.1", "pin-project-lite", ] @@ -1091,6 +1212,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1176,7 +1306,7 @@ dependencies = [ "futures", "log", "parking_lot", - "rand", + "rand 0.8.5", "redis-protocol", "rustls 0.23.20", "rustls-native-certs 0.8.1", @@ -1250,6 +1380,21 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1301,6 +1446,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1308,8 +1464,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1479,6 +1637,26 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.13.1", + "futures-lite", + "infer", + "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs", + "serde_urlencoded", + "url", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1751,6 +1929,21 @@ dependencies = [ "serde", ] +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1951,7 +2144,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -2109,7 +2302,7 @@ dependencies = [ "parking_lot", "pretty_assertions", "prost", - "rand", + "rand 0.8.5", "scopeguard", "serde", "serde_json", @@ -2167,6 +2360,9 @@ dependencies = [ "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", + "azure_core", + "azure_storage", + "azure_storage_blobs", "bincode", "blake3", "byteorder", @@ -2195,7 +2391,7 @@ dependencies = [ "patricia_tree", "pretty_assertions", "prost", - "rand", + "rand 0.8.5", "serde", "serde_json", "serial_test", @@ -2238,7 +2434,7 @@ dependencies = [ "pretty_assertions", "prost", "prost-types", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -2274,7 +2470,7 @@ dependencies = [ "pretty_assertions", "prost", "prost-types", - "rand", + "rand 0.8.5", "relative-path", "scopeguard", "serde", @@ -2457,6 +2653,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "patricia_tree" version = "0.8.0" @@ -2479,7 +2681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" dependencies = [ "memchr", - "thiserror 2.0.6", + "thiserror 2.0.9", "ucd-trie", ] @@ -2700,6 +2902,16 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.37" @@ -2709,6 +2921,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2716,8 +2941,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2727,7 +2962,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2736,7 +2980,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -2826,7 +3079,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -3120,6 +3373,29 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror 1.0.69", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serial_test" version = "3.2.0" @@ -3308,7 +3584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.3.0", "once_cell", "rustix", "windows-sys 0.59.0", @@ -3325,11 +3601,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" dependencies = [ - "thiserror-impl 2.0.6", + "thiserror-impl 2.0.9", ] [[package]] @@ -3345,9 +3621,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", @@ -3371,6 +3647,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", + "itoa", + "js-sys", "num-conv", "powerfmt", "serde", @@ -3550,7 +3828,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3704,6 +3982,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -3743,7 +4022,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "atomic", - "getrandom", + "getrandom 0.2.15", "serde", ] @@ -3765,6 +4044,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "want" version = "0.3.1" @@ -3774,6 +4059,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/nativelink-config/examples/README.md b/nativelink-config/examples/README.md index 50828e65c..d1779c825 100644 --- a/nativelink-config/examples/README.md +++ b/nativelink-config/examples/README.md @@ -41,7 +41,7 @@ The value of `stores` includes top-level keys, which are user supplied names sto ### Store Type Once the store has been named and its object exists, -the next key is the type of store. The options are `filesystem`, `memory`, `compression`, `dedup`, `fast_slow`, `verify`, and `experimental_s3_store`. +the next key is the type of store. The options are `filesystem`, `memory`, `compression`, `dedup`, `fast_slow`, `verify`, and `experimental_s3_store` and `experimental_azure_blob_store`. ```json5 { diff --git a/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 b/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 new file mode 100644 index 000000000..4aede770b --- /dev/null +++ b/nativelink-config/examples/azure_blob_backend_with_local_fast_cas.json5 @@ -0,0 +1,172 @@ +{ + "stores": { + "CAS_MAIN_STORE": { + "verify": { + "backend": { + "dedup": { + "index_store": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-index", + "temp_path": "/tmp/nativelink/data/tmp_path-index", + "eviction_policy": { + "max_bytes": 500000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "cloudshell524895804", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "simple-test-container", + "blob_prefix": "test-prefix-index/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + }, + "content_store": { + "compression": { + "compression_algorithm": { + "lz4": {} + }, + "backend": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-content", + "temp_path": "/tmp/nativelink/data/tmp_path-content", + "eviction_policy": { + "max_bytes": 2000000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "cloudshell1393657559", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "simple-test-container", + "blob_prefix": "test-prefix-dedup-cas/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + } + } + } + } + }, + "verify_size": true, + "verify_hash": true + } + }, + "AC_MAIN_STORE": { + "fast_slow": { + "fast": { + "memory": { + "eviction_policy": { + "max_bytes": 100000000 + } + }, + "filesystem": { + "content_path": "/tmp/nativelink/data/content_path-ac", + "temp_path": "/tmp/nativelink/data/tmp_path-ac", + "eviction_policy": { + "max_bytes": 500000000 + } + } + }, + "slow": { + "experimental_azure_blob_store": { + "account_name": "cloudshell1393657559", + "account_key": "${AZURE_STORAGE_KEY}", + "container": "simple-test-container", + "blob_prefix": "test-prefix-ac/", + "endpoint_suffix": "core.windows.net", + "retry": { + "max_retries": 6, + "delay": 0.3, + "jitter": 0.5 + }, + "max_concurrent_uploads": 10 + } + } + } + } + }, + "schedulers": { + "MAIN_SCHEDULER": { + "simple": { + "supported_platform_properties": { + "cpu_count": "minimum", + "memory_kb": "minimum", + "network_kbps": "minimum", + "disk_read_iops": "minimum", + "disk_read_bps": "minimum", + "disk_write_iops": "minimum", + "disk_write_bps": "minimum", + "shm_size": "minimum", + "gpu_count": "minimum", + "gpu_model": "exact", + "cpu_vendor": "exact", + "cpu_arch": "exact", + "cpu_model": "exact", + "kernel_version": "exact", + "docker_image": "priority", + "lre-rs": "priority" + } + } + } + }, + "servers": [{ + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, + "services": { + "cas": { + "main": { + "cas_store": "CAS_MAIN_STORE" + } + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" + } + }, + "execution": { + "main": { + "cas_store": "CAS_MAIN_STORE", + "scheduler": "MAIN_SCHEDULER" + } + }, + "capabilities": { + "main": { + "remote_execution": { + "scheduler": "MAIN_SCHEDULER" + } + } + }, + "bytestream": { + "cas_stores": { + "main": "CAS_MAIN_STORE" + } + }, + "health": {} + } + }] +} diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..d51f4dafc 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -54,6 +54,32 @@ pub enum StoreSpec { /// memory(MemorySpec), + /// Azure Blob store will use Microsoft's Azure Blob service as a + /// backend to store the files. This configuration can be used to + /// share files across multiple instances. + /// + /// This configuration will never delete files, so you are + /// responsible for purging old files in other ways. + /// + /// **Example JSON Config:** + /// ```json + /// "experimental_azure_blob_store": { + /// "account_name": "cloudshell1393657559", + /// "account_key": "${AZURE_STORAGE_KEY}", + /// "container": "simple-test-container", + /// "blob_prefix": "folder/", + /// "endpoint_suffix": "core.windows.net", + /// "retry": { + /// "max_retries": 6, + /// "delay": 0.3, + /// "jitter": 0.5 + /// }, + /// "max_concurrent_uploads": 10 + /// } + /// ``` + /// + experimental_azure_blob_store(AzureBlobSpec), + /// S3 store will use Amazon's S3 service as a backend to store /// the files. This configuration can be used to share files /// across multiple instances. @@ -787,6 +813,96 @@ pub struct S3Spec { pub disable_http2: bool, } +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct AzureBlobSpec { + /// The Azure Storage account name + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub account_name: String, + + /// The Azure Storage account key + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub account_key: String, + + /// The container name to use as the backend + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub container: String, + + /// Optional prefix for blob paths within the container. If None, no prefix will be used. + #[serde(default)] + pub blob_prefix: Option, + + /// Retry configuration to use when a network request fails. + #[serde(default)] + pub retry: Retry, + + /// If the number of seconds since the blob's last modified time + /// is greater than this value, the blob will not be considered + /// "existing". This allows for external lifecycle management policies + /// to delete blobs that haven't been accessed in a long time. + /// If a client receives a `NotFound`, it should re-upload the blob. + /// + /// There should be sufficient buffer time between the lifecycle policy's + /// deletion threshold and this value. A few days buffer is recommended. + /// + /// Default: 0. Zero means never consider a blob expired. + #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] + pub consider_expired_after_s: u32, + + /// The maximum buffer size to retain for retryable upload errors. + /// Setting this to zero will disable upload buffering, meaning + /// upload failures will result in a complete abort and likely + /// client errors. + /// + /// Default: 5MB. + pub max_retry_buffer_per_request: Option, + + /// Maximum number of concurrent block uploads per blob. + /// This affects the parallelism of block blob uploads. + /// + /// Default: 10. + pub max_concurrent_uploads: Option, + + /// Allow unencrypted HTTP connections. Only use for local testing + /// with Azurite or other local emulators. + /// + /// Default: false + #[serde(default)] + pub insecure_allow_http: bool, + + /// The Azure Storage endpoint suffix. + /// Examples: "core.windows.net", "core.chinacloudapi.cn" + /// + /// Default: "core.windows.net" + #[serde(default)] + pub endpoint_suffix: String, + + /// Service version to use when making requests to Azure Storage. + /// Should match one of Azure Storage REST API versions. + /// + /// Default: Latest supported version + #[serde(default)] + pub api_version: Option, + + /// Optional connection string for Azure Storage account. + /// If provided, this will be used instead of `account_name` and `account_key`. + /// Format: "DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=..." + /// + /// Default: None + #[serde(default)] + pub connection_string: Option, + + /// Disable http/2 connections and only use http/1.1. Default client + /// configuration will have http/1.1 and http/2 enabled for connection + /// schemes. Http/2 should be disabled if environments have poor support + /// or performance related to http/2. Safe to keep default unless + /// underlying network environment or S3 API servers specify otherwise. + /// + /// Default: false + #[serde(default)] + pub disable_http2: bool, +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum StoreType { diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 4f7691e7d..c366cdf9c 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -10,6 +10,7 @@ rust_library( name = "nativelink-store", srcs = [ "src/ac_utils.rs", + "src/azure_blob_store.rs", "src/cas_utils.rs", "src/completeness_checking_store.rs", "src/compression_store.rs", @@ -46,6 +47,9 @@ rust_library( "@crates//:aws-config", "@crates//:aws-sdk-s3", "@crates//:aws-smithy-runtime", + "@crates//:azure_core", + "@crates//:azure_storage", + "@crates//:azure_storage_blobs", "@crates//:bincode", "@crates//:blake3", "@crates//:byteorder", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index d646c53e6..44b868a5a 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -18,6 +18,9 @@ aws-sdk-s3 = { version = "1.65.0", features = [ "rt-tokio", ], default-features = false } aws-smithy-runtime = { version = "1.7.4" } +azure_core = { version = "0.21.0", default-features = false } +azure_storage = { version = "0.21.0", default-features = false, features = ["hmac_rust"] } +azure_storage_blobs = { version = "0.21.0", default-features = false, features = ["hmac_rust"] } bincode = "1.3.3" blake3 = { version = "1.5.5", default-features = false } byteorder = { version = "1.5.0", default-features = false } diff --git a/nativelink-store/src/azure_blob_store.rs b/nativelink-store/src/azure_blob_store.rs new file mode 100644 index 000000000..f4eb6721c --- /dev/null +++ b/nativelink-store/src/azure_blob_store.rs @@ -0,0 +1,843 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::cmp; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use async_trait::async_trait; +use azure_core::auth::Secret; +use azure_core::prelude::Range; +use azure_core::{Body, HttpClient, StatusCode, TransportOptions}; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::*; +use bytes::Bytes; +use futures::future::FusedFuture; +use futures::stream::{unfold, FuturesUnordered}; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use http_body::{Frame, SizeHint}; +use hyper::client::connect::{Connected, Connection}; +use hyper::client::HttpConnector; +use hyper::service::Service; +use hyper::{Body as HyperBody, Client, Request as HyperRequest, Uri}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder, MaybeHttpsStream}; +use nativelink_config::stores::AzureBlobSpec; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::buf_channel::{ + make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, +}; +use nativelink_util::fs; +use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; +use nativelink_util::instant_wrapper::InstantWrapper; +use nativelink_util::retry::{Retrier, RetryResult}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use rand::rngs::OsRng; +use rand::Rng; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, SemaphorePermit}; +use tokio::time::sleep; +use tokio_stream::Stream; +use tracing::{event, Level}; + +use crate::cas_utils::is_zero_digest; + +// Check the below doc for the limits specific to Azure. +// https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage + +// Maximum number of blocks in a block blob or append blob +const MAX_BLOCKS: usize = 50_000; + +// Maximum size of a block in a block blob (4,000 MiB) +const MAX_BLOCK_SIZE: u64 = 4_000 * 1024 * 1024; // 4,000 MiB = 4 GiB + +// Default block size for uploads (4 MiB) +const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024 * 1024; // 4 MiB + +// Default maximum retry buffer per request +const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5 MiB + +// Default maximum number of concurrent uploads +const DEFAULT_MAX_CONCURRENT_UPLOADS: usize = 10; + +// Default idle timeout +const IDLE_TIMEOUT: Duration = Duration::from_secs(15); + +// Maximum number of idle connections per host +const MAX_IDLE_PER_HOST: usize = 32; + +pub struct AzureConnectionWithPermit { + connection: T, + _permit: SemaphorePermit<'static>, +} + +impl Connection for AzureConnectionWithPermit { + fn connected(&self) -> Connected { + self.connection.connected() + } +} + +impl AsyncRead for AzureConnectionWithPermit { + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_read(cx, buf) + } +} + +impl AsyncWrite for AzureConnectionWithPermit { + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_write(cx, buf) + } + + #[inline] + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut Pin::get_mut(self).connection).poll_shutdown(cx) + } +} + +// Created an AzureClient to handle the custom HttpsConnector. +// Unlike Amazon S3, Azure Blob Storage doesn't support +// specifying a custom http client in the Transport Options. +// So, we have specialized it to use a custom HttpsConnector. +#[derive(Clone)] +pub struct AzureClient { + client: Client>, + config: Arc, + retrier: Retrier, + connector: HttpsConnector, +} + +impl AzureClient { + pub fn new( + config: AzureBlobSpec, + jitter_fn: Arc Duration + Send + Sync>, + ) -> Result { + let connector = Self::build_connector(&config); + let client = Self::build_client(connector.clone()); + + Ok(Self { + client, + retrier: Retrier::new( + Arc::new(|duration| Box::pin(sleep(duration))), + jitter_fn, + config.retry.clone(), + ), + config: Arc::new(config), + connector, + }) + } + + pub fn get_connector(&self) -> &HttpsConnector { + &self.connector + } + + fn build_connector(config: &AzureBlobSpec) -> HttpsConnector { + let builder = HttpsConnectorBuilder::new().with_webpki_roots(); + + let builder_with_schemes = if config.insecure_allow_http { + builder.https_or_http() + } else { + builder.https_only() + }; + + if config.disable_http2 { + builder_with_schemes.enable_http1().build() + } else { + builder_with_schemes.enable_http1().enable_http2().build() + } + } + + fn build_client( + connector: HttpsConnector, + ) -> Client> { + Client::builder() + .pool_idle_timeout(IDLE_TIMEOUT) + .pool_max_idle_per_host(MAX_IDLE_PER_HOST) + .build(connector) + } + + async fn call_with_retry( + &self, + req: &Uri, + ) -> Result>, Error> { + let retry_stream_fn = unfold(self.connector.clone(), move |mut client| async move { + let _permit = fs::get_permit().await.unwrap(); + match client.call(req.clone()).await { + Ok(connection) => Some(( + RetryResult::Ok(AzureConnectionWithPermit { + connection, + _permit, + }), + client, + )), + Err(e) => Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Failed to call Azure Blob Storage: {e:?}" + )), + client, + )), + } + }); + self.retrier.retry(retry_stream_fn).await + } +} + +impl Debug for AzureClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AzureClient") + .field("config", &self.config) + .finish() + } +} + +#[async_trait::async_trait] +impl HttpClient for AzureClient { + async fn execute_request( + &self, + request: &azure_core::Request, + ) -> azure_core::Result { + let mut builder = HyperRequest::builder() + .method(request.method().as_ref()) + .uri(request.url().as_str()); + + // Copy headers from the original request + for (name, value) in request.headers().iter() { + builder = builder.header(name.as_str(), value.as_str()); + } + + // Copy the body + let body = match request.body() { + Body::Bytes(bytes) if bytes.is_empty() => HyperBody::empty(), + Body::Bytes(bytes) => HyperBody::from(bytes.to_vec()), + Body::SeekableStream(_) => { + return Err(azure_core::Error::new( + azure_core::error::ErrorKind::Other, + "Unsupported body type: SeekableStream", + )) + } + }; + + let hyper_request = builder + .body(body) + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e))?; + + let response = tokio::time::timeout(IDLE_TIMEOUT, self.client.request(hyper_request)) + .await + .map_err(|_| { + azure_core::Error::new(azure_core::error::ErrorKind::Other, "Request timeout") + })? + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e))?; + + let (parts, body) = response.into_parts(); + + let mapped_stream = body.map(|result| { + result.map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::Other, e)) + }); + + // Copy headers back to the original request + let headers: HashMap<_, _> = parts + .headers + .iter() + .filter_map(|(k, v)| { + Some(( + azure_core::headers::HeaderName::from(k.as_str().to_owned()), + azure_core::headers::HeaderValue::from(v.to_str().ok()?.to_owned()), + )) + }) + .collect(); + + Ok(azure_core::Response::new( + azure_core::StatusCode::try_from(parts.status.as_u16()).expect("Invalid status code"), + azure_core::headers::Headers::from(headers), + Box::pin(mapped_stream), + )) + } +} + +impl Service for AzureClient { + type Response = AzureConnectionWithPermit>; + type Error = Error; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.client + .poll_ready(cx) + .map_err(|e| make_err!(Code::Unavailable, "Failed poll in Azure Blob Storage: {e}")) + } + + fn call(&mut self, req: Uri) -> Self::Future { + let client_clone = self.clone(); + Box::pin(async move { client_clone.call_with_retry(&req).await }) + } +} + +pub struct AzureBodyWrapper { + reader: DropCloserReadHalf, + size: u64, +} + +impl http_body::Body for AzureBodyWrapper { + type Data = Bytes; + type Error = std::io::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let reader = Pin::new(&mut Pin::get_mut(self).reader); + reader + .poll_next(cx) + .map(|maybe_bytes_res| maybe_bytes_res.map(|res| res.map(Frame::data))) + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.size) + } +} + +#[derive(MetricsComponent)] +pub struct AzureBlobStore { + client: Arc, + now_fn: NowFn, + #[metric(help = "The container name for the Azure store")] + container: String, + #[metric(help = "The blob prefix for the Azure store")] + blob_prefix: String, + retrier: Retrier, + #[metric(help = "The number of seconds to consider an object expired")] + consider_expired_after_s: i64, + #[metric(help = "The number of bytes to buffer for retrying requests")] + max_retry_buffer_per_request: usize, + #[metric(help = "The number of concurrent uploads allowed")] + max_concurrent_uploads: usize, +} + +impl AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + pub async fn new(spec: &AzureBlobSpec, now_fn: NowFn) -> Result, Error> { + // TODO: Should we use a common jitter function to minimize duplication? + let jitter_amt = spec.retry.jitter; + let jitter_fn = Arc::new(move |delay: Duration| { + if jitter_amt == 0. { + return delay; + } + let min = 1. - (jitter_amt / 2.); + let max = 1. + (jitter_amt / 2.); + delay.mul_f32(OsRng.gen_range(min..max)) + }); + + let http_client = Arc::new( + AzureClient::new(spec.clone(), jitter_fn.clone()) + .map_err(|e| make_err!(Code::Unavailable, "Failed to create Azure client: {e}"))?, + ); + let transport_options = TransportOptions::new(http_client); + + let storage_credentials = StorageCredentials::access_key( + spec.account_name.clone(), + Secret::new(spec.account_key.clone()), + ); + + let container_client = BlobServiceClient::builder(&spec.account_name, storage_credentials) + .transport(transport_options) + .container_client(&spec.container); + + Self::new_with_client_and_jitter(spec, container_client, jitter_fn, now_fn) + } + + pub fn new_with_client_and_jitter( + spec: &AzureBlobSpec, + client: ContainerClient, + jitter_fn: Arc Duration + Send + Sync>, + now_fn: NowFn, + ) -> Result, Error> { + Ok(Arc::new(Self { + client: Arc::new(client), + now_fn, + container: spec.container.to_string(), + blob_prefix: spec.blob_prefix.as_ref().unwrap_or(&String::new()).clone(), + retrier: Retrier::new( + Arc::new(|duration| Box::pin(sleep(duration))), + jitter_fn, + spec.retry.clone(), + ), + consider_expired_after_s: i64::from(spec.consider_expired_after_s), + max_retry_buffer_per_request: spec + .max_retry_buffer_per_request + .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST), + max_concurrent_uploads: spec + .max_concurrent_uploads + .map_or(DEFAULT_MAX_CONCURRENT_UPLOADS, |v| v), + })) + } + + fn make_blob_path(&self, key: &StoreKey<'_>) -> String { + format!("{}{}", self.blob_prefix, key.as_str()) + } + + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + let blob_path = self.make_blob_path(digest); + + self.retrier + .retry(unfold((), move |state| { + let blob_path = blob_path.clone(); + async move { + let result = self + .client + .blob_client(&blob_path) // Get blob client for specific blob + .get_properties() // Call get_properties directly on blob client + .await; + + match result { + Ok(props) => { + if self.consider_expired_after_s != 0 { + let last_modified = props.date; + let now_s = (self.now_fn)().unix_timestamp() as i64; + if last_modified.unix_timestamp() + self.consider_expired_after_s + <= now_s + { + return Some((RetryResult::Ok(None), state)); + } + } + let blob_size = props.blob.properties.content_length; + Some((RetryResult::Ok(Some(blob_size)), state)) + } + Err(err) => { + if err.to_string().contains("BlobNotFound") { + Some((RetryResult::Ok(None), state)) + } else if err.to_string().contains("ContainerNotFound") { + Some(( + RetryResult::Err(make_err!( + Code::InvalidArgument, + "Container not found: {}", + err + )), + state, + )) + } else { + Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Failed to get blob properties: {:?}", + err + )), + state, + )) + } + } + } + } + })) + .await + } +} + +#[async_trait] +impl StoreDriver for AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + // TODO(Aman): This is also duplicated. Should we have a trait or a macro for this? + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + keys.iter() + .zip(results.iter_mut()) + .map(|(key, result)| async move { + if is_zero_digest(key.borrow()) { + *result = Some(0); + return Ok::<_, Error>(()); + } + *result = self.has(key).await?; + Ok::<_, Error>(()) + }) + .collect::>() + .try_collect() + .await + } + + async fn update( + self: Pin<&Self>, + digest: StoreKey<'_>, + mut reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let blob_path = self.make_blob_path(&digest); + + // Handling zero-sized content check + if let UploadSizeInfo::ExactSize(0) = upload_size { + return Ok(()); + } + + let max_size = match upload_size { + UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, + }; + + // For small files, using single block upload + if max_size < MAX_BLOCK_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) { + let UploadSizeInfo::ExactSize(sz) = upload_size else { + unreachable!("upload_size must be UploadSizeInfo::ExactSize here"); + }; + + reader.set_max_recent_data_size( + u64::try_from(self.max_retry_buffer_per_request) + .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?, + ); + + return self.retrier + .retry(unfold(reader, move |mut reader| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + async move { + let (mut tx, mut rx) = make_buf_channel_pair(); + + let result = { + let reader_ref = &mut reader; + let (upload_res, bind_res) = tokio::join!( + async { + let mut buffer = Vec::with_capacity(sz as usize); + while let Ok(Some(chunk)) = rx.try_next().await { + buffer.extend_from_slice(&chunk); + } + + blob_client + .put_block_blob(Body::from(buffer)) + .content_type("application/octet-stream") + .into_future() + .await + .map(|_| ()) + .map_err(|e| make_err!(Code::Aborted, "{:?}", e)) + }, + async { + tx.bind_buffered(reader_ref).await + } + ); + + upload_res + .and(bind_res) + .err_tip(|| "Failed to upload blob in single chunk") + }; + + match result { + Ok(()) => Some((RetryResult::Ok(()), reader)), + Err(mut err) => { + err.code = Code::Aborted; + let bytes_received = reader.get_bytes_received(); + + if let Err(try_reset_err) = reader.try_reset_stream() { + event!( + Level::ERROR, + ?bytes_received, + err = ?try_reset_err, + "Unable to reset stream after failed upload in AzureStore::update" + ); + Some((RetryResult::Err(err + .merge(try_reset_err) + .append(format!("Failed to retry upload with {bytes_received} bytes received in AzureStore::update"))), + reader)) + } else { + let err = err.append(format!("Retry on upload happened with {bytes_received} bytes received in AzureStore::update")); + event!( + Level::INFO, + ?err, + ?bytes_received, + "Retryable Azure error" + ); + Some((RetryResult::Retry(err), reader)) + } + } + } + } + })) + .await; + } + + // For larger files, we'll use blocj upload strategy + let block_size = + cmp::min(max_size / (MAX_BLOCKS as u64 - 1), MAX_BLOCK_SIZE).max(DEFAULT_BLOCK_SIZE); + + let (tx, mut rx) = mpsc::channel(self.max_concurrent_uploads); + let mut block_ids = Vec::with_capacity(MAX_BLOCKS); + let retrier = self.retrier.clone(); + + let read_stream_fut = { + let tx = tx.clone(); + let blob_path = blob_path.clone(); + async move { + for block_id in 0..MAX_BLOCKS { + let write_buf = reader + .consume(Some( + usize::try_from(block_size) + .err_tip(|| "Could not convert block_size to usize")?, + )) + .await + .err_tip(|| "Failed to read chunk in azure_store")?; + + if write_buf.is_empty() { + break; + } + + let block_id = format!("{block_id:032}"); + let blob_path = blob_path.clone(); + + tx.send(async move { + self.retrier + .retry(unfold( + (write_buf, block_id.clone()), + move |(write_buf, block_id)| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + async move { + let retry_result = blob_client + .put_block( + block_id.clone(), + Body::from(write_buf.clone()), + ) + .into_future() + .await + .map_or_else( + |e| { + RetryResult::Retry(make_err!( + Code::Aborted, + "Failed to upload block {} in Azure store: {:?}", + block_id, + e + )) + }, + |_| RetryResult::Ok(block_id.clone()), + ); + Some((retry_result, (write_buf, block_id))) + } + }, + )) + .await + }) + .await + .map_err(|_| make_err!(Code::Internal, "Failed to send block to channel"))?; + } + Ok::<_, Error>(()) + } + .fuse() + }; + + let mut upload_futures = FuturesUnordered::new(); + + tokio::pin!(read_stream_fut); + + loop { + if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() { + break; + } + tokio::select! { + result = &mut read_stream_fut => result?, + Some(block_id) = upload_futures.next() => block_ids.push(block_id?), + Some(fut) = rx.recv() => upload_futures.push(fut), + } + } + + // Sorting block IDs to ensure consistent ordering + block_ids.sort_unstable(); + + // Commit the block list + let block_list = BlockList { + blocks: block_ids + .into_iter() + .map(|id| BlobBlockType::Latest(BlockId::from(id))) + .collect(), + }; + + retrier + .retry(unfold(block_list, move |block_list| { + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + async move { + Some(( + blob_client + .put_block_list(block_list.clone()) + .content_type("application/octet-stream") + .into_future() + .await + .map_or_else( + |e| { + RetryResult::Retry(make_err!( + Code::Aborted, + "Failed to commit block list in Azure store: {e:?}" + )) + }, + |_| RetryResult::Ok(()), + ), + block_list, + )) + } + })) + .await + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + if is_zero_digest(key.borrow()) { + writer + .send_eof() + .err_tip(|| "Failed to send zero EOF in azure store get_part")?; + return Ok(()); + } + let blob_path = self.make_blob_path(&key); + + let client = Arc::clone(&self.client); + let blob_client = client.blob_client(&blob_path); + let range = match length { + Some(len) => Range::new(offset, offset + len - 1), + None => Range::from(offset..), + }; + + self.retrier + .retry(unfold(writer, move |writer| { + let range_clone = range.clone(); + let blob_client = blob_client.clone(); + async move { + let result = async { + let mut stream = blob_client.get().range(range_clone.clone()).into_stream(); + + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(response) => { + let data = response.data.collect().await.map_err(|e| { + make_err!( + Code::Aborted, + "Failed to collect response data: {:?}", + e + ) + })?; + if data.is_empty() { + continue; + } + writer.send(data).await.map_err(|e| { + make_err!( + Code::Aborted, + "Failed to send data to writer: {:?}", + e + ) + })?; + } + Err(e) => { + return match e { + e if e + .as_http_error() + .map(|e| e.status() == StatusCode::NotFound) + .unwrap_or_default() => + { + Err(make_err!( + Code::NotFound, + "Blob not found in Azure: {:?}", + e + )) + } + _ => Err(make_err!( + Code::Aborted, + "Error reading from Azure stream: {:?}", + e + )), + }; + } + } + } + + writer.send_eof().map_err(|e| { + make_err!(Code::Aborted, "Failed to send EOF to writer: {:?}", e) + })?; + Ok(()) + } + .await; + + match result { + Ok(()) => Some((RetryResult::Ok(()), writer)), + Err(e) => { + if e.code == Code::NotFound { + Some((RetryResult::Err(e), writer)) + } else { + Some((RetryResult::Retry(e), writer)) + } + } + } + } + })) + .await + } + + fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + self + } + + fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +#[async_trait] +impl HealthStatusIndicator for AzureBlobStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + fn get_name(&self) -> &'static str { + "AzureBlobStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + StoreDriver::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..0947d67c2 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -23,6 +23,7 @@ use nativelink_error::Error; use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::store_trait::{Store, StoreDriver}; +use crate::azure_blob_store::AzureBlobStore; use crate::completeness_checking_store::CompletenessCheckingStore; use crate::compression_store::CompressionStore; use crate::dedup_store::DedupStore; @@ -50,6 +51,9 @@ pub fn store_factory<'a>( Box::pin(async move { let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), + StoreSpec::experimental_azure_blob_store(spec) => { + AzureBlobStore::new(spec, SystemTime::now).await? + } StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, StoreSpec::verify(spec) => VerifyStore::new( diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 04040fa5b..579f4e09d 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod ac_utils; +pub mod azure_blob_store; pub mod cas_utils; pub mod completeness_checking_store; pub mod compression_store; diff --git a/nativelink-store/tests/azure_blob_store_test.rs b/nativelink-store/tests/azure_blob_store_test.rs new file mode 100644 index 000000000..4af52202f --- /dev/null +++ b/nativelink-store/tests/azure_blob_store_test.rs @@ -0,0 +1 @@ +// TODO(Aman): Implement a testing framework for azure_blob_store