Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[o11y] Add KV span tags #3261

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 72 additions & 13 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ constexpr auto FLPROD_405_HEADER = "CF-KV-FLPROD-405"_kj;
kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrUnknown,
kj::StringPtr urlStr) {
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options) {
const auto operationName = [&] {
KJ_SWITCH_ONEOF(opTypeOrUnknown) {
KJ_CASE_ONEOF(name, kj::LiteralStringConst) {
Expand All @@ -82,6 +83,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
switch (opType) {
case LimitEnforcer::KvOpType::GET:
return "kv_get"_kjc;
case LimitEnforcer::KvOpType::GET_WITH:
return "kv_getWithMetadata"_kjc;
case LimitEnforcer::KvOpType::PUT:
return "kv_put"_kjc;
case LimitEnforcer::KvOpType::LIST:
Expand All @@ -95,8 +98,55 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
KJ_UNREACHABLE;
}();

kj::Vector<Span::Tag> tags;
tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc));
tags.add("cloudflare.kv.operation.name"_kjc, kj::str(operationName.slice(3)));

KJ_IF_SOME(_options, options) {
KJ_SWITCH_ONEOF(_options) {
KJ_CASE_ONEOF(o2, kj::OneOf<kj::String, GetOptions>) {
KJ_SWITCH_ONEOF(o2) {
KJ_CASE_ONEOF(type, kj::String) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_CASE_ONEOF(o, GetOptions) {
KJ_IF_SOME(type, o.type) {
tags.add("cloudflare.kv.query.parameter.type"_kjc, kj::mv(type));
}
KJ_IF_SOME(cacheTtl, o.cacheTtl) {
tags.add("cloudflare.kv.query.parameter.cacheTtl"_kjc, (int64_t)cacheTtl);
}
}
}
}
KJ_CASE_ONEOF(o, ListOptions) {
KJ_IF_SOME(l, o.limit) {
tags.add("cloudflare.kv.query.parameter.limit"_kjc, (int64_t)l);
}
KJ_IF_SOME(prefix, o.prefix) {
KJ_IF_SOME(p, prefix) {
tags.add("cloudflare.kv.query.parameter.prefix"_kjc, kj::mv(p));
}
}
KJ_IF_SOME(cursor, o.cursor) {
KJ_IF_SOME(c, cursor) {
tags.add("cloudflare.kv.query.parameter.cursor"_kjc, kj::mv(c));
}
}
}
KJ_CASE_ONEOF(o, PutOptions) {
KJ_IF_SOME(expiration, o.expiration) {
tags.add("cloudflare.kv.query.parameter.expiration"_kjc, (int64_t)expiration);
}
KJ_IF_SOME(expirationTtl, o.expirationTtl) {
tags.add("cloudflare.kv.query.parameter.expirationTtl"_kjc, (int64_t)expirationTtl);
}
}
}
}
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}});
subrequestChannel, true, kj::none, operationName, kj::mv(tags));

headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand All @@ -105,19 +155,25 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags) {
jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
fhanau marked this conversation as resolved.
Show resolved Hide resolved
auto resp = getWithMetadata(js, kj::mv(name), kj::mv(options));
auto resp =
getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET);
return resp.then(js,
[](jsg::Lock&, KvNamespace::GetWithMetadataResult result) { return kj::mv(result.value); });
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op) {
validateKeyName("GET", name);

auto& context = IoContext::current();
Expand All @@ -132,11 +188,11 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::mv(t);
type = kj::str(t);
}
KJ_IF_SOME(cacheTtl, options.cacheTtl) {
url.query.add(kj::Url::QueryParam{kj::str("cache_ttl"), kj::str(cacheTtl)});
Expand All @@ -148,7 +204,7 @@ jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET, urlStr);
auto client = getHttpClient(context, headers, op, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -260,7 +316,8 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::list(
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::LIST, urlStr, kj::mv(options));

auto request = client->request(kj::HttpMethod::GET, urlStr, headers);
return context.awaitIo(js, kj::mv(request.response),
Expand Down Expand Up @@ -363,7 +420,8 @@ jsg::Promise<void> KvNamespace::put(jsg::Lock& js,

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::PUT, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[&context, client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
Expand Down Expand Up @@ -419,7 +477,8 @@ jsg::Promise<void> KvNamespace::delete_(jsg::Lock& js, kj::String name) {

kj::HttpHeaders headers(context.getHeaderTable());

auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr);
auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::DELETE, urlStr, kj::none);

auto promise = context.waitForOutputLocks().then(
[headers = kj::mv(headers), client = kj::mv(client), urlStr = kj::mv(urlStr)]() mutable {
Expand Down
13 changes: 8 additions & 5 deletions src/workerd/api/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ class KvNamespace: public jsg::Object {
using GetResult = kj::Maybe<
kj::OneOf<jsg::Ref<ReadableStream>, kj::Array<byte>, kj::String, jsg::JsRef<jsg::JsValue>>>;

jsg::Promise<GetResult> get(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
CompatibilityFlags::Reader flags);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup: compat flags parameter is not needed here, that should simplify the generated code slightly.

jsg::Promise<GetResult> get(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

struct GetWithMetadataResult {
GetResult value;
Expand All @@ -68,6 +66,10 @@ class KvNamespace: public jsg::Object {
});
};

jsg::Promise<GetWithMetadataResult> getWithMetadataImpl(jsg::Lock& js,
kj::String name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
LimitEnforcer::KvOpType op);
jsg::Promise<GetWithMetadataResult> getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options);

Expand Down Expand Up @@ -173,7 +175,8 @@ class KvNamespace: public jsg::Object {
kj::Own<kj::HttpClient> getHttpClient(IoContext& context,
kj::HttpHeaders& headers,
kj::OneOf<LimitEnforcer::KvOpType, kj::LiteralStringConst> opTypeOrName,
kj::StringPtr urlStr);
kj::StringPtr urlStr,
kj::Maybe<kj::OneOf<ListOptions, kj::OneOf<kj::String, GetOptions>, PutOptions>> options);

private:
kj::Array<AdditionalHeader> additionalHeaders;
Expand Down
14 changes: 6 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket here.
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_create"_kjc, {"rpc.method"_kjc, "CreateBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -59,8 +58,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags) {
auto& context = IoContext::current();
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}});
auto client = r2GetClient(
context, subrequestChannel, {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -116,9 +115,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
jsg::Promise<void> R2Admin::delete_(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
49 changes: 42 additions & 7 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
#include <regex>

namespace workerd::api::public_beta {
kj::Own<kj::HttpClient> r2GetClient(
IoContext& context, uint subrequestChannel, R2UserTracing user) {
kj::Vector<Span::Tag> tags;
tags.add("rpc.service"_kjc, kj::str("r2"_kjc));
tags.add(user.method.key, kj::str(user.method.value));
KJ_IF_SOME(b, user.bucket) {
tags.add("cloudflare.r2.bucket"_kjc, kj::str(b));
}
KJ_IF_SOME(tag, user.extraTag) {
tags.add(tag.key, kj::str(tag.value));
}

return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, user.op, kj::mv(tags));
}

static bool isWholeNumber(double x) {
double intpart;
return modf(x, &intpart) == 0;
Expand Down Expand Up @@ -341,7 +356,8 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(jsg::Lock
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -377,7 +393,9 @@ R2Bucket::get(jsg::Lock& js,
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.bucket"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -439,7 +457,9 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::put(jsg::Lock&
});

auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_put"_kjc, {"rpc.method"_kjc, "PutObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.key"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -630,8 +650,9 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client =
context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_createMultipartUpload"_kjc, {"rpc.method"_kjc, "CreateMultipartUpload"_kjc},
this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -729,7 +750,20 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc);
auto deleteKey = [&]() {
KJ_SWITCH_ONEOF(keys) {
KJ_CASE_ONEOF(ks, kj::Array<kj::String>) {
return kj::str(ks);
}
KJ_CASE_ONEOF(k, kj::String) {
return kj::str(k);
}
}
KJ_UNREACHABLE;
}();
auto client = r2GetClient(context, clientIndex,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.delete"_kjc, deleteKey.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -774,7 +808,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,
CompatibilityFlags::Reader flags) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ class Headers;

namespace workerd::api::public_beta {

struct StringTagParams {
kj::LiteralStringConst key;
kj::StringPtr value;
};

struct R2UserTracing {
kj::LiteralStringConst op;
StringTagParams method;
// Passing Maybe<kj::StringPtr> instead of Maybe<StringTagParams> here – this avoids a branch on
// the caller side when bucket is already a Maybe, which is more convenient.
kj::Maybe<kj::StringPtr> bucket;
kj::Maybe<StringTagParams> extraTag;
};

// Helper for creating R2 HTTP Client with the right span tags across operations. This is much
// cleaner than setting span tags directly in each function.
kj::Own<kj::HttpClient> r2GetClient(IoContext& context, uint subrequestChannel, R2UserTracing user);

kj::Array<kj::byte> cloneByteArray(const kj::Array<kj::byte>& arr);
kj::ArrayPtr<kj::StringPtr> fillR2Path(
kj::StringPtr pathStorage[1], const kj::Maybe<kj::String>& bucket);
Expand Down
15 changes: 9 additions & 6 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(jsg:
"Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber);

auto& context = IoContext::current();
auto client =
context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_uploadPart"_kjc, {"rpc.method"_kjc, "UploadPart"_kjc}, this->bucket->adminBucketName(),
{{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -95,8 +96,9 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(jsg::Lo
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_completeMultipartUpload"_kjc, {"rpc.method"_kjc, "CompleteMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -145,8 +147,9 @@ jsg::Promise<void> R2MultipartUpload::abort(
jsg::Lock& js, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_abortMultipartUpload"_kjc, {"rpc.method"_kjc, "AbortMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
Loading
Loading