-
Notifications
You must be signed in to change notification settings - Fork 1
/
search.py
390 lines (301 loc) · 13.3 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
from frontier_and_explored import Frontier, Explored
from block_state import BlockState
import heapq
import re
import time
PERIOD_OF_TIME = 60
def bfs_search(initial_state: BlockState, goal_config):
"""BFS search"""
# initialize timer
start_time = time.time()
# initialize frontier and explored
frontier = Frontier().queue
frontier.append(initial_state)
explored = Explored().set
# frontier_configs is used just for searching and doesn't obstructs functionality
frontier_configs = set()
frontier_configs.add(initial_state.config)
# initialize metrics variable
nodes = 0
while frontier and time.time() - start_time < PERIOD_OF_TIME:
# pop the first state entered in frontier
state = frontier.popleft()
frontier_configs.remove(state.config)
explored.add(state.config)
# check if this state is goal state
if state.config == goal_config:
print("SUCCESS")
return state, nodes, state.cost, time.time() - start_time
# expand the state
state.expand()
nodes = nodes + 1
for child in state.children:
# check for duplicates in frontier and explored
if child.config not in explored and child.config not in frontier_configs:
# add child to frontier
frontier.append(child)
frontier_configs.add(child.config)
print('FAILURE')
exit()
def dfs_search(initial_state: BlockState, goal_config):
"""DFS search"""
# initialize timer
start_time = time.time()
# initialize frontier and explored
frontier = Frontier().stack
frontier.append(initial_state)
explored = Explored().set
# frontier_configs is used just for searching and doesn't obstructs functionality
frontier_configs = set()
frontier_configs.add(initial_state.config)
# initialize metrics variables
max_depth = 0
nodes = 0
while frontier and time.time() - start_time < PERIOD_OF_TIME:
# pop the first state the last state entered in frontier
state = frontier.pop()
frontier_configs.remove(state.config)
# check if state is explored
if state.config not in explored:
explored.add(state.config)
# update max depth
if max_depth < state.cost:
max_depth = state.cost
# check if this state is goal state
if state.config == goal_config:
print("SUCCESS")
return state, nodes, max_depth, time.time() - start_time
# expand the state
state.expand()
# reverse children to put it in frontier with the same priority as bfs
state.children = state.children[::-1]
nodes = nodes + 1
for child in state.children:
# check for duplicates in frontier and explored
if child.config not in frontier_configs:
# add child to frontier
frontier.append(child)
frontier_configs.add(child.config)
print('FAILURE')
exit()
def a_star_search(initial_state, goal_config):
"""A * search"""
start_time = time.time() # initialize timer
frontier = Frontier().heap # list of entries arranged in a heap
entry_finder = {} # mapping of states to entries
explored = Explored().set # a set of explored states
# calculate initial's states h cost and add g cost (which is 0 so no need to add it)
initial_state.f = h1(initial_state.config, goal_config)
# add initial state
add_state(initial_state, entry_finder, frontier)
# initialize metrics variable
nodes = 0
max_depth = 0
while frontier and time.time() - start_time < PERIOD_OF_TIME:
# pop the state with the smaller cost from frontier
state = pop_state(frontier, entry_finder)
# check if the state has been explored
if state.config not in explored:
explored.add(state.config)
# update max depth
if max_depth < state.cost:
max_depth = state.cost
# check if the state is goal state
if state.config == goal_config:
print("SUCCESS")
return state, nodes, max_depth, time.time() - start_time
# expand the state
state.expand()
nodes = nodes + 1
for child in state.children:
# calculate the cost f for child
child.f = child.cost + h3(child.config, goal_config)
# check for duplicates in frontier
if child.config not in entry_finder:
add_state(child, entry_finder, frontier)
# if child state is already in frontier update its cost if cost is less
elif child.f < entry_finder[child.config][0]:
# update the priority of an existing state
remove_state(child.config, entry_finder)
add_state(child, entry_finder, frontier)
print('FAILURE')
exit()
def best_first_search(initial_state, goal_config):
"""Best First search"""
start_time = time.time() # initialize timer
frontier = Frontier().heap # list of entries arranged in a heap
entry_finder = {} # mapping of states to entries
# calculate initial's states h cost
initial_state.f = h1(initial_state.config, goal_config)
# add initial state
add_state(initial_state, entry_finder, frontier)
explored = Explored().set
max_depth = 0
nodes = 0
while frontier and time.time() - start_time < PERIOD_OF_TIME:
# pop the state with the smaller cost from frontier
state = pop_state(frontier, entry_finder)
# check if the state has been explored
if state.config not in explored:
explored.add(state.config)
# update max depth
if max_depth < state.cost:
max_depth = state.cost
# check if the state is goal state
if state.config == goal_config:
print("SUCCESS")
return state, nodes, max_depth, time.time() - start_time
# expand the node
state.expand()
nodes = nodes + 1
for child in state.children:
# calculate the cost f for child
child.f = h2(child.config, goal_config)
# check for duplicates in frontier and frontier
if child.config not in entry_finder:
add_state(child, entry_finder, frontier)
# if child state is already in frontier update its cost if cost is less
elif child.f < entry_finder[child.config][0]:
# update the priority of an existing state
remove_state(child.config, entry_finder)
add_state(child, entry_finder, frontier)
print('FAILURE')
exit()
def add_state(state, entry_finder, frontier):
"""Add a new state """
entry = [state.f, state]
entry_finder[state.config] = entry
heapq.heappush(frontier, entry)
def remove_state(config, entry_finder):
"""Mark an existing state as REMOVED."""
entry = entry_finder.pop(config)
entry[-1] = '<removed-task>'
def pop_state(frontier, entry_finder):
"""Remove and return the lowest cost state."""
while frontier:
state = heapq.heappop(frontier)
if state[1] != '<removed-task>':
del entry_finder[state[1].config]
return state[1]
#
def h1(config, goal_config):
"""Heuristic 1 - this heuristic calculates the number of blocks that are currently not in the correct 'position'."""
cost = 0
index = 0
for cube in config:
if cube[1] != goal_config[index][1]:
cost += 1
index += 1
return cost
def h2(config, goal_config):
"""
Heuristic 2 - this heuristic is twice the number of blocks that must be moved once plus four times the number of
blocks that must be moved twice. A block that must be moved once is a block that is currently on a block
different to the block upon which it rests in the goal state or a block that has such a block somewhere below it
in the same pile. A block that must be moved twice is a block that is currently on the block upon which it must
be placed in the goal state, but that block is a block that must be moved or if there exists a block that must be
moved twice somewhere below it (in the same pile).
"""
index = 0
one_move_cubes = set()
two_move_cubes = set()
cubes_on_table = {}
for cube in config:
if cube[1] == -1:
cubes_on_table[index] = cube
index += 1
for index in cubes_on_table:
cube = cubes_on_table[index]
if cube[1] != goal_config[index][1]:
one_move_cubes.add(cube)
index_of_current_cube = cube[0]
while True:
if config[index_of_current_cube][0] == -1:
break
# check if cube is a block that must be moved once
if config[index_of_current_cube][1] != goal_config[index_of_current_cube][1]:
one_move_cubes.add(config[index_of_current_cube])
else:
# check if cube has a block that must be moved once somewhere below it
index_of_cube_bellow = config[index_of_current_cube][1]
while True:
if config[index_of_cube_bellow] in one_move_cubes:
one_move_cubes.add(config[index_of_current_cube])
break
elif config[index_of_cube_bellow][1] == -1:
break
index_of_cube_bellow = config[index_of_cube_bellow][1]
# check if cube must be moved twice
index_of_cube_bellow = config[index_of_current_cube][1]
if config[index_of_cube_bellow] in one_move_cubes or config[index_of_cube_bellow] in two_move_cubes:
two_move_cubes.add(config[index_of_current_cube])
one_move_cubes.discard(config[index_of_current_cube])
# check if cube has a block that must be moved twice somewhere below it
index_of_cube_bellow = config[index_of_cube_bellow][1]
while True:
if config[index_of_cube_bellow] in two_move_cubes:
two_move_cubes.add(config[index_of_current_cube])
break
elif config[index_of_cube_bellow][1] == -1:
break
index_of_cube_bellow = config[index_of_cube_bellow][1]
index_of_current_cube = config[index_of_current_cube][0]
return len(one_move_cubes) * 2 + len(two_move_cubes) * 4
def h3(config, goal_config):
"""Heuristic 3 - this heuristic is similar to Heuristic 1. It calculates the difference between the current state
and the goal state, but looks at the details of each block. If Block A in the goal state is supposed to be on top
of Block B and under Block C and in the current state it is neither on top of B nor under C, then we add 2 to the
heuristic and if it is either on top of B or under C we add 1. """
cost = 0
index = 0
for cube in config:
if cube[0] != goal_config[index][0] and cube[1] != goal_config[1]:
cost += 2
elif cube[0] != goal_config[index][0] or cube[1] != goal_config[1]:
cost += 1
index += 1
return cost
def calculate_path_to_goal(state):
"""calculate the path to goal"""
moves = []
while state.parent is not None:
moves.append(state.action)
state = state.parent
moves = moves[::-1]
return moves
def is_valid(state, moves, goal_config):
"""check if solution is valid"""
config = list(map(list, state.config))
objects = state.objects
for move in moves:
action = re.split("[(,)]", move)
# initialize
movedcube = objects.index(action[1])
prevplace = action[2]
currplace = action[3]
# if previous place is table change the state of current place cube unclear from above and the state of moved
# cube to above on current place cube
if prevplace == 'table':
if config[objects.index(currplace)][0] == -1:
config[movedcube][1] = objects.index(currplace)
config[objects.index(currplace)][0] = movedcube
else:
return False
# if else current place is table change the state of previous place to clear from above
# and the state of moved cube to on table
elif currplace == 'table':
if config[movedcube][0] == -1:
config[objects.index(prevplace)][0] = -1
config[movedcube][1] = -1
else:
return False
# else change the state of current place to bellow of moved cube , the state of previous place to clear
# and the state of moved cube to above current place
else:
if config[movedcube][0] == -1 and config[objects.index(currplace)][0] == -1:
config[objects.index(currplace)][0] = movedcube
config[objects.index(prevplace)][0] = -1
config[movedcube][1] = objects.index(currplace)
else:
return False
return tuple(map(tuple, config)) == goal_config