Skip to content

Commit

Permalink
[SDK] Valgrind errors on std::atomic variables (#2244)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff authored Jul 22, 2023
1 parent 92a8a54 commit 0c5f90d
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
# ifdef ENABLE_ASYNC_EXPORT
struct SynchronizationData
{
std::atomic<std::size_t> session_counter_;
std::atomic<std::size_t> finished_session_counter_;
std::atomic<std::size_t> session_counter_{0};
std::atomic<std::size_t> finished_session_counter_{0};
std::condition_variable force_flush_cv;
std::mutex force_flush_cv_m;
std::recursive_mutex force_flush_m;
Expand Down
8 changes: 3 additions & 5 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace
{

/**
* This class handles the response message from the Elasticsearch request
* This class handles the response message from the HTTP request
*/
class ResponseHandler : public http_client::EventHandler
{
Expand All @@ -75,9 +75,7 @@ class ResponseHandler : public http_client::EventHandler
ResponseHandler(std::function<bool(opentelemetry::sdk::common::ExportResult)> &&callback,
bool console_debug = false)
: result_callback_{std::move(callback)}, console_debug_{console_debug}
{
stopping_.store(false);
}
{}

std::string BuildResponseLogMessage(http_client::Response &response,
const std::string &body) noexcept
Expand Down Expand Up @@ -356,7 +354,7 @@ class ResponseHandler : public http_client::EventHandler
const opentelemetry::ext::http::client::Session *session_ = nullptr;

// Whether notify has been called
std::atomic<bool> stopping_;
std::atomic<bool> stopping_{false};

// A string to store the response body
std::string body_ = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class Session : public opentelemetry::ext::http::client::Session,
std::string scheme = "http",
const std::string &host = "",
uint16_t port = 80)
: http_client_(http_client), is_session_active_(false)
: http_client_(http_client)
{
host_ = scheme + "://" + host + ":" + std::to_string(port) + "/";
}
Expand Down Expand Up @@ -216,7 +216,7 @@ class Session : public opentelemetry::ext::http::client::Session,
std::unique_ptr<HttpOperation> curl_operation_;
uint64_t session_id_;
HttpClient &http_client_;
std::atomic<bool> is_session_active_;
std::atomic<bool> is_session_active_{false};
};

class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync
Expand Down Expand Up @@ -352,7 +352,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

std::mutex multi_handle_m_;
CURLM *multi_handle_;
std::atomic<uint64_t> next_session_id_;
std::atomic<uint64_t> next_session_id_{0};
uint64_t max_sessions_per_connection_;

std::mutex sessions_m_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ class HttpOperation

const char *GetCurlErrorMessage(CURLcode code);

std::atomic<bool> is_aborted_; // Set to 'true' when async callback is aborted
std::atomic<bool> is_finished_; // Set to 'true' when async callback is finished.
std::atomic<bool> is_cleaned_; // Set to 'true' when async callback is cleaned.
const bool is_raw_response_; // Do not split response headers from response body
const bool reuse_connection_; // Reuse connection
std::atomic<bool> is_aborted_{false}; // Set to 'true' when async callback is aborted
std::atomic<bool> is_finished_{false}; // Set to 'true' when async callback is finished.
std::atomic<bool> is_cleaned_{false}; // Set to 'true' when async callback is cleaned.
const bool is_raw_response_; // Do not split response headers from response body
const bool reuse_connection_; // Reuse connection
const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms

char curl_error_message_[CURL_ERROR_SIZE];
Expand Down Expand Up @@ -311,7 +311,7 @@ class HttpOperation

std::thread::id callback_thread;
std::function<void(HttpOperation &)> callback;
std::atomic<bool> is_promise_running;
std::atomic<bool> is_promise_running{false};
std::promise<CURLcode> result_promise;
std::future<CURLcode> result_future;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TracezDataAggregator

/** A boolean that is set to true in the constructor and false in the
* destructor to start and end execution of aggregate spans **/
std::atomic<bool> execute_;
std::atomic<bool> execute_{false};

/** Thread that executes aggregate spans at regurlar intervals during this
object's lifetime**/
Expand Down
4 changes: 2 additions & 2 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe
protected:
HTTP_SERVER_NS::HttpServer server_;
std::string server_address_;
std::atomic<bool> is_setup_;
std::atomic<bool> is_running_;
std::atomic<bool> is_setup_{false};
std::atomic<bool> is_running_{false};
std::vector<HTTP_SERVER_NS::HttpRequest> received_requests_;
std::mutex mtx_requests;
std::condition_variable cv_got_events;
Expand Down
10 changes: 5 additions & 5 deletions sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ class BatchLogRecordProcessor : public LogRecordProcessor
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SimpleLogRecordProcessor : public LogRecordProcessor
// The lock used to ensure the exporter is not called concurrently
opentelemetry::common::SpinLockMutex lock_;
// The atomic boolean to ensure the ShutDown() function is only called once
std::atomic<bool> is_shutdown_;
std::atomic<bool> is_shutdown_{false};
};
} // namespace logs
} // namespace sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::atomic<bool> is_force_flush_pending_;
std::atomic<bool> is_force_wakeup_background_worker_;
std::atomic<bool> is_force_flush_notified_;
std::atomic<bool> is_force_flush_pending_{false};
std::atomic<bool> is_force_wakeup_background_worker_{false};
std::atomic<bool> is_force_flush_notified_{false};
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};
Expand Down
10 changes: 5 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ class BatchSpanProcessor : public SpanProcessor
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
};

/**
Expand Down
15 changes: 2 additions & 13 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(
buffer_(max_queue_size_),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->is_shutdown.store(false);
}
{}

BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr<LogRecordExporter> &&exporter,
const BatchLogRecordProcessorOptions &options)
Expand All @@ -46,13 +41,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr<LogRecordExport
buffer_(options.max_queue_size),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
{}

std::unique_ptr<Recordable> BatchLogRecordProcessor::MakeRecordable() noexcept
{
Expand Down
8 changes: 1 addition & 7 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
buffer_(max_queue_size_),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
{}

std::unique_ptr<Recordable> BatchSpanProcessor::MakeRecordable() noexcept
{
Expand Down

0 comments on commit 0c5f90d

Please sign in to comment.