From 1c2280d942f61987dd480d6c6918145eaded6375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Math=C3=AFs=20F=C3=A9d=C3=A9rico?= Date: Fri, 26 Jan 2024 15:06:22 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20WIP=20add=20get=5Fenough=5Fof=5F?= =?UTF-8?q?item=20hierarchical=20task?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hcraft/env.py | 14 ++- src/hcraft/planning.py | 205 ++++++++++++++++++++++++++----- tests/planning/test_dummy_env.py | 70 ++++++++++- 3 files changed, 252 insertions(+), 37 deletions(-) diff --git a/src/hcraft/env.py b/src/hcraft/env.py index fea541e1..c8fe5c06 100644 --- a/src/hcraft/env.py +++ b/src/hcraft/env.py @@ -502,7 +502,11 @@ def solving_behavior(self, task: "Task") -> "Behavior": """ return self.all_behaviors[task_to_behavior_name(task)] - def planning_problem(self, **kwargs) -> HcraftPlanningProblem: + def planning_problem( + self, + hierarchical: bool = False, + **kwargs, + ) -> HcraftPlanningProblem: """Build this hcraft environment planning problem. Returns: @@ -530,7 +534,13 @@ def planning_problem(self, **kwargs) -> HcraftPlanningProblem: assert env.purpose.is_terminated # Purpose is achieved ``` """ - return HcraftPlanningProblem(self.state, self.name, self.purpose, **kwargs) + return HcraftPlanningProblem( + self.state, + self.name, + self.purpose, + hierarchical=hierarchical, + **kwargs, + ) def _step_output(self, reward: float, terminated: bool, truncated: bool): infos = { diff --git a/src/hcraft/planning.py b/src/hcraft/planning.py index b88c0589..72bec4df 100644 --- a/src/hcraft/planning.py +++ b/src/hcraft/planning.py @@ -82,7 +82,7 @@ from hcraft.transformation import Transformation, InventoryOwner from hcraft.task import Task, GetItemTask, PlaceItemTask, GoToZoneTask from hcraft.purpose import Purpose -from hcraft.elements import Zone, Item +from hcraft.elements import Stack, Zone, Item # unified_planning is an optional dependency. UPF_AVAILABLE = True @@ -91,6 +91,9 @@ from unified_planning.plans import SequentialPlan from unified_planning.engines.results import PlanGenerationResult from unified_planning.model.problem import Problem + from unified_planning.model.htn import HierarchicalProblem + from unified_planning.model.htn.task import Task as UpfHTask + from unified_planning.model.htn.method import Method UserType = ups.UserType Object = ups.Object @@ -111,6 +114,7 @@ if TYPE_CHECKING: from hcraft.state import HcraftState + from hcraft.world import World Statistics = Dict[str, Union[int, float]] @@ -123,6 +127,7 @@ def __init__( state: "HcraftState", name: str, purpose: Optional["Purpose"], + hierarchical: bool, timeout: float = 60, planner_name: Optional[str] = None, ) -> None: @@ -140,7 +145,14 @@ def __init__( "Missing planning dependencies. Install with:\n" "pip install hcraft[planning]" ) - self.upf_problem: "Problem" = self._init_problem(state, name, purpose) + if hierarchical: + self.upf_problem = self._init_hierarchical_problem( + state=state, name=name, purpose=purpose + ) + else: + self.upf_problem = self._init_flat_problem( + state=state, name=name, purpose=purpose + ) self.plan: Optional["SequentialPlan"] = None self.plans: List["SequentialPlan"] = [] self.stats: List["Statistics"] = [] @@ -212,14 +224,19 @@ def solve(self) -> "PlanGenerationResult": self.upf_problem, timeout=self.timeout ) if results.plan is None: - raise ValueError("Not plan could be found for this problem.") + raise ValueError( + "Not plan could be found for this problem. Status: %s", results.status + ) self.plan = deepcopy(results.plan) self.plans.append(deepcopy(results.plan)) self.stats.append(_read_statistics(results)) return results - def _init_problem( - self, state: "HcraftState", name: str, purpose: Optional["Purpose"] + def _init_flat_problem( + self, + state: "HcraftState", + name: str, + purpose: Optional["Purpose"], ) -> "Problem": """Build a unified planning problem from the given world and purpose. @@ -233,28 +250,125 @@ def _init_problem( Problem: Unified planning problem. """ upf_problem = Problem(name) - self.zone_type = UserType("zone") - self.player_item_type = UserType("player_item") - self.zone_item_type = UserType("zone_item") + self._add_base_flat_problem(upf_problem, state.world) + if purpose is not None and purpose.terminal_groups: + upf_problem.add_goal(self._purpose_to_goal(purpose)) + self.update_problem_to_state(upf_problem, state) + return upf_problem - self.zones_obj: Dict[Zone, "Object"] = {} - for zone in state.world.zones: - self.zones_obj[zone] = Object(zone.name, self.zone_type) + def _init_hierarchical_problem( + self, + state: "HcraftState", + name: str, + purpose: Optional["Purpose"], + ): + upf_problem = HierarchicalProblem(name) + self._add_base_flat_problem(upf_problem, state.world) + self.update_problem_to_state(upf_problem, state) + self._add_tasks_to_hierarchical_problem(upf_problem, state.world) + # if purpose is not None and purpose.terminal_groups: + # upf_problem.add_goal(self._purpose_to_goal(purpose)) + self._add_init_task_network_from_purpose(upf_problem, purpose) + return upf_problem - self.items_obj: Dict[Item, "Object"] = {} - for item in state.world.items: - self.items_obj[item] = Object(item.name, self.player_item_type) + def _add_init_task_network_from_purpose( + self, hproblem: "HierarchicalProblem", purpose: "Purpose" + ): + for task in purpose.best_terminal_group.tasks: + if isinstance(task, GetItemTask): + stack = task.item_stack + hproblem.task_network.add_subtask( + self.get_enough_of_item_task[stack.item], + stack.quantity, + ident=task.name, + ) + else: + raise NotImplementedError - self.zone_items_obj: Dict[Item, "Object"] = {} - for item in state.world.zones_items: - self.zone_items_obj[item] = Object( - f"{item.name}_in_zone", self.zone_item_type + def _add_tasks_to_hierarchical_problem( + self, hproblem: "HierarchicalProblem", world: "World" + ): + # Get enough of item tasks + self.get_enough_of_item_task: Dict[Item, "UpfHTask"] = {} + for item in world.items: + task = hproblem.add_task(f"get-enough-of-{item.name}", quantity=IntType()) + self.get_enough_of_item_task[item] = task + + # Noop method + noop_method = Method(f"has-enough-of-{item.name}", quantity=IntType()) + noop_method.add_precondition( + GE(self.amount(self.items_obj[item]), noop_method.quantity) ) + noop_method.set_task(task) + hproblem.add_method(noop_method) + + # Each transformation giving item + for t_id, transfo in enumerate(world.transformations): + production = transfo.get_changes("player", "add", []) + production_items = [stack.item for stack in production] + if item in production_items: + stack = production[production_items.index(item)] + transfo_action = self.transformation_action[t_id] + execute_method = self._add_execute_transformation_method( + transfo_action, task, stack, transfo.name + ) + hproblem.add_method(execute_method) + + def _add_execute_transformation_method( + self, + transfo_action: "InstantaneousAction", + task: "UpfHTask", + stack: "Stack", + transformation_name: str, + ) -> List["Method"]: + need_loc = "loc" in [param.name for param in transfo_action.parameters] + + # Execute + method_kwargs = {"quantity": IntType()} + if need_loc: + method_kwargs.update(loc=self.zone_type) + method = Method(f"execute-{transformation_name}", **method_kwargs) + + for precondition in transfo_action.preconditions: + method.add_precondition(precondition) + + get_nearly_enough = method.add_subtask( + self.get_enough_of_item_task[stack.item], + method.quantity - stack.quantity, + ) - upf_problem.add_objects(self.zones_obj.values()) - upf_problem.add_objects(self.items_obj.values()) - upf_problem.add_objects(self.zone_items_obj.values()) + transfo_args = [method.loc] if need_loc else [] + execute = method.add_subtask(transfo_action, *transfo_args) + method.set_ordered([get_nearly_enough, execute]) + method.set_task(task, method.quantity) + + return method + + def _add_base_flat_problem( + self, upf_problem: Union["Problem", "HierarchicalProblem"], world: "World" + ): + self._add_objects_to_problem( + upf_problem, world.zones, world.items, world.zones_items + ) + self._add_fluents_to_problem(upf_problem) + self._add_actions_to_problem(upf_problem, world.transformations) + + def _add_actions_to_problem( + self, + upf_problem: Union["Problem", "HierarchicalProblem"], + transformations: List["Transformation"], + ): + self.transformation_action: Dict[int, "InstantaneousAction"] = {} + for t_id, transfo in enumerate(transformations): + self.transformation_action[t_id] = self._action_from_transformation( + transfo, t_id + ) + + upf_problem.add_actions(self.transformation_action.values()) + def _add_fluents_to_problem( + self, upf_problem: Union["Problem", "HierarchicalProblem"] + ): self.pos = Fluent("pos", BoolType(), zone=self.zone_type) self.visited = Fluent("visited", BoolType(), zone=self.zone_type) self.amount = Fluent("amount", IntType(), item=self.player_item_type) @@ -267,25 +381,48 @@ def _init_problem( upf_problem.add_fluent(self.amount, default_initial_value=0) upf_problem.add_fluent(self.amount_at, default_initial_value=0) - actions = [] - for t_id, transfo in enumerate(state.world.transformations): - actions.append(self._action_from_transformation(transfo, t_id)) + def _add_objects_to_problem( + self, + upf_problem: Union["Problem", "HierarchicalProblem"], + zones: List[Zone], + items: List[Item], + zones_items: List[Item], + ): + self.zone_type = UserType("zone") + self.player_item_type = UserType("player_item") + self.zone_item_type = UserType("zone_item") + + self.zones_obj: Dict[Zone, "Object"] = {} + for zone in zones: + self.zones_obj[zone] = Object(zone.name, self.zone_type) - upf_problem.add_actions(actions) + self.items_obj: Dict[Item, "Object"] = {} + for item in items: + self.items_obj[item] = Object(item.name, self.player_item_type) - if purpose is not None and purpose.terminal_groups: - upf_problem.add_goal(self._purpose_to_goal(purpose)) + self.zone_items_obj: Dict[Item, "Object"] = {} + for item in zones_items: + self.zone_items_obj[item] = Object( + f"{item.name}_in_zone", self.zone_item_type + ) - self.update_problem_to_state(upf_problem, state) - return upf_problem + upf_problem.add_objects(self.zones_obj.values()) + upf_problem.add_objects(self.items_obj.values()) + upf_problem.add_objects(self.zone_items_obj.values()) def _action_from_transformation( self, transformation: "Transformation", transformation_id: int ) -> "InstantaneousAction": - action_name = f"{transformation_id}_{transformation.name}" + action_name = self._name_from_transformation(transformation, transformation_id) action = InstantaneousAction(action_name) loc = None - if len(self.zones_obj) > 0: + + if ( + len(transformation.produced_zones_items) > 0 + or len(transformation.consumed_zones_items) > 0 + or transformation.zone is not None + or transformation.destination is not None + ): action = InstantaneousAction(action_name, loc=self.zone_type) loc = action.parameter("loc") action.add_precondition(self.pos(loc)) @@ -306,6 +443,12 @@ def _action_from_transformation( self._add_current_zone_operations(action, transformation, loc) return action + @staticmethod + def _name_from_transformation( + transformation: "Transformation", transformation_id: int + ): + return f"{transformation_id}_{transformation.name}" + def _add_player_operation( self, action: "InstantaneousAction", diff --git a/tests/planning/test_dummy_env.py b/tests/planning/test_dummy_env.py index 06e661ad..172ed44c 100644 --- a/tests/planning/test_dummy_env.py +++ b/tests/planning/test_dummy_env.py @@ -1,12 +1,16 @@ -from hcraft.task import PlaceItemTask +from typing import TYPE_CHECKING +from hcraft.task import GetItemTask, PlaceItemTask from hcraft.purpose import Purpose from hcraft.elements import Item, Zone, Stack from tests.envs import classic_env import pytest +if TYPE_CHECKING: + import unified_planning.model.htn as htn -def test_hcraft_classic(): + +def test_classic_flat(): pytest.importorskip("unified_planning") env, _, named_transformations, start_zone, items, zones, zones_items = classic_env() @@ -19,10 +23,68 @@ def test_hcraft_classic(): problem.solve() expected_plan = [ - "1_search_wood(start)", "0_move_to_other_zone(start)", - "3_craft_plank(other_zone)", + "1_search_wood", + "3_craft_plank", "4_craft_table(other_zone)", ] assert str(expected_plan).replace("'", "") in repr(problem.plan) + + +class TestClassicHierarchical: + @pytest.fixture(autouse=True) + def setup(self): + pytest.importorskip("unified_planning") + ( + self.env, + _, + named_transformations, + start_zone, + items, + zones, + zones_items, + ) = classic_env() + + def test_get_item_no_precondition_multiple(self): + task = GetItemTask(Stack(Item("wood"), 3)) + self.env.purpose = Purpose(task) + + problem = self.env.planning_problem(hierarchical=True) + + htn: "htn" = pytest.importorskip("unified_planning.model.htn") + assert isinstance(problem.upf_problem, htn.HierarchicalProblem) + + print(problem.upf_problem) + + problem.solve() + + expected_plan = [ + "1_search_wood", + "1_search_wood", + "1_search_wood", + ] + + assert str(expected_plan).replace("'", "") in repr(problem.plan.actions) + + # def test_place_item_in_zone(self): + # task = PlaceItemTask(Stack(Item("table"), 1), Zone("other_zone")) + # self.env.purpose = Purpose(task) + + # problem = self.env.planning_problem(hierarchical=True) + + # htn: "htn" = pytest.importorskip("unified_planning.model.htn") + # assert isinstance(problem.upf_problem, htn.HierarchicalProblem) + + # print(problem.upf_problem) + + # problem.solve() + + # expected_plan = [ + # "1_search_wood(start)", + # "0_move_to_other_zone(start)", + # "3_craft_plank(other_zone)", + # "4_craft_table(other_zone)", + # ] + + # assert str(expected_plan).replace("'", "") in repr(problem.plan.actions)