Skip to content

Commit

Permalink
change the use of rte_ring for dumping in/out/drop packets to a newer…
Browse files Browse the repository at this point in the history
… approach
  • Loading branch information
Yan Evzman committed Apr 5, 2024
1 parent 53f60f5 commit 4e8f0b1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 100 deletions.
9 changes: 0 additions & 9 deletions dataplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1674,11 +1674,6 @@ void cControlPlane::mainThread()
break;
}
}
for (const auto& iter : dataPlane->workers)
{
cWorker* worker = iter.second;
ring_handle(worker->ring_toFreePackets, worker->ring_lowPriority);
}

for (auto& iter : kernel_interfaces)
{
Expand Down Expand Up @@ -1893,10 +1888,6 @@ unsigned cControlPlane::ring_handle(rte_ring* ring_to_free_mbuf,
{
handlePacket_farm(mbuf);
}
else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_dump)
{
handlePacket_dump(mbuf);
}
else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_repeat)
{
handlePacket_repeat(mbuf);
Expand Down
43 changes: 43 additions & 0 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "sock_dev.h"
#include "worker.h"

#define MAX_PACK_SIZE 16384

common::log::LogPriority common::log::logPriority = common::log::TLOG_INFO;

cDataPlane::cDataPlane() :
Expand Down Expand Up @@ -1209,6 +1211,25 @@ eResult cDataPlane::allocateSharedMemory()
}
}

// init size for in/out/drop lowPriority ring
for (const auto& [socket_id, num] : number_of_workers_per_socket)
{
auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE;
if (unit_size % RTE_CACHE_LINE_SIZE != 0)
{
unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up
}

auto size = sizeof(sharedmemory::ring_header_t) + unit_size * getConfigValues().ring_lowPriority_size;

auto it = shm_size_per_socket.find(socket_id);
if (it == shm_size_per_socket.end())
{
it = shm_size_per_socket.emplace_hint(it, socket_id, 0);
}
it->second += size * num;
}

for (const auto& [socket_id, num] : number_of_workers_per_socket)
{
auto it = shm_size_per_socket.find(socket_id);
Expand Down Expand Up @@ -1327,6 +1348,28 @@ eResult cDataPlane::splitSharedMemoryPerWorkers()

ring_id++;
}
// init lowPriority ring
{
auto name = "r_lp_" + std::to_string(core_id);
auto offset = offsets[shm];
auto memaddr = (void*)((intptr_t)shm + offset);
sharedmemory::cSharedMemory ring;

auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE;
if (unit_size % RTE_CACHE_LINE_SIZE != 0)
{
unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up
}
const auto units_number = getConfigValues().ring_lowPriority_size;
const auto size = sizeof(sharedmemory::ring_header_t) + unit_size * units_number;
ring.init(memaddr, unit_size, units_number);
offsets[shm] += size;

worker->lowPriorityRing = ring;

auto meta = common::idp::get_shm_info::dump_meta(name, "lp", unit_size, units_number, core_id, socket_id, key, offset);
dumps_meta.emplace_back(meta);
}
}

for (auto& [core_id, worker] : workers)
Expand Down
104 changes: 15 additions & 89 deletions dataplane/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ cWorker::cWorker(cDataPlane* dataPlane) :
translation_packet_id(0),
ring_highPriority(nullptr),
ring_normalPriority(nullptr),
ring_lowPriority(nullptr),
ring_toFreePackets(nullptr),
ring_log(nullptr),
packetsToSWNPRemainder(dataPlane->config.SWNormalPriorityRateLimitPerWorker)
Expand All @@ -66,11 +65,6 @@ cWorker::~cWorker()
rte_ring_free(ring_normalPriority);
}

if (ring_lowPriority)
{
rte_ring_free(ring_lowPriority);
}

if (ring_toFreePackets)
{
rte_ring_free(ring_toFreePackets);
Expand Down Expand Up @@ -140,15 +134,6 @@ eResult cWorker::init(const tCoreId& coreId,
return eResult::errorInitRing;
}

ring_lowPriority = rte_ring_create(("r_lp_" + std::to_string(coreId)).c_str(),
dataPlane->getConfigValues().ring_lowPriority_size,
socketId,
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (!ring_lowPriority)
{
return eResult::errorInitRing;
}

ring_toFreePackets = rte_ring_create(("r_tfp_" + std::to_string(coreId)).c_str(),
dataPlane->getConfigValues().ring_toFreePackets_size,
socketId,
Expand Down Expand Up @@ -1062,26 +1047,11 @@ inline void cWorker::physicalPort_ingress_handle(const unsigned int& worker_port

if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_IN_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress;
YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}
}

Expand Down Expand Up @@ -1327,26 +1297,12 @@ inline void cWorker::logicalPort_egress_handle()

if (basePermanently.globalBaseAtomic->physicalPort_flags[logicalPort.portId] & YANET_PHYSICALPORT_FLAG_OUT_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = logicalPort.portId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress;
YADECAP_METADATA(mbuf)->flow.data.dump.id = logicalPort.portId;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}
if (rte_mbuf_refcnt_read(mbuf) < 1)
{
Expand Down Expand Up @@ -5798,26 +5754,11 @@ inline void cWorker::drop(rte_mbuf* mbuf)

if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_DROP_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop;
YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId;;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}

rte_pktmbuf_free(mbuf);
Expand Down Expand Up @@ -5884,21 +5825,6 @@ inline void cWorker::slowWorker_entry_normalPriority(rte_mbuf* mbuf,
}
}

inline void cWorker::slowWorker_entry_lowPriority(rte_mbuf* mbuf)
{
/// @todo: worker::tStack

if (rte_ring_sp_enqueue(ring_lowPriority, (void*)mbuf))
{
stats.ring_lowPriority_drops++;
rte_pktmbuf_free(mbuf);
}
else
{
stats.ring_lowPriority_packets++;
}
}

YANET_NEVER_INLINE void cWorker::slowWorkerBeforeHandlePackets()
{
localBaseId = currentBaseId;
Expand Down
3 changes: 1 addition & 2 deletions dataplane/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ class cWorker

inline void slowWorker_entry_highPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT
inline void slowWorker_entry_normalPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT
inline void slowWorker_entry_lowPriority(rte_mbuf* mbuf); ///< @todo: DELETE and OPT

inline uint32_t get_tcp_state_timeout(uint8_t flags, const dataplane::globalBase::state_timeout_config_t& state_timeout_config);
inline uint32_t get_state_timeout(rte_mbuf* mbuf, dataplane::metadata* metadata, const dataplane::globalBase::state_timeout_config_t& state_timeout_config);
Expand Down Expand Up @@ -321,7 +320,6 @@ class cWorker

rte_ring* ring_highPriority;
rte_ring* ring_normalPriority;
rte_ring* ring_lowPriority;
dataplane::perf::tsc_deltas* tsc_deltas;
rte_ring* ring_toFreePackets;

Expand All @@ -337,6 +335,7 @@ class cWorker
int32_t packetsToSWNPRemainder;

sharedmemory::cSharedMemory dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER];
sharedmemory::cSharedMemory lowPriorityRing;

samples::Sampler sampler;

Expand Down

0 comments on commit 4e8f0b1

Please sign in to comment.