Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 13 additions & 79 deletions src/bk_download.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,13 @@ void BkDownload::switch_to_local_file() {
}

bool BkDownload::download_done() {
auto tracer = overlaybd_otel::get_tracer("overlaybd");
auto parent_span = opentelemetry::trace::Tracer::GetCurrentSpan();
opentelemetry::trace::StartSpanOptions options;
options.parent = parent_span->GetContext();
auto span = tracer->StartSpan("overlaybd.download.verify_and_commit", {}, {}, options);
auto tracer = overlaybd_otel::GetTracer();
auto span = tracer->StartSpan("download.verify_and_commit");
auto scope = tracer->WithActiveSpan(span);

auto lfs = new_localfs_adaptor();
if (!lfs) {
span->SetAttribute("error", "failed_to_create_fs_adaptor");
span->End();
LOG_ERROR("new_localfs_adaptor() return NULL");
return false;
}
Expand All @@ -87,9 +83,7 @@ bool BkDownload::download_done() {
// verify sha256
photon::semaphore done;
std::string shares;
opentelemetry::trace::StartSpanOptions verify_options;
verify_options.parent = opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
auto verify_span = tracer->StartSpan("sha256_verification", {}, {}, verify_options);
auto verify_span = tracer->StartSpan("sha256_verification");
auto verify_scope = tracer->WithActiveSpan(verify_span);
verify_span->SetAttribute("expected_digest", digest);

Expand All @@ -109,81 +103,47 @@ bool BkDownload::download_done() {
span->SetAttribute("error", "checksum_mismatch");
span->SetAttribute("expected_digest", digest);
span->SetAttribute("actual_digest", shares);
span->End();
LOG_ERROR("verify checksum ` failed (expect: `, got: `)", old_name, digest, shares);
force_download = true; // force redownload next time
return false;
}

opentelemetry::trace::StartSpanOptions rename_options;
rename_options.parent = opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
auto rename_span = tracer->StartSpan("rename_to_commit", {}, {}, rename_options);
auto rename_scope = tracer->WithActiveSpan(rename_span);
int ret = lfs->rename(old_name.c_str(), new_name.c_str());
rename_span->SetAttribute("success", ret == 0);

if (ret != 0) {
span->SetAttribute("error", "rename_failed");
span->End();
LOG_ERRNO_RETURN(0, false, "rename(`,`) failed", old_name, new_name);
}

span->SetAttribute("success", true);
LOG_INFO("download verify done. rename(`,`) success", old_name, new_name);
span->End();
return true;
}

bool BkDownload::download() {
auto tracer = overlaybd_otel::get_tracer("overlaybd");
// Get current span context from parent if it exists
auto parent_span = opentelemetry::trace::Tracer::GetCurrentSpan();
opentelemetry::trace::StartSpanOptions options;
options.parent = parent_span->GetContext();
auto span = tracer->StartSpan("overlaybd.download.lifecycle", {}, {}, options);
auto tracer = overlaybd_otel::GetTracer();
auto span = tracer->StartSpan("download");
auto scope = tracer->WithActiveSpan(span);

span->SetAttribute("url", url);
span->SetAttribute("original_url", url);
span->SetAttribute("dir", dir);
span->SetAttribute("file_size", file_size);

if (check_downloaded(dir)) {
opentelemetry::trace::StartSpanOptions local_options;
local_options.parent = opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
auto local_span = tracer->StartSpan("overlaybd.download.switch_to_local", {}, {}, local_options);
auto local_scope = tracer->WithActiveSpan(local_span);
switch_to_local_file();
span->SetAttribute("from_cache", true);
span->End();
span->SetAttribute("success", true);
return true;
}

span->SetAttribute("from_cache", false);
bool success = false;
if (download_blob()) {
opentelemetry::trace::StartSpanOptions verify_options;
verify_options.parent = opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
auto verify_span = tracer->StartSpan("overlaybd.download.verify", {}, {}, verify_options);
auto verify_scope = tracer->WithActiveSpan(verify_span);
if (!download_done()) {
verify_span->SetAttribute("success", false);
span->SetAttribute("success", false);
span->End();
return false;
if (download_done()) {
switch_to_local_file();
success = true;
}
verify_span->SetAttribute("success", true);
verify_span->End();

opentelemetry::trace::StartSpanOptions switch_options;
switch_options.parent = opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext();
auto switch_span = tracer->StartSpan("overlaybd.download.switch_to_local", {}, {}, switch_options);
auto switch_scope = tracer->WithActiveSpan(switch_span);
switch_to_local_file();
success = true;
}

span->SetAttribute("success", success);
span->End();
return success;
}

Expand All @@ -201,11 +161,8 @@ void BkDownload::unlock_file() {
}

bool BkDownload::download_blob() {
auto tracer = overlaybd_otel::get_tracer("overlaybd");
auto parent_span = opentelemetry::trace::Tracer::GetCurrentSpan();
opentelemetry::trace::StartSpanOptions options;
options.parent = parent_span->GetContext();
auto span = tracer->StartSpan("overlaybd.download.blob", {}, {}, options);
auto tracer = overlaybd_otel::GetTracer();
auto span = tracer->StartSpan("download.blob");
auto scope = tracer->WithActiveSpan(span);

std::string dl_file_path = dir + "/" + DOWNLOAD_TMP_NAME;
Expand All @@ -230,7 +187,6 @@ bool BkDownload::download_blob() {
auto dst = open_localfile_adaptor(dl_file_path.c_str(), O_RDWR | O_CREAT, 0644);
if (dst == nullptr) {
span->SetAttribute("error", "failed_to_open_dst");
span->End();
LOG_ERRNO_RETURN(0, false, "failed to open dst file `", dl_file_path.c_str());
}
DEFER(delete dst;);
Expand All @@ -243,7 +199,6 @@ bool BkDownload::download_blob() {
::posix_memalign(&buff, ALIGNMENT, bs);
if (buff == nullptr) {
span->SetAttribute("error", "failed_to_allocate_buffer");
span->End();
LOG_ERRNO_RETURN(0, false, "failed to allocate buffer with ", VALUE(bs));
}
DEFER(free(buff));
Expand All @@ -256,7 +211,6 @@ bool BkDownload::download_blob() {
while (offset < (ssize_t)file_size) {
if (running != 1) {
span->SetAttribute("error", "download_interrupted");
span->End();
LOG_INFO("image file exit when background downloading");
return false;
}
Expand All @@ -279,22 +233,15 @@ bool BkDownload::download_blob() {
if (!(retry--)) {
span->SetAttribute("error", "max_read_retries_exceeded");
span->SetAttribute("failed_offset", offset);
span->End();
LOG_ERROR_RETURN(EIO, false, "failed to read at ", VALUE(offset), VALUE(count));
}
ssize_t rlen;
{
auto read_span = tracer->StartSpan("overlaybd.download.read_block");
auto read_scope = tracer->WithActiveSpan(read_span);
read_span->SetAttribute("offset", offset);
read_span->SetAttribute("size", count);
SCOPE_AUDIT("bk_download", AU_FILEOP(url, offset, rlen));
rlen = src->pread(buff, bs, offset);
if (rlen >= 0) {
read_span->SetAttribute("bytes_read", rlen);
total_bytes_read += rlen;
}
read_span->End();
}
if (rlen < 0) {
retries++;
Expand All @@ -306,19 +253,12 @@ bool BkDownload::download_blob() {
if (!(retry--)) {
span->SetAttribute("error", "max_write_retries_exceeded");
span->SetAttribute("failed_offset", offset);
span->End();
LOG_ERROR_RETURN(EIO, false, "failed to write at ", VALUE(offset), VALUE(count));
}
auto write_span = tracer->StartSpan("overlaybd.download.write_block");
auto write_scope = tracer->WithActiveSpan(write_span);
write_span->SetAttribute("offset", offset);
write_span->SetAttribute("size", count);
auto wlen = dst->pwrite(buff, count, offset);
if (wlen >= 0) {
write_span->SetAttribute("bytes_written", wlen);
total_bytes_written += wlen;
}
write_span->End();
// but once write lenth larger than read length treats as OK
if (wlen < rlen) {
retries++;
Expand All @@ -333,12 +273,11 @@ bool BkDownload::download_blob() {
span->SetAttribute("total_retries", retries);
span->SetAttribute("success", true);
LOG_INFO("download blob done. (`)", dl_file_path);
span->End();
return true;
}

void bk_download_proc(std::list<BKDL::BkDownload *> &dl_list, uint64_t delay_sec, int &running) {
auto tracer = overlaybd_otel::get_tracer("overlaybd");
auto tracer = overlaybd_otel::GetTracer();
auto span = tracer->StartSpan("background_download_process");
auto scope = tracer->WithActiveSpan(span);

Expand Down Expand Up @@ -375,7 +314,6 @@ void bk_download_proc(std::list<BKDL::BkDownload *> &dl_list, uint64_t delay_sec

if (!dl_item->lock_file()) {
dl_span->SetAttribute("status", "lock_failed");
dl_span->End();
dl_list.push_back(dl_item);
continue;
}
Expand All @@ -385,22 +323,19 @@ void bk_download_proc(std::list<BKDL::BkDownload *> &dl_list, uint64_t delay_sec

if (running != 1) {
dl_span->SetAttribute("status", "interrupted");
dl_span->End();
LOG_WARN("image exited, background download exit...");
delete dl_item;
break;
}

if (!succ && dl_item->try_cnt > 0) {
dl_span->SetAttribute("status", "retry");
dl_span->End();
dl_list.push_back(dl_item);
LOG_WARN("download failed, push back to download queue and retry `", dl_item->dir);
continue;
}

dl_span->SetAttribute("status", succ ? "success" : "failed");
dl_span->End();

LOG_DEBUG("finish downloading or no retry any more: `, retry_cnt: `", dl_item->dir,
dl_item->try_cnt);
Expand All @@ -417,7 +352,6 @@ void bk_download_proc(std::list<BKDL::BkDownload *> &dl_list, uint64_t delay_sec
}
}
LOG_INFO("BACKGROUND DOWNLOAD THREAD EXIT.");
span->End();
}

} // namespace BKDL
2 changes: 2 additions & 0 deletions src/overlaybd/otel/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(SOURCE_OTEL
tracer_common.cpp
context_storage.cpp
)

include(FetchContent)
Expand All @@ -24,6 +25,7 @@ set_target_properties(otel_lib PROPERTIES LINKER_LANGUAGE CXX)

target_include_directories(otel_lib PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
${PHOTON_INCLUDE_DIR}
${opentelemetry-cpp_SOURCE_DIR}/api/include
${opentelemetry-cpp_SOURCE_DIR}/sdk/include
${opentelemetry-cpp_SOURCE_DIR}/exporters/otlp/include
Expand Down
45 changes: 45 additions & 0 deletions src/overlaybd/otel/context_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "context_storage.h"

#include <vector>

#include "photon/thread/thread-local.h"

using opentelemetry::context::Context;
using opentelemetry::context::Token;

static std::vector<Context>& GetStack() {
static photon::thread_local_ptr<std::vector<Context>> stack;
return *stack;
}

namespace overlaybd_otel {

Context LibPhotonContextStorage::GetCurrent() noexcept {
std::vector<Context>& stack = GetStack();
if (stack.empty()) {
return Context();
}
return stack.back();
}

bool LibPhotonContextStorage::Detach(Token &token) noexcept {
std::vector<Context>& stack = GetStack();
if (stack.empty()) {
return false;
}
for (auto it = stack.rbegin(); it != stack.rend(); ++it) {
if (token == *it) {
stack.erase(std::prev(it.base()), stack.end());
return true;
}
}
return false;
}

opentelemetry::nostd::unique_ptr<Token> LibPhotonContextStorage::Attach(const Context& context) noexcept {
std::vector<Context>& stack = GetStack();
stack.push_back(context);
return CreateToken(context);
}

} // overlaybd_otel
21 changes: 21 additions & 0 deletions src/overlaybd/otel/context_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once
#include <iterator>
#include <vector>

#include "opentelemetry/context/runtime_context.h"

namespace overlaybd_otel {

// ContextStorage implementation that uses photonlib's coroutine-aware thread local storage.
class LibPhotonContextStorage : public opentelemetry::context::RuntimeContextStorage {
public:
LibPhotonContextStorage() noexcept = default;

opentelemetry::context::Context GetCurrent() noexcept override;

bool Detach(opentelemetry::context::Token &token) noexcept override;

opentelemetry::nostd::unique_ptr<opentelemetry::context::Token> Attach(const opentelemetry::context::Context& context) noexcept override;
};

} // overlaybd_otel
38 changes: 14 additions & 24 deletions src/overlaybd/otel/tracer_common.cpp
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
#include "opentelemetry/context/runtime_context.h"
#include "tracer_common.h"
#include "context_storage.h"

namespace overlaybd_otel {

void InitTracer() {
// Create OTLP HTTP exporter configuration
opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;

// Install a libphoton coroutine-aware context storage implementation, which uses libphoton's thread-local implementation.
opentelemetry::context::RuntimeContext::SetRuntimeContextStorage(
opentelemetry::nostd::unique_ptr<opentelemetry::context::RuntimeContextStorage>(
new LibPhotonContextStorage()));

// Create OTLP/HTTP exporter using the factory
auto exporter = opentelemetry::exporter::otlp::OtlpHttpExporterFactory::Create(opts);

opentelemetry::sdk::trace::BatchSpanProcessorOptions bspOpts{};
auto processor =
opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(std::move(exporter), bspOpts);

// Create a simple processor (we'll use simple instead of batch for now)
// auto processor =
// opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor>> processors;
processors.push_back(std::move(processor));
processors.push_back(
opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(
opentelemetry::exporter::otlp::OtlpHttpExporterFactory::Create(), {}));

// Default is an always-on sampler
std::unique_ptr<opentelemetry::sdk::trace::TracerContext> context =
opentelemetry::sdk::trace::TracerContextFactory::Create(std::move(processors));
std::shared_ptr<opentelemetry::trace::TracerProvider> provider =
opentelemetry::sdk::trace::TracerProviderFactory::Create(std::move(context));
opentelemetry::sdk::trace::TracerProviderFactory::Create(
opentelemetry::sdk::trace::TracerContextFactory::Create(std::move(processors)));

// Set the global trace provider
opentelemetry::sdk::trace::Provider::SetTracerProvider(provider);

// // set global propagator
// opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
// opentelemetry::nostd::shared_ptr<opentelemetry::context::propagation::TextMapPropagator>(
// new opentelemetry::trace::propagation::HttpTraceContext()));
}

void CleanupTracer() {
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
opentelemetry::sdk::trace::Provider::SetTracerProvider(none);
}

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> get_tracer(std::string tracer_name) {
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
return provider->GetTracer(tracer_name);
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> GetTracer(opentelemetry::nostd::string_view tracer_name) {
return opentelemetry::trace::Provider::GetTracerProvider()->GetTracer(tracer_name);
}

} // namespace overlaybd_otel
Loading