From 772a3ebf119335d6a1a115c4bb9b149d38ca2735 Mon Sep 17 00:00:00 2001 From: Ivan Morozko Date: Thu, 12 Dec 2024 20:17:46 +0400 Subject: [PATCH] TMP: add pcaps, for now dealing with capacity of shm --- common/bufferring.h | 29 +++- dataplane/action_dispatcher.h | 7 +- dataplane/config.h | 3 +- dataplane/controlplane.cpp | 2 +- dataplane/dataplane.cpp | 28 ++- dataplane/meson.build | 2 + dataplane/pcap_shm_device.cpp | 311 ++++++++++++++++++++++++++++++++++ dataplane/pcap_shm_device.h | 233 +++++++++++++++++++++++++ dataplane/sharedmemory.cpp | 97 +++++++---- dataplane/sharedmemory.h | 83 +++++++-- dataplane/worker.h | 3 +- 11 files changed, 724 insertions(+), 74 deletions(-) create mode 100644 dataplane/pcap_shm_device.cpp create mode 100644 dataplane/pcap_shm_device.h diff --git a/common/bufferring.h b/common/bufferring.h index 6c4e4aea..105a158e 100644 --- a/common/bufferring.h +++ b/common/bufferring.h @@ -24,20 +24,39 @@ struct PacketBufferRing { PacketBufferRing() = default; - PacketBufferRing(void* memory, size_t ring_size, size_t item_count) : - unit_size(sizeof(item_header_t) + ring_size), units_number(item_count) + // static function, helps to get capacity in DumpRingRaw + static size_t GetCapacity(size_t ring_size, size_t item_count, size_t unit_size = 0) { - if (unit_size % RTE_CACHE_LINE_SIZE != 0) + if (unit_size == 0) { - unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + unit_size = sizeof(item_header_t) + ring_size; + + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } } - capacity = sizeof(ring_header_t) + unit_size * units_number; + size_t capacity = sizeof(ring_header_t) + unit_size * item_count; + if (capacity % RTE_CACHE_LINE_SIZE != 0) { capacity += RTE_CACHE_LINE_SIZE - capacity % RTE_CACHE_LINE_SIZE; /// round up } + return capacity; + } + + PacketBufferRing(void* memory, size_t ring_size, size_t item_count) : + unit_size(sizeof(item_header_t) + ring_size), units_number(item_count) + { + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + + capacity = GetCapacity(ring_size, item_count, unit_size); + ring = (ring_t*)memory; } diff --git a/dataplane/action_dispatcher.h b/dataplane/action_dispatcher.h index b3c3b771..87ef3053 100644 --- a/dataplane/action_dispatcher.h +++ b/dataplane/action_dispatcher.h @@ -88,8 +88,11 @@ struct ActionDispatcher return; } - auto& ring = args.worker->dumpRings[ring_id]; - ring.write(args.mbuf, flow.type); + cWorker* worker = args.worker; + + // polymorphic, will execute either DumpRingRaw or DumpRingPcap method, + // likely to be devirtualized + worker->dump_rings[ring_id]->Write(args.mbuf, flow.type, worker->CurrentTime()); } static void execute(const common::StateTimeoutAction& action, const Flow& flow, const ActionDispatcherArgs& args) diff --git a/dataplane/config.h b/dataplane/config.h index bd43f6ad..871cb2c3 100644 --- a/dataplane/config.h +++ b/dataplane/config.h @@ -23,11 +23,12 @@ struct tDataPlaneConfig kPcap }; + //TODO: add here path, prefix, pcap files count? like std::variant if format == pcap? struct DumpConfig { + DumpFormat format; unsigned int size; unsigned int count; - DumpFormat format; }; static DumpFormat StringToDumpFormat(const std::string& format_str) diff --git a/dataplane/controlplane.cpp b/dataplane/controlplane.cpp index e17146e4..4675293d 100644 --- a/dataplane/controlplane.cpp +++ b/dataplane/controlplane.cpp @@ -1055,7 +1055,7 @@ common::idp::hexdump_ring::response cControlPlane::hexdump_ring(const common::id for (cWorker* worker : dataPlane->workers_vector) { - auto ring = worker->dumpRings[ring_id]; + auto ring = worker->dump_rings[ring_id]; auto addr = reinterpret_cast(ring.buffer.ring); diff --git a/dataplane/dataplane.cpp b/dataplane/dataplane.cpp index bc83abb7..2cff8abf 100644 --- a/dataplane/dataplane.cpp +++ b/dataplane/dataplane.cpp @@ -1601,12 +1601,7 @@ static std::unordered_map calculate_shared_memory_size(const // Calculate sizes based on shared memory configuration for (const auto& ring_cfg : config.shared_memory) { - const auto& [dump_size, dump_count, format] = ring_cfg.second; - YANET_GCC_BUG_UNUSED(format); - - // Temporarily materialization will occur to create an object and get it's capacity. - // It's okay, because this object is lightweight - size_t size = common::PacketBufferRing(nullptr, dump_size, dump_count).capacity; + size_t size = sharedmemory::GetCapacity(ring_cfg.second); for (const auto& [socket_id, worker_count] : workers_per_socket) { @@ -1623,6 +1618,7 @@ static std::unordered_map calculate_shared_memory_size(const return shm_size_per_socket; } +//FIXME: why is this class not using class SharedMemory from common/shared_memory.h? eResult cDataPlane::allocateSharedMemory() { // shared memory size for each numa @@ -1681,8 +1677,7 @@ eResult cDataPlane::allocateSharedMemory() /// split memory per worker eResult cDataPlane::splitSharedMemoryPerWorkers() { - using sharedmemory::SharedMemoryDumpRing; - using utils::ShiftBuffer; + using namespace sharedmemory; for (cWorker* worker : workers_vector) { @@ -1701,16 +1696,15 @@ eResult cDataPlane::splitSharedMemoryPerWorkers() int ring_id = 0; for (const auto& [tag, ring_cfg] : config.shared_memory) { - const auto& [dump_size, dump_count, format] = ring_cfg; + const auto& [format, dump_size, dump_count] = ring_cfg; auto memaddr = utils::ShiftBuffer(shm, offset); + offset += GetCapacity(ring_cfg); - sharedmemory::SharedMemoryDumpRing ring(format, memaddr, dump_size, dump_count); - worker->dumpRings[ring_id] = ring; - - offset += ring.Capacity(); + worker->dump_rings[ring_id] = CreateSharedMemoryDumpRing(ring_cfg, memaddr); std::string name = "shm_" + std::to_string(core_id) + "_" + std::to_string(ring_id); + // TODO: add format here dumps_meta.emplace_back(name, tag, dump_size, dump_count, core_id, socket_id, key, offset); tag_to_id[tag] = ring_id; @@ -1719,10 +1713,10 @@ eResult cDataPlane::splitSharedMemoryPerWorkers() } auto memaddr = utils::ShiftBuffer(shm, offset); - worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{}; - offset += sizeof(dataplane::perf::tsc_deltas); + worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{}; + tscs_meta.emplace_back(core_id, socket_id, key, offset); } @@ -2204,8 +2198,6 @@ eResult cDataPlane::parseRateLimits(const nlohmann::json& json) eResult cDataPlane::parseSharedMemory(const nlohmann::json& json) { - using DumpFormat = tDataPlaneConfig::DumpFormat; - for (const auto& shmJson : json) { std::string tag = shmJson["tag"]; @@ -2219,7 +2211,7 @@ eResult cDataPlane::parseSharedMemory(const nlohmann::json& json) return eResult::invalidConfigurationFile; } - config.shared_memory[tag] = {size, count, tDataPlaneConfig::StringToDumpFormat(format_str)}; + config.shared_memory[tag] = {tDataPlaneConfig::StringToDumpFormat(format_str), size, count}; } return eResult::success; diff --git a/dataplane/meson.build b/dataplane/meson.build index 73962ecd..7f9f648c 100644 --- a/dataplane/meson.build +++ b/dataplane/meson.build @@ -3,6 +3,7 @@ dependencies += libdpdk.get_variable('dpdk_dep') dependencies += libjson.get_variable('nlohmann_json_dep') dependencies += dependency('libsystemd') dependencies += dependency('threads') +dependencies += pcapplusplus_deps sources = files('bus.cpp', 'controlplane.cpp', @@ -20,6 +21,7 @@ sources = files('bus.cpp', 'neighbor.cpp', 'report.cpp', 'sharedmemory.cpp', + 'pcap_shm_device.cpp', 'slow_worker.cpp', 'sock_dev.cpp', 'worker.cpp', diff --git a/dataplane/pcap_shm_device.cpp b/dataplane/pcap_shm_device.cpp new file mode 100644 index 00000000..912180a6 --- /dev/null +++ b/dataplane/pcap_shm_device.cpp @@ -0,0 +1,311 @@ +#include +#include +#include +#include +#include + +#include "pcap_shm_device.h" + +//TODO: replace cerr with YANET_LOG +namespace pcpp +{ + +bool PcapShmWriterDevice::RotateToNextSegment() +{ + current_segment_index_ = (current_segment_index_ + 1) % pcap_files_; + FILE* file = segments_[current_segment_index_].file; + // Move file pointer to just after the global header in the new segment + return (fseek(file, kPcapFileHeaderSize, SEEK_SET) == 0); +} + +bool PcapShmWriterDevice::FillSegments() +{ + segments_.resize(pcap_files_); + + size_t base_size = shm_size_ / pcap_files_; + size_t remainder = shm_size_ % pcap_files_; + + size_t offset = 0; + for (size_t i = 0; i < pcap_files_; ++i) + { + size_t segment_size = base_size + (i == pcap_files_ - 1 ? remainder : 0); + segments_[i].start_ptr = static_cast(shm_ptr_) + offset; + segments_[i].size = segment_size; + offset += segment_size; + + FILE* file = fmemopen(segments_[i].start_ptr, segments_[i].size, "w+"); + if (!file) + { + std::cerr << "fmemopen failed for segment " << i << std::endl; + return false; + } + + pcap_dumper_t* dumper = pcap_dump_fopen(m_PcapDescriptor.get(), file); + if (!dumper) + { + std::cerr << "pcap_dump_fopen failed for segment " << i << std::endl; + fclose(file); + return false; + } + + segments_[i].file = file; + segments_[i].dumper = dumper; + } + + return true; +} + +PcapShmWriterDevice::PcapShmWriterDevice(void* shm_ptr, size_t shm_size, size_t pcap_files, LinkLayerType link_layer_type, bool nanoseconds_precision) : + IShmWriterDevice(shm_ptr, shm_size), + link_layer_type_(link_layer_type), + pcap_files_(pcap_files), + current_segment_index_(0) +{ +#if defined(PCAP_TSTAMP_PRECISION_NANO) + precision_ = nanoseconds_precision ? FileTimestampPrecision::Nanoseconds + : FileTimestampPrecision::Microseconds; +#else + if (nanosecondsPrecision) + { + std::cerr << "PcapPlusPlus was compiled without nano precision support which requires " + "libpcap > 1.5.1. Please " + "recompile PcapPlusPlus with nano precision support to use this feature. " + "Using " + "default microsecond precision.\n"; + } + m_Precision_ = FileTimestampPrecision::Microseconds; +#endif + + // TODO: we should add this assert + /* if (m_SegmentSize <= kPcapFileHeaderSize + PCPP_MAX_PACKET_SIZE - 1) { */ + /* TMP_LOG("Segment too small to hold at least one full packet"); */ + /* throw("something"); */ + /* } */ +} + +PcapShmWriterDevice::~PcapShmWriterDevice() +{ + PcapShmWriterDevice::close(); +} + +void PcapShmWriterDevice::DumpPcapFilesToDisk(std::string_view filename_prefix) +{ + Flush(); + + size_t file_index = 1; + std::string filename; + // Allocate space for prefix + index + ".pcap" + filename.reserve(filename_prefix.size() + 10); + + for (size_t i = 0; i < pcap_files_; ++i) + { + size_t segment_index = (current_segment_index_ + 1 + i) % pcap_files_; + FILE* file = segments_[segment_index].file; + + // Not opened or already closed + if (file == nullptr) + { + continue; + } + + long used = ftell(file); + if (used < 0) + { + std::cerr << "ftell failed on segment " << i << std::endl; + continue; + } + + // If only global header is present, no packets were written. + if (static_cast(used) <= kPcapFileHeaderSize) + { + continue; + } + + filename = filename_prefix; + filename += std::to_string(file_index++) + ".pcap"; + std::ofstream output_file(filename, std::ios::binary); + if (!output_file) + { + std::cerr << "Failed to open " << filename << " for writing" << std::endl; + continue; + } + + output_file.write(reinterpret_cast(segments_[segment_index].start_ptr), used); + if (output_file.bad()) + { + std::cerr << "Error writing to file " << filename << std::endl; + continue; + } + } +} + +bool PcapShmWriterDevice::open() +{ + if (m_DeviceOpened) + { + return true; + } + + switch (link_layer_type_) + { + case LINKTYPE_RAW: + case LINKTYPE_DLT_RAW2: + std::cerr << "The only Raw IP link type supported in libpcap/WinPcap/Npcap is " + "LINKTYPE_DLT_RAW1, please use that instead\n"; + return false; + default: + break; + } + +#if defined(PCAP_TSTAMP_PRECISION_NANO) + m_PcapDescriptor = internal::PcapHandle(pcap_open_dead_with_tstamp_precision( + link_layer_type_, PCPP_MAX_PACKET_SIZE - 1, static_cast(precision_))); +#else + m_PcapDescriptor = + internal::PcapHandle(pcap_open_dead(m_LinkLayerType_, PCPP_MAX_PACKET_SIZE - 1)); +#endif + if (m_PcapDescriptor == nullptr) + { + std::cerr << "Error opening pcap descriptor: pcap_open_dead returned nullptr" + << std::endl; + return false; + } + + if (!FillSegments()) + { + return false; + } + + current_segment_index_ = 0; + m_DeviceOpened = true; + return true; +} + +bool PcapShmWriterDevice::WritePacket(RawPacket const& packet) +{ + if (!m_DeviceOpened) + { + std::cerr << "Device not opened" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + if (packet.getLinkLayerType() != link_layer_type_) + { + std::cerr << "Cannot write a packet with a different link layer type" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + pcap_pkthdr pkt_hdr; + pkt_hdr.caplen = packet.getRawDataLen(); + pkt_hdr.len = packet.getFrameLength(); + + timespec packet_timestamp = packet.getPacketTimeStamp(); +#if defined(PCAP_TSTAMP_PRECISION_NANO) + if (precision_ != FileTimestampPrecision::Nanoseconds) + { + TIMESPEC_TO_TIMEVAL(&pkt_hdr.ts, &packet_timestamp); + } + else + { + pkt_hdr.ts.tv_sec = packet_timestamp.tv_sec; + pkt_hdr.ts.tv_usec = packet_timestamp.tv_nsec; + } +#else + TIMESPEC_TO_TIMEVAL(&pkt_hdr.ts, &packet_timestamp); +#endif + + // kPcapPacketHeaderSizeOnDisk is different from sizeof(pcap_pkthdr) + size_t needed = kPcapPacketHeaderSizeOnDisk + pkt_hdr.caplen; + + FILE* file = segments_[current_segment_index_].file; + long used = ftell(file); + if (used < 0) + { + std::cerr << "ftell failed on current segment" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + size_t available = segments_[current_segment_index_].size - used; + if (needed > available) + { + if (!RotateToNextSegment()) + { + std::cerr << "fseek failed when rotating to next segment" << std::endl; + ++num_of_packets_not_written_; + return false; + } + file = segments_[current_segment_index_].file; + } + + pcap_dump(reinterpret_cast(segments_[current_segment_index_].dumper), &pkt_hdr, packet.getRawData()); + ++num_of_packets_written_; + return true; +} + +bool PcapShmWriterDevice::WritePackets(RawPacketVector const& packets) +{ + for (RawPacket const* packet : packets) + { + if (!WritePacket(*packet)) + return false; + } + return true; +} + +void PcapShmWriterDevice::Flush() +{ + if (!m_DeviceOpened) + return; + + for (auto& seg : segments_) + { + if (seg.dumper != nullptr && pcap_dump_flush(seg.dumper) == -1) + { + std::cerr << "Error while flushing the packets to shared memory" << std::endl; + } + } + + for (auto& seg : segments_) + { + if (seg.file != nullptr && fflush(seg.file) == EOF) + { + std::cerr << "Error while flushing the packets to file" << std::endl; + } + } +} + +void PcapShmWriterDevice::close() +{ + if (!m_DeviceOpened) + return; + + Flush(); + + for (auto& [ptr, size, file, dumper] : segments_) + { + if (dumper != nullptr) + { + // pcap_dump_close closes both the dumper and the FILE* + pcap_dump_close(dumper); + ptr = nullptr; + size = 0; + dumper = nullptr; + file = nullptr; + } + } + + m_PcapDescriptor.reset(); + m_DeviceOpened = false; +} + +void PcapShmWriterDevice::getStatistics(PcapStats& stats) const +{ + stats.packetsRecv = num_of_packets_written_; + stats.packetsDrop = num_of_packets_not_written_; + stats.packetsDropByInterface = 0; +} + +} // namespace pcpp diff --git a/dataplane/pcap_shm_device.h b/dataplane/pcap_shm_device.h new file mode 100644 index 00000000..595a8a7f --- /dev/null +++ b/dataplane/pcap_shm_device.h @@ -0,0 +1,233 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "PcapDevice.h" +#include "PcapFileDevice.h" + +// The PcapPlus Plus namespace. +namespace pcpp +{ + +/** + * @brief An abstract class representing a shared memory device for pcap data. + * + * This device provides a pcap-compatible interface for reading/writing packets, + * but the underlying storage is a shared memory region rather than a file or a live network + * interface. + * + * Derived classes must implement device-specific logic for reading/writing packets. + */ +class IShmDevice : public IPcapDevice +{ +protected: + void* shm_ptr_; + size_t shm_size_; + + explicit IShmDevice(void* shm_ptr, size_t shm_size) : + IPcapDevice(), shm_ptr_(shm_ptr), shm_size_(shm_size) {} + + ~IShmDevice() override + { + close(); + } + +public: + /** + * @return Pointer to the underlying shared memory region. + */ + [[nodiscard]] void* GetShmPtr() const + { + return shm_ptr_; + } + + /** + * @return The size of the shared memory region in bytes. + */ + [[nodiscard]] size_t GetShmSize() const + { + return shm_size_; + } + + /** + * @brief Close the device. + * + * This will release any pcap resources associated with it. + */ + void close() override + { + if (m_PcapDescriptor != nullptr) + { + m_PcapDescriptor = nullptr; + } + m_DeviceOpened = false; + } +}; + +/** + * @brief An abstract class for shared memory writer devices. + * + * A writer device provides methods to write packets into the shared memory region. + * These packets can later be read or dumped to disk by other utilities. + */ +class IShmWriterDevice : public IShmDevice +{ +protected: + uint32_t num_of_packets_written_; + uint32_t num_of_packets_not_written_; + + IShmWriterDevice(void* shm_ptr, size_t shm_size) : + IShmDevice(shm_ptr, shm_size), + num_of_packets_written_(0), + num_of_packets_not_written_(0) {} + +public: + ~IShmWriterDevice() override = default; + + /** + * @brief Write a single RawPacket into the shared memory. + * + * @param[in] packet The packet to write. + * + * @return True if the packet was written successfully, false otherwise. + */ + [[nodiscard]] virtual bool WritePacket(RawPacket const& packet) = 0; + + /** + * @brief Write multiple RawPackets into the shared memory. + * + * @param[in] packets A vector of packet pointers to be written. + * + * @return True if all packets were written successfully, false otherwise. + */ + [[nodiscard]] virtual bool WritePackets(RawPacketVector const& packets) = 0; +}; + +/** + * @brief A class for writing packets to a shared memory region in pcap format, using a ring-buffer + * approach. + * + * The objective is to enable continuous packet capture while utilizing a limited amount of memory. + * The approach adopted here is inspired by Wireshark's "multiple files, ring buffer" feature: + * + * Multiple files, ring buffer: + * "Much like 'Multiple files continuous', reaching one of the multiple files switch conditions + * (one of the 'Next file every …' values) will switch to the next file. This will be a newly + * created file if the value of 'Ring buffer with n files' is not reached; otherwise, it will + * replace the oldest of the formerly used files (thus forming a 'ring'). + * + * This mode will limit the maximum disk usage, even for an unlimited amount of capture input data, + * only keeping the latest captured data." + * (Source: https://www.wireshark.org/docs/wsug_html_chunked/ChCapCaptureFiles.html) + * + * **Algorithm Behind Ring-Buffer Writing:** + * The shared memory region is divided into multiple segments (each representing a 'virtual pcap + * file'). Packets are written sequentially into the current segment. If there isn't enough space + * for a new packet, the writer 'rotates' to the next segment. + * - Suppose you have N segments. + * - You write packets into segment 1 until it's almost full. + * - If you can't fit a new packet, you move to segment 2, and continue writing there. + * - Once you reach segment N and still have more packets, you wrap around to segment 1 again, + * overwriting old data. + * + * After all packets are written, `DumpPcapFilesToDisk()` can be used to extract each segment + * into a standalone pcap file. + */ +class PcapShmWriterDevice : public IShmWriterDevice +{ + LinkLayerType link_layer_type_; + FileTimestampPrecision precision_; + + size_t pcap_files_; ///< Number of pcap segments + size_t current_segment_index_; ///< Current segment index we're writing to + + struct SegmentInfo + { + void* start_ptr; ///< Pointer to the start of this segment in shared memory + size_t size; ///< Size of the segment + FILE* file; ///< FILE stream for this pcap segment + pcap_dumper_t* dumper; ///< pcap dumper for this pcap segment + }; + + std::vector segments_; + + // Prevent copying + PcapShmWriterDevice(PcapShmWriterDevice const&) = delete; + PcapShmWriterDevice& operator=(PcapShmWriterDevice const&) = delete; + + /** + * @brief Rotate to the next segment if the current one doesn't have enough space. + * + * @return True if successful, false if fseek fails. + */ + bool RotateToNextSegment(); + + /** + * @brief Distribute the shared memory into multiple segments and initialize them as in-memory + * pcap 'files'. + * + * This method divides the shared memory region into pcap_files_ segments, + * ensuring all available memory is utilized. Each segment will have an equal base size, + * except for the last segment which includes any remainder bytes. It then opens each segment as + * an in-memory pcap 'file'. + * + * @return True if all segments were successfully initialized, false otherwise. + */ + bool FillSegments(); + +public: + static constexpr size_t kPcapPacketHeaderSizeOnDisk = 16; + static constexpr size_t kPcapFileHeaderSize = 24; + + /** + * @brief Constructor for PcapShmWriterDevice + * + * @param[in] shmPtr Pointer to the shared memory region. + * @param[in] shmSize Size of the shared memory region. + * @param[in] pcapFiles Number of 'pcap segments' to divide the shared memory into. + * @param[in] linkLayerType The link layer type all packets in this region will be based on. The + * default is Ethernet. + * @param[in] nanosecondsPrecision A boolean indicating whether to write timestamps in + * nano-precision. If set to false, timestamps will be written in micro-precision. + */ + PcapShmWriterDevice(void* shm_ptr, size_t shm_size, size_t pcap_files, LinkLayerType link_layer_type = LINKTYPE_ETHERNET, bool nanoseconds_precision = false); + + ~PcapShmWriterDevice() override; + + /** + * @brief Dump each pcap segment from shared memory to a file on disk. + * + * @param filenamePrefix The prefix for the output pcap files, e.g. "capture_" + * will produce "capture_1.pcap", "capture_2.pcap", etc. + */ + void DumpPcapFilesToDisk(std::string_view filename_prefix); + + bool open() override; + + bool WritePacket(RawPacket const& packet) override; + + bool WritePackets(RawPacketVector const& packets) override; + + /** + * @brief Flush all pending writes to the shared memory segments. + */ + void Flush(); + + /** + * @brief Close the device and free associated resources. + */ + void close() override; + + /** + * @brief Get statistics for packets written so far. + * + * @param[out] stats The PcapStats structure to fill. + */ + void getStatistics(PcapStats& stats) const override; +}; + +} // namespace pcpp diff --git a/dataplane/sharedmemory.cpp b/dataplane/sharedmemory.cpp index 64104a30..ff0239f0 100644 --- a/dataplane/sharedmemory.cpp +++ b/dataplane/sharedmemory.cpp @@ -1,58 +1,97 @@ #include "sharedmemory.h" #include "common/type.h" +#include "common/utils.h" #include "metadata.h" -using namespace sharedmemory; +#include "MBufRawPacket.h" -SharedMemoryDumpRing::SharedMemoryDumpRing(DumpFormat format, void* memory, size_t dump_size, size_t dump_count) : - format_(format) +namespace sharedmemory { - switch (format_) - { - case DumpFormat::kPcap: - // init somehow with pcaps - break; - - case DumpFormat::kRaw: - buffer = common::PacketBufferRing(memory, dump_size, dump_count); - capacity_ = buffer.capacity; - buffer.ring->header.before = 0; - buffer.ring->header.after = 0; - - break; - default: - YANET_THROW("Wrong shared memory dump format"); - } +DumpRingRaw::DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count) : + buffer_(memory, max_pkt_size, pkt_count), ring_(buffer_.ring) +{ + ring_->header.before = 0; + ring_->header.after = 0; } -void SharedMemoryDumpRing::write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) +void DumpRingRaw::Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, [[maybe_unused]] uint32_t time) { // Each ring has its own header, the header contains absolute position // to which next packet should be written. Position has two state: // -- "before" increments immediately before of copying data to memory; // -- "after" increments after copying data. - uint64_t wpos = (buffer.ring->header.before) % buffer.units_number; - buffer.ring->header.before++; - auto* item = (item_t*)((uintptr_t)buffer.ring->memory + (wpos * buffer.unit_size)); + uint64_t wpos = (ring_->header.before) % buffer_.units_number; + ring_->header.before++; + auto* item = utils::ShiftBuffer(ring_->memory, wpos * buffer_.unit_size); dataplane::metadata* metadata = YADECAP_METADATA(mbuf); - uint64_t memory_size = buffer.unit_size - sizeof(ring_header_t); + uint64_t memory_size = buffer_.unit_size - sizeof(ring_header_t); uint64_t copy_size = RTE_MIN(memory_size, mbuf->data_len); item->header.size = copy_size; item->header.tag = metadata->hash; item->header.in_logicalport_id = metadata->in_logicalport_id; item->header.out_logicalport_id = metadata->out_logicalport_id; - item->header.flow_type = (uint8_t)flow_type; + item->header.flow_type = static_cast(flow_type); - memcpy(item->memory, - rte_pktmbuf_mtod(mbuf, void*), - copy_size); + memcpy(item->memory, rte_pktmbuf_mtod(mbuf, void*), copy_size); YANET_MEMORY_BARRIER_COMPILE; - buffer.ring->header.after++; + ring_->header.after++; +} + +size_t DumpRingRaw::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + return PacketBufferRing::GetCapacity(max_pkt_size, pkt_count); +} + +// TODO: use max_pkt_size as snaplen in pcap? +DumpRingPcap::DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count) : + dev_(memory, GetCapacity(max_pkt_size, pkt_count), 3) +{ + // TODO: Don't know how yet, but we need to pass files amount. Let's do three by now. +} + +/** + * @brief A complete copy of the PcapPlusPlus wrapper of the RawPacket class. + * + * This class allows initialization with an already-created mbuf, making it + * possible to safely pass the object to a Writer instance as the base class + * RawPacket. In the original `MBufRawPacket` class, the `setMBuf` method + * was protected, but it has been incorporated into a new constructor. + */ +struct MBufRawPacketCopy : public pcpp::MBufRawPacket +{ + using MBufRawPacket::MBufRawPacket; + + MBufRawPacketCopy(rte_mbuf* mBuf, const timespec& timestamp) : + MBufRawPacket() + { + setMBuf(mBuf, timestamp); + } +}; + +void DumpRingPcap::Write(rte_mbuf* mbuf, [[maybe_unused]] common::globalBase::eFlowType flow_type, uint32_t time) +{ + timespec ts = {.tv_sec = time, .tv_nsec = 0}; + MBufRawPacketCopy raw_packet(mbuf, ts); + + // TODO: can I do this, or should I use time obtained from basePermanently.globalBaseAtomic->currentTime? + /* timespec_get(&ts, TIME_UTC); */ + + dev_.WritePacket(raw_packet); +} + +size_t DumpRingPcap::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + auto& file_hdr_size = pcpp::PcapShmWriterDevice::kPcapFileHeaderSize; + auto& pkt_hdr_size = pcpp::PcapShmWriterDevice::kPcapPacketHeaderSizeOnDisk; + + return file_hdr_size + (pkt_hdr_size + max_pkt_size) * pkt_count; } + +} // namespace sharedmemory diff --git a/dataplane/sharedmemory.h b/dataplane/sharedmemory.h index e1fd8d54..387c0313 100644 --- a/dataplane/sharedmemory.h +++ b/dataplane/sharedmemory.h @@ -1,4 +1,5 @@ #pragma once +//TODO: RENAME TO dump_rings.h #include @@ -6,36 +7,84 @@ #include "common/type.h" #include "config.h" +#include "pcap_shm_device.h" namespace sharedmemory { - -using ring_header_t = common::PacketBufferRing::ring_header_t; -using ring_t = common::PacketBufferRing::ring_t; -using item_header_t = common::PacketBufferRing::item_header_t; -using item_t = common::PacketBufferRing::item_t; using DumpFormat = tDataPlaneConfig::DumpFormat; +using DumpConfig = tDataPlaneConfig::DumpConfig; + +struct DumpRingBase +{ + virtual ~DumpRingBase(); -class SharedMemoryDumpRing + virtual void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) = 0; +}; + +class DumpRingRaw : public DumpRingBase { - DumpFormat format_; - size_t capacity_; + using PacketBufferRing = common::PacketBufferRing; + using ring_t = PacketBufferRing::ring_t; + using item_t = PacketBufferRing::item_t; + using ring_header_t = PacketBufferRing::ring_header_t; + + PacketBufferRing buffer_; + ring_t* ring_; public: - SharedMemoryDumpRing() : - format_(DumpFormat::kRaw), capacity_(0) {} + DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count); - SharedMemoryDumpRing(DumpFormat format, void* memory, size_t dump_size, size_t dump_count); + ~DumpRingRaw() override = default; - void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type); + void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) override; + + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); +}; + +class DumpRingPcap : public DumpRingBase +{ + pcpp::PcapShmWriterDevice dev_; + +public: + DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count); - // FIXME: make it private. I've made it public to simplify hexdump code - common::PacketBufferRing buffer; + ~DumpRingPcap() override = default; - [[nodiscard]] size_t Capacity() const + void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) override; + + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); +}; + +inline size_t GetCapacity(const DumpConfig& config) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) { - return capacity_; + case DumpFormat::kRaw: + return DumpRingRaw::GetCapacity(max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return DumpRingPcap::GetCapacity(max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); } -}; +} + +inline std::unique_ptr CreateSharedMemoryDumpRing(const DumpConfig& config, void* memory) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) + { + case DumpFormat::kRaw: + return std::make_unique(memory, max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return std::make_unique(memory, max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); + } +} } // namespace sharedmemory diff --git a/dataplane/worker.h b/dataplane/worker.h index d67f38d1..fec58c90 100644 --- a/dataplane/worker.h +++ b/dataplane/worker.h @@ -358,7 +358,8 @@ class cWorker // will decrease with each new packet sent to slow worker, replenishes each N mseconds int32_t packetsToSWNPRemainder; - sharedmemory::SharedMemoryDumpRing dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER]; + using DumpRingBasePtr = std::unique_ptr; + std::array dump_rings; samples::Sampler sampler;