-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathant_colony.py
58 lines (44 loc) · 1.98 KB
/
ant_colony.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import math
import threading
import numpy as np
from ant import Ant
class AntColony:
def __init__(self, graph, number_of_ants, number_of_generations):
self.number_of_ants = number_of_ants
self.graph = graph
self.number_of_generations = number_of_generations
self.best_path = []
self.best_cost = math.inf
self.results_lock = threading.Lock()
self.steps = [] # best paths in each generation
def run_ant(self, ant, pheromone_exponent, length_exponent):
cost = ant.go(pheromone_exponent, length_exponent)
self.add_result(ant.path, cost)
def add_result(self, path, cost):
with self.results_lock:
if cost < self.best_cost:
self.best_path = path
self.best_cost = cost
def evaporate_pheromones(self, evaporation_coefficient):
edges = self.graph.edges(data=True)
for edge in edges:
node1, node2, data = edge
old_pheromone = data['pheromone']
self.graph[node1][node2]['pheromone'] = (1 - evaporation_coefficient) * old_pheromone
def simulate(self, pheromone_exponent, length_exponent, evaporation_coefficient, pheromone_leaving_coefficient,
starting_node=0):
for generation_number in range(self.number_of_generations):
ants = []
threads = list()
for i in range(self.number_of_ants):
ants.append(Ant(i, self.graph, starting_node))
x = threading.Thread(target=self.run_ant, args=[ants[i], pheromone_exponent, length_exponent])
threads.append(x)
x.start()
for thread in threads:
thread.join()
self.evaporate_pheromones(evaporation_coefficient)
for ant in ants:
ant.leave_pheromones(pheromone_leaving_coefficient)
self.steps.append(np.array(self.best_path[:-1]))
return self.best_path[:-1], self.best_cost, self.steps