Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving ip_loadbalancer module #211

Open
wants to merge 1 commit into
base: eel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 188 additions & 17 deletions pox/misc/ip_loadbalancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,51 @@
# limitations under the License.

"""
A very sloppy IP load balancer.
An IP load balancer that allows user to select the load balancer algorithm and
the weights for each server.

Run it with --ip=<Service IP> --servers=IP1,IP2,...

By default, it will do load balancing on the first switch that connects. If
you want, you can add --dpid=<dpid> to specify a particular switch.

Also by default, the load balancer algorithm will be random, if you want to
change it, add --algorithm=<algorithm> (you can select random, round-robin or
least-bandwidth). You can select the weights for each server using
--weight=WEIGHT_IP1,WEIGHT_IP2,... in the same order as the servers.

If you run with py module, you will be able to change the algorithm any time,
using:
POX> change_algorithm(<algorithm>)
You can also change the weights for each server:
POX> change_weights({"IP1": WEIGHT1, ... })

Please submit improvements. :)
"""

from pox.core import core
import pox
import thread
log = core.getLogger("iplb")

from pox.lib.packet.ethernet import ethernet, ETHER_BROADCAST
from pox.lib.packet.ipv4 import ipv4
from pox.lib.packet.arp import arp
from pox.lib.addresses import IPAddr, EthAddr
from pox.lib.revent import EventRemove
from pox.lib.util import str_to_bool, dpid_to_str, str_to_dpid

# include as part of the betta branch
from pox.openflow.of_json import *

import pox.openflow.libopenflow_01 as of

import time
import random

FLOW_IDLE_TIMEOUT = 10
FLOW_IDLE_TIMEOUT = 5
FLOW_MEMORY_TIMEOUT = 60 * 5


UPDATE_DATA_TRANSFERRED = 14

class MemoryEntry (object):
"""
Expand Down Expand Up @@ -86,22 +102,24 @@ def key2 (self):

return self.server,ipp.srcip,tcpp.dstport,tcpp.srcport


class iplb (object):
"""
A simple IP load balancer
An IP load balancer

Give it a service_ip and a list of server IP addresses. New TCP flows
to service_ip will be randomly redirected to one of the servers.
to service_ip will be redirected to one of the servers using the selected
algorithm and using the weights map.

We probe the servers to see if they're alive by sending them ARPs.
"""
def __init__ (self, connection, service_ip, servers = []):
def __init__ (self, connection, algorithm, service_ip, weights, servers = []):
self.service_ip = IPAddr(service_ip)
self.servers = [IPAddr(a) for a in servers]
self.con = connection
self.mac = self.con.eth_addr
self.live_servers = {} # IP -> MAC,port
self.algorithm = algorithm
self.weights = weights

try:
self.log = log.getChild(dpid_to_str(self.con.dpid))
Expand All @@ -114,6 +132,19 @@ def __init__ (self, connection, service_ip, servers = []):
# How quickly do we probe?
self.probe_cycle_time = 5

# Last update in the map of data transferred.
self.last_update = time.time()

# Data transferred map (IP -> data transferred in the last
# UPDATE_DATA_TRANSFERRED seconds).
self.data_transferred = {}
for server in self.servers:
self.data_transferred[server] = 0

# Variables used in round-robin algorithm.
self.round_robin_index = 0
self.round_robin_pck_sent = 0

# How long do we wait for an ARP reply before we consider a server dead?
self.arp_timeout = 3

Expand All @@ -127,6 +158,33 @@ def __init__ (self, connection, service_ip, servers = []):
# As part of a gross hack, we now do this from elsewhere
#self.con.addListeners(self)

# Allow user to change algorithm and weights at any time.
core.Interactive.variables['change_algorithm'] = self._change_algorithm
core.Interactive.variables['change_weights'] = self._change_weights

def _change_algorithm(self, algorithm):
"""
Change the algorithm for load balancing.
"""
if algorithm not in ALGORITHM_LIST:
log.error("Algorithm %s is not allowed, allowed algorithms: %s",
algorithm, ALGORITHM_LIST.keys())
else:
self.algorithm = algorithm
log.info("Setting algorithm to %s.", self.algorithm)

def _change_weights(self, weights):
"""
Change the weights for each server in the balancing.
"""
if type(weights) is not dict:
log.error("Weigths should be a dictionary { IP: WEIGHT }.")
elif sorted(weights.keys()) != sorted(self.weights.keys()):
log.error("Weights needs to contains all servers")
else:
self.weights = { IPAddr(ip): weight for ip, weight in weights.items() }
log.info("Setting weights to %s.", self.weights)

def _do_expire (self):
"""
Expire probes and "memorized" flows
Expand All @@ -142,6 +200,11 @@ def _do_expire (self):
if ip in self.live_servers:
self.log.warn("Server %s down", ip)
del self.live_servers[ip]
# Delete each entry in the table.
del self.data_transferred[ip]
del self.weights[ip]
# Set the count of packet for round robin as 0.
self.round_robin_pck_sent = 0

# Expire old flows
c = len(self.memory)
Expand Down Expand Up @@ -194,7 +257,8 @@ def _pick_server (self, key, inport):
"""
Pick a server for a (hopefully) new connection
"""
return random.choice(self.live_servers.keys())
self.log.debug("Balancing done by the %s algorithm.", self.algorithm)
return ALGORITHM_LIST[self.algorithm](self)

def _handle_PacketIn (self, event):
inport = event.port
Expand Down Expand Up @@ -223,16 +287,24 @@ def drop ():
else:
# Ooh, new server.
self.live_servers[arpp.protosrc] = arpp.hwsrc,inport
self.data_transferred[arpp.protosrc] = 0
if arpp.protosrc not in self.weights.keys():
self.weights[arpp.protosrc] = 1
self.log.info("Server %s up", arpp.protosrc)
return

# Not TCP and not ARP. Don't know what to do with this. Drop it.
return drop()

# It's TCP.

ipp = packet.find('ipv4')

# Update the data count table, if needed.
if time.time() - self.last_update > UPDATE_DATA_TRANSFERRED:
for server in self.data_transferred.keys():
self.data_transferred[server] = 0
self.last_update = time.time()

if ipp.srcip in self.servers:
# It's FROM one of our balanced servers.
# Rewrite it BACK to the client
Expand All @@ -247,7 +319,6 @@ def drop ():

# Refresh time timeout and reinstall.
entry.refresh()

#self.log.debug("Install reverse flow for %s", key)

# Install reverse table entry
Expand All @@ -265,11 +336,11 @@ def drop ():
data=event.ofp,
actions=actions,
match=match)

self.con.send(msg)

elif ipp.dstip == self.service_ip:
# Ah, it's for our service IP and needs to be load balanced

# Do we already know this flow?
key = ipp.srcip,ipp.dstip,tcpp.srcport,tcpp.dstport
entry = self.memory.get(key)
Expand Down Expand Up @@ -304,27 +375,102 @@ def drop ():
data=event.ofp,
actions=actions,
match=match)

self.con.send(msg)

def round_robin_alg (balancer):
"""
Select the next server for load balancing using the round-robin algorithm.
"""
length = len(balancer.live_servers.keys())
if balancer.round_robin_index >= length:
balancer.round_robin_index = 0
server_selected = list(balancer.live_servers.keys())[balancer.round_robin_index]
balancer.round_robin_pck_sent = balancer.round_robin_pck_sent + 1

if balancer.round_robin_pck_sent == balancer.weights[server_selected]:
balancer.round_robin_index += 1
balancer.round_robin_pck_sent = 0

return server_selected

def least_bandwidth_alg (balancer):
"""
Select the next server for load balancing using the least bandwidth algorithm.
"""
length = len(balancer.live_servers.keys())
servers = list(balancer.live_servers.keys())
data_transferred = balancer.data_transferred
weights = balancer.weights

for i in range(length):
if weights[servers[i]] > 0:
best_server = servers[i]

# Weighted least-bandwidth based on weighted least-connection scheduling
# algorithm (see http://kb.linuxvirtualserver.org/wiki/Weighted_Least-Connection_Scheduling)
for ii in range(i + 1, length):
if data_transferred[best_server] * weights[servers[ii]] > \
data_transferred[servers[ii]] * weights[best_server]:
best_server = servers[ii]
return best_server

def random_alg (balancer):
"""
Select a random server for load balancer.
"""
return random.choice(balancer.live_servers.keys())

# List of algorithms allowed in the load balancer.
ALGORITHM_LIST = {
'round-robin': round_robin_alg,
'least-bandwidth': least_bandwidth_alg,
'random': random_alg
}

# Remember which DPID we're operating on (first one to connect)
_dpid = None


def launch (ip, servers, dpid = None):
def launch (ip, servers, weights_val = [], dpid = None, algorithm = 'random'):
global _dpid
global _algorithm

if dpid is not None:
_dpid = str_to_dpid(dpid)

if algorithm not in ALGORITHM_LIST:
log.error("Algorithm %s is not allowed, allowed algorithms: %s",
algorithm, ALGORITHM_LIST.keys())
exit(1)

# Getting the servers IP.
servers = servers.replace(","," ").split()
servers = [IPAddr(x) for x in servers]
ip = IPAddr(ip)

# Parsing the weights for each server.
weights = {}
if len(weights_val) is 0:
weights_val = ""
for x in servers:
weights_val += "1,"

weights_val = weights_val.replace(",", " ").split()

if len(weights_val) is not len(servers):
log.error("Weights array is not the same length than servers array")
exit(1)

for i in range(len(servers)):
weights[servers[i]] = int(weights_val[i])

# Getting the controller IP.
ip = IPAddr(ip)

# We only want to enable ARP Responder *only* on the load balancer switch,
# so we do some disgusting hackery and then boot it up.
from proto.arp_responder import ARPResponder
old_pi = ARPResponder._handle_PacketIn

def new_pi (self, event):
if event.dpid == _dpid:
# Yes, the packet-in is on the right switch
Expand All @@ -334,10 +480,10 @@ def new_pi (self, event):
# Hackery done. Now start it.
from proto.arp_responder import launch as arp_launch
arp_launch(eat_packets=False,**{str(ip):True})

import logging
logging.getLogger("proto.arp_responder").setLevel(logging.WARN)


def _handle_ConnectionUp (event):
global _dpid
if _dpid is None:
Expand All @@ -348,13 +494,38 @@ def _handle_ConnectionUp (event):
else:
if not core.hasComponent('iplb'):
# Need to initialize first...
core.registerNew(iplb, event.connection, IPAddr(ip), servers)

core.registerNew(iplb, event.connection, algorithm,
IPAddr(ip), weights, servers)

log.info("IP Load Balancer Ready.")
log.info("Load Balancing on %s", event.connection)

# Gross hack
core.iplb.con = event.connection
event.connection.addListeners(core.iplb)

def _handle_FlowStatsReceived (event):
for f in event.stats:
ip_dst = f.match.nw_dst
ip_src = f.match.nw_src

if ip_dst != None and IPAddr(ip_dst) in core.iplb.servers:
core.iplb.data_transferred[IPAddr(ip_dst)] += f.byte_count

if ip_src != None and IPAddr(ip_src) in core.iplb.servers:
core.iplb.data_transferred[IPAddr(ip_src)] += f.byte_count

core.openflow.addListenerByName("FlowStatsReceived", _handle_FlowStatsReceived)
core.openflow.addListenerByName("ConnectionUp", _handle_ConnectionUp)

from pox.lib.recoco import Timer

# Send the flow stats to all the switches connected to the controller.
def _timer_func ():
for connection in core.openflow._connections.values():
connection.send(of.ofp_stats_request(body=of.ofp_flow_stats_request()))

# Request flow stats every FLOW_IDLE_TIMEOUT second.
Timer(FLOW_IDLE_TIMEOUT, _timer_func, recurring=True)