diff --git a/dataplane/controlplane.cpp b/dataplane/controlplane.cpp index 8ca855f5..d9fbfc0e 100644 --- a/dataplane/controlplane.cpp +++ b/dataplane/controlplane.cpp @@ -1674,11 +1674,6 @@ void cControlPlane::mainThread() break; } } - for (const auto& iter : dataPlane->workers) - { - cWorker* worker = iter.second; - ring_handle(worker->ring_toFreePackets, worker->ring_lowPriority); - } for (auto& iter : kernel_interfaces) { @@ -1893,10 +1888,6 @@ unsigned cControlPlane::ring_handle(rte_ring* ring_to_free_mbuf, { handlePacket_farm(mbuf); } - else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_dump) - { - handlePacket_dump(mbuf); - } else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_repeat) { handlePacket_repeat(mbuf); diff --git a/dataplane/dataplane.cpp b/dataplane/dataplane.cpp index 698f40ba..bfd022c0 100644 --- a/dataplane/dataplane.cpp +++ b/dataplane/dataplane.cpp @@ -39,6 +39,8 @@ #include "sock_dev.h" #include "worker.h" +#define MAX_PACK_SIZE 16384 + common::log::LogPriority common::log::logPriority = common::log::TLOG_INFO; cDataPlane::cDataPlane() : @@ -1209,6 +1211,25 @@ eResult cDataPlane::allocateSharedMemory() } } + // init size for in/out/drop lowPriority ring + for (const auto& [socket_id, num] : number_of_workers_per_socket) + { + auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE; + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + + auto size = sizeof(sharedmemory::ring_header_t) + unit_size * getConfigValues().ring_lowPriority_size; + + auto it = shm_size_per_socket.find(socket_id); + if (it == shm_size_per_socket.end()) + { + it = shm_size_per_socket.emplace_hint(it, socket_id, 0); + } + it->second += size * num; + } + for (const auto& [socket_id, num] : number_of_workers_per_socket) { auto it = shm_size_per_socket.find(socket_id); @@ -1327,6 +1348,25 @@ eResult cDataPlane::splitSharedMemoryPerWorkers() ring_id++; } + // init lowPriority ring + { + auto name = "r_lp_" + std::to_string(core_id); + auto offset = offsets[shm]; + auto memaddr = (void*)((intptr_t)shm + offset); + sharedmemory::cSharedMemory ring; + + auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE; + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + const auto units_number = getConfigValues().ring_lowPriority_size; + const auto size = sizeof(sharedmemory::ring_header_t) + unit_size * units_number; + ring.init(memaddr, unit_size, units_number); + offsets[shm] += size; + + worker->lowPriorityRing = ring; + } } for (auto& [core_id, worker] : workers) diff --git a/dataplane/worker.cpp b/dataplane/worker.cpp index 5bbf58d1..d946cfc7 100644 --- a/dataplane/worker.cpp +++ b/dataplane/worker.cpp @@ -40,7 +40,6 @@ cWorker::cWorker(cDataPlane* dataPlane) : translation_packet_id(0), ring_highPriority(nullptr), ring_normalPriority(nullptr), - ring_lowPriority(nullptr), ring_toFreePackets(nullptr), ring_log(nullptr), packetsToSWNPRemainder(dataPlane->config.SWNormalPriorityRateLimitPerWorker) @@ -66,11 +65,6 @@ cWorker::~cWorker() rte_ring_free(ring_normalPriority); } - if (ring_lowPriority) - { - rte_ring_free(ring_lowPriority); - } - if (ring_toFreePackets) { rte_ring_free(ring_toFreePackets); @@ -140,15 +134,6 @@ eResult cWorker::init(const tCoreId& coreId, return eResult::errorInitRing; } - ring_lowPriority = rte_ring_create(("r_lp_" + std::to_string(coreId)).c_str(), - dataPlane->getConfigValues().ring_lowPriority_size, - socketId, - RING_F_SP_ENQ | RING_F_SC_DEQ); - if (!ring_lowPriority) - { - return eResult::errorInitRing; - } - ring_toFreePackets = rte_ring_create(("r_tfp_" + std::to_string(coreId)).c_str(), dataPlane->getConfigValues().ring_toFreePackets_size, socketId, @@ -1062,26 +1047,11 @@ inline void cWorker::physicalPort_ingress_handle(const unsigned int& worker_port if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_IN_DUMP) { - if (!rte_ring_full(ring_lowPriority)) - { - rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool); - if (mbuf_clone) - { - *YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf); - - rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*), - rte_pktmbuf_mtod(mbuf, char*), - mbuf->data_len); - - mbuf_clone->data_len = mbuf->data_len; - mbuf_clone->pkt_len = mbuf->pkt_len; - - YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId; - slowWorker_entry_lowPriority(mbuf_clone); - } - } + YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump; + YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress; + YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId; + auto& ring = lowPriorityRing; + ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump); } } @@ -1327,26 +1297,12 @@ inline void cWorker::logicalPort_egress_handle() if (basePermanently.globalBaseAtomic->physicalPort_flags[logicalPort.portId] & YANET_PHYSICALPORT_FLAG_OUT_DUMP) { - if (!rte_ring_full(ring_lowPriority)) - { - rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool); - if (mbuf_clone) - { - *YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf); - - rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*), - rte_pktmbuf_mtod(mbuf, char*), - mbuf->data_len); - mbuf_clone->data_len = mbuf->data_len; - mbuf_clone->pkt_len = mbuf->pkt_len; - - YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = logicalPort.portId; - slowWorker_entry_lowPriority(mbuf_clone); - } - } + YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump; + YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress; + YADECAP_METADATA(mbuf)->flow.data.dump.id = logicalPort.portId; + auto& ring = lowPriorityRing; + ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump); } if (rte_mbuf_refcnt_read(mbuf) < 1) { @@ -5798,26 +5754,11 @@ inline void cWorker::drop(rte_mbuf* mbuf) if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_DROP_DUMP) { - if (!rte_ring_full(ring_lowPriority)) - { - rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool); - if (mbuf_clone) - { - *YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf); - - rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*), - rte_pktmbuf_mtod(mbuf, char*), - mbuf->data_len); - - mbuf_clone->data_len = mbuf->data_len; - mbuf_clone->pkt_len = mbuf->pkt_len; - - YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop; - YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId; - slowWorker_entry_lowPriority(mbuf_clone); - } - } + YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump; + YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop; + YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId;; + auto& ring = lowPriorityRing; + ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump); } rte_pktmbuf_free(mbuf); @@ -5884,21 +5825,6 @@ inline void cWorker::slowWorker_entry_normalPriority(rte_mbuf* mbuf, } } -inline void cWorker::slowWorker_entry_lowPriority(rte_mbuf* mbuf) -{ - /// @todo: worker::tStack - - if (rte_ring_sp_enqueue(ring_lowPriority, (void*)mbuf)) - { - stats.ring_lowPriority_drops++; - rte_pktmbuf_free(mbuf); - } - else - { - stats.ring_lowPriority_packets++; - } -} - YANET_NEVER_INLINE void cWorker::slowWorkerBeforeHandlePackets() { localBaseId = currentBaseId; diff --git a/dataplane/worker.h b/dataplane/worker.h index e0352ee2..bcc7b8a6 100644 --- a/dataplane/worker.h +++ b/dataplane/worker.h @@ -205,7 +205,6 @@ class cWorker inline void slowWorker_entry_highPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT inline void slowWorker_entry_normalPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT - inline void slowWorker_entry_lowPriority(rte_mbuf* mbuf); ///< @todo: DELETE and OPT inline uint32_t get_tcp_state_timeout(uint8_t flags, const dataplane::globalBase::state_timeout_config_t& state_timeout_config); inline uint32_t get_state_timeout(rte_mbuf* mbuf, dataplane::metadata* metadata, const dataplane::globalBase::state_timeout_config_t& state_timeout_config); @@ -321,7 +320,6 @@ class cWorker rte_ring* ring_highPriority; rte_ring* ring_normalPriority; - rte_ring* ring_lowPriority; dataplane::perf::tsc_deltas* tsc_deltas; rte_ring* ring_toFreePackets; @@ -337,6 +335,7 @@ class cWorker int32_t packetsToSWNPRemainder; sharedmemory::cSharedMemory dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER]; + sharedmemory::cSharedMemory lowPriorityRing; samples::Sampler sampler;