diff --git a/pox/misc/ip_loadbalancer.py b/pox/misc/ip_loadbalancer.py index a754b2a5b..55a04b80b 100644 --- a/pox/misc/ip_loadbalancer.py +++ b/pox/misc/ip_loadbalancer.py @@ -13,35 +13,51 @@ # limitations under the License. """ -A very sloppy IP load balancer. +An IP load balancer that allows user to select the load balancer algorithm and +the weights for each server. Run it with --ip= --servers=IP1,IP2,... By default, it will do load balancing on the first switch that connects. If you want, you can add --dpid= to specify a particular switch. +Also by default, the load balancer algorithm will be random, if you want to +change it, add --algorithm= (you can select random, round-robin or +least-bandwidth). You can select the weights for each server using +--weight=WEIGHT_IP1,WEIGHT_IP2,... in the same order as the servers. + +If you run with py module, you will be able to change the algorithm any time, +using: +POX> change_algorithm() +You can also change the weights for each server: +POX> change_weights({"IP1": WEIGHT1, ... }) + Please submit improvements. :) """ from pox.core import core import pox +import thread log = core.getLogger("iplb") from pox.lib.packet.ethernet import ethernet, ETHER_BROADCAST from pox.lib.packet.ipv4 import ipv4 from pox.lib.packet.arp import arp from pox.lib.addresses import IPAddr, EthAddr +from pox.lib.revent import EventRemove from pox.lib.util import str_to_bool, dpid_to_str, str_to_dpid +# include as part of the betta branch +from pox.openflow.of_json import * + import pox.openflow.libopenflow_01 as of import time import random -FLOW_IDLE_TIMEOUT = 10 +FLOW_IDLE_TIMEOUT = 5 FLOW_MEMORY_TIMEOUT = 60 * 5 - - +UPDATE_DATA_TRANSFERRED = 14 class MemoryEntry (object): """ @@ -86,22 +102,24 @@ def key2 (self): return self.server,ipp.srcip,tcpp.dstport,tcpp.srcport - class iplb (object): """ - A simple IP load balancer + An IP load balancer Give it a service_ip and a list of server IP addresses. New TCP flows - to service_ip will be randomly redirected to one of the servers. + to service_ip will be redirected to one of the servers using the selected + algorithm and using the weights map. We probe the servers to see if they're alive by sending them ARPs. """ - def __init__ (self, connection, service_ip, servers = []): + def __init__ (self, connection, algorithm, service_ip, weights, servers = []): self.service_ip = IPAddr(service_ip) self.servers = [IPAddr(a) for a in servers] self.con = connection self.mac = self.con.eth_addr self.live_servers = {} # IP -> MAC,port + self.algorithm = algorithm + self.weights = weights try: self.log = log.getChild(dpid_to_str(self.con.dpid)) @@ -114,6 +132,19 @@ def __init__ (self, connection, service_ip, servers = []): # How quickly do we probe? self.probe_cycle_time = 5 + # Last update in the map of data transferred. + self.last_update = time.time() + + # Data transferred map (IP -> data transferred in the last + # UPDATE_DATA_TRANSFERRED seconds). + self.data_transferred = {} + for server in self.servers: + self.data_transferred[server] = 0 + + # Variables used in round-robin algorithm. + self.round_robin_index = 0 + self.round_robin_pck_sent = 0 + # How long do we wait for an ARP reply before we consider a server dead? self.arp_timeout = 3 @@ -127,6 +158,33 @@ def __init__ (self, connection, service_ip, servers = []): # As part of a gross hack, we now do this from elsewhere #self.con.addListeners(self) + # Allow user to change algorithm and weights at any time. + core.Interactive.variables['change_algorithm'] = self._change_algorithm + core.Interactive.variables['change_weights'] = self._change_weights + + def _change_algorithm(self, algorithm): + """ + Change the algorithm for load balancing. + """ + if algorithm not in ALGORITHM_LIST: + log.error("Algorithm %s is not allowed, allowed algorithms: %s", + algorithm, ALGORITHM_LIST.keys()) + else: + self.algorithm = algorithm + log.info("Setting algorithm to %s.", self.algorithm) + + def _change_weights(self, weights): + """ + Change the weights for each server in the balancing. + """ + if type(weights) is not dict: + log.error("Weigths should be a dictionary { IP: WEIGHT }.") + elif sorted(weights.keys()) != sorted(self.weights.keys()): + log.error("Weights needs to contains all servers") + else: + self.weights = { IPAddr(ip): weight for ip, weight in weights.items() } + log.info("Setting weights to %s.", self.weights) + def _do_expire (self): """ Expire probes and "memorized" flows @@ -142,6 +200,11 @@ def _do_expire (self): if ip in self.live_servers: self.log.warn("Server %s down", ip) del self.live_servers[ip] + # Delete each entry in the table. + del self.data_transferred[ip] + del self.weights[ip] + # Set the count of packet for round robin as 0. + self.round_robin_pck_sent = 0 # Expire old flows c = len(self.memory) @@ -194,7 +257,8 @@ def _pick_server (self, key, inport): """ Pick a server for a (hopefully) new connection """ - return random.choice(self.live_servers.keys()) + self.log.debug("Balancing done by the %s algorithm.", self.algorithm) + return ALGORITHM_LIST[self.algorithm](self) def _handle_PacketIn (self, event): inport = event.port @@ -223,6 +287,9 @@ def drop (): else: # Ooh, new server. self.live_servers[arpp.protosrc] = arpp.hwsrc,inport + self.data_transferred[arpp.protosrc] = 0 + if arpp.protosrc not in self.weights.keys(): + self.weights[arpp.protosrc] = 1 self.log.info("Server %s up", arpp.protosrc) return @@ -230,9 +297,14 @@ def drop (): return drop() # It's TCP. - ipp = packet.find('ipv4') + # Update the data count table, if needed. + if time.time() - self.last_update > UPDATE_DATA_TRANSFERRED: + for server in self.data_transferred.keys(): + self.data_transferred[server] = 0 + self.last_update = time.time() + if ipp.srcip in self.servers: # It's FROM one of our balanced servers. # Rewrite it BACK to the client @@ -247,7 +319,6 @@ def drop (): # Refresh time timeout and reinstall. entry.refresh() - #self.log.debug("Install reverse flow for %s", key) # Install reverse table entry @@ -265,11 +336,11 @@ def drop (): data=event.ofp, actions=actions, match=match) + self.con.send(msg) elif ipp.dstip == self.service_ip: # Ah, it's for our service IP and needs to be load balanced - # Do we already know this flow? key = ipp.srcip,ipp.dstip,tcpp.srcport,tcpp.dstport entry = self.memory.get(key) @@ -304,27 +375,102 @@ def drop (): data=event.ofp, actions=actions, match=match) + self.con.send(msg) +def round_robin_alg (balancer): + """ + Select the next server for load balancing using the round-robin algorithm. + """ + length = len(balancer.live_servers.keys()) + if balancer.round_robin_index >= length: + balancer.round_robin_index = 0 + server_selected = list(balancer.live_servers.keys())[balancer.round_robin_index] + balancer.round_robin_pck_sent = balancer.round_robin_pck_sent + 1 + + if balancer.round_robin_pck_sent == balancer.weights[server_selected]: + balancer.round_robin_index += 1 + balancer.round_robin_pck_sent = 0 + + return server_selected + +def least_bandwidth_alg (balancer): + """ + Select the next server for load balancing using the least bandwidth algorithm. + """ + length = len(balancer.live_servers.keys()) + servers = list(balancer.live_servers.keys()) + data_transferred = balancer.data_transferred + weights = balancer.weights + + for i in range(length): + if weights[servers[i]] > 0: + best_server = servers[i] + + # Weighted least-bandwidth based on weighted least-connection scheduling + # algorithm (see http://kb.linuxvirtualserver.org/wiki/Weighted_Least-Connection_Scheduling) + for ii in range(i + 1, length): + if data_transferred[best_server] * weights[servers[ii]] > \ + data_transferred[servers[ii]] * weights[best_server]: + best_server = servers[ii] + return best_server + +def random_alg (balancer): + """ + Select a random server for load balancer. + """ + return random.choice(balancer.live_servers.keys()) + +# List of algorithms allowed in the load balancer. +ALGORITHM_LIST = { + 'round-robin': round_robin_alg, + 'least-bandwidth': least_bandwidth_alg, + 'random': random_alg +} # Remember which DPID we're operating on (first one to connect) _dpid = None - -def launch (ip, servers, dpid = None): +def launch (ip, servers, weights_val = [], dpid = None, algorithm = 'random'): global _dpid + global _algorithm + if dpid is not None: _dpid = str_to_dpid(dpid) + if algorithm not in ALGORITHM_LIST: + log.error("Algorithm %s is not allowed, allowed algorithms: %s", + algorithm, ALGORITHM_LIST.keys()) + exit(1) + + # Getting the servers IP. servers = servers.replace(","," ").split() servers = [IPAddr(x) for x in servers] - ip = IPAddr(ip) + # Parsing the weights for each server. + weights = {} + if len(weights_val) is 0: + weights_val = "" + for x in servers: + weights_val += "1," + + weights_val = weights_val.replace(",", " ").split() + + if len(weights_val) is not len(servers): + log.error("Weights array is not the same length than servers array") + exit(1) + + for i in range(len(servers)): + weights[servers[i]] = int(weights_val[i]) + + # Getting the controller IP. + ip = IPAddr(ip) # We only want to enable ARP Responder *only* on the load balancer switch, # so we do some disgusting hackery and then boot it up. from proto.arp_responder import ARPResponder old_pi = ARPResponder._handle_PacketIn + def new_pi (self, event): if event.dpid == _dpid: # Yes, the packet-in is on the right switch @@ -334,10 +480,10 @@ def new_pi (self, event): # Hackery done. Now start it. from proto.arp_responder import launch as arp_launch arp_launch(eat_packets=False,**{str(ip):True}) + import logging logging.getLogger("proto.arp_responder").setLevel(logging.WARN) - def _handle_ConnectionUp (event): global _dpid if _dpid is None: @@ -348,7 +494,10 @@ def _handle_ConnectionUp (event): else: if not core.hasComponent('iplb'): # Need to initialize first... - core.registerNew(iplb, event.connection, IPAddr(ip), servers) + + core.registerNew(iplb, event.connection, algorithm, + IPAddr(ip), weights, servers) + log.info("IP Load Balancer Ready.") log.info("Load Balancing on %s", event.connection) @@ -356,5 +505,27 @@ def _handle_ConnectionUp (event): core.iplb.con = event.connection event.connection.addListeners(core.iplb) + def _handle_FlowStatsReceived (event): + for f in event.stats: + ip_dst = f.match.nw_dst + ip_src = f.match.nw_src + if ip_dst != None and IPAddr(ip_dst) in core.iplb.servers: + core.iplb.data_transferred[IPAddr(ip_dst)] += f.byte_count + + if ip_src != None and IPAddr(ip_src) in core.iplb.servers: + core.iplb.data_transferred[IPAddr(ip_src)] += f.byte_count + + core.openflow.addListenerByName("FlowStatsReceived", _handle_FlowStatsReceived) core.openflow.addListenerByName("ConnectionUp", _handle_ConnectionUp) + + from pox.lib.recoco import Timer + + # Send the flow stats to all the switches connected to the controller. + def _timer_func (): + for connection in core.openflow._connections.values(): + connection.send(of.ofp_stats_request(body=of.ofp_flow_stats_request())) + + # Request flow stats every FLOW_IDLE_TIMEOUT second. + Timer(FLOW_IDLE_TIMEOUT, _timer_func, recurring=True) +