-
Notifications
You must be signed in to change notification settings - Fork 0
/
dp.py
123 lines (106 loc) · 4.48 KB
/
dp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
class DynamicProgramming:
"""
A dynamic programming calculator for shortest path question.
"""
def __init__(self):
# the reward mapping, 1st dimension means along x or y
# i.e. reward_map[0][0][1] means reward from (0, 1) to (0, 2), which is 9
# while reward_map[1][1][2] means reward from (1, 2) to (1, 3), which is 6
self.reward_map = np.array([
[[11, 9, 7],
[6, 5, 10],
[5, 10, 7],
[10, 6, 5]],
[[10, 8, 7],
[7, 9, 6],
[12, 8, 8],
[6, 9, 7]]
])
def shortest_path(self, x, y):
"""
:param x: the location of the start point along the x
:param y: the location of the start point along the y
:return:
"""
# the state value of the end point is 0.
if x == 0 and y == 0:
return 0
# when the start point is along the x axis, just retrospect its source along the x axis.
elif y == 0:
return self.shortest_path(x - 1, y) + self.reward_map[0][y][x - 1]
# when the start point is along the y axis, just retrospect its source along the y axis.
elif x == 0:
return self.shortest_path(x, y - 1) + self.reward_map[1][x][y - 1]
else:
return min(self.shortest_path(x - 1, y) + self.reward_map[0][y][x - 1],
self.shortest_path(x, y - 1) + self.reward_map[1][x][y - 1])
class Rollout_with_One_Step_lookahead_Minimization:
"""
实际上,rollout与上面dp算法的本质区别在于对state value的处理,dp是不停递归计算出来的精确解,而rollout则是在其rollout policy
i.e. nearest neighbor 计算出来的近似解。
"""
def __init__(self):
self.reward_map = np.array([
[[11, 9, 7],
[6, 5, 10],
[5, 10, 7],
[10, 6, 5]],
[[10, 8, 7],
[7, 9, 6],
[12, 8, 8],
[6, 9, 7]]
])
def nearest_neighbor(self, x, y):
"""
:param x: the location of the start point along the x
:param y: the location of the start point along the y
:return:
"""
if x == 0 and y == 0:
return 0
elif y == 0:
return self.nearest_neighbor(x - 1, y) + self.reward_map[0][y][x - 1]
elif x == 0:
return self.nearest_neighbor(x, y - 1) + self.reward_map[1][x][y - 1]
else:
minimum_index = np.argmin([self.reward_map[0][y][x - 1], self.reward_map[1][x][y - 1]], axis=0)
if minimum_index:
return self.nearest_neighbor(x, y - 1) + self.reward_map[1][x][y - 1]
else:
return self.nearest_neighbor(x - 1, y) + self.reward_map[0][y][x - 1]
def shortest_path(self, x, y):
"""
:param x:
:param y:
:return:
"""
print('x, y', x, y)
if x == 0 and y == 0:
return 0
# when the start point is along the x axis, just retrospect its source along the x axis.
elif y == 0:
return self.shortest_path(x - 1, y) + self.reward_map[0][y][x - 1]
# when the start point is along the y axis, just retrospect its source along the y axis.
elif x == 0:
return self.shortest_path(x, y - 1) + self.reward_map[1][x][y - 1]
else:
minimum_index = np.argmin([self.nearest_neighbor(x - 1, y) + self.reward_map[0][y][x - 1],
self.nearest_neighbor(x, y - 1) + self.reward_map[1][x][y - 1]], axis=0)
if minimum_index:
return self.shortest_path(x, y - 1) + + self.reward_map[1][x][y - 1]
else:
return self.shortest_path(x - 1, y) + self.reward_map[0][y][x - 1]
if __name__ == "__main__":
dp = DynamicProgramming()
rollout = Rollout_with_One_Step_lookahead_Minimization()
res = dp.shortest_path(3, 3)
# optimality is a matrix, where every node is a optimal solution.
optimality = []
for i in range(4):
for j in range(4):
optimality.append(dp.shortest_path(i, j))
print(optimality)
# 关于寻找路径,这边没有专门写一个demo,因为有点dirty,大致思想就是把最终的optimality打印出来,相减与reward相比
# 如果相等表示这条路段可行,最终把所有可行的路段连接起来形成路径
res_prime = rollout.shortest_path(3, 3)