diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index dee6c5a2..d4023749 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -15,12 +15,19 @@ jobs:
strategy:
fail-fast: false
matrix:
- image: ['ubuntu:18.04', 'ubuntu:20.04', 'debian:stretch', 'debian:buster', 'debian:bullseye', 'centos:7', 'centos:8']
+ image:
+ - 'ubuntu:18.04'
+ - 'ubuntu:20.04'
+ - 'debian:stretch'
+ - 'debian:buster'
+ - 'debian:bullseye'
+ - 'centos:7'
+ - 'quay.io/centos/centos:stream8'
+ - 'oraclelinux:8'
name: Build on ${{ matrix.image }}
container: ${{ matrix.image }}
steps:
- - uses: actions/checkout@v1
# Dependencies ---------------------------------------------------------------------------
- name: Install dependencies for libfds and IPFIXcol2 (Ubuntu/Debian)
@@ -32,23 +39,33 @@ jobs:
apt-get -y install librdkafka-dev
env:
DEBIAN_FRONTEND: noninteractive
- - name: Enable additional repositories (CentOS 8)
- if: startsWith(matrix.image, 'centos:8')
+ - name: Enable additional repositories (CentOS Steam)
+ if: contains(matrix.image, 'centos:stream')
run: |
dnf -y install 'dnf-command(config-manager)'
dnf config-manager --set-enabled appstream powertools
- - name: Install dependencies for libfds and IPFIXcol2 (CentOS)
- if: startsWith(matrix.image, 'centos')
+ - name: Enable additional repositories (Oracle Linux)
+ if: contains(matrix.image, 'oraclelinux')
+ run: |
+ dnf -y install 'dnf-command(config-manager)'
+ dnf config-manager --set-enabled ol8_appstream ol8_codeready_builder
+ - name: Enable EPEL (CentOS)
+ if: contains(matrix.image, 'centos')
run: |
yum -y install epel-release
+ - name: Enable EPEL (OracleLinux)
+ if: contains(matrix.image, 'oraclelinux')
+ run: |
+ dnf -y install oracle-epel-release-el8
+ - name: Install dependencies for libfds and IPFIXcol2 (CentOS, Oracle Linux)
+ if: contains(matrix.image, 'centos') || contains(matrix.image, 'oraclelinux')
+ run: |
yum -y install git gcc gcc-c++ cmake make libxml2-devel lz4-devel libzstd-devel
yum -y install zlib-devel pkgconfig librdkafka-devel
yum -y install python3-docutils || yum -y install python-docutils
- - name: Install dependencies for libfds and IPFIXcol2 (Fedora)
- if: startsWith(matrix.image, 'fedora')
- run: |
- dnf -y install git gcc gcc-c++ cmake make libxml2-devel lz4-devel libzstd-devel
- dnf -y install python3-docutils zlib-devel pkgconfig librdkafka-devel
+
+ # Checkout repository --------------------------------------------------------------------
+ - uses: actions/checkout@v2
# Build libfds library ------------------------------------------------------------------
# Note: Master against master branch. Otherwise against debug branch.
diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml
index 30170954..8fc28285 100644
--- a/.github/workflows/packages.yml
+++ b/.github/workflows/packages.yml
@@ -22,13 +22,19 @@ jobs:
container: ${{ matrix.image }}
steps:
- - uses: actions/checkout@v1
- - name: Define global variables
- run: echo "::set-output name=zip_file::ipfixcol2-${IMAGE//:/}-$GITHUB_SHA.zip"
- shell: bash
- env:
- IMAGE: ${{ matrix.image }}
- id: vars
+ - uses: actions/checkout@v2
+ - name: Define variables
+ uses: actions/github-script@v5
+ with:
+ script: |
+ const sha = context.sha.substring(0, 8);
+ const image = `${{ matrix.image }}`
+ const distro = image.split('/').pop().replace(/:/g,'_');
+ const zip = `ipfixcol2-${distro}-${sha}`;
+ core.exportVariable('ZIP_FILE', zip);
+ - name: Prepare environment
+ run: |
+ mkdir -p build/libfds_repo
# Dependencies ---------------------------------------------------------------------------
- name: Install dependencies for libfds and IPFIXcol2 (Ubuntu/Debian)
@@ -67,17 +73,16 @@ jobs:
ipfixcol2 -V
ipfixcol2 -h
ipfixcol2 -L
- - name: Pack IPFIXcol2 DEB packages
- working-directory: 'build/pkg/deb/debbuild/'
- run: zip "$GITHUB_WORKSPACE/$ZIP_FILE" *.deb *.ddeb *.tar.gz *.dsc
- env:
- ZIP_FILE: ${{ steps.vars.outputs.zip_file }}
- name: Archive DEB packages
if: github.event_name == 'push'
- uses: actions/upload-artifact@v1
+ uses: actions/upload-artifact@v2
with:
- name: ${{ steps.vars.outputs.zip_file }}
- path: ${{ steps.vars.outputs.zip_file }}
+ name: ${{ env.ZIP_FILE }}
+ path: |
+ build/pkg/deb/debbuild/*.deb
+ build/pkg/deb/debbuild/*.ddeb
+ build/pkg/deb/debbuild/*.tar.gz
+ build/pkg/deb/debbuild/*.dsc
rpm:
# Try to build RPM packages
@@ -85,39 +90,56 @@ jobs:
strategy:
fail-fast: false
matrix:
- image: ['centos:7', 'centos:8']
+ image:
+ - 'centos:7'
+ - 'quay.io/centos/centos:stream8'
+ - 'oraclelinux:8'
name: Build RPMs on ${{ matrix.image }}
container: ${{ matrix.image }}
steps:
- - uses: actions/checkout@v1
- - name: Prepare environment and variables
+ - name: Define variables
+ uses: actions/github-script@v5
+ with:
+ script: |
+ const sha = context.sha.substring(0, 8);
+ const image = `${{ matrix.image }}`
+ const distro = image.split('/').pop().replace(/:/g,'_');
+ const zip = `ipfixcol2-${distro}-${sha}`;
+ core.exportVariable('ZIP_FILE', zip);
+ - name: Prepare environment
run: |
- echo "::set-output name=zip_file::ipfixcol2-${IMAGE//:/}-$GITHUB_SHA.zip"
mkdir -p build/libfds_repo
- env:
- IMAGE: ${{ matrix.image }}
- id: vars
# Dependencies ---------------------------------------------------------------------------
- - name: Enable additional repositories (CentOS 8)
- if: startsWith(matrix.image, 'centos:8')
+ - name: Enable additional repositories (CentOS Stream)
+ if: contains(matrix.image, 'centos:stream')
run: |
dnf -y install 'dnf-command(config-manager)'
dnf config-manager --set-enabled appstream powertools
- - name: Install dependencies for libfds and IPFIXcol2 (CentOS)
- if: startsWith(matrix.image, 'centos')
+ - name: Enable additional repositories (Oracle Linux)
+ if: contains(matrix.image, 'oraclelinux')
+ run: |
+ dnf -y install 'dnf-command(config-manager)'
+ dnf config-manager --set-enabled ol8_appstream ol8_codeready_builder
+ - name: Enable EPEL (CentOS)
+ if: contains(matrix.image, 'centos')
run: |
yum -y install epel-release
+ - name: Enable EPEL (OracleLinux)
+ if: contains(matrix.image, 'oraclelinux')
+ run: |
+ dnf -y install oracle-epel-release-el8
+ - name: Install dependencies for libfds and IPFIXcol2 (CentOS, Oracle Linux)
+ if: contains(matrix.image, 'centos') || contains(matrix.image, 'oraclelinux')
+ run: |
yum -y install git gcc gcc-c++ cmake make libxml2-devel lz4-devel libzstd-devel
yum -y install zlib-devel pkgconfig rpm-build librdkafka-devel
yum -y install python3-docutils || yum -y install python-docutils
- - name: Install depedencies for libfds and IPFIXcol2 (Fedora)
- if: startsWith(matrix.image, 'fedora')
- run: |
- dnf -y install git gcc gcc-c++ cmake make libxml2-devel lz4-devel libzstd-devel
- dnf -y install python3-docutils zlib-devel pkgconfig rpm-build librdkafka-devel
+
+ # Checkout repository --------------------------------------------------------------------
+ - uses: actions/checkout@v2
# Build LIBFDS RPM package ---------------------------------------------------------------
- name: Checkout libfds library - master branch
@@ -146,14 +168,11 @@ jobs:
ipfixcol2 -V
ipfixcol2 -h
ipfixcol2 -L -v
- - name: Pack IPFIXcol2 RPM packages
- working-directory: 'build/pkg/rpm/rpmbuild'
- run: zip -r "$GITHUB_WORKSPACE/$ZIP_FILE" RPMS SRPMS
- env:
- ZIP_FILE: ${{ steps.vars.outputs.zip_file }}
- name: Archive RPM packages
if: github.event_name == 'push'
- uses: actions/upload-artifact@v1
+ uses: actions/upload-artifact@v2
with:
- name: ${{ steps.vars.outputs.zip_file }}
- path: ${{ steps.vars.outputs.zip_file }}
+ name: ${{ env.ZIP_FILE }}
+ path: |
+ build/pkg/rpm/rpmbuild/RPMS/
+ build/pkg/rpm/rpmbuild/SRPMS/
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e9d5e5ea..8dd95de3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -14,7 +14,7 @@ endif()
# Versions and other informations
set(IPFIXCOL_VERSION_MAJOR 2)
-set(IPFIXCOL_VERSION_MINOR 2)
+set(IPFIXCOL_VERSION_MINOR 3)
set(IPFIXCOL_VERSION_PATCH 0)
set(IPFIXCOL_VERSION
${IPFIXCOL_VERSION_MAJOR}.${IPFIXCOL_VERSION_MINOR}.${IPFIXCOL_VERSION_PATCH})
diff --git a/README.rst b/README.rst
index ef3b3b17..d3df01b1 100644
--- a/README.rst
+++ b/README.rst
@@ -86,15 +86,23 @@ Second, install build dependencies of the collector
yum install gcc gcc-c++ cmake make python3-docutils zlib-devel librdkafka-devel
# Optionally: doxygen pkgconfig
-* Note: latest systems (e.g. Fedora/CentOS 8) use ``dnf`` instead of ``yum``.
+* Note: latest systems (e.g. Fedora/CentOS Stream 8) use ``dnf`` instead of ``yum``.
* Note: package ``python3-docutils`` may by also named as ``python-docutils`` or ``python2-docutils``
* Note: package ``pkgconfig`` may by also named as ``pkg-config``
-* Note: CentOS 8 requires additional system repositories (``appstream`` and ``powertools``) to be enabled:
+* Note: CentOS Stream 8 usually requires additional system repositories to be enabled:
.. code-block::
+ dnf -y install epel-release
dnf config-manager --set-enabled appstream powertools
+* Note: Oracle Linux 8 usually requires additional system repositories to be enabled:
+
+.. code-block::
+
+ dnf -y install oracle-epel-release-el8
+ dnf config-manager --set-enabled ol8_appstream ol8_codeready_builder
+
**Debian/Ubuntu:**
.. code-block::
diff --git a/extra_plugins/output/unirec/README.rst b/extra_plugins/output/unirec/README.rst
index 39d383ae..36cab628 100644
--- a/extra_plugins/output/unirec/README.rst
+++ b/extra_plugins/output/unirec/README.rst
@@ -123,6 +123,10 @@ Parameters
Specification of interface type and its parameters. For more details, see section
"Output interface types".
+:``mappingFile``:
+ Path to configuration file with mapping IPFIX fields to UniRec fields. If the parameter is
+ not defined, the default configuration file is used. See section "UniRec configuration file".
+
Output interface types
----------------------
Exactly one of the following output type must be defined in the instance configuration of this
diff --git a/extra_plugins/output/unirec/config/unirec-elements.txt b/extra_plugins/output/unirec/config/unirec-elements.txt
index 74b18fdd..f6f97dcd 100644
--- a/extra_plugins/output/unirec/config/unirec-elements.txt
+++ b/extra_plugins/output/unirec/config/unirec-elements.txt
@@ -24,34 +24,42 @@
SRC_IP ipaddr e0id8,e0id27 # IPv4 or IPv6 source address
DST_IP ipaddr e0id12,e0id28 # IPv4 or IPv6 destination address
SRC_PORT uint16 e0id7 # Transport protocol source port
-DST_PORT uint16 e0id11 # Transport protocol destination port
+DST_PORT uint16 e0id11,e0id32 # Transport protocol destination port or ICMP type/code
PROTOCOL uint8 e0id4 # Transport protocol
TCP_FLAGS uint8 e0id6 # TCP flags
BYTES uint64 e0id1 # Number of bytes in flow
PACKETS uint32 e0id2 # Number of packets in flow
TTL uint8 e0id192 # IP time to live
+TTL_REV uint8 e29305id192 # IP time to live rev
TOS uint8 e0id5 # IP type of service
TIME_FIRST time e0id150,e0id152,e0id154,e0id156 # Time of the first packet of a flow
TIME_LAST time e0id151,e0id153,e0id155,e0id157 # Time of the last packet of a flow
DIR_BIT_FIELD uint8 _internal_dbf_ # Bit field used for determining incoming/outgoing flow (1 => Incoming, 0 => Outgoing)
LINK_BIT_FIELD uint64 _internal_lbf_ # Bit field of links on which was flow seen
+
SRC_MAC macaddr e0id56
DST_MAC macaddr e0id80
+FLOW_END_REASON uint8 iana:flowEndReason # Reason of exporting the flow record, see https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-flow-end-reason
+
# --- Additional biflow fields ---
BYTES_REV uint64 e29305id1
PACKETS_REV uint32 e29305id2
TCP_FLAGS_REV uint8 e29305id6
+# --- Additional TCP fields ---
+TCP_SYN_SIZE uint8 flowmon:tcpSynSize
+TCP_SYN_TTL uint8 flowmon:tcpSynTtl
+
# --- DNS specific fields ---
DNS_ANSWERS uint16 cesnet:DNSAnswers # DNS answers
DNS_RCODE uint8 cesnet:DNSRCode # DNS rcode
-DNS_NAME string cesnet:DNSName # DNS name
-DNS_QTYPE uint16 cesnet:DNSQType # DNS qtype
-DNS_CLASS uint16 cesnet:DNSClass # DNS class
+DNS_Q_NAME string cesnet:DNSName # DNS name
+DNS_Q_TYPE uint16 cesnet:DNSQType # DNS qtype
+DNS_Q_CLASS uint16 cesnet:DNSClass # DNS class
DNS_RR_TTL uint32 cesnet:DNSRRTTL # DNS rr ttl
-DNS_RLENGTH uint16 cesnet:DNSRDataLength # DNS rlenght
-DNS_RDATA bytes cesnet:DNSRData # DNS rdata
+DNS_RR_RLENGTH uint16 cesnet:DNSRDataLength # DNS rlenght
+DNS_RR_RDATA bytes cesnet:DNSRData # DNS rdata
DNS_PSIZE uint16 cesnet:DNSPSize # DNS payload size
DNS_DO uint8 cesnet:DNSRDO # DNS DNSSEC OK bit
DNS_ID uint16 cesnet:DNSTransactionID # DNS transaction id
@@ -118,14 +126,7 @@ FME_SIP_CALLED_PARTY string flowmon:sipCalledParty
FME_SIP_VIA string flowmon:sipVia # SIP VIA
# --- HTTP elements ---
-HTTP_REQUEST_METHOD_ID uint32 e16982id500 # HTTP request method id
-HTTP_REQUEST_HOST string e16982id501 # HTTP(S) request host
-HTTP_REQUEST_URL string e16982id502 # HTTP request url
-HTTP_REQUEST_AGENT_ID uint32 e16982id503 # HTTP request agent id
-HTTP_REQUEST_AGENT string e16982id504 # HTTP request agent
-HTTP_REQUEST_REFERER string e16982id505 # HTTP referer
-HTTP_RESPONSE_STATUS_CODE uint32 e16982id506 # HTTP response status code
-HTTP_RESPONSE_CONTENT_TYPE string e16982id507 # HTTP response content type
+HTTP_REQUEST_METHOD_ID string cesnet:httpMethod # HTTP request method
FME_HTTP_UA_OS uint16 flowmon:httpUaOs
FME_HTTP_UA_OS_MAJ uint16 flowmon:httpUaOsMaj
@@ -135,11 +136,14 @@ FME_HTTP_UA_APP uint16 flowmon:httpUaApp
FME_HTTP_UA_APP_MAJ uint16 flowmon:httpUaAppMaj
FME_HTTP_UA_APP_MIN uint16 flowmon:httpUaAppMin
FME_HTTP_UA_APP_BLD uint16 flowmon:httpUaAppBld
+HTTP_REQUEST_REFERER string flowmon:httpReferer
+HTTP_RESPONSE_CONTENT_TYPE string flowmon:httpContentType
FME_HTTP_METHOD_MASK uint16 flowmon:httpMethodMask
-FME_HTTP_REQUEST_HOST string flowmon:httpHost # HTTP(S) request host
-FME_HTTP_REQUEST_URL string flowmon:httpUrl # HTTP request url
-FME_HTTP_RESPONSE_STATUS_CODE uint32 flowmon:httpStatusCode # HTTP response status code
-FME_HTTP_REQUEST_USER_AGENT string flowmon:httpUserAgent
+HTTP_REQUEST_HOST string flowmon:httpHost # HTTP(S) request host
+HTTP_REQUEST_URL string flowmon:httpUrl # HTTP request url
+HTTP_RESPONSE_STATUS_CODE uint32 flowmon:httpStatusCode # HTTP response status code
+HTTP_REQUEST_AGENT string flowmon:httpUserAgent
+
# --- Other fields ---
IPV6_TUN_TYPE uint8 e16982id405 # IPv6 tunnel type
@@ -173,10 +177,13 @@ FME_TLS_VALIDITY_NOTAFTER int64 flowmon:tlsValidityNotAfter
FME_TLS_SIGNATURE_ALG uint16 flowmon:tlsSignatureAlg # tlsSignatureAlg
FME_TLS_PUBLIC_KEYALG uint16 flowmon:tlsPublicKeyAlg # tlsPublicKeyAlg
FME_TLS_PUBLIC_KEYLENGTH int32 flowmon:tlsPublicKeyLength # tlsPublicKeyLength
-FME_TLS_JA_3FINGERPRINT bytes flowmon:tlsJa3Fingerprint # tlsJa3Fingerprint
TLS_SNI string cesnet:TLSSNI # Server Name Indication https://en.wikipedia.org/wiki/Server_Name_Indication
-TLS_JA_3FINGERPRINT bytes cesnet:tlsJa3Fingerprint # tlsJa3Fingerprint
+TLS_JA3_FINGERPRINT bytes flowmon:tlsJa3Fingerprint # tlsJa3Fingerprint
+
+QUIC_SNI string cesnet:quicSNI # Server Name Indication from QUIC
+QUIC_USER_AGENT string cesnet:quicUserAgent # User-Agent value extracted from decrypted QUIC header
+QUIC_VERSION uint32 cesnet:quicVersion # Version of QUIC protocol extracted from decrypted QUIC header
# --- Per-Packet Information elements ---
PPI_PKT_LENGTHS uint16* e0id291/cesnet:packetLength # basicList of packet lengths
@@ -185,15 +192,15 @@ PPI_PKT_FLAGS uint8* e0id291/cesnet:packetFlag
PPI_PKT_DIRECTIONS int8* e0id291/cesnet:packetDirection # basicList of packet directions
# --- SSDP Information elements ---
-SSDP_LOCATION_PORT uint16 cesnet:SSDPLocationPort
-SSDP_SERVER string cesnet:SSDPServer
-SSDP_USER_AGENT string cesnet:SSDPUserAgent
-SSDP_NT string cesnet:SSDPNT
-SSDP_ST string cesnet:SSDPST
+SSDP_LOCATION_PORT uint16 cesnet:SSDPLocationPort,flowmon:SSDPLocationPort
+SSDP_SERVER string cesnet:SSDPServer,flowmon:SSDPServer
+SSDP_USER_AGENT string cesnet:SSDPUserAgent,flowmon:SSDPUserAgent
+SSDP_NT string cesnet:SSDPNT,flowmon:SSDPNT
+SSDP_ST string cesnet:SSDPST,flowmon:SSDPST
# --- DNSDD Information elements ---
-DNSSD_QUERIES string cesnet:DNSSDQueries
-DNSSD_RESPONSES string cesnet:DNSSDResponses
+DNSSD_QUERIES string cesnet:DNSSDQueries,flowmon:DNSSDQuery
+DNSSD_RESPONSES string cesnet:DNSSDResponses,flowmon:DNSSDResponse
# --- OVPN Information elements ---
OVPN_CONF_LEVEL uint8 cesnet:OVPNConfLevel
@@ -229,3 +236,51 @@ NB_SUFFIX uint8 cesnet:NBSuffix
# --- IDPContent Information elements ---
IDP_CONTENT bytes cesnet:IDPContent
IDP_CONTENT_REV bytes cesnet:IDPContentRev
+
+# --- Hists ---
+S_PHISTS_SIZES uint32* e0id291/cesnet:phistSrcSizes
+S_PHISTS_IPT uint32* e0id291/cesnet:phistSrcInterPacketTime
+D_PHISTS_SIZES uint32* e0id291/cesnet:phistDstSizes
+D_PHISTS_IPT uint32* e0id291/cesnet:phistDstInterPacketTime
+
+# --- Bursts ---
+
+SBI_BRST_BYTES uint32* e0id291/cesnet:burstSrcBytes
+SBI_BRST_PACKETS uint32* e0id291/cesnet:burstSrcPackets
+SBI_BRST_TIME_START time* e0id291/cesnet:burstSrcTimeStart
+SBI_BRST_TIME_STOP time* e0id291/cesnet:burstSrcTimeStop
+DBI_BRST_PACKETS uint32* e0id291/cesnet:burstDstPackets
+DBI_BRST_BYTES uint32* e0id291/cesnet:burstDstBytes
+DBI_BRST_TIME_START time* e0id291/cesnet:burstDstTimeStart
+DBI_BRST_TIME_STOP time* e0id291/cesnet:burstDstTimeStop
+
+# --- BasicPlus ---
+L4_TCP_MSS uint32 cesnet:tcpMss
+L4_TCP_MSS_REV uint32 cesnet:tcpMssRev
+L4_TCP_SYN_SIZE uint16 cesnet:tcpSynSize
+
+L4_TCP_WIN uint16 e0id186
+L4_TCP_WIN_REV uint16 e29305id186
+L4_TCP_OPTIONS uint64 e0id209
+L4_TCP_OPTIONS_REV uint64 e29305id209
+L3_FLAGS uint8 e0id197
+L3_FLAGS_REV uint8 e29305id197
+
+# --- wireguard ---
+WG_CONF_LEVEL uint8 cesnet:WGConfLevel
+WG_SRC_PEER uint32 cesnet:WGSrcPeer
+WG_DST_PEER uint32 cesnet:WGDstPeer
+
+# --- OSQuery ---
+OSQUERY_PROGRAM_NAME string cesnet:OSQueryProgramName
+OSQUERY_USERNAME string cesnet:OSQueryUsername
+OSQUERY_OS_NAME string cesnet:OSQueryOSName
+OSQUERY_OS_MAJOR uint16 cesnet:OSQueryOSMajor
+OSQUERY_OS_MINOR uint16 cesnet:OSQueryOSMinor
+OSQUERY_OS_BUILD string cesnet:OSQueryOSBuild
+OSQUERY_OS_PLATFORM string cesnet:OSQueryOSPlatform
+OSQUERY_OS_PLATFORM_LIKE string cesnet:OSQueryOSPlatformLike
+OSQUERY_OS_ARCH string cesnet:OSQueryOSArch
+OSQUERY_KERNEL_VERSION string cesnet:OSQueryKernelVersion
+OSQUERY_SYSTEM_HOSTNAME string cesnet:OSQuerySystemHostname
+
diff --git a/extra_plugins/output/unirec/ipfixcol2-unirec-output.spec.in b/extra_plugins/output/unirec/ipfixcol2-unirec-output.spec.in
index 71c348de..0f8bd1d0 100644
--- a/extra_plugins/output/unirec/ipfixcol2-unirec-output.spec.in
+++ b/extra_plugins/output/unirec/ipfixcol2-unirec-output.spec.in
@@ -12,7 +12,7 @@ Packager:
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}
BuildRequires: gcc >= 4.8, cmake >= 2.8.8, make
-BuildRequires: ipfixcol2-devel, libfds-devel, python2-docutils
+BuildRequires: ipfixcol2-devel, libfds-devel, /usr/bin/rst2man
BuildRequires: libtrap-devel, unirec >= 2.3.0
Requires: libtrap >= 0.12.0, ipfixcol2 >= 2.0.0, libfds >= 0.1.0
diff --git a/extra_plugins/output/unirec/src/configuration.c b/extra_plugins/output/unirec/src/configuration.c
index 5d8a3c02..41e6930c 100644
--- a/extra_plugins/output/unirec/src/configuration.c
+++ b/extra_plugins/output/unirec/src/configuration.c
@@ -63,6 +63,8 @@ enum cfg_timeout_mode {
CFG_TIMEOUT_HALF_WAIT = -3 /**< Block only if some client is connected */
};
+/** Filename of IPFIX-to-UniRec */
+#define DEF_CONF_FILENAME "unirec-elements.txt"
/** Default maximum number of connections over TCP/TCP-TLS/Unix */
#define DEF_MAX_CONNECTIONS 64
/** Default output interface timeout */
@@ -84,6 +86,7 @@ struct ifc_common {
/*
*
+ * /etc/ipfixcol2/unirec-elements.txt
* DST_IP,SRC_IP,BYTES,DST_PORT,?TCP_FLAGS,SRC_PORT,PROTOCOL
* true
*
@@ -123,6 +126,7 @@ enum params_xml_nodes {
// Main parameters
NODE_UNIREC_FMT = 1,
NODE_BIFLOW_SPLIT,
+ NODE_MAPPING_FILE,
NODE_TRAP_COMMON,
NODE_TRAP_SPEC,
// TRAP common parameters
@@ -208,6 +212,7 @@ static const struct fds_xml_args args_params[] = {
FDS_OPTS_ROOT("params"),
FDS_OPTS_ELEM(NODE_UNIREC_FMT, "uniRecFormat", FDS_OPTS_T_STRING, 0),
FDS_OPTS_ELEM(NODE_BIFLOW_SPLIT, "splitBiflow", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
+ FDS_OPTS_ELEM(NODE_MAPPING_FILE, "mappingFile", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
FDS_OPTS_NESTED(NODE_TRAP_COMMON, "trapIfcCommon", args_trap_common, FDS_OPTS_P_OPT),
FDS_OPTS_NESTED(NODE_TRAP_SPEC, "trapIfcSpec", args_trap_spec, 0),
FDS_OPTS_END
@@ -480,9 +485,10 @@ cfg_parse_tcp(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct conf_params *cfg, enum
// Prepare the TRAP interface specification
char *res = NULL;
const char *trap_ifc = (type == SPEC_TCP) ? "t" : "T";
- if (cfg_str_append(&res, "%s:%" PRIu16 ":%" PRIu64, trap_ifc, port, max_conn) != IPX_OK
+ if (cfg_str_append(&res, "%s:%" PRIu16, trap_ifc, port) != IPX_OK
|| (type == SPEC_TCP_TLS
- && cfg_str_append(&res, ":%s:%s:%s", file_key, file_cert, file_ca) != IPX_OK)) {
+ && cfg_str_append(&res, ":%s:%s:%s", file_key, file_cert, file_ca) != IPX_OK)
+ || (cfg_str_append(&res, ":max_clients=%" PRIu64, max_conn) != IPX_OK)) {
IPX_CTX_ERROR(ctx, "Unable to allocate memory (%s:%d)", __FILE__, __LINE__);
free(file_ca); free(file_cert); free(file_key);
free(res);
@@ -544,7 +550,7 @@ cfg_parse_unix(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct conf_params *cfg)
// Prepare the TRAP interface specification
char *res = NULL;
- if (cfg_str_append(&res, "u:%s:%" PRIu64, name, max_conn) != IPX_OK) {
+ if (cfg_str_append(&res, "u:%s:max_clients=%" PRIu64, name, max_conn) != IPX_OK) {
IPX_CTX_ERROR(ctx, "Unable to allocate memory (%s:%d)", __FILE__, __LINE__);
free(name);
return IPX_ERR_NOMEM;
@@ -780,6 +786,13 @@ cfg_parse_params(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct conf_params *cfg)
// Set default values
cfg->biflow_split = true;
+ cfg->mapping_file = NULL;
+
+ rc = cfg_str_append(&cfg->mapping_file, "%s/%s", ipx_api_cfg_dir(), DEF_CONF_FILENAME);
+ if (rc != FDS_OK) {
+ IPX_CTX_ERROR(ctx, "Unable to allocate memory (%s:%d)", __FILE__, __LINE__);
+ return rc;
+ }
// Set default TRAP common parameters
struct ifc_common common;
@@ -806,6 +819,16 @@ cfg_parse_params(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct conf_params *cfg)
assert(content->type == FDS_OPTS_T_BOOL);
cfg->biflow_split = content->val_bool;
break;
+ case NODE_MAPPING_FILE:
+ // Mapping file
+ assert(content->type == FDS_OPTS_T_STRING);
+ free(cfg->mapping_file);
+ cfg->mapping_file = strdup(content->ptr_string);
+ if (cfg->mapping_file == NULL) {
+ IPX_CTX_ERROR(ctx, "Unable to allocate memory (%s:%d)", __FILE__, __LINE__);
+ return IPX_ERR_NOMEM;
+ }
+ break;
case NODE_TRAP_SPEC:
// TRAP output interface specifier
assert(content->type == FDS_OPTS_T_CONTEXT);
@@ -919,6 +942,7 @@ configuration_free(struct conf_params *cfg)
return;
}
+ free(cfg->mapping_file);
free(cfg->trap_ifc_spec);
free(cfg->unirec_fmt);
free(cfg->unirec_spec);
diff --git a/extra_plugins/output/unirec/src/configuration.h b/extra_plugins/output/unirec/src/configuration.h
index c9a5eb91..8890bbd3 100644
--- a/extra_plugins/output/unirec/src/configuration.h
+++ b/extra_plugins/output/unirec/src/configuration.h
@@ -46,7 +46,9 @@
* \brief Structure for a configuration parsed from XML
*/
struct conf_params {
- /** Prepared TRAP interface specification string */
+ /** Path to IPFIX-to-UniRec mapping file */
+ char *mapping_file;
+ /** Prepared TRAP interface specification string */
char *trap_ifc_spec;
/**
* TRAP interface UniRec template
diff --git a/extra_plugins/output/unirec/src/map.c b/extra_plugins/output/unirec/src/map.c
index 415facda..8c3b8b6f 100644
--- a/extra_plugins/output/unirec/src/map.c
+++ b/extra_plugins/output/unirec/src/map.c
@@ -480,9 +480,9 @@ map_sort_fn(const void *p1, const void *p2)
ipfix2 = ipfix2->next;
}
- if (ipfix1->next) {
+ if (ipfix1 && ipfix1->next) {
return -1;
- } else if (ipfix2->next) {
+ } else if (ipfix2 && ipfix2->next) {
return 1;
}
@@ -585,7 +585,7 @@ map_load(map_t *map, const char *file)
// Collision detected!
snprintf(map->err_buffer, ERR_SIZE, "The IPFIX IE '%s' (PEN %" PRIu32 ", ID %" PRIu16 ") "
- "is mapped to multiple different UniRec fields ('%s' and '%s')",
+ "is mapped to multiple UniRec fields ('%s' and '%s')",
rec_now->ipfix.def->name, rec_now->ipfix.en, rec_now->ipfix.id,
rec_now->unirec.name, rec_prev->unirec.name);
map_clear(map);
diff --git a/extra_plugins/output/unirec/src/unirecplugin.c b/extra_plugins/output/unirec/src/unirecplugin.c
index dca9f8b3..e336f786 100644
--- a/extra_plugins/output/unirec/src/unirecplugin.c
+++ b/extra_plugins/output/unirec/src/unirecplugin.c
@@ -48,8 +48,6 @@
#include "configuration.h"
#include "map.h"
-/** Filename of IPFIX-to-UniRec */
-#define CONF_FILENAME "unirec-elements.txt"
/** Name of TRAP context that belongs to the plugin */
#define PLUGIN_TRAP_NAME "IPFIXcol2-UniRec"
/** Description of the TRAP context that belongs to the plugin */
@@ -100,41 +98,26 @@ struct conf_unirec {
/**
* \brief Get the IPFIX-to-UniRec conversion database
* \param ctx Plugin context
+ * \param file Path to a file with IPFIX-to-UniRec mapping
* \return Conversion table or NULL (an error has occurred)
*/
static map_t *
-ipfix2unirec_db(ipx_ctx_t *ctx)
+ipfix2unirec_db(ipx_ctx_t *ctx, const char *file)
{
- const char *path = ipx_api_cfg_dir();
- const size_t full_size = strlen(path) + strlen(CONF_FILENAME) + 2; // 2 = '/' + '\0'
- char *full_path = malloc(full_size * sizeof(char));
- if (!full_path) {
- IPX_CTX_ERROR(ctx, "Unable to allocate memory (%s:%d)", __FILE__, __LINE__);
- return NULL;
- }
-
- int ret_val = snprintf(full_path, full_size, "%s/%s", path, CONF_FILENAME);
- if (ret_val < 0 || ((size_t) ret_val) >= full_size) {
- IPX_CTX_ERROR(ctx, "Failed to generate a configuration path (internal error)", '\0');
- free(full_path);
- return NULL;
- }
-
map_t *map = map_init(ipx_ctx_iemgr_get(ctx));
if (!map) {
IPX_CTX_ERROR(ctx, "Failed to initialize conversion map! (%s:%d)", __FILE__, __LINE__);
- free(full_path);
return NULL;
}
- if (map_load(map, full_path) != IPX_OK) {
+ IPX_CTX_INFO(ctx, "Loading IPFIX-to-UniRec mapping file '%s'", file);
+
+ if (map_load(map, file) != IPX_OK) {
IPX_CTX_ERROR(ctx, "Failed to initialize conversion database: %s", map_last_error(map));
map_destroy(map);
- free(full_path);
return NULL;
}
- free(full_path);
return map;
}
@@ -317,7 +300,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *params)
conf->params = parsed_params;
// Load IPFIX-to-UniRec conversion database
- map_t *conv_db = ipfix2unirec_db(ctx);
+ map_t *conv_db = ipfix2unirec_db(ctx, parsed_params->mapping_file);
if (!conv_db) {
configuration_free(parsed_params);
free(conf);
diff --git a/include/ipfixcol2/message_ipfix.h b/include/ipfixcol2/message_ipfix.h
index c45c1941..a0bb1f92 100644
--- a/include/ipfixcol2/message_ipfix.h
+++ b/include/ipfixcol2/message_ipfix.h
@@ -201,6 +201,30 @@ ipx_msg_ipfix2base(ipx_msg_ipfix_t *msg)
return (ipx_msg_t *) msg;
}
+/**
+ * \brief Add a new IPFIX Set description.
+ *
+ * The record is uninitialized and user MUST fill it! The function is
+ * intended for annotation of newly created IPFIX Message.
+ * \param[in] msg IPFIX Message wrapper
+ * \return Pointer to the record or NULL (memory allocation error)
+ */
+IPX_API struct ipx_ipfix_set *
+ipx_msg_ipfix_add_set_ref(struct ipx_msg_ipfix *msg);
+
+/**
+ * \brief Add a new IPFIX Data Record description.
+ *
+ * The record is uninitialized and user MUST fill it! The function is
+ * intended for annotation of newly created IPFIX Message.
+ * \warning The wrapper \p msg_ref can be reallocated and different pointer
+ * can be returned!
+ * \param[in,out] msg_ref IPFIX Message wrapper
+ * \return Pointer to the record or NULL (memory allocation error)
+ */
+IPX_API struct ipx_ipfix_record *
+ipx_msg_ipfix_add_drec_ref(struct ipx_msg_ipfix **msg_ref);
+
/**@}*/
#ifdef __cplusplus
}
diff --git a/pkg/deb/CMakeLists.txt b/pkg/deb/CMakeLists.txt
index 129af061..7e79bcf5 100644
--- a/pkg/deb/CMakeLists.txt
+++ b/pkg/deb/CMakeLists.txt
@@ -48,7 +48,6 @@ file(MAKE_DIRECTORY
# Copy and create configuration files
file(COPY
- "templates/changelog"
"templates/rules"
"templates/compat"
"templates/ipfixcol2.install"
@@ -61,6 +60,12 @@ file(COPY
DESTINATION "${DEB_CFG_DIR}/source"
)
+configure_file(
+ "templates/changelog.in"
+ "${DEB_CFG_DIR}/changelog"
+ @ONLY
+)
+
configure_file(
"templates/control.in"
"${DEB_CFG_DIR}/control"
diff --git a/pkg/deb/templates/changelog b/pkg/deb/templates/changelog.in
similarity index 53%
rename from pkg/deb/templates/changelog
rename to pkg/deb/templates/changelog.in
index 603cd04b..2382e678 100644
--- a/pkg/deb/templates/changelog
+++ b/pkg/deb/templates/changelog.in
@@ -1,4 +1,4 @@
-ipfixcol2 (2.2.0-1) unstable; urgency=low
+ipfixcol2 (@CPACK_PACKAGE_VERSION@-@CPACK_PACKAGE_RELEASE@) unstable; urgency=low
* Initial release.
diff --git a/src/core/message_ipfix.h b/src/core/message_ipfix.h
index 3719a6cb..60fed909 100644
--- a/src/core/message_ipfix.h
+++ b/src/core/message_ipfix.h
@@ -116,26 +116,4 @@ struct ipx_msg_ipfix {
size_t
ipx_msg_ipfix_size(uint32_t rec_cnt, size_t rec_size);
-/**
- * \brief Add a new IPFIX Set
- *
- * The record is uninitialized and user MUST fill it!
- * \param[in] msg IPFIX Message wrapper
- * \return Pointer to the record or NULL (memory allocation error)
- */
-struct ipx_ipfix_set *
-ipx_msg_ipfix_add_set_ref(struct ipx_msg_ipfix *msg);
-
-/**
- * \brief Add a new IPFIX Data Record
- *
- * The record is uninitialized and user MUST fill it!
- * \warning The wrapper \p msg_ref can be reallocated and different pointer can be returned!
- * \param[in,out] msg_ref IPFIX Message wrapper
- * \return Pointer to the record or NULL (memory allocation error)
- */
-struct ipx_ipfix_record *
-ipx_msg_ipfix_add_drec_ref(struct ipx_msg_ipfix **msg_ref);
-
-
#endif // IPFIXCOL_MESSAGE_IPFIX_INTERNAL_H
diff --git a/src/plugins/input/tcp/tcp.c b/src/plugins/input/tcp/tcp.c
index e501804f..84b40083 100644
--- a/src/plugins/input/tcp/tcp.c
+++ b/src/plugins/input/tcp/tcp.c
@@ -47,6 +47,7 @@
#include
#include
#include
+#include
#include
#include
@@ -62,10 +63,6 @@
#define GETTER_TIMEOUT (10)
/** Max sockets events processed in the getter - i.e. epoll_wait array size */
#define GETTER_MAX_EVENTS (16)
-/** Timeout to read whole IPFIX Message after at least part has been received (in microseconds) */
-#define GETTER_RECV_TIMEOUT (500000)
-/** Default size of a buffer prepared for new IPFIX/NetFlow message (bytes) */
-#define DEF_MSG_SIZE (4096)
/** Plugin description */
IPX_API struct ipx_plugin_info ipx_plugin_info = {
@@ -91,6 +88,13 @@ struct tcp_pair {
struct ipx_session *session;
/** No message has been received from the Session yet */
bool new_connection;
+
+ /** Partly received message that waits for completition */
+ uint8_t *msg;
+ /** Allocated size of the partly received msg message */
+ uint16_t msg_size;
+ /** Already receive part of the msg message */
+ uint16_t msg_offset;
};
/** Instance data */
@@ -143,7 +147,7 @@ static int
active_session_add(struct tcp_data *data, int sd, struct ipx_session *session)
{
// Create a new pair
- struct tcp_pair *pair = malloc(sizeof(*pair));
+ struct tcp_pair *pair = calloc(1, sizeof(*pair));
if (!pair) {
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
return IPX_ERR_NOMEM;
@@ -242,7 +246,11 @@ active_session_remove_aux(struct tcp_data *data, size_t idx)
}
}
- // Close internal structures an remove it from the list (do NOT free SESSION)
+ // Free internal structures and remove the pair from the list (do NOT free SESSION)
+ if (pair->msg) {
+ free(pair->msg);
+ }
+
close(pair->fd);
free(pair);
@@ -309,14 +317,21 @@ listener_add_connection(struct tcp_data *data, int sd)
assert(sd >= 0);
const char *err_str;
- // Set receive timeout (after data on the socket is ready)
- struct timeval rcv_timeout;
- rcv_timeout.tv_sec = GETTER_RECV_TIMEOUT / 1000000;
- rcv_timeout.tv_usec = GETTER_RECV_TIMEOUT % 1000000;
- if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &rcv_timeout, sizeof(rcv_timeout)) == -1) {
+ // Set non-blocking mode on the socket
+ int flags = fcntl(sd, F_GETFL, 0);
+ if (flags == -1) {
ipx_strerror(errno, err_str);
- IPX_CTX_WARNING(data->ctx, "Listener: Failed to specify receiving timeout of a socket: %s",
+ IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
err_str);
+ return IPX_ERR_DENIED;
+ }
+
+ flags |= O_NONBLOCK;
+ if (fcntl(sd, F_SETFL, flags) == -1) {
+ ipx_strerror(errno, err_str);
+ IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
+ err_str);
+ return IPX_ERR_DENIED;
}
// Get the description of the remove address
@@ -810,102 +825,267 @@ active_destroy(ipx_ctx_t *ctx, struct tcp_data *instance)
}
/**
- * \brief Get an IPFIX message from a socket and pass it
+ * \brief Try to read an IPFIX Message header
+ *
+ * The whole header might not be available right now. In that case, the function
+ * will only read and store available part and it will successfully return. When
+ * this happens, the allocated message is smaller that IPFIX Message header. The
+ * next call of this function will tried to read the rest.
+ *
+ * If the whole header is available, the function will allocate message buffer
+ * sufficient for the rest of the message.
*
* \param[in] ctx Instance data (necessary for passing messages)
* \param[in] pair Connection pair (socket descriptor and session) to receive from
* \return #IPX_OK on success
- * \return #IPX_ERR_EOF if the socket is closed
+ * \return #IPX_ERR_EOF if the socket has been closed
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
*/
static int
-socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
+socket_process_receive_header(ipx_ctx_t *ctx, struct tcp_pair *pair)
{
- const char *err_str;
- struct fds_ipfix_msg_hdr hdr;
- static_assert(sizeof(hdr) == FDS_IPFIX_MSG_HDR_LEN, "Invalid size of IPFIX Message header");
+ struct fds_ipfix_msg_hdr hdr = {0};
+ uint8_t *hdr_raw = (uint8_t *) &hdr;
- // Get the message header (do not move pointer)
- ssize_t len = recv(pair->fd, &hdr, FDS_IPFIX_MSG_HDR_LEN, MSG_WAITALL | MSG_PEEK);
+ uint16_t remains = FDS_IPFIX_MSG_HDR_LEN;
+ uint16_t offset = 0;
+ ssize_t len;
+
+ uint8_t *msg_buffer;
+ uint16_t msg_version;
+ uint16_t msg_size;
+
+ assert(!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN);
+
+ if (pair->msg) {
+ // Fill the header with previously received data
+ offset = pair->msg_offset;
+ remains = FDS_IPFIX_MSG_HDR_LEN - offset;
+
+ memcpy(hdr_raw, pair->msg, offset);
+ }
+
+ len = recv(pair->fd, &hdr_raw[offset], remains, 0);
if (len == 0) {
- // Connection terminated
- IPX_CTX_INFO(ctx, "Connection from '%s' closed.", pair->session->ident);
- return IPX_ERR_EOF;
+ // Connection has been closed
+ if (offset > 0) {
+ IPX_CTX_WARNING(ctx, "Connection with '%s' has been unexpectly closed",
+ pair->session->ident);
+ return IPX_ERR_FORMAT;
+ } else {
+ IPX_CTX_INFO(ctx, "Connection with '%s' closed.", pair->session->ident);
+ return IPX_ERR_EOF;
+ }
}
- if (len == -1 || len < FDS_IPFIX_MSG_HDR_LEN) {
- // Failed to read header -> close
- int error_code = (len == -1) ? errno : EINTR;
- ipx_strerror(error_code, err_str);
- IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to failure to receive "
- "an IPFIX Message header: %s", pair->session->ident, err_str);
+ if (len < 0) {
+ // Something went wrong
+ const char *err_str;
+
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ return IPX_OK;
+ }
+
+ ipx_strerror(errno, err_str);
+ IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
+ pair->session->ident, err_str);
return IPX_ERR_FORMAT;
}
- assert(len == FDS_IPFIX_MSG_HDR_LEN);
- // Check the header (version, size)
- uint16_t msg_version = ntohs(hdr.version);
- uint16_t msg_size = ntohs(hdr.length);
- uint32_t msg_odid = ntohl(hdr.odid);
+ offset += len;
+
+ if (offset < FDS_IPFIX_MSG_HDR_LEN) {
+ // We don't have whole IPFIX Message header
+ uint8_t *ptr = realloc(pair->msg, offset);
+ if (!ptr) {
+ IPX_CTX_ERROR(ctx,
+ "Connection with '%s' closed due to memory allocation failure! (%s:%d).",
+ pair->session->ident, __FILE__, __LINE__);
+ return IPX_ERR_NOMEM;
+ }
+
+ memcpy(ptr, hdr_raw, offset);
+ pair->msg = ptr;
+ pair->msg_offset = offset;
+ pair->msg_size = offset;
+
+ return IPX_OK;
+ }
+
+ // Check the IPFIX Message header
+ msg_version = ntohs(hdr.version);
+ msg_size = ntohs(hdr.length);
if (msg_version != FDS_IPFIX_VERSION || msg_size < FDS_IPFIX_MSG_HDR_LEN) {
- // Unsupported header version
- IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to the unsupported version of "
- "IPFIX/NetFlow.", pair->session->ident);
+ // Invalid header version
+ IPX_CTX_WARNING(ctx,
+ "Connection with '%s' closed due to invalid IPFIX Message header.",
+ pair->session->ident);
return IPX_ERR_FORMAT;
}
- // Read the whole message
- uint8_t *buffer = malloc(msg_size * sizeof(uint8_t));
- if (!buffer) {
- IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation failure! (%s:%d).",
+ // Preallocated buffer for the rest of the IPFIX Message body
+ msg_buffer = realloc(pair->msg, msg_size);
+ if (!msg_buffer) {
+ IPX_CTX_ERROR(ctx,
+ "Connection with '%s' closed due to memory allocation failure! (%s:%d).",
pair->session->ident, __FILE__, __LINE__);
return IPX_ERR_NOMEM;
}
- len = recv(pair->fd, buffer, msg_size, MSG_WAITALL);
- if (len != msg_size) {
- int error_code = (len == -1) ? errno : ETIMEDOUT;
- ipx_strerror(error_code, err_str);
- IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to failure while reading from "
- "its socket: %s.", pair->session->ident, err_str);
- free(buffer);
+ memcpy(msg_buffer, hdr_raw, FDS_IPFIX_MSG_HDR_LEN);
+ pair->msg = msg_buffer;
+ pair->msg_offset = FDS_IPFIX_MSG_HDR_LEN;
+ pair->msg_size = msg_size;
+
+ return IPX_OK;
+}
+
+/**
+ * \brief Try to read an IPFIX Message body
+ *
+ * The whole body might not be available right now. In that case, the function
+ * will only read and store available part and it will successfully return. When
+ * this happens, the message offset is smaller that the message size. The next call
+ * of this function will tried to read the rest.
+ *
+ * \param[in] ctx Instance data (necessary for passing messages)
+ * \param[in] pair Connection pair (socket descriptor and session) to receive from
+ * \return #IPX_OK on success
+ * \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
+ * \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
+ */
+static int
+socket_process_receive_body(ipx_ctx_t *ctx, struct tcp_pair *pair)
+{
+ const uint16_t remains = pair->msg_size - pair->msg_offset;
+ assert(pair->msg && pair->msg_offset >= FDS_IPFIX_MSG_HDR_LEN);
+
+ if (remains == 0) {
+ // This is an IPFIX Message without body...
+ return IPX_OK;
+ }
+
+ ssize_t len = recv(pair->fd, &pair->msg[pair->msg_offset], remains, 0);
+ if (len == 0) {
+ // Connection closed
+ IPX_CTX_WARNING(ctx, "Connection from '%s' has been unexpectly closed",
+ pair->session->ident);
return IPX_ERR_FORMAT;
}
+ if (len < 0) {
+ // Something went wrong
+ const char *err_str;
+
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ return IPX_OK;
+ }
+
+ ipx_strerror(errno, err_str);
+ IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
+ pair->session->ident, err_str);
+ return IPX_ERR_FORMAT;
+ }
+
+ pair->msg_offset += (uint16_t) len;
+ return IPX_OK;
+}
+
+/**
+ * \brief Try to pass fully received IPFIX Message to the collector.
+ *
+ * \param[in] ctx Instance data (necessary for passing messages)
+ * \param[in] pair Connection pair (socket descriptor and session)
+ * \return #IPX_OK on success
+ * \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
+ */
+static int
+socket_process_pass_msg(ipx_ctx_t *ctx, struct tcp_pair *pair)
+{
+ assert(pair->msg && pair->msg_offset == pair->msg_size && "Partly received message");
+
if (pair->new_connection) {
// Send information about the new Transport Session
- pair->new_connection = false;
ipx_msg_session_t *msg = ipx_msg_session_create(pair->session, IPX_MSG_SESSION_OPEN);
if (!msg) {
- IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
- "failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
- free(buffer);
+ IPX_CTX_ERROR(ctx,
+ "Connection with '%s' closed due to memory allocation failure! (%s:%d).",
+ pair->session->ident, __FILE__, __LINE__);
return IPX_ERR_NOMEM;
}
ipx_ctx_msg_pass(ctx, ipx_msg_session2base(msg));
+ pair->new_connection = false;
}
// Create a message wrapper and pass the message
struct ipx_msg_ctx msg_ctx;
msg_ctx.session = pair->session;
- msg_ctx.odid = msg_odid;
+ msg_ctx.odid = ntohl(((struct fds_ipfix_msg_hdr *) pair->msg)->odid);
msg_ctx.stream = 0; // Streams are not supported over TCP
- ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, buffer, msg_size);
+ ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, pair->msg, pair->msg_size);
if (!msg) {
- IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
- "failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
- free(buffer);
+ IPX_CTX_ERROR(ctx,
+ "Connection with '%s' closed due to memory allocation failure! (%s:%d).",
+ pair->session->ident, __FILE__, __LINE__);
return IPX_ERR_NOMEM;
}
ipx_ctx_msg_pass(ctx, ipx_msg_ipfix2base(msg));
+
+ pair->msg = NULL;
+ pair->msg_offset = 0;
+ pair->msg_size = 0;
+
return IPX_OK;
}
+/**
+ * \brief Get an IPFIX message from a socket and pass it
+ *
+ * \param[in] ctx Instance data (necessary for passing messages)
+ * \param[in] pair Connection pair (socket descriptor and session) to receive from
+ * \return #IPX_OK on success
+ * \return #IPX_ERR_EOF if the socket is closed
+ * \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
+ * \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
+ */
+static int
+socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
+{
+ int ret;
+
+ if (!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
+ // Try to receive IPFIX Message header first
+ ret = socket_process_receive_header(ctx, pair);
+ if (ret != IPX_OK) {
+ return ret;
+ }
+
+ if (pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
+ // Incomplete IPFIX Message header, read the rest later...
+ return IPX_OK;
+ }
+ }
+
+ // Receive rest of the message body
+ ret = socket_process_receive_body(ctx, pair);
+ if (ret != IPX_OK) {
+ return ret;
+ }
+
+ if (pair->msg_offset < pair->msg_size) {
+ // Incomplete IPFIX Message body, read the rest later...
+ return IPX_OK;
+ }
+
+ // Pass the message
+ return socket_process_pass_msg(ctx, pair);
+}
+
// -------------------------------------------------------------------------------------------------
int
diff --git a/src/plugins/output/dummy/dummy.c b/src/plugins/output/dummy/dummy.c
index ed883dd5..f0079260 100644
--- a/src/plugins/output/dummy/dummy.c
+++ b/src/plugins/output/dummy/dummy.c
@@ -46,6 +46,11 @@
#include "config.h"
+#define IANA_PEN 0
+#define IANA_PEN_REV 29305
+#define IE_ID_BYTES 1
+#define IE_ID_PKTS 2
+
/** Plugin description */
IPX_API struct ipx_plugin_info ipx_plugin_info = {
// Plugin type
@@ -57,7 +62,7 @@ IPX_API struct ipx_plugin_info ipx_plugin_info = {
// Configuration flags (reserved for future use)
.flags = 0,
// Plugin version string (like "1.2.3")
- .version = "2.1.0",
+ .version = "2.2.0",
// Minimal IPFIXcol version string (like "1.2.3")
.ipx_min = "2.0.0"
};
@@ -91,7 +96,8 @@ stats_update(struct instance_data *inst, ipx_msg_ipfix_t *msg)
// For each IPFIX Data Record
for (uint32_t i = 0; i < rec_cnt; ++i) {
struct ipx_ipfix_record *rec_ptr = ipx_msg_ipfix_get_drec(msg, i);
- enum fds_template_type ttype = rec_ptr->rec.tmplt->type;
+ const struct fds_template *tmplt = rec_ptr->rec.tmplt;
+ const enum fds_template_type ttype = tmplt->type;
struct fds_drec_field field;
uint64_t value;
@@ -105,13 +111,30 @@ stats_update(struct instance_data *inst, ipx_msg_ipfix_t *msg)
}
// Get octetDeltaCount
- if (fds_drec_find(&rec_ptr->rec, 0, 1, &field) != FDS_EOC
+ if (fds_drec_find(&rec_ptr->rec, IANA_PEN, IE_ID_BYTES, &field) != FDS_EOC
&& fds_get_uint_be(field.data, field.size, &value) == FDS_OK) {
inst->cnt_bytes += value;
}
// Get packetDeltaCount
- if (fds_drec_find(&rec_ptr->rec, 0, 2, &field) != FDS_EOC
+ if (fds_drec_find(&rec_ptr->rec, IANA_PEN, IE_ID_PKTS, &field) != FDS_EOC
+ && fds_get_uint_be(field.data, field.size, &value) == FDS_OK) {
+ inst->cnt_pkts += value;
+ }
+
+ if ((tmplt->flags & FDS_TEMPLATE_BIFLOW) == 0) {
+ // Not a biflow record
+ continue;
+ }
+
+ // Get octetDeltaCount (reverse)
+ if (fds_drec_find(&rec_ptr->rec, IANA_PEN_REV, IE_ID_BYTES, &field) != FDS_EOC
+ && fds_get_uint_be(field.data, field.size, &value) == FDS_OK) {
+ inst->cnt_bytes += value;
+ }
+
+ // Get packetDeltaCount (reverse)
+ if (fds_drec_find(&rec_ptr->rec, IANA_PEN_REV, IE_ID_PKTS, &field) != FDS_EOC
&& fds_get_uint_be(field.data, field.size, &value) == FDS_OK) {
inst->cnt_pkts += value;
}
diff --git a/src/plugins/output/fds/src/Storage.cpp b/src/plugins/output/fds/src/Storage.cpp
index 30db3d25..5867c344 100644
--- a/src/plugins/output/fds/src/Storage.cpp
+++ b/src/plugins/output/fds/src/Storage.cpp
@@ -17,6 +17,8 @@
#include
#include "Storage.hpp"
+const std::string TMP_SUFFIX = ".tmp";
+
Storage::Storage(ipx_ctx_t *ctx, const Config &cfg) : m_ctx(ctx), m_path(cfg.m_path)
{
// Check if the directory exists
@@ -46,6 +48,11 @@ Storage::Storage(ipx_ctx_t *ctx, const Config &cfg) : m_ctx(ctx), m_path(cfg.m_p
m_flags |= FDS_FILE_APPEND;
}
+Storage::~Storage()
+{
+ window_close();
+}
+
void
Storage::window_new(time_t ts)
{
@@ -53,7 +60,8 @@ Storage::window_new(time_t ts)
window_close();
// Open new file
- const std::string new_file = filename_gen(ts);
+ const std::string new_file = filename_gen(ts) + TMP_SUFFIX;
+ m_file_name = new_file;
std::unique_ptr new_file_cpy(strdup(new_file.c_str()), &free);
char *dir2create;
@@ -80,8 +88,14 @@ Storage::window_new(time_t ts)
void
Storage::window_close()
{
+ bool file_opened = (m_file.get() != nullptr);
m_file.reset();
m_session2params.clear();
+ if (file_opened) {
+ std::string new_file_name(m_file_name.begin(), m_file_name.end() - TMP_SUFFIX.size());
+ std::rename(m_file_name.c_str(), new_file_name.c_str());
+ m_file_name.clear();
+ }
}
void
diff --git a/src/plugins/output/fds/src/Storage.hpp b/src/plugins/output/fds/src/Storage.hpp
index 29b0528e..56bbcc14 100644
--- a/src/plugins/output/fds/src/Storage.hpp
+++ b/src/plugins/output/fds/src/Storage.hpp
@@ -36,7 +36,7 @@ class Storage {
* @throw FDS_exception if @p path directory doesn't exist in the system
*/
Storage(ipx_ctx_t *ctx, const Config &cfg);
- virtual ~Storage() = default;
+ virtual ~Storage();
// Disable copy constructors
Storage(const Storage &other) = delete;
@@ -105,6 +105,8 @@ class Storage {
/// Output FDS file
std::unique_ptr m_file = {nullptr, &fds_file_close};
+ /// Output FDS file name
+ std::string m_file_name;
/// Mapping of Transport Sessions to FDS specific parameters
std::map m_session2params;
diff --git a/src/plugins/output/forwarder/CMakeLists.txt b/src/plugins/output/forwarder/CMakeLists.txt
index 2bec16c3..66ee28b6 100644
--- a/src/plugins/output/forwarder/CMakeLists.txt
+++ b/src/plugins/output/forwarder/CMakeLists.txt
@@ -1,19 +1,30 @@
# Create a linkable module
add_library(forwarder-output MODULE
src/main.cpp
- src/config.h
+ src/Config.h
+ src/Config.cpp
src/Forwarder.h
- src/ConnectionManager.h
- src/ConnectionManager.cpp
- src/ConnectionParams.h
+ src/Forwarder.cpp
src/Connection.h
src/Connection.cpp
- src/ConnectionBuffer.h
- src/SyncPipe.h
- src/IPFIXMessage.h
- src/MessageBuilder.h
+ src/Host.cpp
+ src/Host.h
+ src/common.h
+ src/common.cpp
+ src/Message.h
+ src/Message.cpp
+ src/Sender.h
+ src/Sender.cpp
+ src/connector/Connector.h
+ src/connector/Connector.cpp
+ src/connector/FutureSocket.h
+ src/connector/FutureSocket.cpp
+ src/connector/Pipe.h
+ src/connector/Pipe.cpp
)
+target_include_directories(forwarder-output PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/src)
+
install(
TARGETS forwarder-output
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
diff --git a/src/plugins/output/forwarder/README.rst b/src/plugins/output/forwarder/README.rst
index 93fdde4c..427b0970 100644
--- a/src/plugins/output/forwarder/README.rst
+++ b/src/plugins/output/forwarder/README.rst
@@ -1,7 +1,7 @@
-Forwarder (output plugin) [Preview]
-===================================
+Forwarder (output plugin)
+=========================
-This plugin allows forwarding incoming IPFIX messages to other collector in various modes.
+This plugin allows forwarding incoming flow records in IPFIX form to other collector in various modes.
It can be used to broadcast messages to multiple collectors (e.g. a main and a backup collector),
or to distribute messages across multiple collectors (e.g. for load balancing).
@@ -20,13 +20,18 @@ Example configuration
Subcollector 1
- 127.0.0.1
- 4751
+ 192.168.1.2
+ 4739
Subcollector 2
+ 192.168.1.3
+ 4739
+
+
+ Subcollector 3
localhost
- 4752
+ 4739
@@ -36,42 +41,48 @@ Parameters
----------
:``mode``:
- The forwarding mode; round robin (messages are sent to one host at time and hosts are cycled through) or all (messages are broadcasted to all hosts)
+ Flow distribution mode. **RoundRobin** (each record will be delivered to one of hosts) or **All** (each record will be delivered to all hosts).
[values: RoundRobin/All]
:``protocol``:
- The transport protocol to use
+ The transport protocol to use.
[values: TCP/UDP]
-:``connectionBufferSize``:
- Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts)
- [value: number of bytes, default: 4194304]
+:``templatesResendSecs``:
+ Send templates again every N seconds (UDP only).
+ [value: number of seconds, default: 600, 0 = never]
-:``templateRefreshIntervalSecs``:
- Send templates again every N seconds (UDP only)
- [value: number of seconds, default: 600]
+:``templatesResendPkts``:
+ Send templates again every N packets (UDP only).
+ [value: number of packets, default: 5000, 0 = never]
-:``templateRefreshIntervalBytes``:
- Send templates again every N bytes (UDP only)
- [value: number of bytes, default: 5000000]
+:``reconnectSecs``:
+ Attempt to reconnect every N seconds in case the connection drops (TCP only).
+ [value: number of seconds, default: 10, 0 = don't wait]
-:``reconnectIntervalSecs``:
- Attempt to reconnect every N seconds in case the connection drops (TCP only)
- [value: number of seconds, default: 10]
+:``premadeConnections``:
+ Keep N connections open with each host so there is no delay in connecting once a connection is needed.
+ [value: number of connections, default: 5]
:``hosts``:
- The receiving hosts
+ The receiving hosts.
:``host``:
:``name``:
- Optional identification of the host
+ Optional identification of the host.
[value: string, default:
:]
:``address``:
- The address of the host
+ The address of the host.
[value: IPv4/IPv6 address or a hostname]
:``port``:
- The port to connect to
+ The port to connect to.
[value: port number]
+Known limitations
+-----------------
+
+Export time of IPFIX messages is set to the current time when forwarding. This may cause issues with data fields using deltaTime relative to the export time!
+
+
diff --git a/src/plugins/output/forwarder/TODO.txt b/src/plugins/output/forwarder/TODO.txt
deleted file mode 100644
index 37631f98..00000000
--- a/src/plugins/output/forwarder/TODO.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-* Template withdrawals
-* More effective way of handling template changes - currently all the templates are being sent again every time any change in templates is detected
-* Message MTU
-* Possible bug: when testing, a small number of data records seems to be lost (something like 20 out of 1,000,000)
-* Connection buffer size
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Config.cpp b/src/plugins/output/forwarder/src/Config.cpp
new file mode 100644
index 00000000..1c2b93b6
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Config.cpp
@@ -0,0 +1,307 @@
+/**
+ * \file src/plugins/output/forwarder/src/Config.cpp
+ * \author Michal Sedlak
+ * \brief Plugin configuration implementation
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#include "Config.h"
+
+#include
+#include
+#include
+#include
+#include
+
+///
+/// Config schema definition
+///
+
+enum {
+ MODE,
+ PROTOCOL,
+ RECONNECT_SECS,
+ TEMPLATES_RESEND_SECS,
+ TEMPLATES_RESEND_PKTS,
+ CONNECTION_BUFFER_SIZE,
+ HOSTS,
+ HOST,
+ NAME,
+ ADDRESS,
+ PORT,
+ PREMADE_CONNECTIONS
+};
+
+static fds_xml_args host_schema[] = {
+ FDS_OPTS_ELEM(NAME , "name" , FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
+ FDS_OPTS_ELEM(ADDRESS, "address", FDS_OPTS_T_STRING, 0 ),
+ FDS_OPTS_ELEM(PORT , "port" , FDS_OPTS_T_UINT , 0 ),
+ FDS_OPTS_END
+};
+
+static fds_xml_args hosts_schema[] = {
+ FDS_OPTS_NESTED(HOST, "host", host_schema, FDS_OPTS_P_MULTI),
+ FDS_OPTS_END
+};
+
+static fds_xml_args params_schema[] = {
+ FDS_OPTS_ROOT ("params"),
+ FDS_OPTS_ELEM (MODE , "mode" , FDS_OPTS_T_STRING, 0 ),
+ FDS_OPTS_ELEM (PROTOCOL , "protocol" , FDS_OPTS_T_STRING, 0 ),
+ FDS_OPTS_ELEM (TEMPLATES_RESEND_SECS, "templatesResendSecs", FDS_OPTS_T_UINT , FDS_OPTS_P_OPT),
+ FDS_OPTS_ELEM (TEMPLATES_RESEND_PKTS, "templatesResendPkts", FDS_OPTS_T_UINT , FDS_OPTS_P_OPT),
+ FDS_OPTS_ELEM (RECONNECT_SECS , "reconnectSecs" , FDS_OPTS_T_UINT , FDS_OPTS_P_OPT),
+ FDS_OPTS_ELEM (PREMADE_CONNECTIONS , "premadeConnections" , FDS_OPTS_T_UINT , FDS_OPTS_P_OPT),
+ FDS_OPTS_NESTED(HOSTS , "hosts" , hosts_schema , 0 ),
+ FDS_OPTS_END
+};
+
+///
+/// Config definition
+///
+
+/**
+ * \brief Create a new configuration
+ * \param[in] params XML configuration of JSON plugin
+ * \throw runtime_error in case of invalid configuration
+ */
+Config::Config(const char *xml_config)
+{
+ set_defaults();
+
+ auto parser = std::unique_ptr(fds_xml_create(), &fds_xml_destroy);
+ if (!parser) {
+ throw std::runtime_error("Failed to create an XML parser!");
+ }
+
+ if (fds_xml_set_args(parser.get(), params_schema) != FDS_OK) {
+ throw std::runtime_error("Failed to parse the description of an XML document!");
+ }
+
+ fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(parser.get(), xml_config, true);
+ if (!params_ctx) {
+ std::string err = fds_xml_last_err(parser.get());
+ throw std::runtime_error("Failed to parse the configuration: " + err);
+ }
+
+ try {
+ parse_params(params_ctx);
+ ensure_valid();
+ } catch (const std::invalid_argument &ex) {
+ throw std::runtime_error("Config params error: " + std::string(ex.what()));
+ }
+}
+
+void
+Config::parse_params(fds_xml_ctx_t *params_ctx)
+{
+ const fds_xml_cont *content;
+
+ while (fds_xml_next(params_ctx, &content) != FDS_EOC) {
+
+ switch (content->id) {
+
+ case MODE:
+ if (strcasecmp(content->ptr_string, "roundrobin") == 0) {
+ this->forward_mode = ForwardMode::ROUNDROBIN;
+
+ } else if (strcasecmp(content->ptr_string, "all") == 0) {
+ this->forward_mode = ForwardMode::SENDTOALL;
+
+ } else {
+ throw std::invalid_argument("mode must be one of: 'RoundRobin', 'All'");
+
+ }
+ break;
+
+ case PROTOCOL:
+ if (strcasecmp(content->ptr_string, "tcp") == 0) {
+ this->protocol = Protocol::TCP;
+
+ } else if (strcasecmp(content->ptr_string, "udp") == 0) {
+ this->protocol = Protocol::UDP;
+
+ } else {
+ throw std::invalid_argument("protocol must be one of: 'TCP', 'UDP'");
+
+ }
+ break;
+
+ case HOSTS:
+ parse_hosts(content->ptr_ctx);
+ break;
+
+ case TEMPLATES_RESEND_SECS:
+ this->tmplts_resend_secs = content->val_uint;
+ break;
+
+ case TEMPLATES_RESEND_PKTS:
+ this->tmplts_resend_pkts = content->val_uint;
+ break;
+
+ case RECONNECT_SECS:
+ this->reconnect_secs = content->val_uint;
+ break;
+
+ case PREMADE_CONNECTIONS:
+ this->nb_premade_connections = content->val_uint;
+ break;
+
+ default: assert(0);
+ }
+ }
+}
+
+void
+Config::parse_hosts(fds_xml_ctx_t *hosts_ctx)
+{
+ const fds_xml_cont *content;
+
+ while (fds_xml_next(hosts_ctx, &content) != FDS_EOC) {
+ assert(content->id == HOST);
+ parse_host(content->ptr_ctx);
+ }
+}
+
+void
+Config::parse_host(fds_xml_ctx_t *host_ctx)
+{
+ HostConfig host;
+
+ const fds_xml_cont *content;
+
+ while (fds_xml_next(host_ctx, &content) != FDS_EOC) {
+
+ switch (content->id) {
+
+ case NAME:
+ host.name = std::string(content->ptr_string);
+ break;
+
+ case ADDRESS:
+ host.address = std::string(content->ptr_string);
+ break;
+
+ case PORT:
+ if (content->val_uint > UINT16_MAX) {
+ throw std::invalid_argument("invalid host port " + std::to_string(content->val_uint));
+ }
+
+ host.port = static_cast(content->val_uint);
+ break;
+ }
+ }
+
+ if (host.name.empty()) {
+ host.name = host.address + ":" + std::to_string(host.port);
+ }
+
+ if (host_exists(host)) {
+ throw std::invalid_argument("duplicate host " + host.address + ":" + std::to_string(host.port));
+ }
+
+ hosts.push_back(host);
+}
+
+void
+Config::set_defaults()
+{
+ this->tmplts_resend_secs = 10 * 60;
+ this->tmplts_resend_pkts = 5000;
+ this->reconnect_secs = 10;
+ this->nb_premade_connections = 5;
+}
+
+void
+Config::ensure_valid()
+{
+ for (auto &host : hosts) {
+
+ if (!can_resolve_host(host)) {
+ throw std::invalid_argument("cannot resolve host address " + host.address);
+ }
+
+ }
+}
+
+bool
+Config::host_exists(HostConfig host)
+{
+ for (auto &host_ : hosts) {
+
+ if (host.address == host_.address && host.port == host_.port) {
+ return true;
+ }
+
+ }
+
+ return false;
+}
+
+bool
+Config::can_resolve_host(HostConfig host)
+{
+ addrinfo *info;
+
+ addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+
+ switch (protocol) {
+ case Protocol::TCP:
+ hints.ai_protocol = IPPROTO_TCP;
+ hints.ai_socktype = SOCK_STREAM;
+ break;
+
+ case Protocol::UDP:
+ hints.ai_protocol = IPPROTO_UDP;
+ hints.ai_socktype = SOCK_DGRAM;
+ break;
+
+ default: assert(0);
+ }
+
+ int ret = getaddrinfo(host.address.c_str(), std::to_string(host.port).c_str(), &hints, &info);
+
+ if (ret == 0) {
+ freeaddrinfo(info);
+ return true;
+
+ } else {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Config.h b/src/plugins/output/forwarder/src/Config.h
new file mode 100644
index 00000000..9eda1651
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Config.h
@@ -0,0 +1,122 @@
+/**
+ * \file src/plugins/output/forwarder/src/Config.h
+ * \author Michal Sedlak
+ * \brief Plugin configuration header
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+#pragma once
+
+#include
+
+#include
+#include
+
+#include "common.h"
+
+/// The forwarding mode
+enum class ForwardMode {
+ UNASSIGNED,
+ SENDTOALL, /// Every message is forwarded to all of the hosts
+ ROUNDROBIN /// Only one host receives each message, next host is selected every message
+};
+
+struct HostConfig {
+ /// The displayed name of the host, purely informational
+ std::string name;
+ /// The address of the host, IP address or a hostname
+ std::string address;
+ /// The port of the host
+ uint16_t port;
+};
+
+/// The config to be passed to the forwarder
+class Config
+{
+public:
+ /// The transport protocol to be used for connection to the hosts
+ Protocol protocol;
+ /// The mode of forwarding messages
+ ForwardMode forward_mode;
+ /// Connection parameters of the hosts the data will be forwarded to
+ std::vector hosts;
+ /// The number of packets sent between sending refresh of templates (whichever happens first, packets or secs)
+ unsigned int tmplts_resend_pkts;
+ /// The number of seconds elapsed between sending refresh of templates (whichever happens first, packets or secs)
+ unsigned int tmplts_resend_secs;
+ /// The number of seconds to wait before trying to reconnect when using a TCP connection
+ unsigned int reconnect_secs;
+ /// Number of premade connections to keep
+ unsigned int nb_premade_connections;
+
+ Config() {};
+
+ /**
+ * \brief Create a new configuration
+ * \param[in] params XML configuration of the plugin
+ * \throw runtime_error in case of invalid configuration
+ */
+ Config(const char *xml_config);
+
+private:
+ /// Parse the element
+ void
+ parse_params(fds_xml_ctx_t *params_ctx);
+
+ /// Parse the element
+ void
+ parse_hosts(fds_xml_ctx_t *hosts_ctx);
+
+ /// Parse the element
+ void
+ parse_host(fds_xml_ctx_t *host_ctx);
+
+ /// Set default config values
+ void
+ set_defaults();
+
+ /// Ensure the configuration is valid or throw an exception
+ void
+ ensure_valid();
+
+ /// Check if a host already exists in the vector of hosts
+ bool
+ host_exists(HostConfig host);
+
+ /// Check if the host address can be resolved
+ bool
+ can_resolve_host(HostConfig host);
+};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Connection.cpp b/src/plugins/output/forwarder/src/Connection.cpp
index b7d2c5b6..0bdb55f3 100644
--- a/src/plugins/output/forwarder/src/Connection.cpp
+++ b/src/plugins/output/forwarder/src/Connection.cpp
@@ -1,7 +1,7 @@
/**
* \file src/plugins/output/forwarder/src/Connection.cpp
* \author Michal Sedlak
- * \brief Buffered socket connection
+ * \brief Connection class implementation
* \date 2021
*/
@@ -41,90 +41,222 @@
#include "Connection.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
#include
-Connection::Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size)
-: manager(manager)
-, params(params)
-, buffer(buffer_size)
+#include "Message.h"
+
+Connection::Connection(const std::string &ident, ConnectionParams con_params, ipx_ctx_t *log_ctx,
+ unsigned int tmplts_resend_pkts, unsigned int tmplts_resend_secs,
+ Connector &connector) :
+ m_ident(ident),
+ m_con_params(con_params),
+ m_log_ctx(log_ctx),
+ m_tmplts_resend_pkts(tmplts_resend_pkts),
+ m_tmplts_resend_secs(tmplts_resend_secs),
+ m_connector(connector)
{
}
-bool
+void
Connection::connect()
{
- if (sockfd >= 0) {
- ::close(sockfd);
+ assert(m_sockfd.get() < 0);
+ m_future_socket = m_connector.get(m_con_params);
+}
+
+void
+Connection::forward_message(ipx_msg_ipfix_t *msg)
+{
+ assert(check_connected());
+
+ Sender &sender = get_or_create_sender(msg);
+ try {
+ sender.process_message(msg);
+
+ } catch (const ConnectionError &err) {
+ // In case connection was lost, we have to resend templates when it reconnects
+ sender.clear_templates();
+ throw err;
}
- sockfd = params.make_socket();
- return sockfd >= 0;
}
-std::unique_lock
-Connection::begin_write()
+void
+Connection::lose_message(ipx_msg_ipfix_t *msg)
{
- return std::unique_lock(buffer_mutex);
+ Sender &sender = get_or_create_sender(msg);
+ sender.lose_message(msg);
}
-bool
-Connection::write(void *data, long length)
+void
+Connection::advance_transfers()
{
- return buffer.write((uint8_t *)data, length);
+ assert(check_connected());
+
+ IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size());
+
+ for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
+
+ Transfer &transfer = *it;
+
+ assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
+
+ ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
+ transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
+
+ check_socket_error(ret);
+
+ size_t sent = std::max(0, ret);
+ IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
+
+ // Is the transfer done?
+ if (transfer.offset + sent == transfer.data.size()) {
+ it = m_transfers.erase(it);
+ // Remove the transfer and continue with the next one
+
+ } else {
+ transfer.offset += sent;
+
+ // Finish, cannot advance next transfer before the one before it is fully sent
+ break;
+ }
+ }
}
-void
-Connection::rollback_write()
+bool
+Connection::check_connected()
{
- buffer.rollback();
+ if (m_sockfd.get() >= 0) {
+ return true;
+ }
+
+ if (m_future_socket && m_future_socket->ready()) {
+ m_sockfd = m_future_socket->retrieve();
+ m_future_socket = nullptr;
+ return true;
+ }
+
+ return false;
}
-long
-Connection::writeable()
+
+static Transfer
+make_transfer(const std::vector &parts, uint16_t offset, uint16_t total_length)
{
- return buffer.writeable();
+ uint16_t length = total_length - offset;
+
+ // Find first unfinished part
+ size_t i = 0;
+
+ while (offset >= parts[i].iov_len) {
+ offset -= parts[i].iov_len;
+ i++;
+ }
+
+ // Copy the unfinished portion
+ std::vector buffer(length); //NOTE: We might want to do this more effectively...
+ uint16_t buffer_pos = 0;
+
+ for (; i < parts.size(); i++) {
+ memcpy(buffer.data() + buffer_pos, &((uint8_t *) parts[i].iov_base)[offset], parts[i].iov_len - offset);
+ buffer_pos += parts[i].iov_len - offset;
+ offset = 0;
+ }
+
+ // Create the transfer
+ Transfer transfer;
+ transfer.data = std::move(buffer);
+ transfer.offset = 0;
+
+ return transfer;
}
-void
-Connection::commit_write()
+void
+Connection::store_unfinished_transfer(Message &msg, uint16_t offset)
{
- buffer.commit();
- manager.pipe.notify();
- has_data_to_send = buffer.readable();
+ Transfer transfer = make_transfer(msg.parts(), offset, msg.length());
+
+ IPX_CTX_DEBUG(m_log_ctx, "Storing unfinished transfer of %" PRIu16 " bytes in connection to %s",
+ msg.length() - offset, m_ident.c_str());
+
+ m_transfers.push_back(std::move(transfer));
}
-bool
-Connection::send_some()
+
+void
+Connection::send_message(Message &msg)
{
- if (params.protocol == TransProto::Udp) {
- while (1) {
- fds_ipfix_msg_hdr ipfix_header;
- if (!buffer.peek(ipfix_header)) {
- return true;
- }
- auto message_length = ntohs(ipfix_header.length);
- int ret = buffer.send_data(sockfd, message_length);
- if (ret == 0 || !buffer.readable()) {
- return true;
- } else if (ret < 0) {
- return false;
- }
- }
- return true;
- } else {
- return buffer.send_data(sockfd) >= 0;
+ // All waiting transfers have to be sent first
+ if (!m_transfers.empty()) {
+ store_unfinished_transfer(msg, 0);
+ return;
+ }
+
+ std::vector &parts = msg.parts();
+
+ msghdr hdr;
+ memset(&hdr, 0, sizeof(hdr));
+ hdr.msg_iov = parts.data();
+ hdr.msg_iovlen = parts.size();
+
+ ssize_t ret = sendmsg(m_sockfd.get(), &hdr, MSG_DONTWAIT | MSG_NOSIGNAL);
+
+ check_socket_error(ret);
+
+ size_t sent = std::max(0, ret);
+
+ IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%" PRIu16 " B to %s", sent, msg.length(), m_ident.c_str());
+
+ if (sent < msg.length()) {
+ store_unfinished_transfer(msg, sent);
}
}
-void
-Connection::close()
+Sender &
+Connection::get_or_create_sender(ipx_msg_ipfix_t *msg)
{
- close_flag = true;
- manager.pipe.notify();
+ uint32_t odid = ipx_msg_ipfix_get_ctx(msg)->odid;
+
+ if (m_senders.find(odid) == m_senders.end()) {
+ m_senders.emplace(odid,
+ std::unique_ptr(new Sender(
+ [&](Message &msg) {
+ send_message(msg);
+ },
+ m_con_params.protocol == Protocol::TCP,
+ m_tmplts_resend_pkts,
+ m_tmplts_resend_secs)));
+ }
+
+ Sender &sender = *m_senders[odid].get();
+
+ return sender;
}
-Connection::~Connection()
+void
+Connection::check_socket_error(ssize_t sock_ret)
{
- if (sockfd >= 0) {
- ::close(sockfd);
+ if (sock_ret < 0 && errno != EWOULDBLOCK && errno != EAGAIN) {
+ char *errbuf;
+ ipx_strerror(errno, errbuf);
+
+ IPX_CTX_ERROR(m_log_ctx, "A connection to %s lost! (%s)", m_ident.c_str(), errbuf);
+ m_sockfd.reset();
+
+ // All state from the previous connection is lost once new one is estabilished
+ m_transfers.clear();
+
+ throw ConnectionError(errbuf);
}
-}
\ No newline at end of file
+}
diff --git a/src/plugins/output/forwarder/src/Connection.h b/src/plugins/output/forwarder/src/Connection.h
index 09af5a29..02c76f1d 100644
--- a/src/plugins/output/forwarder/src/Connection.h
+++ b/src/plugins/output/forwarder/src/Connection.h
@@ -1,7 +1,7 @@
/**
* \file src/plugins/output/forwarder/src/Connection.h
* \author Michal Sedlak
- * \brief Buffered socket connection
+ * \brief Connection class header
* \date 2021
*/
@@ -41,77 +41,144 @@
#pragma once
-#include "ConnectionManager.h"
-#include "ConnectionParams.h"
-#include "ConnectionBuffer.h"
-
-#include
-#include
-#include
-#include
-
+#include
+#include
+#include
#include
-#include
-#include
-class ConnectionManager;
+#include
+
+#include "common.h"
+#include "connector/Connector.h"
+#include "Sender.h"
-class Connection
-{
-friend class ConnectionManager;
+class Connection;
+/// An error to be thrown on connection errors
+class ConnectionError {
public:
- /// Flag indicating that the connection was lost and the forwarder needs to resend templates etc.
- /// The flag won't be reset when the connection is reestablished!
- std::atomic connection_lost_flag { false };
+ ConnectionError(std::string message)
+ : m_message(message) {}
+
+ ConnectionError(std::string message, std::shared_ptr &connection)
+ : m_message(message), m_connection(&connection) {}
+
+ ConnectionError with_connection(std::shared_ptr &connection) const
+ { return ConnectionError(m_message, connection); }
+
+ const char *what() const { return m_message.c_str(); }
- Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size);
-
- bool
+ std::shared_ptr *connection() const { return m_connection; }
+
+private:
+ std::string m_message;
+ std::shared_ptr *m_connection;
+};
+
+/// A transfer to be sent through the connection
+struct Transfer {
+ /// The data to send
+ std::vector data;
+ /// The offset to send from, i.e. the amount of data that was already sent
+ uint16_t offset;
+};
+
+/// A class representing one of the connections to the subcollector
+/// Each host opens one connection per session
+class Connection {
+public:
+ /**
+ * \brief The constructor
+ * \param ident The host identification
+ * \param con_params The connection parameters
+ * \param log_ctx The logging context
+ * \param tmplts_resend_pkts Interval in packets after which templates are resend (UDP only)
+ * \param tmplts_resend_secs Interval in seconds after which templates are resend (UDP only)
+ */
+ Connection(const std::string &ident, ConnectionParams con_params, ipx_ctx_t *log_ctx,
+ unsigned int tmplts_resend_pkts, unsigned int tmplts_resend_secs,
+ Connector &connector);
+
+ /// Do not permit copying or moving as the connection holds a raw socket that is closed in the destructor
+ /// (we could instead implement proper moving and copying behavior, but we don't really need it at the moment)
+ Connection(const Connection &) = delete;
+ Connection(Connection &&) = delete;
+
+ /**
+ * \brief Connect the connection socket
+ * \throw ConnectionError if the socket couldn't be connected
+ */
+ void
connect();
- std::unique_lock
- begin_write();
+ /**
+ * \brief Forward an IPFIX message
+ * \param msg The IPFIX message
+ */
+ void
+ forward_message(ipx_msg_ipfix_t *msg);
+
+ /**
+ * \brief Lose an IPFIX message, i.e. update the internal state as if it has been forwarded
+ * even though it is not being sent
+ * \param msg The IPFIX message
+ */
+ void
+ lose_message(ipx_msg_ipfix_t *msg);
+
+ /**
+ * \brief Advance the unfinished transfers
+ */
+ void
+ advance_transfers();
+
+ /**
+ * \brief Check if the connection socket is currently connected
+ * \return true or false
+ */
+ bool check_connected();
+
+ /**
+ * \brief Get number of transfers still waiting to be transmitted
+ * \return The number of waiting transfers
+ */
+ size_t waiting_transfers_cnt() const { return m_transfers.size(); }
+
+ /**
+ * \brief The identification of the connection
+ */
+ const std::string &ident() const { return m_ident; }
- bool
- write(void *data, long length);
+private:
+ const std::string &m_ident;
- bool
- send_some();
+ ConnectionParams m_con_params;
- void
- commit_write();
+ ipx_ctx_t *m_log_ctx;
- void
- rollback_write();
+ unsigned int m_tmplts_resend_pkts;
- long
- writeable();
+ unsigned int m_tmplts_resend_secs;
- void
- close();
+ UniqueFd m_sockfd;
- ~Connection();
+ std::shared_ptr m_future_socket;
-private:
- /// The manager managing this connection
- ConnectionManager &manager;
+ std::unordered_map> m_senders;
+
+ std::vector m_transfers;
- /// The parameters to estabilish the connection
- ConnectionParams params;
+ Connector &m_connector;
- /// The connection socket
- int sockfd = -1;
+ void
+ store_unfinished_transfer(Message &msg, uint16_t offset);
- /// Buffer for the data to send and a mutex guarding it
- /// (buffer will be accessed from sender thread and writer thread)
- std::mutex buffer_mutex;
- ConnectionBuffer buffer;
+ void
+ send_message(Message &msg);
- /// Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time
- /// (doesn't need to be atomic because we only set it while holding the mutex)
- bool has_data_to_send = false;
+ Sender &
+ get_or_create_sender(ipx_msg_ipfix_t *msg);
- /// Flag indicating that the connection has been closed and can be disposed of after the data is sent
- std::atomic close_flag { false };
+ void
+ check_socket_error(ssize_t sock_ret);
};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/ConnectionBuffer.h b/src/plugins/output/forwarder/src/ConnectionBuffer.h
deleted file mode 100644
index 9db928a0..00000000
--- a/src/plugins/output/forwarder/src/ConnectionBuffer.h
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * \file src/plugins/output/forwarder/src/ConnectionBuffer.h
- * \author Michal Sedlak
- * \brief Ring buffer used by connections
- * \date 2021
- */
-
-/* Copyright (C) 2021 CESNET, z.s.p.o.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in
- * the documentation and/or other materials provided with the
- * distribution.
- * 3. Neither the name of the Company nor the names of its contributors
- * may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * ALTERNATIVELY, provided that this notice is retained in full, this
- * product may be distributed under the terms of the GNU General Public
- * License (GPL) version 2 or later, in which case the provisions
- * of the GPL apply INSTEAD OF those given above.
- *
- * This software is provided ``as is'', and any express or implied
- * warranties, including, but not limited to, the implied warranties of
- * merchantability and fitness for a particular purpose are disclaimed.
- * In no event shall the company or contributors be liable for any
- * direct, indirect, incidental, special, exemplary, or consequential
- * damages (including, but not limited to, procurement of substitute
- * goods or services; loss of use, data, or profits; or business
- * interruption) however caused and on any theory of liability, whether
- * in contract, strict liability, or tort (including negligence or
- * otherwise) arising in any way out of the use of this software, even
- * if advised of the possibility of such damage.
- *
- */
-
-#pragma once
-
-#include
-
-#include
-#include
-#include
-#include
-
-class ConnectionBuffer
-{
-public:
- ConnectionBuffer(long capacity)
- : capacity(capacity)
- , buffer(capacity)
- {}
-
- void
- rollback()
- {
- write_offset = read_end_offset;
- }
-
- void
- commit()
- {
- read_end_offset = write_offset;
- }
-
- long
- writeable()
- {
- return writeable_from(write_offset);
- }
-
- bool
- write(uint8_t *data, long length)
- {
- long pos = raw_write_at(write_offset, data, length);
- if (pos == -1) {
- return false;
- }
- write_offset = pos;
- return true;
- }
-
- template
- bool
- write(T data)
- {
- return write((uint8_t *)&data, sizeof(T));
- }
-
- long
- readable()
- {
- return read_offset > read_end_offset
- ? capacity - read_offset + read_end_offset
- : read_end_offset - read_offset;
- }
-
- bool
- peek(uint8_t *data, long length)
- {
- if (readable() < length) {
- return false;
- }
- raw_read_at(read_offset, data, length);
- return true;
- }
-
- template
- bool
- peek(T &item)
- {
- return peek((uint8_t *)&item, sizeof(item));
- }
-
- int
- send_data(int sockfd, long length = -1)
- {
- if (length == -1) {
- length = readable();
- }
- iovec iov[2] = {};
- iov[0].iov_len = std::min(cont_readable_from(read_offset), length);
- iov[0].iov_base = &buffer[read_offset];
- iov[1].iov_len = length - iov[0].iov_len;
- iov[1].iov_base = &buffer[0];
- msghdr msg_hdr = {};
- msg_hdr.msg_iov = iov;
- msg_hdr.msg_iovlen = 2;
- int ret = sendmsg(sockfd, &msg_hdr, MSG_DONTWAIT | MSG_NOSIGNAL);
- if (ret < 0) {
- return (errno == EWOULDBLOCK || errno == EAGAIN) ? 0 : ret;
- }
- read_offset = advance(read_offset, ret);
- return ret;
- }
-
-private:
- long capacity;
- long read_offset = 0;
- long read_end_offset = 0;
- long write_offset = 0;
- std::vector buffer;
-
- long
- advance(long pos, long n)
- {
- return (pos + n) % capacity;
- }
-
- long
- readable_from(long pos)
- {
- return pos > read_end_offset
- ? capacity - pos + read_end_offset
- : read_end_offset - pos;
- }
-
- long
- cont_readable_from(long pos)
- {
- return pos > read_end_offset
- ? capacity - pos
- : read_end_offset - pos;
- }
-
- long
- raw_read_at(long pos, uint8_t *data, long length)
- {
- if (readable_from(pos) < length) {
- return -1;
- }
- long read1 = std::min(cont_readable_from(pos), length);
- long read2 = length - read1;
- memcpy(&data[0], &buffer[pos], read1);
- memcpy(&data[read1], &buffer[advance(pos, read1)], read2);
- return advance(pos, length);
- }
-
- long
- cont_writeable_from(long pos)
- {
- return read_offset > pos
- ? read_offset - pos - 1
- : (read_offset == 0 ? capacity - pos - 1 : capacity - pos);
- }
-
- long
- writeable_from(long pos)
- {
- return read_offset > pos
- ? read_offset - pos - 1
- : capacity - pos + read_offset - 1;
- }
-
- long
- raw_write_at(long pos, uint8_t *data, long length)
- {
- /// WARNING: Does not advance the write offset
- if (writeable_from(pos) < length) {
- return -1;
- }
- long write1 = std::min(length, cont_writeable_from(pos));
- long write2 = length - write1;
- memcpy(&buffer[pos], &data[0], write1);
- memcpy(&buffer[advance(pos, write1)], &data[write1], write2);
- return advance(pos, length);
- }
-
- template
- bool
- raw_write_at(long pos, T data)
- {
- return raw_write_at(pos, (uint8_t *)&data, sizeof(T));
- }
-
-};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/ConnectionManager.cpp b/src/plugins/output/forwarder/src/ConnectionManager.cpp
deleted file mode 100644
index f55dafaf..00000000
--- a/src/plugins/output/forwarder/src/ConnectionManager.cpp
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * \file src/plugins/output/forwarder/src/ConnectionManager.cpp
- * \author Michal Sedlak
- * \brief Connection manager
- * \date 2021
- */
-
-/* Copyright (C) 2021 CESNET, z.s.p.o.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in
- * the documentation and/or other materials provided with the
- * distribution.
- * 3. Neither the name of the Company nor the names of its contributors
- * may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * ALTERNATIVELY, provided that this notice is retained in full, this
- * product may be distributed under the terms of the GNU General Public
- * License (GPL) version 2 or later, in which case the provisions
- * of the GPL apply INSTEAD OF those given above.
- *
- * This software is provided ``as is'', and any express or implied
- * warranties, including, but not limited to, the implied warranties of
- * merchantability and fitness for a particular purpose are disclaimed.
- * In no event shall the company or contributors be liable for any
- * direct, indirect, incidental, special, exemplary, or consequential
- * damages (including, but not limited to, procurement of substitute
- * goods or services; loss of use, data, or profits; or business
- * interruption) however caused and on any theory of liability, whether
- * in contract, strict liability, or tort (including negligence or
- * otherwise) arising in any way out of the use of this software, even
- * if advised of the possibility of such damage.
- *
- */
-
-#include "ConnectionManager.h"
-
-Connection &
-ConnectionManager::add_client(ConnectionParams params)
-{
- auto connection_ptr = std::unique_ptr(new Connection(*this, params, connection_buffer_size));
- auto &connection = *connection_ptr;
- std::lock_guard guard(mutex);
- if (connection.connect()) {
- active_connections.push_back(std::move(connection_ptr));
- } else {
- reconnect_connections.push_back(std::move(connection_ptr));
- }
- return connection;
-}
-
-void
-ConnectionManager::send_loop()
-{
- int max_fd;
-
- fd_set pipe_fds;
- FD_ZERO(&pipe_fds);
- FD_SET(pipe.get_readfd(), &pipe_fds);
-
- fd_set socket_fds;
- FD_ZERO(&socket_fds);
-
- auto watch_sock = [&](int fd) {
- FD_SET(fd, &pipe_fds);
- max_fd = std::max(max_fd, fd);
- };
-
- while (!exit_flag) {
- max_fd = pipe.get_readfd();
- FD_ZERO(&socket_fds);
-
- {
- std::lock_guard guard(mutex);
- pipe.clear();
- auto it = active_connections.begin();
- while (it != active_connections.end()) {
- auto &connection = **it;
- if (connection.has_data_to_send) {
- std::lock_guard guard(connection.buffer_mutex);
- if (connection.send_some()) {
- if (connection.buffer.readable()) {
- watch_sock(connection.sockfd);
- }
- connection.has_data_to_send = connection.buffer.readable();
- } else {
- connection.connection_lost_flag = true;
- reconnect_connections.push_back(std::move(*it));
- it = active_connections.erase(it);
- reconnect_cv.notify_one();
- continue;
- }
- } else {
- if (connection.close_flag) {
- it = active_connections.erase(it);
- continue;
- }
- }
- it++;
- }
- }
-
- select(max_fd + 1, &pipe_fds, &socket_fds, NULL, NULL);
- }
-}
-
-void
-ConnectionManager::reconnect_loop()
-{
- while (!exit_flag) {
- auto lock = std::unique_lock(mutex);
- auto it = reconnect_connections.begin();
- while (it != reconnect_connections.end()) {
- auto &connection = **it;
- if (connection.connect()) {
- active_connections.push_back(std::move(*it));
- it = reconnect_connections.erase(it);
- pipe.notify();
- } else {
- if (connection.close_flag) {
- it = reconnect_connections.erase(it);
- continue;
- }
- it++;
- }
- }
-
- if (reconnect_connections.empty()) {
- reconnect_cv.wait(lock);
- } else {
- reconnect_cv.wait_for(lock, std::chrono::seconds(reconnect_interval_secs));
- }
- }
-}
-
-void
-ConnectionManager::start()
-{
- send_thread = std::thread([this]() { send_loop(); });
- reconnect_thread = std::thread([this]() { reconnect_loop(); });
-}
-
-void
-ConnectionManager::stop()
-{
- exit_flag = true;
- pipe.notify();
- reconnect_cv.notify_one();
- send_thread.join();
- reconnect_thread.join();
-}
-
-void
-ConnectionManager::set_reconnect_interval(int number_of_seconds)
-{
- reconnect_interval_secs = number_of_seconds;
-}
-
-void
-ConnectionManager::set_connection_buffer_size(long number_of_bytes)
-{
- connection_buffer_size = number_of_bytes;
-}
-
diff --git a/src/plugins/output/forwarder/src/ConnectionParams.h b/src/plugins/output/forwarder/src/ConnectionParams.h
deleted file mode 100644
index 5f9fbfa7..00000000
--- a/src/plugins/output/forwarder/src/ConnectionParams.h
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * \file src/plugins/output/forwarder/src/ConnectionParams.h
- * \author Michal Sedlak
- * \brief Parameters for estabilishing connection
- * \date 2021
- */
-
-/* Copyright (C) 2021 CESNET, z.s.p.o.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in
- * the documentation and/or other materials provided with the
- * distribution.
- * 3. Neither the name of the Company nor the names of its contributors
- * may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * ALTERNATIVELY, provided that this notice is retained in full, this
- * product may be distributed under the terms of the GNU General Public
- * License (GPL) version 2 or later, in which case the provisions
- * of the GPL apply INSTEAD OF those given above.
- *
- * This software is provided ``as is'', and any express or implied
- * warranties, including, but not limited to, the implied warranties of
- * merchantability and fitness for a particular purpose are disclaimed.
- * In no event shall the company or contributors be liable for any
- * direct, indirect, incidental, special, exemplary, or consequential
- * damages (including, but not limited to, procurement of substitute
- * goods or services; loss of use, data, or profits; or business
- * interruption) however caused and on any theory of liability, whether
- * in contract, strict liability, or tort (including negligence or
- * otherwise) arising in any way out of the use of this software, even
- * if advised of the possibility of such damage.
- *
- */
-
-#pragma once
-
-#include
-#include
-
-#include
-#include
-#include
-#include
-#include
-
-using unique_addrinfo = std::unique_ptr;
-
-enum class TransProto { Tcp, Udp };
-
-struct ConnectionParams
-{
- ConnectionParams(std::string address, std::string port, TransProto protocol)
- : address(address)
- , port(port)
- , protocol(protocol)
- {
- }
-
- unique_addrinfo
- resolve_address()
- {
- addrinfo *info;
- addrinfo hints = {};
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = (protocol == TransProto::Tcp ? SOCK_STREAM : SOCK_DGRAM);
- hints.ai_protocol = (protocol == TransProto::Tcp ? IPPROTO_TCP : IPPROTO_UDP);
- if (getaddrinfo(address.c_str(), port.c_str(), &hints, &info) == 0) {
- return unique_addrinfo(info, &freeaddrinfo);
- } else {
- return unique_addrinfo(NULL, &freeaddrinfo);
- }
- }
-
- int
- make_socket()
- {
- auto address_info = resolve_address();
- if (!address_info) {
- return -1;
- }
-
- int sockfd;
- addrinfo *p;
-
- for (p = address_info.get(); p != NULL; p = p->ai_next) {
- sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
- if (sockfd < 0) {
- continue;
- }
-
- if (protocol == TransProto::Udp) {
- sockaddr_in sa = {};
- sa.sin_family = AF_INET;
- sa.sin_port = 0;
- sa.sin_addr.s_addr = INADDR_ANY;
- sa.sin_port = 0;
- if (bind(sockfd, (sockaddr *)&sa, sizeof(sa)) != 0) {
- close(sockfd);
- continue;
- }
- }
-
- if (connect(sockfd, p->ai_addr, p->ai_addrlen) != 0) {
- close(sockfd);
- continue;
- }
-
- break;
- }
-
- if (!p) {
- return -1;
- }
-
- return sockfd;
- }
-
- std::string
- str()
- {
- return std::string(protocol == TransProto::Tcp ? "TCP" : "UDP") + ":"
- + address + ":"
- + port;
- }
-
- std::string address;
- std::string port;
- TransProto protocol;
-};
\ No newline at end of file
diff --git a/src/plugins/output/forwarder/src/Forwarder.cpp b/src/plugins/output/forwarder/src/Forwarder.cpp
new file mode 100644
index 00000000..da0e89f1
--- /dev/null
+++ b/src/plugins/output/forwarder/src/Forwarder.cpp
@@ -0,0 +1,133 @@
+/**
+ * \file src/plugins/output/forwarder/src/Forwarder.cpp
+ * \author Michal Sedlak
+ * \brief Forwarder class implementation
+ * \date 2021
+ */
+
+/* Copyright (C) 2021 CESNET, z.s.p.o.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of the Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * ALTERNATIVELY, provided that this notice is retained in full, this
+ * product may be distributed under the terms of the GNU General Public
+ * License (GPL) version 2 or later, in which case the provisions
+ * of the GPL apply INSTEAD OF those given above.
+ *
+ * This software is provided ``as is'', and any express or implied
+ * warranties, including, but not limited to, the implied warranties of
+ * merchantability and fitness for a particular purpose are disclaimed.
+ * In no event shall the company or contributors be liable for any
+ * direct, indirect, incidental, special, exemplary, or consequential
+ * damages (including, but not limited to, procurement of substitute
+ * goods or services; loss of use, data, or profits; or business
+ * interruption) however caused and on any theory of liability, whether
+ * in contract, strict liability, or tort (including negligence or
+ * otherwise) arising in any way out of the use of this software, even
+ * if advised of the possibility of such damage.
+ *
+ */
+
+#include "Forwarder.h"
+
+Forwarder::Forwarder(Config config, ipx_ctx_t *log_ctx) :
+ m_config(config),
+ m_log_ctx(log_ctx)
+{
+ // Set up connector
+ std::vector con_params;
+ for (const auto &host_config : m_config.hosts) {
+ con_params.push_back(ConnectionParams{host_config.address, host_config.port, m_config.protocol});
+ }
+
+ m_connector.reset(new Connector(con_params, m_config.nb_premade_connections,
+ m_config.reconnect_secs, m_log_ctx));
+
+ // Set up hosts
+ for (const auto &host_config : m_config.hosts) {
+ m_hosts.emplace_back(
+ new Host(host_config.name,
+ ConnectionParams{host_config.address, host_config.port, m_config.protocol},
+ m_log_ctx,
+ m_config.tmplts_resend_pkts,
+ m_config.tmplts_resend_secs,
+ m_config.forward_mode == ForwardMode::SENDTOALL,
+ *m_connector.get()));
+
+ }
+}
+
+void Forwarder::handle_session_message(ipx_msg_session_t *msg)
+{
+ const ipx_session *session = ipx_msg_session_get_session(msg);
+
+ switch (ipx_msg_session_get_event(msg)) {
+ case IPX_MSG_SESSION_OPEN:
+ IPX_CTX_DEBUG(m_log_ctx, "New session %s", session->ident);
+ for (auto &host : m_hosts) {
+ host->setup_connection(session);
+ }
+ break;
+
+ case IPX_MSG_SESSION_CLOSE:
+ IPX_CTX_DEBUG(m_log_ctx, "Closing session %s", session->ident);
+ for (auto &host : m_hosts) {
+ host->finish_connection(session);
+ }
+ break;
+ }
+}
+
+void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg)
+{
+ // Forward message
+ switch (m_config.forward_mode) {
+ case ForwardMode::SENDTOALL:
+ forward_to_all(msg);
+ break;
+
+ case ForwardMode::ROUNDROBIN:
+ forward_round_robin(msg);
+ break;
+
+ default: assert(0);
+ }
+}
+
+void
+Forwarder::forward_to_all(ipx_msg_ipfix_t *msg)
+{
+ for (auto &host : m_hosts) {
+ host->forward_message(msg);
+ }
+}
+
+void
+Forwarder::forward_round_robin(ipx_msg_ipfix_t *msg)
+{
+ bool ok = false;
+
+ for (size_t i = 0; i < m_hosts.size(); i++) {
+ auto &host = m_hosts[m_rr_index];
+ ok = host->forward_message(msg);
+ m_rr_index = (m_rr_index + 1) % m_hosts.size();
+ if (ok) {
+ break;
+ }
+ }
+
+ if (!ok) {
+ IPX_CTX_WARNING(m_log_ctx, "Couldn't forward to any of the hosts, dropping message!", 0);
+ }
+}
diff --git a/src/plugins/output/forwarder/src/Forwarder.h b/src/plugins/output/forwarder/src/Forwarder.h
index d75e6182..2fc11ddd 100644
--- a/src/plugins/output/forwarder/src/Forwarder.h
+++ b/src/plugins/output/forwarder/src/Forwarder.h
@@ -1,7 +1,7 @@
/**
* \file src/plugins/output/forwarder/src/Forwarder.h
* \author Michal Sedlak
- * \brief Forwarder logic
+ * \brief Forwarder class header
* \date 2021
*/
@@ -41,402 +41,64 @@
#pragma once
-#include "ConnectionManager.h"
-#include "IPFIXMessage.h"
-#include "MessageBuilder.h"
+#include "Config.h"
-#include
-#include
-
-#include