Skip to content

Commit

Permalink
Engines common headers support (#1000)
Browse files Browse the repository at this point in the history
This commit adds support for the new format of engine common headers.
  • Loading branch information
apolyakov authored Jun 19, 2024
1 parent 3e2cdf6 commit 7fb4547
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 54 deletions.
73 changes: 73 additions & 0 deletions common/binlog/binlog-snapshot.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "common/binlog/binlog-snapshot.h"

#include <tuple>

#include "common/tl/fetch.h"

namespace kphp {
namespace tl {

namespace {

constexpr int32_t RESULT_TRUE_MAGIC{0x3f9c8ef8};

} // namespace

BarsicSnapshotHeader::BarsicSnapshotHeader()
: fields_mask()
, dependencies(DEPENDENCIES_BUFFER_SIZE)
, payload_offset() {}

void BarsicSnapshotHeader::SnapshotDependency::tl_fetch() noexcept {
std::basic_string<char> buffer{};
buffer.reserve(STRING_BUFFER_SIZE);

fields_mask = tl_fetch_int();
// skip cluster_id
vk::tl::fetch_string(buffer);
// skip shard_id
vk::tl::fetch_string(buffer);
payload_offset = tl_fetch_long();
}

void BarsicSnapshotHeader::tl_fetch() noexcept {
std::basic_string<char> buffer{};
buffer.reserve(STRING_BUFFER_SIZE);

fields_mask = tl_fetch_int();
// skip cluster_id
vk::tl::fetch_string(buffer);
// skip shard_id
vk::tl::fetch_string(buffer);
// skip snapshot_meta
vk::tl::fetch_string(buffer);
// skip dependencies
vk::tl::fetch_vector(dependencies);

payload_offset = tl_fetch_long();

// skip engine_version
vk::tl::fetch_string(buffer);
// skip creation_time_nano
std::ignore = tl_fetch_long();
// skip control_meta
if (static_cast<bool>(fields_mask & 0x1)) {
vk::tl::fetch_string(buffer);
}
}

void TlEngineSnapshotHeader::tl_fetch() noexcept {
fields_mask = tl_fetch_int();
binlog_time_sec = tl_fetch_long();

if (tl_fetch_int() == RESULT_TRUE_MAGIC) {
file_binlog_crc = tl_fetch_int();
}
}

} // namespace tl
} // namespace kphp
59 changes: 59 additions & 0 deletions common/binlog/binlog-snapshot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <cstddef>
#include <cstdint>
#include <optional>
#include <vector>

namespace kphp {
namespace tl {

constexpr auto UNEXPECTED_TL_MAGIC_ERROR_FORMAT = "unexpected TL magic 0x%x, expected 0x%x\n";

constexpr auto COMMON_HEADER_META_SIZE = sizeof(int32_t) + sizeof(int64_t);
constexpr auto COMMON_HEADER_HASH_SIZE = 2 * sizeof(int64_t);

constexpr int32_t PMEMCACHED_OLD_INDEX_MAGIC = 0x53407fa0;
constexpr int32_t PMEMCACHED_INDEX_RAM_MAGIC_G3 = 0x65049e9e;
constexpr int32_t BARSIC_SNAPSHOT_HEADER_MAGIC = 0x1d0d1b74;
constexpr int32_t TL_ENGINE_SNAPSHOT_HEADER_MAGIC = 0x4bf8b614;
constexpr int32_t PERSISTENT_CONFIG_V2_SNAPSHOT_BLOCK = 0x501096b7;
constexpr int32_t RPC_QUERIES_SNAPSHOT_QUERY_COMMON = 0x9586c501;
constexpr int32_t SNAPSHOT_MAGIC = 0xf0ec39fb;
constexpr int32_t COMMON_INFO_END = 0x5a9ce5ec;

struct BarsicSnapshotHeader {
struct SnapshotDependency {
int32_t fields_mask;
int64_t payload_offset;

void tl_fetch() noexcept;
};

int32_t fields_mask;
std::vector<SnapshotDependency> dependencies;
int64_t payload_offset;

void tl_fetch() noexcept;

BarsicSnapshotHeader();

private:
static constexpr auto STRING_BUFFER_SIZE = 512;
static constexpr auto DEPENDENCIES_BUFFER_SIZE = 128;
};

struct TlEngineSnapshotHeader {
int32_t fields_mask{};
int64_t binlog_time_sec{};
std::optional<int32_t> file_binlog_crc;

void tl_fetch() noexcept;
};

} // namespace tl
} // namespace kphp
3 changes: 2 additions & 1 deletion common/binlog/binlog.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ prepend(BINLOG_SOURCES ${COMMON_DIR}/binlog/
binlog-buffer.cpp
binlog-buffer-aio.cpp
binlog-buffer-rotation-points.cpp
binlog-buffer-replay.cpp)
binlog-buffer-replay.cpp
binlog-snapshot.cpp)

vk_add_library(binlog_src OBJECT ${BINLOG_SOURCES})
42 changes: 24 additions & 18 deletions common/binlog/snapshot-shifts.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,34 @@

#pragma once

inline static int get_snapshot_position_shift(const struct kfs_file_info *info) {
#include <cstdint>

#include "common/binlog/binlog-snapshot.h"
#include "common/tl/methods/string.h"

inline static long long get_snapshot_log_pos(const struct kfs_file_info *info) {
long long log_pos{-1};
if (info->preloaded_bytes < 4) {
return 0xffff;
return log_pos;
}
int magic = *(int *)(info->start);
if (magic == 0x53407fa0) { // PMEMCACHED_RAM_INDEX_MAGIC
return 16;
}
fprintf(stderr, "Unknown snapshot magic for file %s: %08x\n", info->filename, magic);
return 0xffff;
}

inline static long long get_snapshot_log_pos(const struct kfs_file_info *info) {
int shift = get_snapshot_position_shift(info);
long long log_pos = -1;
if (info->preloaded_bytes >= shift + 8) {
log_pos = *(long long *)(info->start + shift);
if (!(info->min_log_pos <= log_pos && log_pos <= info->max_log_pos)) {
fprintf(stderr, "filename %s info->min_log_pos %lld info->max_log_pos %lld log_pos %lld shift %d\n", info->filename, info->min_log_pos, info->max_log_pos, log_pos, shift);
assert(info->min_log_pos <= log_pos && log_pos <= info->max_log_pos);
const auto magic{*reinterpret_cast<const int32_t *>(info->start)};
if (magic == kphp::tl::PMEMCACHED_OLD_INDEX_MAGIC) {
log_pos = *reinterpret_cast<const int64_t *>(info->start + 2 * sizeof(int32_t) + sizeof(int64_t)); // add offset of log_pos1
} else if (magic == kphp::tl::BARSIC_SNAPSHOT_HEADER_MAGIC && info->preloaded_bytes >= kphp::tl::COMMON_HEADER_META_SIZE - sizeof(int32_t)) {
const auto tl_body_len{*reinterpret_cast<const int64_t *>(info->start + sizeof(int32_t))};
if (info->preloaded_bytes >= kphp::tl::COMMON_HEADER_META_SIZE + tl_body_len) {
kphp::tl::BarsicSnapshotHeader bsh{};
vk::tl::fetch_from_buffer(info->start + kphp::tl::COMMON_HEADER_META_SIZE, tl_body_len, bsh);
log_pos = bsh.payload_offset;
}
} else {
fprintf(stderr, "Unknown snapshot magic for file %s: %08x\n", info->filename, magic);
}

if (log_pos < info->min_log_pos || log_pos > info->max_log_pos) {
fprintf(stderr, "filename %s info->min_log_pos %lld info->max_log_pos %lld log_pos %lld\n", info->filename, info->min_log_pos, info->max_log_pos, log_pos);
}

return log_pos;
}

Loading

0 comments on commit 7fb4547

Please sign in to comment.