Skip to content

Commit

Permalink
merge batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
dlacoste-esrf committed Aug 12, 2020
2 parents f42c0ff + ab9eae5 commit 2401a44
Show file tree
Hide file tree
Showing 19 changed files with 1,271 additions and 281 deletions.
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
Checks: '-*,clang-diagnostic-*,clang-analyzer-*,-*,modernize*,performance*,readability*,bugprone*,clang-analyzer*,cppcoreguidelines*,misc*,-readability-braces-around-statements,-cppcoreguidelines-pro-bounds-array-to-pointer-decay'
Checks: '-*,clang-diagnostic-*,clang-analyzer-*,-*,modernize*,performance*,readability*,bugprone*,clang-analyzer*,cppcoreguidelines*,misc*,-readability-braces-around-statements,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type'
WarningsAsErrors: ''
HeaderFilterRegex: '(src|test)/.*'
AnalyzeTemporaryDtors: false
Expand Down
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

# Build options
set(FETCH_LIBHDBPP_TAG "exp-refactor" CACHE STRING "Libhdbpp branch/tag to clone 'master'")
option(FETCH_LIBHDBPP "Download and build using a local copy of libhdb++" OFF)
option(FETCH_LIBHDBPP_TAG "When FETCH_LIBHDBPP is enabled, this is the tag fetch ('master')")
option(BUILD_UNIT_TESTS "Build unit tests" OFF)
option(BUILD_BENCHMARK_TESTS "Build benchmarking tests (Forces RELEASE build)" OFF)
option(ENABLE_CLANG "Enable clang code and layout analysis" OFF)
Expand Down
53 changes: 42 additions & 11 deletions benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,60 @@ project(benchmark-tests)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(CMAKE_COLOR_MAKEFILE ON)

set(BENCHMARK_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/QueryBuilderTests.cpp)
add_executable(query-builder-tests ${CMAKE_CURRENT_SOURCE_DIR}/QueryBuilderTests.cpp)
target_compile_options(query-builder-tests PRIVATE -Wall -Wextra -g)

target_link_libraries(query-builder-tests
PRIVATE
libhdbpp_timescale_static_library
TangoInterfaceLibrary
benchmark
benchmark_main
gtest
test-utils)

target_include_directories(query-builder-tests
PRIVATE ${CMAKE_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR})

add_executable(benchmark-tests ${BENCHMARK_SOURCES})
target_compile_options(benchmark-tests PRIVATE -Wall -Wextra -g)
target_compile_definitions(query-builder-tests
PRIVATE -DDEBUG_ENABLED)

target_link_libraries(benchmark-tests
PRIVATE libhdbpp_headers libhdbpp_timescale_shared_library TangoInterfaceLibrary benchmark gtest)
set_target_properties(query-builder-tests
PROPERTIES
LINK_FLAGS "-Wl,--no-undefined"
CXX_STANDARD 14)

target_include_directories(benchmark-tests
if(DO_CLANG_TIDY)
set_target_properties(query-builder-tests
PROPERTIES
CXX_CLANG_TIDY ${DO_CLANG_TIDY})
endif(DO_CLANG_TIDY)

add_executable(db-insert-tests ${CMAKE_CURRENT_SOURCE_DIR}/DbInsertionTests.cpp)
target_compile_options(db-insert-tests PRIVATE -Wall -Wextra -g)

target_link_libraries(db-insert-tests
PRIVATE
libhdbpp_timescale_static_library
TangoInterfaceLibrary
benchmark
benchmark_main
gtest
test-utils)

target_include_directories(db-insert-tests
PRIVATE ${CMAKE_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR})

target_compile_definitions(benchmark-tests
target_compile_definitions(db-insert-tests
PRIVATE -DDEBUG_ENABLED)

set_target_properties(benchmark-tests
set_target_properties(db-insert-tests
PROPERTIES
LINK_FLAGS "-Wl,--no-undefined"
CXX_STANDARD 14)

if(DO_CLANG_TIDY)
set_target_properties(unit-tests
set_target_properties(db-insert-tests
PROPERTIES
CXX_CLANG_TIDY ${DO_CLANG_TIDY})
endif(DO_CLANG_TIDY)

566 changes: 566 additions & 0 deletions benchmark/DbInsertionTests.cpp

Large diffs are not rendered by default.

40 changes: 19 additions & 21 deletions benchmark/QueryBuilderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
You should have received a copy of the Lesser GNU General Public License
along with libhdb++timescale. If not, see <http://www.gnu.org/licenses/>. */

#include "QueryBuilder.hpp"

#include <benchmark/benchmark.h>

//=============================================================================
//=============================================================================
void bmAllocateQueryBuilder(benchmark::State& state)
void bmAllocateQueryBuilder(benchmark::State &state)
{
// Test - Testing the time it takes to allocate a QueryBuilder, mainly for future test
// reference
hdbpp_internal::LogConfigurator::initLogging();
hdbpp_internal::LogConfigurator::initLogging("test");

for (auto _ : state)
hdbpp_internal::pqxx_conn::QueryBuilder query_builder;
Expand All @@ -36,10 +37,10 @@ BENCHMARK(bmAllocateQueryBuilder);

//=============================================================================
//=============================================================================
void bmTraitsComparator(benchmark::State& state)
void bmTraitsComparator(benchmark::State &state)
{
// TEST - Test the AttributeTraits comparator used in the cache inside QueryBuilder,
// the test is against a full map with every possible tango traits combination
// the test is against a full map with every possible tango traits combination
std::map<hdbpp_internal::AttributeTraits, std::string> trait_cache;

vector<Tango::CmdArgType> types {Tango::DEV_DOUBLE,
Expand Down Expand Up @@ -67,8 +68,7 @@ void bmTraitsComparator(benchmark::State& state)
for (auto &write : write_types)
{
// add to the cache for future hits
trait_cache.emplace(
hdbpp_internal::AttributeTraits{write, format, type},
trait_cache.emplace(hdbpp_internal::AttributeTraits {write, format, type},
to_string(write) + to_string(format) + to_string(type));
}
}
Expand All @@ -84,25 +84,25 @@ BENCHMARK(bmTraitsComparator);

//=============================================================================
//=============================================================================
static void writeTypeArgs(benchmark::internal::Benchmark* b)
static void writeTypeArgs(benchmark::internal::Benchmark *b)
{
vector<Tango::AttrWriteType> write_types {Tango::READ, Tango::WRITE, Tango::READ_WRITE, Tango::READ_WITH_WRITE};

for (auto & write_type : write_types)
for (auto &write_type : write_types)
b->Args({static_cast<int>(write_type)});
}

//=============================================================================
//=============================================================================
template<typename T>
void bmStoreDataEventQueryNoCache(benchmark::State& state)
void bmStoreDataEventQueryNoCache(benchmark::State &state)
{
// TEST - Testing how long it takes to build an Insert Data Event query with
// an empty cache (this forces the full string to be built)
hdbpp_internal::LogConfigurator::initLogging();
hdbpp_internal::LogConfigurator::initLogging("test");

hdbpp_internal::AttributeTraits traits
{static_cast<Tango::AttrWriteType>(state.range(0)), Tango::SCALAR, Tango::DEV_DOUBLE};
hdbpp_internal::AttributeTraits traits {
static_cast<Tango::AttrWriteType>(state.range(0)), Tango::SCALAR, Tango::DEV_DOUBLE};

for (auto _ : state)
{
Expand All @@ -115,14 +115,14 @@ void bmStoreDataEventQueryNoCache(benchmark::State& state)
//=============================================================================
//=============================================================================
template<typename T>
void bmStoreDataEventQueryCache(benchmark::State& state)
void bmStoreDataEventQueryCache(benchmark::State &state)
{
// TEST - Testing the full lookup for an Insert Data QueryEvent query when the cache
// map is fully populated
hdbpp_internal::LogConfigurator::initLogging();
// map is fully populated
hdbpp_internal::LogConfigurator::initLogging("test");

hdbpp_internal::AttributeTraits traits
{static_cast<Tango::AttrWriteType>(state.range(0)), Tango::SCALAR, Tango::DEV_DOUBLE};
hdbpp_internal::AttributeTraits traits {
static_cast<Tango::AttrWriteType>(state.range(0)), Tango::SCALAR, Tango::DEV_DOUBLE};

vector<Tango::CmdArgType> types {Tango::DEV_DOUBLE,
Tango::DEV_FLOAT,
Expand All @@ -147,13 +147,11 @@ void bmStoreDataEventQueryCache(benchmark::State& state)
for (auto &type : types)
for (auto &format : format_types)
for (auto &write : write_types)
query_builder.storeDataEventStatement<T>(hdbpp_internal::AttributeTraits{write, format, type});
query_builder.storeDataEventStatement<T>(hdbpp_internal::AttributeTraits {write, format, type});

for (auto _ : state)
query_builder.storeDataEventStatement<T>(traits);
}

BENCHMARK_TEMPLATE(bmStoreDataEventQueryNoCache, bool)->Apply(writeTypeArgs);
BENCHMARK_TEMPLATE(bmStoreDataEventQueryCache, bool)->Apply(writeTypeArgs);

BENCHMARK_MAIN();
139 changes: 113 additions & 26 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ namespace pqxx_conn
checkConnection(LOCATION_INFO);
checkAttributeExists(full_attr_name, LOCATION_INFO);

// first ensure the error message has an id inm the database, otherwise
// first ensure the error message has an id in the database, otherwise
// we can not store data against it
if (!_error_desc_id_cache->valueExists(error_msg))
storeErrorMsg(full_attr_name, error_msg);
Expand All @@ -416,36 +416,52 @@ namespace pqxx_conn
Tango::Except::throw_exception("Consistency Error", msg, LOCATION_INFO);
}

try
if (_enable_buffering)
{
// create and perform a pqxx transaction
pqxx::perform([&, this]() {
pqxx::work tx {(*_conn), StoreDataEventError};
auto query = QueryBuilder::storeDataEventErrorString(pqxx::to_string(_conf_id_cache->value(full_attr_name)),
pqxx::to_string(event_time),
pqxx::to_string(quality),
pqxx::to_string(_error_desc_id_cache->value(error_msg)),
traits);

query += ";";
_sql_buffer.push_back(query);
}
else
{
try
{
// create and perform a pqxx transaction
pqxx::perform([&, this]() {
pqxx::work tx {(*_conn), StoreDataEventError};

if (!tx.prepared(_query_builder.storeDataEventErrorName(traits)).exists())
{
tx.conn().prepare(_query_builder.storeDataEventErrorName(traits),
_query_builder.storeDataEventErrorStatement(traits));
if (!tx.prepared(_query_builder.storeDataEventErrorName(traits)).exists())
{
tx.conn().prepare(_query_builder.storeDataEventErrorName(traits),
_query_builder.storeDataEventErrorStatement(traits));

spdlog::trace("Created prepared statement for: {}", _query_builder.storeDataEventErrorName(traits));
}
spdlog::trace(
"Created prepared statement for: {}", _query_builder.storeDataEventErrorName(traits));
}

// no result expected
tx.exec_prepared0(_query_builder.storeDataEventErrorName(traits),
_conf_id_cache->value(full_attr_name),
event_time,
quality,
_error_desc_id_cache->value(error_msg));
// no result expected
tx.exec_prepared0(_query_builder.storeDataEventErrorName(traits),
_conf_id_cache->value(full_attr_name),
event_time,
quality,
_error_desc_id_cache->value(error_msg));

tx.commit();
});
}
catch (const pqxx::pqxx_exception &ex)
{
handlePqxxError("The attribute [" + full_attr_name + "] error message [" + error_msg + "] was not saved.",
ex.base().what(),
_query_builder.storeDataEventErrorName(traits),
LOCATION_INFO);
tx.commit();
});
}
catch (const pqxx::pqxx_exception &ex)
{
handlePqxxError(
"The attribute [" + full_attr_name + "] error message [" + error_msg + "] was not saved.",
ex.base().what(),
_query_builder.storeDataEventErrorName(traits),
LOCATION_INFO);
}
}
}

Expand Down Expand Up @@ -606,6 +622,77 @@ namespace pqxx_conn
return traits;
}

//=============================================================================
//=============================================================================
void DbConnection::flush()
{
spdlog::debug("Flushing buffer of size: {}", _sql_buffer.size());

if (_sql_buffer.empty())
{
spdlog::warn("Nothing to flush from the buffer, returning");
return;
}

try
{
pqxx::perform([&, this]() {
pqxx::work tx {(*_conn), StoreDataEvents};

string full_query;

for (auto const &query : _sql_buffer)
full_query += query;

tx.exec0(full_query);

// commit the result
tx.commit();
});
}
catch (const pqxx::pqxx_exception &ex)
{

spdlog::error("Error: An unexpected error occurred when trying to run a multiple event transaction.");
spdlog::error("Caught error at: {} Error: \"{}\"", LOCATION_INFO, ex.base().what());
spdlog::info("Trying to run multiple event transaction in single bunches.");

string full_msg = "";
bool single_error = false;

for (auto const &query : _sql_buffer)
{
try
{
pqxx::perform([&, this]() {
pqxx::work tx {(*_conn), StoreDataEvents};

tx.exec0(query);
tx.commit();
});
}
catch (const pqxx::pqxx_exception &ex)
{
single_error = true;
spdlog::error("Error: An unexpected error occurred when trying to run the single query: \"{}\"", query);
spdlog::error("Caught error at: {} Error: \"{}\"", LOCATION_INFO, ex.base().what());
full_msg += "Could not run query:" + query + "\n";
}
}

// we may try the events individually in future
_sql_buffer.clear();

if(single_error)
{
spdlog::error("Throwing storage error with message: \"{}\"", full_msg);
Tango::Except::throw_exception("Storage Error", full_msg, LOCATION_INFO);
}
}

_sql_buffer.clear();
}

//=============================================================================
//=============================================================================
void DbConnection::storeEvent(const std::string &full_attr_name, const std::string &event)
Expand Down
Loading

0 comments on commit 2401a44

Please sign in to comment.