diff --git a/Course3/Notes/reinforcement_learning.md b/Course3/Notes/reinforcement_learning.md index c7d0933..a60a1ce 100644 --- a/Course3/Notes/reinforcement_learning.md +++ b/Course3/Notes/reinforcement_learning.md @@ -127,6 +127,7 @@ you guess it, and then you update it based on the Bellman equation. So assuming first y is y1 corresponding to result from s1 and a1: $y_1 = R(s_1) + \gamma \max_{a'} Q(s_1', a')$ +h $y_2 = R(s_2) + \gamma \max_{a'} Q(s_2', a')$ To train neural network, we take training sample of x data, where y are just @@ -147,5 +148,13 @@ This iteratively improves the Q function, making the NN a good estimate of Q(s, One could imagine creating agents that start at random, and start improving, but we pick only the ones that improve the most and add a few random evolutions to the mix. This is called a genetic algorithm. -### Algorithm refinements +### Algorithmic Instability +The practice of training a neural network to approximate Q(s, a) is unstable and +prone to oscillations and instabilities. + +However, there are a few approaches to make it more stable. The lab starts with two: +**Target Network** and **Experience Replay**. +Target Network is a technique in which we use two networks instead of one. +The first network is the one we are training, and the second network is the one we use to compute the target values. +The target values are computed using the second network, and the loss is computed using the first network. The second network is updated to match the first network every N steps. Furthermore, the target network update is damped, which is also called **soft update**. Experience Replay is a technique in which we store the training samples in a buffer and sample from the buffer to train the network. This helps to avoid overfitting to the most recent samples. diff --git a/Course3/lunar_lander_lab/lander.ipynb b/Course3/lunar_lander_lab/lander.ipynb new file mode 100644 index 0000000..ec4d1bc --- /dev/null +++ b/Course3/lunar_lander_lab/lander.ipynb @@ -0,0 +1,280 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"The lunar lander lab uses the Actor Environment formalism, where\n", + "the actor takes an action and the environment evaluates the action\n", + "to get a new observation state and reward. These files are my attempt\n", + "at constructing a readable and extensible codebase around the lander\n", + "and formalism. I have not actually implemented the lab, but the point\n", + "of this exercsie was to think of and explore ways to productionalize\n", + "code for data science.\n", + "\n", + "The code is a bit over-abstracted on purpose. To provide flexibility and\n", + "to test the abstractions which I may want to use. \n", + "\"\"\"\n", + "from enum import Enum\n", + "from dataclasses import dataclass\n", + "from typing import Collection, Callable, Protocol, TypeAlias\n", + "\n", + "class Action(Enum):\n", + " do_nothing = 0\n", + " fire_main_engine = 1\n", + " fire_left_engine = 2\n", + " fire_right_engine = 3\n", + "\n", + "\n", + "\n", + "@dataclass\n", + "class State:\n", + " \"\"\"Observation state of the lunar lander\"\"\"\n", + " x: float = 0\n", + " y: float = 0\n", + " x_velocity: float = 0\n", + " y_velocity: float = 0\n", + " angle: float = 0\n", + " angular_velocity: float = 0\n", + " left_leg_contact: bool = False\n", + " right_leg_contact: bool = False\n", + "\n", + " def step(self) -> None:\n", + " \"\"\"Step the state forward in time\"\"\"\n", + " self.y += self.y_velocity\n", + " self.x += self.x_velocity\n", + " self.angle += self.angular_velocity\n", + "\n", + "# This is the ideal state we want to reach\n", + "# However we could still be successful if we land but at an angle\n", + "# or have some residual velocity...\n", + "desired_state = State(\n", + " x=0,\n", + " y=0,\n", + " x_velocity=0,\n", + " y_velocity=0,\n", + " angle=0,\n", + " angular_velocity=0,\n", + " left_leg_contact=True,\n", + " right_leg_contact=True\n", + ")\n", + "\n", + "SurfaceFunction: TypeAlias = Callable[[float], float]\n", + "\n", + "def flat_surface(x: float) -> float:\n", + " \"\"\"A flat surface function\"\"\"\n", + " return 0.2\n", + "\n", + "class BoundaryStates(Enum):\n", + " \"\"\"States that are considered boundary conditions or the default\"\"\"\n", + " flying = 0 # default state\n", + " landed = 0\n", + " crashed = 1\n", + " left_screen = 2\n", + "\n", + "class BoundsCheck(Protocol):\n", + " \"\"\"Protocol for determining failure or success states,\n", + " which can be thought of as boundary conditions on the state space.\"\"\"\n", + " def __call__(self, state: State) -> BoundaryStates:\n", + " ...\n", + "\n", + "@dataclass\n", + "class MoonBounds(BoundsCheck):\n", + " \"\"\"Bounds of the moon\"\"\"\n", + " surface_func: SurfaceFunction = flat_surface\n", + " desired_state: State = desired_state\n", + "\n", + " def __call__(self, state: State) -> BoundaryStates:\n", + " \"\"\"Get the boundary condition for the current state\"\"\"\n", + " if self.crashed(state):\n", + " return BoundaryStates.crashed\n", + " if self.left_screen(state):\n", + " return BoundaryStates.left_screen\n", + " if self.landed(state):\n", + " return BoundaryStates.landed\n", + " return BoundaryStates.flying\n", + " \n", + " def landed(self, state: State) -> bool:\n", + " \"\"\"Whether we (safely) landed. Unsafe landing is when we land\n", + " at too much of an angle and/or with too much velocity. Note:\n", + " Currently this is treated just like not landing at all.\"\"\"\n", + " desired_state = self.desired_state\n", + " current_state = state\n", + " return current_state.x == desired_state.x and \\\n", + " current_state.y == desired_state.y and \\\n", + " current_state.left_leg_contact == desired_state.left_leg_contact and \\\n", + " current_state.right_leg_contact == desired_state.right_leg_contact and \\\n", + " current_state.x_velocity <= desired_state.x_velocity and \\\n", + " current_state.y_velocity <= desired_state.y_velocity and \\\n", + " current_state.angle <= abs(desired_state.angle) and \\\n", + " current_state.angular_velocity <= desired_state.angular_velocity\n", + "\n", + " \n", + " def crashed(self, state: State) -> bool:\n", + " \"\"\"Whether we crashed. We crash if we hit the moon surface.\n", + " The surface is defined by a function that takes the x coordinate\n", + " and returns the y coordinate of the surface.\"\"\"\n", + " return state.y <= self.surface_func(state.x)\n", + " \n", + " def left_screen(self, state: State) -> bool:\n", + " \"\"\"Whether we are still in bounds. We are out of bounds if we\n", + " are outside of the x bounds of the screen.\"\"\"\n", + " return 0 <= state.x <= 1\n", + " \n", + "\n", + "\n", + "class RewardAssignment(Protocol):\n", + " \"\"\"Protocol for assigning rewards to states. Allowing for different\n", + " reward functions both for different states and for boundary conditions\n", + " (landed, crashed, left screen, etc.)\"\"\"\n", + " def __call__(self, state: State, boundary_state: BoundaryStates) -> float:\n", + " ...\n", + " \n", + "@dataclass\n", + "class Reward:\n", + " \"\"\"Since the reward function is coupled to the boundary conditions\n", + " and state, define a class that takes in state and bounds, and provides\n", + " a callable as the overall reward function, implementing specifics as\n", + " needed.\"\"\"\n", + " observation_state_reward: RewardAssignment\n", + " collision_penalty: float = -100\n", + " screen_penalty: float = -100\n", + " done_reward: float = 100\n", + "\n", + " def __call__(self, state: State, boundary_state: BoundaryStates = BoundaryStates.flying) -> float:\n", + " \"\"\"Get the reward for the current state\"\"\"\n", + "\n", + " # Assuming failure boundary condition rewards invalidate other\n", + " # state dependent rewards.\n", + " if boundary_state is BoundaryStates.crashed:\n", + " return self.collision_penalty\n", + " if boundary_state is BoundaryStates.left_screen:\n", + " return self.screen_penalty\n", + " reward: float = 0\n", + " if boundary_state is BoundaryStates.landed:\n", + " reward += self.done_reward\n", + "\n", + " # @TODO: implement flying observation state dependent rewards\n", + " reward += self.observation_state_reward(state, boundary_state)\n", + " return reward \n", + " \n", + "@dataclass\n", + "class EngineActions:\n", + " \"\"\"State of the actions being taken for the engines, (here\n", + " we make no assumption of one action at a time.)\"\"\"\n", + " main: bool = False\n", + " left: bool = False\n", + " right: bool = False\n", + " \n", + " def get_actions(self) -> set[Action]:\n", + " \"\"\"Get the actions that are currently being taken, we use\n", + " set since order must not matter.\"\"\"\n", + " actions: set[Action] = set()\n", + " if self.main:\n", + " actions.add(Action.fire_main_engine)\n", + " if self.left:\n", + " actions.add(Action.fire_left_engine)\n", + " if self.right:\n", + " actions.add(Action.fire_right_engine)\n", + " if not actions: # if we are not doing anything...\n", + " actions.add(Action.do_nothing)\n", + " return actions\n", + " \n", + "\n", + "class Policy(Protocol):\n", + " \"\"\"Protocol for defining policies\"\"\"\n", + " def __call__(self, state: State) -> Action:\n", + " ...\n", + "\n", + "class StateAction(Protocol):\n", + " \"\"\"Protocol for defining state modification based on action\"\"\"\n", + " def __call__(self, state: State, action: Action) -> State:\n", + " ...\n", + "\n", + "\n", + "\n", + "def modify_state_with_action(state: State, action: Action) -> State:\n", + " \"\"\"Modify the state with the given action (in place modification)\"\"\"\n", + " match action:\n", + " case Action.do_nothing:\n", + " pass\n", + " case Action.fire_main_engine:\n", + " state.y_velocity += 0.1\n", + " case Action.fire_left_engine:\n", + " state.x_velocity -= 0.05\n", + " state.angular_velocity -= 0.05\n", + " case Action.fire_right_engine:\n", + " state.x_velocity += 0.05\n", + " state.angular_velocity += 0.05\n", + " gravity = -0.00 # Assuming negligible gravity\n", + " state.y_velocity += gravity\n", + " state.step()\n", + " return state\n", + "\n", + "\n", + "\n", + "@dataclass\n", + "class Agent:\n", + " \"\"\"We've leaked the abstraction a bit, as we communicate over state\n", + " instead of actions.\"\"\"\n", + " previous_action: Action = Action.do_nothing\n", + " current_state: State = State()\n", + " policy: str = \"SimplePolicy\" # TODO: implement policy\n", + " state_action: StateAction = modify_state_with_action\n", + "\n", + " def take_action(self, action: Action) -> State:\n", + " \"\"\"Take an action in the environment\"\"\"\n", + " # Technically we modify in place but we are being explicit\n", + " # especially good if we change to copy on write\n", + " self.current_state = self.state_action(self.current_state, action)\n", + " self.previous_action = action\n", + " return self.current_state\n", + "\n", + "\n", + "@dataclass\n", + "class Environment:\n", + " agent: Agent\n", + " bounds: BoundsCheck\n", + " reward: RewardAssignment #= Reward()\n", + " surface: SurfaceFunction = flat_surface\n", + " current_boundary_state: BoundaryStates = BoundaryStates.flying\n", + " done_boundary_state: BoundaryStates = BoundaryStates.landed\n", + " \n", + " def step(self, action: Action) -> tuple[State, float, bool]:\n", + " \"\"\"Take a step in the environment\"\"\"\n", + " # This is a leaky abstraction; we are communicating over state.\n", + " # We should instead communicate over actions and determine the\n", + " # state based on the action reported by the agent and its previous\n", + " # state!\n", + " unresolved_state = self.agent.take_action(action)\n", + "\n", + " # Evaluate leg contact\n", + " if (unresolved_state.y - self.surface(unresolved_state.x)) <= 0.01:\n", + " unresolved_state.left_leg_contact = True\n", + " unresolved_state.right_leg_contact = True\n", + "\n", + " # Update state and boundary state.\n", + " current_state = unresolved_state\n", + " self.current_boundary_state = self.bounds(current_state)\n", + "\n", + " reward = self.reward(current_state, self.current_boundary_state)\n", + " done = self.bounds(current_state) is self.done_boundary_state\n", + " return current_state, reward, done\n", + " \n", + "\n", + "\n", + "\n", + "agent = \"lander\"\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}