Skip to content

Commit

Permalink
[EXPORTER] Ignore exception when create thread in OTLP file exporter. (
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Jul 29, 2024
1 parent deed1e3 commit 84368cd
Showing 1 changed file with 72 additions and 50 deletions.
122 changes: 72 additions & 50 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
#include <thread>
#include <utility>
#include <vector>
#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

#if !defined(__CYGWIN__) && defined(_WIN32)
# ifndef WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -1424,71 +1427,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
return;
}

std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
return;
}

std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;
#endif

while (true)
std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
{
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}
return;
}

if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
break;
}
std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;

while (true)
{
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}

{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
break;
}

if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}

concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}
{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
}

concurrency_file->background_thread_waiter_cv.notify_all();
}
if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}

concurrency_file->background_thread_waiter_cv.notify_all();
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: "
<< e.what() << ".Data writing may experience some delays.");
}
catch (...)
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may "
"experience some delays.");
}
#endif
}

private:
Expand Down

1 comment on commit 84368cd

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'OpenTelemetry-cpp api Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 84368cd Previous: deed1e3 Ratio
BM_SpinLockThrashing/4/process_time/real_time 2.0180095325816763 ms/iter 0.7429373014223325 ms/iter 2.72
BM_ProcYieldSpinLockThrashing/2/process_time/real_time 0.6351052568509028 ms/iter 0.2022302410208767 ms/iter 3.14
BM_NaiveSpinLockThrashing/2/process_time/real_time 0.9532024188594923 ms/iter 0.20512459809298342 ms/iter 4.65
BM_NaiveSpinLockThrashing/4/process_time/real_time 2.5538009347267523 ms/iter 0.7230499644338348 ms/iter 3.53
BM_ThreadYieldSpinLockThrashing/1/process_time/real_time 14.997058444552952 ms/iter 6.342059687564247 ms/iter 2.36

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.