Skip to content

Commit

Permalink
🚧 WIP add get_enough_of_item hierarchical task
Browse files Browse the repository at this point in the history
  • Loading branch information
MathisFederico committed Jan 26, 2024
1 parent f105cb5 commit 1c2280d
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 37 deletions.
14 changes: 12 additions & 2 deletions src/hcraft/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,11 @@ def solving_behavior(self, task: "Task") -> "Behavior":
"""
return self.all_behaviors[task_to_behavior_name(task)]

def planning_problem(self, **kwargs) -> HcraftPlanningProblem:
def planning_problem(
self,
hierarchical: bool = False,
**kwargs,
) -> HcraftPlanningProblem:
"""Build this hcraft environment planning problem.
Returns:
Expand Down Expand Up @@ -530,7 +534,13 @@ def planning_problem(self, **kwargs) -> HcraftPlanningProblem:
assert env.purpose.is_terminated # Purpose is achieved
```
"""
return HcraftPlanningProblem(self.state, self.name, self.purpose, **kwargs)
return HcraftPlanningProblem(
self.state,
self.name,
self.purpose,
hierarchical=hierarchical,
**kwargs,
)

def _step_output(self, reward: float, terminated: bool, truncated: bool):
infos = {
Expand Down
205 changes: 174 additions & 31 deletions src/hcraft/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
from hcraft.transformation import Transformation, InventoryOwner
from hcraft.task import Task, GetItemTask, PlaceItemTask, GoToZoneTask
from hcraft.purpose import Purpose
from hcraft.elements import Zone, Item
from hcraft.elements import Stack, Zone, Item

# unified_planning is an optional dependency.
UPF_AVAILABLE = True
Expand All @@ -91,6 +91,9 @@
from unified_planning.plans import SequentialPlan
from unified_planning.engines.results import PlanGenerationResult
from unified_planning.model.problem import Problem
from unified_planning.model.htn import HierarchicalProblem
from unified_planning.model.htn.task import Task as UpfHTask
from unified_planning.model.htn.method import Method

UserType = ups.UserType
Object = ups.Object
Expand All @@ -111,6 +114,7 @@

if TYPE_CHECKING:
from hcraft.state import HcraftState
from hcraft.world import World

Statistics = Dict[str, Union[int, float]]

Expand All @@ -123,6 +127,7 @@ def __init__(
state: "HcraftState",
name: str,
purpose: Optional["Purpose"],
hierarchical: bool,
timeout: float = 60,
planner_name: Optional[str] = None,
) -> None:
Expand All @@ -140,7 +145,14 @@ def __init__(
"Missing planning dependencies. Install with:\n"
"pip install hcraft[planning]"
)
self.upf_problem: "Problem" = self._init_problem(state, name, purpose)
if hierarchical:
self.upf_problem = self._init_hierarchical_problem(
state=state, name=name, purpose=purpose
)
else:
self.upf_problem = self._init_flat_problem(
state=state, name=name, purpose=purpose
)
self.plan: Optional["SequentialPlan"] = None
self.plans: List["SequentialPlan"] = []
self.stats: List["Statistics"] = []
Expand Down Expand Up @@ -212,14 +224,19 @@ def solve(self) -> "PlanGenerationResult":
self.upf_problem, timeout=self.timeout
)
if results.plan is None:
raise ValueError("Not plan could be found for this problem.")
raise ValueError(
"Not plan could be found for this problem. Status: %s", results.status
)
self.plan = deepcopy(results.plan)
self.plans.append(deepcopy(results.plan))
self.stats.append(_read_statistics(results))
return results

def _init_problem(
self, state: "HcraftState", name: str, purpose: Optional["Purpose"]
def _init_flat_problem(
self,
state: "HcraftState",
name: str,
purpose: Optional["Purpose"],
) -> "Problem":
"""Build a unified planning problem from the given world and purpose.
Expand All @@ -233,28 +250,125 @@ def _init_problem(
Problem: Unified planning problem.
"""
upf_problem = Problem(name)
self.zone_type = UserType("zone")
self.player_item_type = UserType("player_item")
self.zone_item_type = UserType("zone_item")
self._add_base_flat_problem(upf_problem, state.world)
if purpose is not None and purpose.terminal_groups:
upf_problem.add_goal(self._purpose_to_goal(purpose))
self.update_problem_to_state(upf_problem, state)
return upf_problem

self.zones_obj: Dict[Zone, "Object"] = {}
for zone in state.world.zones:
self.zones_obj[zone] = Object(zone.name, self.zone_type)
def _init_hierarchical_problem(
self,
state: "HcraftState",
name: str,
purpose: Optional["Purpose"],
):
upf_problem = HierarchicalProblem(name)
self._add_base_flat_problem(upf_problem, state.world)
self.update_problem_to_state(upf_problem, state)
self._add_tasks_to_hierarchical_problem(upf_problem, state.world)
# if purpose is not None and purpose.terminal_groups:
# upf_problem.add_goal(self._purpose_to_goal(purpose))
self._add_init_task_network_from_purpose(upf_problem, purpose)
return upf_problem

self.items_obj: Dict[Item, "Object"] = {}
for item in state.world.items:
self.items_obj[item] = Object(item.name, self.player_item_type)
def _add_init_task_network_from_purpose(
self, hproblem: "HierarchicalProblem", purpose: "Purpose"
):
for task in purpose.best_terminal_group.tasks:
if isinstance(task, GetItemTask):
stack = task.item_stack
hproblem.task_network.add_subtask(
self.get_enough_of_item_task[stack.item],
stack.quantity,
ident=task.name,
)
else:
raise NotImplementedError

self.zone_items_obj: Dict[Item, "Object"] = {}
for item in state.world.zones_items:
self.zone_items_obj[item] = Object(
f"{item.name}_in_zone", self.zone_item_type
def _add_tasks_to_hierarchical_problem(
self, hproblem: "HierarchicalProblem", world: "World"
):
# Get enough of item tasks
self.get_enough_of_item_task: Dict[Item, "UpfHTask"] = {}
for item in world.items:
task = hproblem.add_task(f"get-enough-of-{item.name}", quantity=IntType())
self.get_enough_of_item_task[item] = task

# Noop method
noop_method = Method(f"has-enough-of-{item.name}", quantity=IntType())
noop_method.add_precondition(
GE(self.amount(self.items_obj[item]), noop_method.quantity)
)
noop_method.set_task(task)
hproblem.add_method(noop_method)

# Each transformation giving item
for t_id, transfo in enumerate(world.transformations):
production = transfo.get_changes("player", "add", [])
production_items = [stack.item for stack in production]
if item in production_items:
stack = production[production_items.index(item)]
transfo_action = self.transformation_action[t_id]
execute_method = self._add_execute_transformation_method(
transfo_action, task, stack, transfo.name
)
hproblem.add_method(execute_method)

def _add_execute_transformation_method(
self,
transfo_action: "InstantaneousAction",
task: "UpfHTask",
stack: "Stack",
transformation_name: str,
) -> List["Method"]:
need_loc = "loc" in [param.name for param in transfo_action.parameters]

# Execute
method_kwargs = {"quantity": IntType()}
if need_loc:
method_kwargs.update(loc=self.zone_type)
method = Method(f"execute-{transformation_name}", **method_kwargs)

for precondition in transfo_action.preconditions:
method.add_precondition(precondition)

get_nearly_enough = method.add_subtask(
self.get_enough_of_item_task[stack.item],
method.quantity - stack.quantity,
)

upf_problem.add_objects(self.zones_obj.values())
upf_problem.add_objects(self.items_obj.values())
upf_problem.add_objects(self.zone_items_obj.values())
transfo_args = [method.loc] if need_loc else []
execute = method.add_subtask(transfo_action, *transfo_args)
method.set_ordered([get_nearly_enough, execute])
method.set_task(task, method.quantity)

return method

def _add_base_flat_problem(
self, upf_problem: Union["Problem", "HierarchicalProblem"], world: "World"
):
self._add_objects_to_problem(
upf_problem, world.zones, world.items, world.zones_items
)
self._add_fluents_to_problem(upf_problem)
self._add_actions_to_problem(upf_problem, world.transformations)

def _add_actions_to_problem(
self,
upf_problem: Union["Problem", "HierarchicalProblem"],
transformations: List["Transformation"],
):
self.transformation_action: Dict[int, "InstantaneousAction"] = {}
for t_id, transfo in enumerate(transformations):
self.transformation_action[t_id] = self._action_from_transformation(
transfo, t_id
)

upf_problem.add_actions(self.transformation_action.values())

def _add_fluents_to_problem(
self, upf_problem: Union["Problem", "HierarchicalProblem"]
):
self.pos = Fluent("pos", BoolType(), zone=self.zone_type)
self.visited = Fluent("visited", BoolType(), zone=self.zone_type)
self.amount = Fluent("amount", IntType(), item=self.player_item_type)
Expand All @@ -267,25 +381,48 @@ def _init_problem(
upf_problem.add_fluent(self.amount, default_initial_value=0)
upf_problem.add_fluent(self.amount_at, default_initial_value=0)

actions = []
for t_id, transfo in enumerate(state.world.transformations):
actions.append(self._action_from_transformation(transfo, t_id))
def _add_objects_to_problem(
self,
upf_problem: Union["Problem", "HierarchicalProblem"],
zones: List[Zone],
items: List[Item],
zones_items: List[Item],
):
self.zone_type = UserType("zone")
self.player_item_type = UserType("player_item")
self.zone_item_type = UserType("zone_item")

self.zones_obj: Dict[Zone, "Object"] = {}
for zone in zones:
self.zones_obj[zone] = Object(zone.name, self.zone_type)

upf_problem.add_actions(actions)
self.items_obj: Dict[Item, "Object"] = {}
for item in items:
self.items_obj[item] = Object(item.name, self.player_item_type)

if purpose is not None and purpose.terminal_groups:
upf_problem.add_goal(self._purpose_to_goal(purpose))
self.zone_items_obj: Dict[Item, "Object"] = {}
for item in zones_items:
self.zone_items_obj[item] = Object(
f"{item.name}_in_zone", self.zone_item_type
)

self.update_problem_to_state(upf_problem, state)
return upf_problem
upf_problem.add_objects(self.zones_obj.values())
upf_problem.add_objects(self.items_obj.values())
upf_problem.add_objects(self.zone_items_obj.values())

def _action_from_transformation(
self, transformation: "Transformation", transformation_id: int
) -> "InstantaneousAction":
action_name = f"{transformation_id}_{transformation.name}"
action_name = self._name_from_transformation(transformation, transformation_id)
action = InstantaneousAction(action_name)
loc = None
if len(self.zones_obj) > 0:

if (
len(transformation.produced_zones_items) > 0
or len(transformation.consumed_zones_items) > 0
or transformation.zone is not None
or transformation.destination is not None
):
action = InstantaneousAction(action_name, loc=self.zone_type)
loc = action.parameter("loc")
action.add_precondition(self.pos(loc))
Expand All @@ -306,6 +443,12 @@ def _action_from_transformation(
self._add_current_zone_operations(action, transformation, loc)
return action

@staticmethod
def _name_from_transformation(
transformation: "Transformation", transformation_id: int
):
return f"{transformation_id}_{transformation.name}"

def _add_player_operation(
self,
action: "InstantaneousAction",
Expand Down
Loading

0 comments on commit 1c2280d

Please sign in to comment.