diff --git a/common/binlog/binlog-snapshot.cpp b/common/binlog/binlog-snapshot.cpp new file mode 100644 index 0000000000..accaf501bc --- /dev/null +++ b/common/binlog/binlog-snapshot.cpp @@ -0,0 +1,73 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "common/binlog/binlog-snapshot.h" + +#include + +#include "common/tl/fetch.h" + +namespace kphp { +namespace tl { + +namespace { + +constexpr int32_t RESULT_TRUE_MAGIC{0x3f9c8ef8}; + +} // namespace + +BarsicSnapshotHeader::BarsicSnapshotHeader() + : fields_mask() + , dependencies(DEPENDENCIES_BUFFER_SIZE) + , payload_offset() {} + +void BarsicSnapshotHeader::SnapshotDependency::tl_fetch() noexcept { + std::basic_string buffer{}; + buffer.reserve(STRING_BUFFER_SIZE); + + fields_mask = tl_fetch_int(); + // skip cluster_id + vk::tl::fetch_string(buffer); + // skip shard_id + vk::tl::fetch_string(buffer); + payload_offset = tl_fetch_long(); +} + +void BarsicSnapshotHeader::tl_fetch() noexcept { + std::basic_string buffer{}; + buffer.reserve(STRING_BUFFER_SIZE); + + fields_mask = tl_fetch_int(); + // skip cluster_id + vk::tl::fetch_string(buffer); + // skip shard_id + vk::tl::fetch_string(buffer); + // skip snapshot_meta + vk::tl::fetch_string(buffer); + // skip dependencies + vk::tl::fetch_vector(dependencies); + + payload_offset = tl_fetch_long(); + + // skip engine_version + vk::tl::fetch_string(buffer); + // skip creation_time_nano + std::ignore = tl_fetch_long(); + // skip control_meta + if (static_cast(fields_mask & 0x1)) { + vk::tl::fetch_string(buffer); + } +} + +void TlEngineSnapshotHeader::tl_fetch() noexcept { + fields_mask = tl_fetch_int(); + binlog_time_sec = tl_fetch_long(); + + if (tl_fetch_int() == RESULT_TRUE_MAGIC) { + file_binlog_crc = tl_fetch_int(); + } +} + +} // namespace tl +} // namespace kphp diff --git a/common/binlog/binlog-snapshot.h b/common/binlog/binlog-snapshot.h new file mode 100644 index 0000000000..6cf9c99510 --- /dev/null +++ b/common/binlog/binlog-snapshot.h @@ -0,0 +1,59 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include + +namespace kphp { +namespace tl { + +constexpr auto UNEXPECTED_TL_MAGIC_ERROR_FORMAT = "unexpected TL magic 0x%x, expected 0x%x\n"; + +constexpr auto COMMON_HEADER_META_SIZE = sizeof(int32_t) + sizeof(int64_t); +constexpr auto COMMON_HEADER_HASH_SIZE = 2 * sizeof(int64_t); + +constexpr int32_t PMEMCACHED_OLD_INDEX_MAGIC = 0x53407fa0; +constexpr int32_t PMEMCACHED_INDEX_RAM_MAGIC_G3 = 0x65049e9e; +constexpr int32_t BARSIC_SNAPSHOT_HEADER_MAGIC = 0x1d0d1b74; +constexpr int32_t TL_ENGINE_SNAPSHOT_HEADER_MAGIC = 0x4bf8b614; +constexpr int32_t PERSISTENT_CONFIG_V2_SNAPSHOT_BLOCK = 0x501096b7; +constexpr int32_t RPC_QUERIES_SNAPSHOT_QUERY_COMMON = 0x9586c501; +constexpr int32_t SNAPSHOT_MAGIC = 0xf0ec39fb; +constexpr int32_t COMMON_INFO_END = 0x5a9ce5ec; + +struct BarsicSnapshotHeader { + struct SnapshotDependency { + int32_t fields_mask; + int64_t payload_offset; + + void tl_fetch() noexcept; + }; + + int32_t fields_mask; + std::vector dependencies; + int64_t payload_offset; + + void tl_fetch() noexcept; + + BarsicSnapshotHeader(); + +private: + static constexpr auto STRING_BUFFER_SIZE = 512; + static constexpr auto DEPENDENCIES_BUFFER_SIZE = 128; +}; + +struct TlEngineSnapshotHeader { + int32_t fields_mask{}; + int64_t binlog_time_sec{}; + std::optional file_binlog_crc; + + void tl_fetch() noexcept; +}; + +} // namespace tl +} // namespace kphp diff --git a/common/binlog/binlog.cmake b/common/binlog/binlog.cmake index 7fa75452fd..1a6ecbadb9 100644 --- a/common/binlog/binlog.cmake +++ b/common/binlog/binlog.cmake @@ -3,6 +3,7 @@ prepend(BINLOG_SOURCES ${COMMON_DIR}/binlog/ binlog-buffer.cpp binlog-buffer-aio.cpp binlog-buffer-rotation-points.cpp - binlog-buffer-replay.cpp) + binlog-buffer-replay.cpp + binlog-snapshot.cpp) vk_add_library(binlog_src OBJECT ${BINLOG_SOURCES}) diff --git a/common/binlog/snapshot-shifts.h b/common/binlog/snapshot-shifts.h index 5a28872a4c..2bd0c3d0f6 100644 --- a/common/binlog/snapshot-shifts.h +++ b/common/binlog/snapshot-shifts.h @@ -4,28 +4,34 @@ #pragma once -inline static int get_snapshot_position_shift(const struct kfs_file_info *info) { +#include + +#include "common/binlog/binlog-snapshot.h" +#include "common/tl/methods/string.h" + +inline static long long get_snapshot_log_pos(const struct kfs_file_info *info) { + long long log_pos{-1}; if (info->preloaded_bytes < 4) { - return 0xffff; + return log_pos; } - int magic = *(int *)(info->start); - if (magic == 0x53407fa0) { // PMEMCACHED_RAM_INDEX_MAGIC - return 16; - } - fprintf(stderr, "Unknown snapshot magic for file %s: %08x\n", info->filename, magic); - return 0xffff; -} -inline static long long get_snapshot_log_pos(const struct kfs_file_info *info) { - int shift = get_snapshot_position_shift(info); - long long log_pos = -1; - if (info->preloaded_bytes >= shift + 8) { - log_pos = *(long long *)(info->start + shift); - if (!(info->min_log_pos <= log_pos && log_pos <= info->max_log_pos)) { - fprintf(stderr, "filename %s info->min_log_pos %lld info->max_log_pos %lld log_pos %lld shift %d\n", info->filename, info->min_log_pos, info->max_log_pos, log_pos, shift); - assert(info->min_log_pos <= log_pos && log_pos <= info->max_log_pos); + const auto magic{*reinterpret_cast(info->start)}; + if (magic == kphp::tl::PMEMCACHED_OLD_INDEX_MAGIC) { + log_pos = *reinterpret_cast(info->start + 2 * sizeof(int32_t) + sizeof(int64_t)); // add offset of log_pos1 + } else if (magic == kphp::tl::BARSIC_SNAPSHOT_HEADER_MAGIC && info->preloaded_bytes >= kphp::tl::COMMON_HEADER_META_SIZE - sizeof(int32_t)) { + const auto tl_body_len{*reinterpret_cast(info->start + sizeof(int32_t))}; + if (info->preloaded_bytes >= kphp::tl::COMMON_HEADER_META_SIZE + tl_body_len) { + kphp::tl::BarsicSnapshotHeader bsh{}; + vk::tl::fetch_from_buffer(info->start + kphp::tl::COMMON_HEADER_META_SIZE, tl_body_len, bsh); + log_pos = bsh.payload_offset; } + } else { + fprintf(stderr, "Unknown snapshot magic for file %s: %08x\n", info->filename, magic); + } + + if (log_pos < info->min_log_pos || log_pos > info->max_log_pos) { + fprintf(stderr, "filename %s info->min_log_pos %lld info->max_log_pos %lld log_pos %lld\n", info->filename, info->min_log_pos, info->max_log_pos, log_pos); } + return log_pos; } - diff --git a/server/confdata-binlog-replay.cpp b/server/confdata-binlog-replay.cpp index c8994c6f2e..7f4c6b58ce 100644 --- a/server/confdata-binlog-replay.cpp +++ b/server/confdata-binlog-replay.cpp @@ -4,26 +4,28 @@ #include "server/confdata-binlog-replay.h" +#include #include #include #include #include +#include #include #include #include +#include +#include #include "common/binlog/binlog-replayer.h" +#include "common/binlog/binlog-snapshot.h" #include "common/dl-utils-lite.h" -#include "common/precise-time.h" +#include "common/kfs/kfs.h" #include "common/server/engine-settings.h" #include "common/server/init-binlog.h" #include "common/server/init-snapshot.h" +#include "common/tl/methods/string.h" #include "common/wrappers/string_view.h" -#include "common/kfs/kfs.h" - -#include "runtime/allocator.h" #include "runtime/confdata-global-manager.h" -#include "runtime/kphp_core.h" #include "server/confdata-binlog-events.h" #include "server/confdata-stats.h" #include "server/server-log.h" @@ -116,34 +118,16 @@ class ConfdataBinlogReplayer : vk::binlog::replayer { return dot_pos; } - int load_index() noexcept { - if (!Snapshot) { - jump_log_ts = 0; - jump_log_pos = 0; - jump_log_crc32 = 0; - return 0; - } - index_header header; - kfs_read_file_assert (Snapshot, &header, sizeof(index_header)); - if (header.magic != PMEMCACHED_INDEX_MAGIC) { - fprintf(stderr, "index file is not for confdata\n"); - return -1; - } - jump_log_ts = header.log_timestamp; - jump_log_pos = header.log_pos1; - jump_log_crc32 = header.log_pos1_crc32; - - const int nrecords = header.nrecords; - vkprintf(2, "%d records readed\n", nrecords); - auto index_offset = std::make_unique(nrecords + 1); + int process_confdata_snapshot_entries(index_header &header) noexcept { + const auto index_offset = std::make_unique(header.nrecords + 1); assert (index_offset); - kfs_read_file_assert (Snapshot, index_offset.get(), sizeof(index_offset[0]) * (nrecords + 1)); - vkprintf(1, "index_offset[%d]=%" PRId64 "\n", nrecords, index_offset[nrecords]); + kfs_read_file_assert(Snapshot, index_offset.get(), sizeof(index_offset[0]) * (header.nrecords + 1)); + vkprintf(1, "index_offset[%d]=%" PRId64 "\n", header.nrecords, index_offset[header.nrecords]); - auto index_binary_data = std::make_unique(index_offset[nrecords]); + const auto index_binary_data = std::make_unique(index_offset[header.nrecords]); assert (index_binary_data); - kfs_read_file_assert (Snapshot, index_binary_data.get(), index_offset[nrecords]); + kfs_read_file_assert(Snapshot, index_binary_data.get(), index_offset[header.nrecords]); using entry_type = lev_confdata_store_wrapper; @@ -151,7 +135,7 @@ class ConfdataBinlogReplayer : vk::binlog::replayer { vk::string_view last_two_dots_key; array_size one_dot_elements_counter; array_size two_dots_elements_counter; - for (int i = 0; i < nrecords; i++) { + for (auto i = 0; i < header.nrecords; i++) { const auto &element = reinterpret_cast(index_binary_data[index_offset[i]]); const vk::string_view key{element.data, static_cast(std::max(element.key_len, short{0}))}; if (key.empty() || key_blacklist_.is_blacklisted(key)) { @@ -168,7 +152,7 @@ class ConfdataBinlogReplayer : vk::binlog::replayer { // disable the blacklist because we checked the keys during the previous step blacklist_enabled_ = false; - for (int i = 0; i < nrecords; i++) { + for (auto i = 0; i < header.nrecords; i++) { if (index_offset[i] >= 0) { store_element(reinterpret_cast(index_binary_data[index_offset[i]])); } @@ -184,6 +168,161 @@ class ConfdataBinlogReplayer : vk::binlog::replayer { return 0; } + int process_old_confdata_snapshot() noexcept { + index_header header; + kfs_read_file_assert(Snapshot, &header, sizeof(index_header)); + + if (header.magic != kphp::tl::PMEMCACHED_OLD_INDEX_MAGIC) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, header.magic, kphp::tl::PMEMCACHED_OLD_INDEX_MAGIC); + return -1; + } + + jump_log_ts = header.log_timestamp; + jump_log_pos = header.log_pos1; + jump_log_crc32 = header.log_pos1_crc32; + + return process_confdata_snapshot_entries(header); + } + + int process_barsic_common_header() noexcept { + std::array header_meta{}; + kfs_read_file_assert(Snapshot, header_meta.data(), header_meta.size()); + + int32_t magic{}; + vk::tl::fetch_from_buffer(reinterpret_cast(header_meta.data()), header_meta.size(), magic); + if (magic != kphp::tl::BARSIC_SNAPSHOT_HEADER_MAGIC) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, magic, kphp::tl::BARSIC_SNAPSHOT_HEADER_MAGIC); + return -1; + } + + int64_t tl_body_len{}; + vk::tl::fetch_from_buffer(reinterpret_cast(header_meta.data() + sizeof(int32_t)), header_meta.size() - sizeof(int32_t), + tl_body_len); + + std::vector buffer{static_cast(tl_body_len + kphp::tl::COMMON_HEADER_HASH_SIZE)}; + kfs_read_file_assert(Snapshot, buffer.data(), buffer.size()); + + kphp::tl::BarsicSnapshotHeader bsh{}; + vk::tl::fetch_from_buffer(reinterpret_cast(buffer.data()), buffer.size(), bsh); + // TODO: compute xxhash + + jump_log_pos = bsh.payload_offset; + return 0; + } + + int process_confdata_engine_header() noexcept { + std::array header_meta{}; + kfs_read_file_assert(Snapshot, header_meta.data(), header_meta.size()); + + int32_t magic{}; + vk::tl::fetch_from_buffer(reinterpret_cast(header_meta.data()), header_meta.size(), magic); + if (magic != kphp::tl::TL_ENGINE_SNAPSHOT_HEADER_MAGIC) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, magic, kphp::tl::TL_ENGINE_SNAPSHOT_HEADER_MAGIC); + return -1; + } + + int64_t tl_body_len{}; + vk::tl::fetch_from_buffer(reinterpret_cast(header_meta.data() + sizeof(int32_t)), header_meta.size() - sizeof(int32_t), + tl_body_len); + + std::vector buffer{static_cast(tl_body_len + kphp::tl::COMMON_HEADER_HASH_SIZE)}; + kfs_read_file_assert(Snapshot, buffer.data(), buffer.size()); + + kphp::tl::TlEngineSnapshotHeader esh{}; + vk::tl::fetch_from_buffer(reinterpret_cast(buffer.data()), buffer.size(), esh); + // TODO: compute xxhash + + jump_log_ts = esh.binlog_time_sec; + jump_log_crc32 = esh.file_binlog_crc.value_or(0); + return 0; + } + + int skip_persistent_config() noexcept { + int32_t magic{}; + kfs_read_file_assert(Snapshot, &magic, sizeof(int32_t)); + if (magic != kphp::tl::PERSISTENT_CONFIG_V2_SNAPSHOT_BLOCK) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, magic, kphp::tl::PERSISTENT_CONFIG_V2_SNAPSHOT_BLOCK); + return -1; + } + + int32_t num_sizes{}; + kfs_read_file_assert(Snapshot, &num_sizes, sizeof(int32_t)); + + std::vector sizes(num_sizes); + kfs_read_file_assert(Snapshot, reinterpret_cast(sizes.data()), sizes.size() * sizeof(int64_t)); + + // use sizeof(int32_t) as initial value for accumulate since persistent config ends with crc32 + ::lseek(Snapshot->fd, std::accumulate(sizes.cbegin(), sizes.cend(), static_cast(sizeof(int32_t))), SEEK_CUR); + return 0; + } + + int skip_persistent_queries() noexcept { + int32_t size{}; + kfs_read_file_assert(Snapshot, &size, sizeof(int32_t)); + if (size != 0) { + fprintf(stderr, "unexpected non-zero size of persistent queries: %d\n", size); + return -1; + } + // skip hash + ::lseek(Snapshot->fd, kphp::tl::COMMON_HEADER_HASH_SIZE, SEEK_CUR); + return 0; + } + + int process_snapshot() noexcept { + if (!Snapshot) { + jump_log_ts = 0; + jump_log_pos = 0; + jump_log_crc32 = 0; + return 0; + } + + int32_t header_magic{}; + kfs_read_file_assert(Snapshot, &header_magic, sizeof(int32_t)); + // move cursor back so old index reader can safely read a header it expects + ::lseek(Snapshot->fd, -sizeof(int32_t), SEEK_CUR); + + if (header_magic == kphp::tl::PMEMCACHED_OLD_INDEX_MAGIC) { + return process_old_confdata_snapshot(); + } else if (header_magic == kphp::tl::BARSIC_SNAPSHOT_HEADER_MAGIC) { + if (process_barsic_common_header() != 0) { return -1; } + if (process_confdata_engine_header() != 0) { return -1; } + if (skip_persistent_config() != 0) { return -1; } + + kfs_read_file_assert(Snapshot, &header_magic, sizeof(int32_t)); + if (header_magic == kphp::tl::RPC_QUERIES_SNAPSHOT_QUERY_COMMON) { + // PMC code may write persistent query, but confdata should not have any + fprintf(stderr, "active persistent query (magic 0x%x) are not supported in confdata snapshots", kphp::tl::RPC_QUERIES_SNAPSHOT_QUERY_COMMON); + return -1; + } + + // engine code always writes this section, but it doesn't make any sense for confdata. + // Don't do strict check in case this section disappears + if (header_magic == kphp::tl::SNAPSHOT_MAGIC) { + if (skip_persistent_queries() != 0) { return -1; } + } else { + // move cursor back to let the next read take the whole index_header. + ::lseek(Snapshot->fd, -sizeof(int32_t), SEEK_CUR); + } + + kfs_read_file_assert(Snapshot, &header_magic, sizeof(int32_t)); + if (header_magic != kphp::tl::COMMON_INFO_END) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, header_magic, kphp::tl::COMMON_INFO_END); + return -1; + } + + index_header idx_header{}; + kfs_read_file_assert(Snapshot, &idx_header, sizeof(index_header)); + if (idx_header.magic != kphp::tl::PMEMCACHED_INDEX_RAM_MAGIC_G3) { + fprintf(stderr, kphp::tl::UNEXPECTED_TL_MAGIC_ERROR_FORMAT, idx_header.magic, kphp::tl::PMEMCACHED_INDEX_RAM_MAGIC_G3); + return -1; + } + return process_confdata_snapshot_entries(idx_header); + } + + fprintf(stderr, "unexpected header magic: 0x%x\n", header_magic); + return -1; + } + OperationStatus delete_element(const char *key, short key_len) noexcept { auto memory_status = current_memory_status(); return generic_operation(key, key_len, -1, memory_status, [this] (MemoryStatus memory_status) { @@ -899,7 +1038,7 @@ void init_confdata_binlog_reader() noexcept { static engine_settings_t settings = {}; settings.name = NAME_VERSION; settings.load_index = []() { - return ConfdataBinlogReplayer::get().load_index(); + return ConfdataBinlogReplayer::get().process_snapshot(); }; settings.replay_logevent = [](const lev_generic *E, int size) { return ConfdataBinlogReplayer::get().replay(E, size); @@ -934,6 +1073,7 @@ void init_confdata_binlog_reader() noexcept { auto &confdata_binlog_replayer = ConfdataBinlogReplayer::get(); confdata_binlog_replayer.init(confdata_manager.get_resource()); engine_default_load_index(confdata_settings.binlog_mask); + update_confdata_state_from_binlog(true, 10 * confdata_settings.confdata_update_timeout_sec); if (confdata_binlog_replayer.current_memory_status() != ConfdataBinlogReplayer::MemoryStatus::NORMAL) { confdata_binlog_replayer.raise_confdata_oom_error("Can't read confdata binlog on start"); diff --git a/server/pmemcached-binlog-interface.h b/server/pmemcached-binlog-interface.h index 3fb778f4e7..fd9319757c 100644 --- a/server/pmemcached-binlog-interface.h +++ b/server/pmemcached-binlog-interface.h @@ -94,8 +94,6 @@ struct lev_pmemcached_touch { #pragma pack(pop) -#define PMEMCACHED_INDEX_MAGIC 0x53407fa0 - // snapshot structures typedef struct { /* strange numbers */ @@ -107,7 +105,6 @@ typedef struct { int log_timestamp; unsigned int log_pos0_crc32; unsigned int log_pos1_crc32; - int nrecords; } index_header;