diff --git a/src/workerd/api/kv.c++ b/src/workerd/api/kv.c++ index e8593275a14..41112d29d8e 100644 --- a/src/workerd/api/kv.c++ +++ b/src/workerd/api/kv.c++ @@ -69,7 +69,8 @@ constexpr auto FLPROD_405_HEADER = "CF-KV-FLPROD-405"_kj; kj::Own KvNamespace::getHttpClient(IoContext& context, kj::HttpHeaders& headers, kj::OneOf opTypeOrUnknown, - kj::StringPtr urlStr) { + kj::StringPtr urlStr, + kj::Maybe, PutOptions>> options) { const auto operationName = [&] { KJ_SWITCH_ONEOF(opTypeOrUnknown) { KJ_CASE_ONEOF(name, kj::LiteralStringConst) { @@ -82,6 +83,8 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, switch (opType) { case LimitEnforcer::KvOpType::GET: return "kv_get"_kjc; + case LimitEnforcer::KvOpType::GET_WITH: + return "kv_getWithMetadata"_kjc; case LimitEnforcer::KvOpType::PUT: return "kv_put"_kjc; case LimitEnforcer::KvOpType::LIST: @@ -95,8 +98,55 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, KJ_UNREACHABLE; }(); + kj::Vector tags; + tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc)); + tags.add("cloudflare.kv.operation.name"_kjc, kj::str(operationName.slice(3))); + + KJ_IF_SOME(_options, options) { + KJ_SWITCH_ONEOF(_options) { + KJ_CASE_ONEOF(o2, kj::OneOf) { + KJ_SWITCH_ONEOF(o2) { + KJ_CASE_ONEOF(type, kj::String) { + tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type)); + } + KJ_CASE_ONEOF(o, GetOptions) { + KJ_IF_SOME(type, o.type) { + tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type)); + } + KJ_IF_SOME(cacheTtl, o.cacheTtl) { + tags.add("cloudflare.kv.query.parameter.cacheTtl"_kjc, (int64_t)cacheTtl); + } + } + } + } + KJ_CASE_ONEOF(o, ListOptions) { + KJ_IF_SOME(l, o.limit) { + tags.add("cloudflare.kv.query.parameter.limit"_kjc, (int64_t)l); + } + KJ_IF_SOME(prefix, o.prefix) { + KJ_IF_SOME(p, prefix) { + tags.add("cloudflare.kv.query.parameter.prefix"_kjc, kj::mv(p)); + } + } + KJ_IF_SOME(cursor, o.cursor) { + KJ_IF_SOME(c, cursor) { + tags.add("cloudflare.kv.query.parameter.cursor"_kjc, kj::mv(c)); + } + } + } + KJ_CASE_ONEOF(o, PutOptions) { + KJ_IF_SOME(expiration, o.expiration) { + tags.add("cloudflare.kv.query.parameter.expiration"_kjc, (int64_t)expiration); + } + KJ_IF_SOME(expirationTtl, o.expirationTtl) { + tags.add("cloudflare.kv.query.parameter.expirationTtl"_kjc, (int64_t)expirationTtl); + } + } + } + } auto client = context.getHttpClientWithSpans( - subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}}); + subrequestChannel, true, kj::none, operationName, kj::mv(tags)); + headers.add(FLPROD_405_HEADER, urlStr); for (const auto& header: additionalHeaders) { headers.add(header.name.asPtr(), header.value.asPtr()); @@ -105,12 +155,11 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, return client; } -jsg::Promise KvNamespace::get(jsg::Lock& js, - kj::String name, - jsg::Optional> options, - CompatibilityFlags::Reader flags) { +jsg::Promise KvNamespace::get( + jsg::Lock& js, kj::String name, jsg::Optional> options) { return js.evalNow([&] { - auto resp = getWithMetadata(js, kj::mv(name), kj::mv(options)); + auto resp = + getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET); return resp.then(js, [](jsg::Lock&, KvNamespace::GetWithMetadataResult result) { return kj::mv(result.value); }); }); @@ -118,6 +167,13 @@ jsg::Promise KvNamespace::get(jsg::Lock& js, jsg::Promise KvNamespace::getWithMetadata( jsg::Lock& js, kj::String name, jsg::Optional> options) { + return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH); +} + +jsg::Promise KvNamespace::getWithMetadataImpl(jsg::Lock& js, + kj::String name, + jsg::Optional> options, + LimitEnforcer::KvOpType op) { validateKeyName("GET", name); auto& context = IoContext::current(); @@ -132,11 +188,11 @@ jsg::Promise KvNamespace::getWithMetadata( KJ_IF_SOME(oneOfOptions, options) { KJ_SWITCH_ONEOF(oneOfOptions) { KJ_CASE_ONEOF(t, kj::String) { - type = kj::mv(t); + type = kj::str(t); } KJ_CASE_ONEOF(options, GetOptions) { KJ_IF_SOME(t, options.type) { - type = kj::mv(t); + type = kj::str(t); } KJ_IF_SOME(cacheTtl, options.cacheTtl) { url.query.add(kj::Url::QueryParam{kj::str("cache_ttl"), kj::str(cacheTtl)}); @@ -148,7 +204,7 @@ jsg::Promise KvNamespace::getWithMetadata( auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST); auto headers = kj::HttpHeaders(context.getHeaderTable()); - auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET, urlStr); + auto client = getHttpClient(context, headers, op, urlStr, kj::mv(options)); auto request = client->request(kj::HttpMethod::GET, urlStr, headers); return context.awaitIo(js, kj::mv(request.response), @@ -260,7 +316,8 @@ jsg::Promise> KvNamespace::list( auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST); auto headers = kj::HttpHeaders(context.getHeaderTable()); - auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr); + auto client = + getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr, kj::mv(options)); auto request = client->request(kj::HttpMethod::GET, urlStr, headers); return context.awaitIo(js, kj::mv(request.response), @@ -363,7 +420,8 @@ jsg::Promise KvNamespace::put(jsg::Lock& js, auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST); - auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr); + auto client = + getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr, kj::mv(options)); auto promise = context.waitForOutputLocks().then( [&context, client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers), @@ -419,7 +477,8 @@ jsg::Promise KvNamespace::delete_(jsg::Lock& js, kj::String name) { kj::HttpHeaders headers(context.getHeaderTable()); - auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr); + auto client = + getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr, kj::none); auto promise = context.waitForOutputLocks().then( [headers = kj::mv(headers), client = kj::mv(client), urlStr = kj::mv(urlStr)]() mutable { diff --git a/src/workerd/api/kv.h b/src/workerd/api/kv.h index 0060991d462..491199029b7 100644 --- a/src/workerd/api/kv.h +++ b/src/workerd/api/kv.h @@ -50,10 +50,8 @@ class KvNamespace: public jsg::Object { using GetResult = kj::Maybe< kj::OneOf, kj::Array, kj::String, jsg::JsRef>>; - jsg::Promise get(jsg::Lock& js, - kj::String name, - jsg::Optional> options, - CompatibilityFlags::Reader flags); + jsg::Promise get( + jsg::Lock& js, kj::String name, jsg::Optional> options); struct GetWithMetadataResult { GetResult value; @@ -68,6 +66,10 @@ class KvNamespace: public jsg::Object { }); }; + jsg::Promise getWithMetadataImpl(jsg::Lock& js, + kj::String name, + jsg::Optional> options, + LimitEnforcer::KvOpType op); jsg::Promise getWithMetadata( jsg::Lock& js, kj::String name, jsg::Optional> options); @@ -173,7 +175,8 @@ class KvNamespace: public jsg::Object { kj::Own getHttpClient(IoContext& context, kj::HttpHeaders& headers, kj::OneOf opTypeOrName, - kj::StringPtr urlStr); + kj::StringPtr urlStr, + kj::Maybe, PutOptions>> options); private: kj::Array additionalHeaders; diff --git a/src/workerd/api/r2-admin.c++ b/src/workerd/api/r2-admin.c++ index f4eaa78cb28..788fefe9080 100644 --- a/src/workerd/api/r2-admin.c++ +++ b/src/workerd/api/r2-admin.c++ @@ -26,9 +26,8 @@ jsg::Ref R2Admin::get(jsg::Lock& js, kj::String bucketName) { jsg::Promise> R2Admin::create( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - // TODO(o11y): Add cloudflare.r2.bucket here. - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}}); + auto client = r2GetClient(context, subrequestChannel, + {"r2_create"_kjc, {"rpc.method"_kjc, "CreateBucket"_kjc}, name.asPtr()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -59,8 +58,8 @@ jsg::Promise R2Admin::list(jsg::Lock& js, const jsg::TypeHandler>& errorType, CompatibilityFlags::Reader flags) { auto& context = IoContext::current(); - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}}); + auto client = r2GetClient( + context, subrequestChannel, {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -116,9 +115,8 @@ jsg::Promise R2Admin::list(jsg::Lock& js, jsg::Promise R2Admin::delete_( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - // TODO(o11y): Add cloudflare.r2.bucket - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}}); + auto client = r2GetClient(context, subrequestChannel, + {"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteBucket"_kjc}, name.asPtr()}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-bucket.c++ b/src/workerd/api/r2-bucket.c++ index 2c6893f4a77..21eac01fc78 100644 --- a/src/workerd/api/r2-bucket.c++ +++ b/src/workerd/api/r2-bucket.c++ @@ -24,6 +24,21 @@ #include namespace workerd::api::public_beta { +kj::Own r2GetClient( + IoContext& context, uint subrequestChannel, R2UserTracing user) { + kj::Vector tags; + tags.add("rpc.service"_kjc, kj::str("r2"_kjc)); + tags.add(user.method.key, kj::str(user.method.value)); + KJ_IF_SOME(b, user.bucket) { + tags.add("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + KJ_IF_SOME(tag, user.extraTag) { + tags.add(tag.key, kj::str(tag.value)); + } + + return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, user.op, kj::mv(tags)); +} + static bool isWholeNumber(double x) { double intpart; return modf(x, &intpart) == 0; @@ -341,7 +356,8 @@ jsg::Promise>> R2Bucket::head(jsg::Lock return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -377,7 +393,9 @@ R2Bucket::get(jsg::Lock& js, return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.bucket"_kjc, name.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -439,7 +457,9 @@ jsg::Promise>> R2Bucket::put(jsg::Lock& }); auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_put"_kjc, {"rpc.method"_kjc, "PutObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.key"_kjc, name.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -630,8 +650,9 @@ jsg::Promise> R2Bucket::createMultipartUpload(jsg::L const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = - context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_createMultipartUpload"_kjc, {"rpc.method"_kjc, "CreateMultipartUpload"_kjc}, + this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -729,7 +750,20 @@ jsg::Promise R2Bucket::delete_(jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc); + auto deleteKey = [&]() { + KJ_SWITCH_ONEOF(keys) { + KJ_CASE_ONEOF(ks, kj::Array) { + return kj::str(ks); + } + KJ_CASE_ONEOF(k, kj::String) { + return kj::str(k); + } + } + KJ_UNREACHABLE; + }(); + auto client = r2GetClient(context, clientIndex, + {"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.delete"_kjc, deleteKey.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -774,7 +808,8 @@ jsg::Promise R2Bucket::list(jsg::Lock& js, CompatibilityFlags::Reader flags) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}, this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-bucket.h b/src/workerd/api/r2-bucket.h index e6772e0f491..fa6a65bf0e6 100644 --- a/src/workerd/api/r2-bucket.h +++ b/src/workerd/api/r2-bucket.h @@ -15,6 +15,24 @@ class Headers; namespace workerd::api::public_beta { +struct StringTagParams { + kj::LiteralStringConst key; + kj::StringPtr value; +}; + +struct R2UserTracing { + kj::LiteralStringConst op; + StringTagParams method; + // Passing Maybe instead of Maybe here – this avoids a branch on + // the caller side when bucket is already a Maybe, which is more convenient. + kj::Maybe bucket; + kj::Maybe extraTag; +}; + +// Helper for creating R2 HTTP Client with the right span tags across operations. This is much +// cleaner than setting span tags directly in each function. +kj::Own r2GetClient(IoContext& context, uint subrequestChannel, R2UserTracing user); + kj::Array cloneByteArray(const kj::Array& arr); kj::ArrayPtr fillR2Path( kj::StringPtr pathStorage[1], const kj::Maybe& bucket); diff --git a/src/workerd/api/r2-multipart.c++ b/src/workerd/api/r2-multipart.c++ index c3f00ac791f..de539af68d2 100644 --- a/src/workerd/api/r2-multipart.c++ +++ b/src/workerd/api/r2-multipart.c++ @@ -30,8 +30,9 @@ jsg::Promise R2MultipartUpload::uploadPart(jsg: "Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber); auto& context = IoContext::current(); - auto client = - context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_uploadPart"_kjc, {"rpc.method"_kjc, "UploadPart"_kjc}, this->bucket->adminBucketName(), + {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -95,8 +96,9 @@ jsg::Promise> R2MultipartUpload::complete(jsg::Lo const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_completeMultipartUpload"_kjc, {"rpc.method"_kjc, "CompleteMultipartUpload"_kjc}, + this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -145,8 +147,9 @@ jsg::Promise R2MultipartUpload::abort( jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_abortMultipartUpload"_kjc, {"rpc.method"_kjc, "AbortMultipartUpload"_kjc}, + this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 09c3439926b..10a70545d72 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -839,11 +839,11 @@ kj::Own IoContext::getSubrequestChannelWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags) { + kj::Vector tags) { return getSubrequest( [&](TraceContext& tracing, IoChannelFactory& channelFactory) { - for (const SpanTagParams& tag: tags) { - tracing.userSpan.setTag(kj::mv(tag.key), kj::str(tag.value)); + for (Span::Tag& tag: tags) { + tracing.userSpan.setTag(kj::mv(tag.key), kj::mv(tag.value)); } return getSubrequestChannelImpl( channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory); @@ -897,7 +897,7 @@ kj::Own IoContext::getHttpClientWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags) { + kj::Vector tags) { return asHttpClient(getSubrequestChannelWithSpans( channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags))); } diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 7aa3c2687cf..f15d7c6da7e 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -710,19 +710,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler kj::Maybe cfBlobJson, kj::ConstString operationName); - // As above, but with list of span tags to add. - // TODO(o11y): For now this only supports literal values based on initializer_list constraints. - // Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use - // that instead to support other value types. - struct SpanTagParams { - kj::LiteralStringConst key; - kj::LiteralStringConst value; - }; kj::Own getSubrequestChannelWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags); + kj::Vector tags); // Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only. kj::Own getSubrequestChannelNoChecks(uint channel, @@ -742,7 +734,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags); + kj::Vector tags); // Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects // to HttpClient. diff --git a/src/workerd/io/limit-enforcer.h b/src/workerd/io/limit-enforcer.h index 3287be9b5ee..4525f889ce2 100644 --- a/src/workerd/io/limit-enforcer.h +++ b/src/workerd/io/limit-enforcer.h @@ -109,7 +109,7 @@ class LimitEnforcer { // external subrequests. virtual void newSubrequest(bool isInHouse) = 0; - enum class KvOpType { GET, PUT, LIST, DELETE }; + enum class KvOpType { GET, GET_WITH, PUT, LIST, DELETE }; // Called before starting a KV operation. Throws a JSG exception if the operation should be // blocked due to exceeding limits, such as the free tier daily operation limit. virtual void newKvRequest(KvOpType op) = 0;