Skip to content

Commit

Permalink
Move ShiftBuffer function from common::sdp to utils
Browse files Browse the repository at this point in the history
This function is too useful and universal to be in that namespace
  • Loading branch information
ol-imorozko committed Dec 23, 2024
1 parent 065ac3f commit 51f25bb
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 57 deletions.
2 changes: 1 addition & 1 deletion autotest/autotest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,7 @@ common::PacketBufferRing::item_t* read_shm_packet(common::PacketBufferRing* buff
return nullptr;
}

return common::sdp::ShiftBuffer<common::PacketBufferRing::item_t*>(ring->memory, position * buffer->unit_size);
return utils::ShiftBuffer<common::PacketBufferRing::item_t*>(ring->memory, position * buffer->unit_size);
}

bool tAutotest::step_dumpPackets(const YAML::Node& yamlStep,
Expand Down
8 changes: 4 additions & 4 deletions cli/telegraf.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ void main_counters()
for (const auto& [coreId, worker_info] : sdp_data.workers)
{
std::vector<influxdb_format::value_t> values;
auto* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
auto* buffer = utils::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
for (const auto& [name, index] : sdp_data.metadata_worker.counter_positions)
{
values.emplace_back(name.data(), buffer[index]);
Expand All @@ -722,8 +722,8 @@ void main_counters()
for (const auto& [coreId, worker_info] : sdp_data.workers_gc)
{
std::vector<influxdb_format::value_t> values;
auto* buffer = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
auto* buffer = utils::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
for (const auto& [name, index] : sdp_data.metadata_worker_gc.counter_positions)
{
values.emplace_back(name.data(), buffer[index]);
Expand Down
22 changes: 11 additions & 11 deletions common/sdpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class SdpClient
shift);
return eResult::errorInitSharedMemory;
}
iter.second.buffer = ShiftBuffer<void*>(buffer, shift);
iter.second.buffer = ShiftBuffer(buffer, shift);
}
for (auto& iter : sdp_data.workers_gc)
{
Expand All @@ -147,7 +147,7 @@ class SdpClient
shift);
return eResult::errorInitSharedMemory;
}
iter.second.buffer = ShiftBuffer<void*>(buffer, shift);
iter.second.buffer = ShiftBuffer(buffer, shift);
}

return eResult::success;
Expand Down Expand Up @@ -177,8 +177,8 @@ class SdpClient
{
if (!core_id.has_value() || worker_core_id == core_id)
{
auto* counters = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
auto* counters = ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
result[worker_core_id] = counters[index];
}
}
Expand All @@ -193,8 +193,8 @@ class SdpClient
{
if (!core_id.has_value() || worker_core_id == core_id)
{
auto* counters = common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
auto* counters = ShiftBuffer<uint64_t*>(worker_info.buffer,
sdp_data.metadata_worker.start_counters);
result[worker_core_id] = counters[index];
}
}
Expand Down Expand Up @@ -232,8 +232,8 @@ class SdpClient
std::vector<uint64_t*> buffers;
for (const auto& iter : sdp_data.workers)
{
buffers.push_back(common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer,
sdp_data.metadata_worker.start_counters));
buffers.push_back(ShiftBuffer<uint64_t*>(iter.second.buffer,
sdp_data.metadata_worker.start_counters));
}

for (size_t i = 0; i < counter_ids.size(); i++)
Expand Down Expand Up @@ -429,7 +429,7 @@ class SdpClient

static uint64_t ReadValue(void* buffer, uint64_t index)
{
auto* data = common::sdp::ShiftBuffer<uint8_t*>(buffer, index * sizeof(uint64_t));
auto* data = ShiftBuffer<uint8_t*>(buffer, index * sizeof(uint64_t));
uint64_t result = 0;
for (int i = 0; i < 8; i++)
{
Expand All @@ -443,9 +443,9 @@ class SdpClient
values.clear();
for (uint64_t index = 0; index < count; index++)
{
void* current = common::sdp::ShiftBuffer<void*>(buffer, shift + 128 * index);
void* current = ShiftBuffer(buffer, shift + 128 * index);
uint64_t value = ReadValue(current, 0);
char* str = common::sdp::ShiftBuffer<char*>(current, 8);
char* str = ShiftBuffer<char*>(current, 8);
if (str[119] != 0)
{
// 119 - index of last symbol
Expand Down
15 changes: 6 additions & 9 deletions common/sdpcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "define.h"
#include "idp.h"
#include "utils.h"

// #define YANET_USE_POSIX_SHARED_MEMORY

Expand Down Expand Up @@ -87,19 +88,15 @@ Block for worker_gc
namespace common::sdp
{

using utils::ShiftBuffer;

#ifdef YANET_USE_POSIX_SHARED_MEMORY
inline std::string FileNameWorkerOnNumaNode(tSocketId socket_id)
{
return YANET_SHARED_MEMORY_PREFIX_WORKERS + std::to_string(socket_id) + ".shm";
}
#endif

template<typename TResult, typename TBuffer = void*>
inline TResult ShiftBuffer(TBuffer buffer, uint64_t size)
{
return reinterpret_cast<TResult>((reinterpret_cast<char*>(buffer) + size));
}

template<typename Key, typename Value>
bool MapsEqual(const std::map<Key, Value>& left, const std::map<Key, Value>& right)
{
Expand Down Expand Up @@ -224,9 +221,9 @@ struct DataPlaneInSharedMemory
{
auto count_errors = static_cast<uint32_t>(common::idp::errorType::size);
auto count_requests = static_cast<uint32_t>(common::idp::requestType::size);
auto* requests = common::sdp::ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section);
auto* errors = common::sdp::ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section + count_requests * sizeof(uint64_t));
auto* durations = common::sdp::ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section + (count_requests + count_errors) * sizeof(uint64_t));
auto* requests = ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section);
auto* errors = ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section + count_requests * sizeof(uint64_t));
auto* durations = ShiftBuffer<uint64_t*>(dataplane_data, start_bus_section + (count_requests + count_errors) * sizeof(uint64_t));
return {requests, errors, durations};
}
};
Expand Down
11 changes: 8 additions & 3 deletions common/utils.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#pragma once

#include <iomanip>
#include <vector>
#include <type_traits>
#include <iomanip>
#include <string>
#include <type_traits>
#include <vector>

namespace utils
{

template<typename TResult = void*>
TResult ShiftBuffer(void* buffer, size_t size)
{
static_assert(std::is_pointer_v<TResult>, "TResult must be a pointer type.");
return reinterpret_cast<TResult>(static_cast<std::byte*>(buffer) + size);
}

// Utility to calculate percentage
// TODO C++20: use std::type_identity_t to establish non-deduced context
// Will allow to do `to_percent(4.2, 1)`
Expand Down
2 changes: 1 addition & 1 deletion controlplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ std::vector<uint64_t> cControlPlane::getAclCounters()
uint64_t start_acl_counters = sdp_data.metadata_worker.start_acl_counters;
for (const auto& iter : sdp_data.workers)
{
auto* aclCounters = common::sdp::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters);
auto* aclCounters = utils::ShiftBuffer<uint64_t*>(iter.second.buffer, start_acl_counters);
for (size_t i = 0; i < YANET_CONFIG_ACL_COUNTERS_SIZE; i++)
{
response[i] += aclCounters[i];
Expand Down
2 changes: 1 addition & 1 deletion controlplane/telegraf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ common::icp::telegraf_other::response telegraf_t::telegraf_other()
{
std::array<uint64_t, CONFIG_YADECAP_MBUFS_BURST_SIZE + 1> bursts;
auto* worker_bursts =
common::sdp::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts);
utils::ShiftBuffer<uint64_t*>(worker_info.buffer, sdp_data->metadata_worker.start_bursts);
memcpy(&bursts[0], worker_bursts, sizeof(uint64_t) * (CONFIG_YADECAP_MBUFS_BURST_SIZE + 1));
currWorkers[coreId] = bursts;
}
Expand Down
2 changes: 1 addition & 1 deletion dataplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ common::idp::hexdump_ring::response cControlPlane::hexdump_ring(const common::id
return {};
}

auto addr = common::sdp::ShiftBuffer<char*>(shmaddr, offset);
auto addr = utils::ShiftBuffer<char*>(shmaddr, offset);
}

response.hexdumped_ring = //hexdump addr, size (what size) here;
Expand Down
9 changes: 6 additions & 3 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
#include "common/tsc_deltas.h"
#include "dataplane.h"
#include "dataplane/sdpserver.h"
#include "sharedmemory.h"
#include "globalbase.h"
#include "sharedmemory.h"
#include "sock_dev.h"
#include "work_runner.h"
#include "worker.h"
Expand Down Expand Up @@ -1681,6 +1681,9 @@ eResult cDataPlane::allocateSharedMemory()
/// split memory per worker
eResult cDataPlane::splitSharedMemoryPerWorkers()
{
using sharedmemory::SharedMemoryDumpRing;
using utils::ShiftBuffer;

for (cWorker* worker : workers_vector)
{
tSocketId socket_id = worker->socketId;
Expand All @@ -1700,7 +1703,7 @@ eResult cDataPlane::splitSharedMemoryPerWorkers()
{
const auto& [dump_size, dump_count, format] = ring_cfg;

auto memaddr = common::sdp::ShiftBuffer<void*>(shm, offset);
auto memaddr = utils::ShiftBuffer(shm, offset);

sharedmemory::SharedMemoryDumpRing ring(format, memaddr, dump_size, dump_count);
worker->dumpRings[ring_id] = ring;
Expand All @@ -1715,7 +1718,7 @@ eResult cDataPlane::splitSharedMemoryPerWorkers()
ring_id++;
}

auto memaddr = common::sdp::ShiftBuffer<void*>(shm, offset);
auto memaddr = utils::ShiftBuffer(shm, offset);
worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{};

offset += sizeof(dataplane::perf::tsc_deltas);
Expand Down
32 changes: 16 additions & 16 deletions dataplane/unittest/sdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class TestBus

void CompareWithClient(const common::sdp::DataPlaneInSharedMemory& sdp_data_client)
{
void* buffer = common::sdp::ShiftBuffer<void*>(sdp_data_client.dataplane_data, sdp_data_client.start_bus_section);
void* buffer = utils::ShiftBuffer(sdp_data_client.dataplane_data, sdp_data_client.start_bus_section);
auto count_errors = static_cast<uint32_t>(common::idp::errorType::size);
auto count_requests = static_cast<uint32_t>(common::idp::requestType::size);
auto* requests = common::sdp::ShiftBuffer<uint64_t*>(buffer, 0);
auto* errors = common::sdp::ShiftBuffer<uint64_t*>(buffer, count_requests * sizeof(uint64_t));
auto* durations = common::sdp::ShiftBuffer<uint64_t*>(buffer, (count_requests + count_errors) * sizeof(uint64_t));
auto* requests = utils::ShiftBuffer<uint64_t*>(buffer, 0);
auto* errors = utils::ShiftBuffer<uint64_t*>(buffer, count_requests * sizeof(uint64_t));
auto* durations = utils::ShiftBuffer<uint64_t*>(buffer, (count_requests + count_errors) * sizeof(uint64_t));

for (uint32_t index = 0; index < static_cast<uint32_t>(common::idp::requestType::size); index++)
{
Expand Down Expand Up @@ -159,11 +159,11 @@ class TestWorker

void SetBufferForCounters(void* buffer, const common::sdp::MetadataWorker& metadata)
{
counters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
aclCounters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_acl_counters);
bursts = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_bursts);
stats = common::sdp::ShiftBuffer<common::worker::stats::common*>(buffer, metadata.start_stats);
statsPorts = common::sdp::ShiftBuffer<common::worker::stats::port*>(buffer, metadata.start_stats_ports);
counters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
aclCounters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_acl_counters);
bursts = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_bursts);
stats = utils::ShiftBuffer<common::worker::stats::common*>(buffer, metadata.start_stats);
statsPorts = utils::ShiftBuffer<common::worker::stats::port*>(buffer, metadata.start_stats_ports);
}

void SetTestValues(tCoreId coreId)
Expand Down Expand Up @@ -207,29 +207,29 @@ class TestWorker
ASSERT_EQ(common::sdp::SdpClient::GetCounterByName(sdp_data_client, "dropPackets", coreId)[coreId], stats->dropPackets);

// statsPorts
auto* bufStatsPorts = common::sdp::ShiftBuffer<common::worker::stats::port*>(buffer, sdp_data_client.metadata_worker.start_stats_ports);
auto* bufStatsPorts = utils::ShiftBuffer<common::worker::stats::port*>(buffer, sdp_data_client.metadata_worker.start_stats_ports);
for (uint32_t index = 0; index < CONFIG_YADECAP_PORTS_SIZE + 1; index++)
{
ASSERT_EQ(statsPorts[index].controlPlane_drops, bufStatsPorts[index].controlPlane_drops);
ASSERT_EQ(statsPorts[index].physicalPort_egress_drops, bufStatsPorts[index].physicalPort_egress_drops);
}

// bursts
auto* bufBursts = common::sdp::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_bursts);
auto* bufBursts = utils::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_bursts);
for (uint32_t index = 0; index < CONFIG_YADECAP_MBUFS_BURST_SIZE + 1; index++)
{
ASSERT_EQ(bursts[index], bufBursts[index]);
}

// counters
auto* bufCounters = common::sdp::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_counters);
auto* bufCounters = utils::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_counters);
for (uint32_t index = 0; index < YANET_CONFIG_COUNTERS_SIZE; index++)
{
ASSERT_EQ(counters[index], bufCounters[index]);
}

// aclCounters
auto* bufAclCounters = common::sdp::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_acl_counters);
auto* bufAclCounters = utils::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker.start_acl_counters);
for (uint32_t index = 0; index < YANET_CONFIG_ACL_COUNTERS_SIZE; index++)
{
ASSERT_EQ(aclCounters[index], bufAclCounters[index]);
Expand Down Expand Up @@ -276,8 +276,8 @@ class TestWorkerGc

void SetBufferForCounters(void* buffer, const common::sdp::MetadataWorkerGc& metadata)
{
counters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
stats = common::sdp::ShiftBuffer<common::worker_gc::stats_t*>(buffer, metadata.start_stats);
counters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
stats = utils::ShiftBuffer<common::worker_gc::stats_t*>(buffer, metadata.start_stats);
}

void SetTestValues(tCoreId coreId)
Expand All @@ -302,7 +302,7 @@ class TestWorkerGc
ASSERT_EQ(common::sdp::SdpClient::GetCounterByName(sdp_data_client, "drop_samples", coreId)[coreId], stats->drop_samples);

// counters
auto* bufCounters = common::sdp::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker_gc.start_counters);
auto* bufCounters = utils::ShiftBuffer<uint64_t*>(buffer, sdp_data_client.metadata_worker_gc.start_counters);
for (uint32_t index = 0; index < YANET_CONFIG_COUNTERS_SIZE; index++)
{
ASSERT_EQ(counters[index], bufCounters[index]);
Expand Down
11 changes: 6 additions & 5 deletions dataplane/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "common/fallback.h"
#include "common/nat46clat.h"

#include "common/utils.h"
#include "dataplane/sdpserver.h"

#include "action_dispatcher.h"
Expand Down Expand Up @@ -344,11 +345,11 @@ void cWorker::FillMetadataWorkerCounters(common::sdp::MetadataWorker& metadata)

void cWorker::SetBufferForCounters(void* buffer, const common::sdp::MetadataWorker& metadata)
{
counters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
aclCounters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_acl_counters);
bursts = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_bursts);
stats = common::sdp::ShiftBuffer<common::worker::stats::common*>(buffer, metadata.start_stats);
statsPorts = common::sdp::ShiftBuffer<common::worker::stats::port*>(buffer, metadata.start_stats_ports);
counters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
aclCounters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_acl_counters);
bursts = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_bursts);
stats = utils::ShiftBuffer<common::worker::stats::common*>(buffer, metadata.start_stats);
statsPorts = utils::ShiftBuffer<common::worker::stats::port*>(buffer, metadata.start_stats_ports);
}

eResult cWorker::sanityCheck()
Expand Down
4 changes: 2 additions & 2 deletions dataplane/worker_gc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ void worker_gc_t::FillMetadataWorkerCounters(common::sdp::MetadataWorkerGc& meta

void worker_gc_t::SetBufferForCounters(void* buffer, const common::sdp::MetadataWorkerGc& metadata)
{
counters = common::sdp::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
stats = common::sdp::ShiftBuffer<common::worker_gc::stats_t*>(buffer, metadata.start_stats);
counters = utils::ShiftBuffer<uint64_t*>(buffer, metadata.start_counters);
stats = utils::ShiftBuffer<common::worker_gc::stats_t*>(buffer, metadata.start_stats);
}

YANET_INLINE_NEVER void worker_gc_t::thread()
Expand Down

0 comments on commit 51f25bb

Please sign in to comment.