Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: nff-go in Azure with failsafe #690

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
)

var openFlowsNumber = uint32(0)
var createdPorts []port
var createdPorts map[uint16](*port)
var portPair map[types.IPv4Address](*port)
var schedState *scheduler
var vEach [10][vBurstSize]uint8
Expand Down Expand Up @@ -725,14 +725,19 @@ func SystemInit(args *Config) error {
return err
}
// Init Ports
createdPorts = make([]port, low.GetPortsNumber(), low.GetPortsNumber())
for i := range createdPorts {
createdPorts[i].port = uint16(i)
createdPorts = make(map[uint16](*port))
i := low.GetNextPort(0)
for i < low.RteMaxEthPorts {
createdPorts[i] = &port{}
createdPorts[i].port = i
common.LogDebug(common.Initialization, "Found Port ID:", i)

if maxInIndex > low.CheckPortRSS(createdPorts[i].port) {
createdPorts[i].InIndex = low.CheckPortRSS(createdPorts[i].port)
} else {
createdPorts[i].InIndex = maxInIndex
}
i = low.GetNextPort(i + 1)
}
portPair = make(map[types.IPv4Address](*port))
ioDevices = make(map[string]interface{})
Expand Down Expand Up @@ -886,17 +891,18 @@ func SetReceiverFile(filename string, repcount int32) (OUT *Flow) {
// Receive queue will be added to port automatically.
// Returns new opened flow with received packets
func SetReceiver(portId uint16) (OUT *Flow, err error) {
if portId >= uint16(len(createdPorts)) {
return nil, common.WrapWithNFError(nil, "Requested receive port exceeds number of ports which can be used by DPDK (bind to DPDK).", common.ReqTooManyPorts)
port, ok := createdPorts[portId]
if !ok {
return nil, common.WrapWithNFError(nil, "Requested receive port not found.", common.ReqTooManyPorts)
}
if createdPorts[portId].willReceive {
if port.willReceive {
return nil, common.WrapWithNFError(nil, "Requested receive port was already set to receive. Two receives from one port are prohibited.", common.MultipleReceivePort)
}
createdPorts[portId].wasRequested = true
createdPorts[portId].willReceive = true
rings := low.CreateRings(burstSize*sizeMultiplier, createdPorts[portId].InIndex)
addReceiver(portId, rings, createdPorts[portId].InIndex)
return newFlow(rings, createdPorts[portId].InIndex), nil
port.wasRequested = true
port.willReceive = true
rings := low.CreateRings(burstSize*sizeMultiplier, port.InIndex)
addReceiver(portId, rings, port.InIndex)
return newFlow(rings, port.InIndex), nil
}

// SetReceiverOS adds function receive from Linux interface to flow graph.
Expand Down Expand Up @@ -1045,11 +1051,14 @@ func SetSender(IN *Flow, portId uint16) error {
if err := checkFlow(IN); err != nil {
return err
}
if portId >= uint16(len(createdPorts)) {
return common.WrapWithNFError(nil, "Requested send port exceeds number of ports which can be used by DPDK (bind to DPDK).", common.ReqTooManyPorts)

port, ok := createdPorts[portId]
if !ok {
return common.WrapWithNFError(nil, "Requested send port not found.", common.ReqTooManyPorts)
}
createdPorts[portId].wasRequested = true
if createdPorts[portId].sendRings == nil {

port.wasRequested = true
if port.sendRings == nil {
// To allow consequent sends to one port, we need to create a send ring
// for the first, and then all the consequent sends should be merged
// with already created send ring.
Expand All @@ -1062,13 +1071,13 @@ func SetSender(IN *Flow, portId uint16) error {
max = createdPorts[i].InIndex
}
}
createdPorts[portId].sendRings = low.CreateRings(burstSize*sizeMultiplier, max)
addSender(portId, createdPorts[portId].sendRings, IN.inIndexNumber)
port.sendRings = low.CreateRings(burstSize*sizeMultiplier, max)
addSender(portId, port.sendRings, IN.inIndexNumber)
}
// For a typical 40 GB card, like Intel 710 series, one core should be able
// to handle all the TX without problems. So we merged all income flows to created
// ring which will be send.
mergeOneFlow(IN, createdPorts[portId].sendRings)
mergeOneFlow(IN, port.sendRings)
return nil
}

Expand Down Expand Up @@ -1312,7 +1321,7 @@ func GetNameByPort(port uint16) (string, error) {
func SetIPForPort(port uint16, ip types.IPv4Address) error {
for i := range createdPorts {
if createdPorts[i].port == port && createdPorts[i].wasRequested {
portPair[ip] = &createdPorts[i]
portPair[ip] = createdPorts[i]
return nil
}
}
Expand Down Expand Up @@ -1554,7 +1563,7 @@ func recvXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
func processKNI(parameters interface{}, inIndex []int32, flag *int32, coreID int) {
srk := parameters.(*KNIParameters)
if srk.linuxCore == true {
coreID = schedState.cores[createdPorts[srk.port.PortId].KNICoreIndex].id
coreID = schedState.cores[createdPorts[uint16(srk.port.PortId)].KNICoreIndex].id
}
low.SrKNI(uint16(srk.port.PortId), flag, coreID, srk.recv, srk.out, srk.send, srk.in, &srk.stats)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/low/low.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,14 @@ func FreeKNI(port uint16) error {
return nil
}

const (
RteMaxEthPorts = C.RTE_MAX_ETHPORTS
)

func GetNextPort(port uint16) uint16 {
return uint16(C.rte_eth_find_next_owned_by(C.uint16_t(port), C.RTE_ETH_DEV_NO_OWNER))
}

// GetPortsNumber gets total number of available Ethernet devices.
func GetPortsNumber() int {
return int(C.rte_eth_dev_count())
Expand Down
9 changes: 0 additions & 9 deletions internal/low/low.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,6 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools,
rx_rings = 0;
}

if (port >= rte_eth_dev_count())
return -1;

struct rte_eth_conf port_conf_default = {
.rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN,
.mq_mode = ETH_MQ_RX_RSS },
Expand Down Expand Up @@ -940,9 +937,6 @@ bool check_hwtxchecksum_capability(uint16_t port_id) {
DEV_TX_OFFLOAD_TCP_CKSUM;
struct rte_eth_dev_info dev_info;

if (port_id >= rte_eth_dev_count())
return false;

memset(&dev_info, 0, sizeof(dev_info));
rte_eth_dev_info_get(port_id, &dev_info);
return (dev_info.tx_offload_capa & flags) == flags;
Expand All @@ -952,9 +946,6 @@ bool check_hwrxpackets_timestamp_capability(uint16_t port_id) {
uint64_t flags = DEV_RX_OFFLOAD_TIMESTAMP;
struct rte_eth_dev_info dev_info;

if (port_id >= rte_eth_dev_count())
return false;

memset(&dev_info, 0, sizeof(dev_info));
rte_eth_dev_info_get(port_id, &dev_info);
return (dev_info.rx_offload_capa & flags) == flags;
Expand Down
2 changes: 1 addition & 1 deletion internal/low/low_mlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
package low

/*
#cgo LDFLAGS: -lrte_distributor -lrte_reorder -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_jobstats -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_vhost -lrte_ip_frag -lrte_cfgfile -Wl,--whole-archive -Wl,--start-group -lrte_kvargs -lrte_mbuf -lrte_hash -lrte_ethdev -lrte_mempool -lrte_ring -lrte_mempool_ring -lrte_eal -lrte_cmdline -lrte_net -lrte_bus_pci -lrte_pci -lrte_bus_vdev -lrte_timer -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_cxgbe -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ena -lrte_pmd_ring -lrte_pmd_af_packet -lrte_pmd_null -libverbs -lmnl -lmlx4 -lmlx5 -lrte_pmd_mlx4 -lrte_pmd_mlx5 -Wl,--end-group -Wl,--no-whole-archive -lrt -lm -ldl -lnuma
#cgo LDFLAGS: -lrte_distributor -lrte_reorder -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_jobstats -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_vhost -lrte_ip_frag -lrte_cfgfile -Wl,--whole-archive -Wl,--start-group -lrte_kvargs -lrte_mbuf -lrte_hash -lrte_ethdev -lrte_gso -lrte_mempool -lrte_ring -lrte_mempool_ring -lrte_eal -lrte_cmdline -lrte_net -lrte_bus_pci -lrte_pci -lrte_bus_vdev -lrte_timer -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_cxgbe -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ena -lrte_pmd_ring -lrte_pmd_af_packet -lrte_pmd_null -lrte_pmd_failsafe -lrte_pmd_tap -lrte_pmd_vdev_netvsc -lrte_bus_vmbus -lrte_pmd_netvsc -libverbs -lmnl -lmlx4 -lmlx5 -lrte_pmd_mlx4 -lrte_pmd_mlx5 -Wl,--end-group -Wl,--no-whole-archive -lrt -lm -ldl -lnuma
*/
import "C"