diff --git a/Course3/Notes/reinforcement_learning.md b/Course3/Notes/reinforcement_learning.md new file mode 100644 index 0000000..a60a1ce --- /dev/null +++ b/Course3/Notes/reinforcement_learning.md @@ -0,0 +1,160 @@ +# Reinforcement learning + +## Formalism + +- **Agent** is the learner and decision maker. +- **States** The observations made by the agent can be in at time $t$ (possible states of existence). +- **Actions** $A_t$ are the set of choices that can be made by the agent at time $t$ (possible moves). +- **Rewards** $R_t$ are the feedback to the agent at time $t$. Rewards can be set for reaching a certain state or set of outcomes which arise from state (winning in chess while states are chess boards positions and pieces). Or for achieving an outcome based on an outcome (winning in chess) +- **Discount factor** $\gamma$ is a constant in $[0, 1]$ that determines the present value of future rewards. +- **Return** $G_t$ is the total discounted reward from time step $t$. $G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + ... = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$. Return is discounted because the agent prefers rewards now rather than later. +- **Policy** $\pi$ is a distribution over actions given states. A way of picking actions that the agent will follow given the current state, rewards, and possible states (and possibly actions). Based on the return. + +Job of a reinforcement algorithm could be to find the best policy. The best policy is the one that maximizes the expected return. It basically means choosing a policy that defines the best action for each state. + +### Markov Decision Process (MDP) + +Reinforcement learning is basically a Markov Decision Process (MDP). MDP means action and reward are dependent on current state, not on the history of states. The agent is in a state, takes an action, and receives a reward. The next state and reward depends only on the current state and action. + +Mathematically the component of a Markov Decision Process is a tuple $(S, A, P, R, \gamma)$ where: + +- $S$ is a finite set of states. +- $A$ is a finite set of actions. +- $P$ is a state transition probability matrix. $P_{ss'}^a = P[S_{t+1} = s' | S_t = s, A_t = a]$. It is the probability of transitioning to state $s'$ at time $t+1$ given that the agent was in state $s$ at time $t$ and took action $a$. +- $R$ is a reward function. $R_s^a = E[R_{t+1} | S_t = s, A_t = a]$. It is the expected immediate reward received after transitioning to state $s'$ at time $t+1$ given that the agent was in state $s$ at time $t$ and took action $a$. +- $\gamma$ is a discount factor, $\gamma \in [0, 1]$. +- $\pi$ is a policy, $\pi(a|s) = P[A_t = a | S_t = s]$. It is the probability of taking action $a$ at time $t$ given that the agent was in state $s$ at time $t$. +- $G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + ... = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$ is the return. It is the total discounted reward from time step $t$. +- $v(s) = E[G_t | S_t = s]$ is the state-value function. It is the expected return starting from state $s$. +- $q(s, a) = E[G_t | S_t = s, A_t = a]$ is the action-value function. It is the expected return starting from state $s$, taking action $a$. + +### Reward strategies + +Rewards can be linear -1, 0, 1. Or non-linear, e.g. 1, 0, -1000. + +An example of linear rewards is the game of chess, where the reward is 1 for a win, 0 for a draw, and -1 for a loss. + +An example of a non-linear reward is helicopter piloting, where the reward is -1000 for for crashing, 1 for flying well. + +In the case of the mars rower, it could be 40 for getting around an obstacle on the left side, 0 for all other states, and 100 for reaching the goal via the right side. So rewards don't have to be negative, or equal/even. It is context dependent. + + +### State action value function + +Start in state s. Take action a (once), then behave optimally afterwards. + +If you can compute Q(s, a) for all s and a, then you can compute the optimal policy by choosing the action that maximizes Q(s, a) +for each state. + +#### Bellman equation + +Used to Compute Q(s, a). R(s) reward. $\gamma$ discount factor. $\pi$ policy. +s' is the next state. a' is the next possible action. + +Formula: + +$Q(s, a) = R(s) + \gamma \max_{a'} Q(s', a')$ + + +The reward function is recursive for Bellman equation for each next step. Immediate reward + discounted future reward +Where discount future reward is current reward + i (no discount), then discounted future reward... + +#### Stochastic + + +Just a probably of taking an action in a state (so not deterministic). In the stochastic reinforcment learning case, +we are maximing the expected return (average value) is the goal of the reward sequences. + +Bellman equation for stochastic: + +$Q(s, a) = R(s) + \gamma E[\max_{a'} Q(s', a')]$ + +Where $E[\max_{a'} Q(s', a')]$ is the expected value of the maximum action value function for the future states. +What, on average, you expect to receive as reward from that state on. + +(This usually lowers the reward for each state) due to missteps. + + +## Continous state spaces + +Instead of discrete states, the state can be anywhere in a continous space. For example, +anywhere along a line instead of in a discrete box/set-of-states. +For example in self-driving car, state can be x, y, angle theta, velocities, angular velocity +etc. Each a continous value. The state $s$ then becomes a vector. + +The action $a$ then can also be continous. For example, the action can be changing the angle of the steering wheel by some amount theta, though the action actually maps one vector of states +to another vector of states (since moving the streering wheel changes the state of X, Y, theta, and their velocities). + +### Lunar Lander Example + +States are not just x,y, and velocities, but also l, and r, whether the left and/or right leg are +touching the ground. + +Reward function, crash, soft-landing, any leg grounded, fire main engine most costly than fire side engines. + +Codifying the reward function is the hard part (and picking appropriate states). For example, s and l are picked because the exact position of x and y are super critical for success, so it is better to codify the outcome we want (landed). In chess, this could be mate, stalemate, mate in X, etc. + +### Deep Reinforcement Learning + +In a state s, use neural network to compute Q(s, a) for all a. Then choose the action that maximizes Q(s, a). This is called Deep Q-Networks (DQN) developed by DeepMind in 2015. + +The feature vector $X$ is one-hot encoded actions, plus all the states. In lunar lander, the feature vector is 4 actions (fire left, fire right, fire main, do nothing), plus the states (x, y, theta, x velocity, y velocity, theta velocity, left leg grounded, right leg grounded). + +input layer should be sized to match the feature vector. + +Single Output layer corresponding to the Q(s, a) for a given state-action pair. +because we are trying to predict the Q value for each action instead of +just predicting the action to take. + +So we repeat this 4 times, once for each action, and then pick the action with the highest Q value. + +We can make this more computationally efficient by using a single output layer with 4 outputs, one for each action. This is called a dueling network. The output layer is split into two parts, one for the state value function, and one for the advantage function. The state value function is the expected return starting from state s. The advantage function is the expected return starting from state s, taking action a. The Q value is then the state value function plus the advantage function minus the average of the advantage function. This is called the dueling network architecture. + +But the DQN approach is better because it is simpler and more stable, making it easier to train. +Just like LLMs with predict next word instead of predicting all words. + +Bellman equation applied to neural networks: + +$Q(s, a) = R(s) + \gamma \max_{a'} Q(s', a')$ + +$f_{W,b}(X) = y$ + +states in python code: (s, a, R(s), s') + +given (s,a) as x, y is computed from $R(s)$, $s'$. If you don't know Q function, +you guess it, and then you update it based on the Bellman equation. + +So assuming first y is y1 corresponding to result from s1 and a1: + +$y_1 = R(s_1) + \gamma \max_{a'} Q(s_1', a')$ +h +$y_2 = R(s_2) + \gamma \max_{a'} Q(s_2', a')$ + +To train neural network, we take training sample of x data, where y are just +numbers computes as above. Then we train the neural network to predict y using +loss function (MSE) and adam/gradiant descent. + +Learning algorithm: + +- initialize NN random as guess of Q(s, a). (All parameters of the neural network are randomly initialized). +- Repeat: + - Sample a minibatch of actions (s, a, R(s), s'), say 10,000 and learn the values for each +as the tuple (s, a, R(s), s') as above. + - To avoid overcompute, we store only most recent N samples (say 10k). This is called replay buffer (or replay memory, experience replay). + - From this buffer, we create a training set of x and y values. Train Q function to predict y values from x values, say Q_new by minimizing MSE via an optimizer. Now update Q and then repeat. + - Repeat until convergence. + +This iteratively improves the Q function, making the NN a good estimate of Q(s, a). Then we can just use Q(s, a) to pick the best action. + +One could imagine creating agents that start at random, and start improving, but we pick only the ones that improve the most and add a few random evolutions to the mix. This is called a genetic algorithm. + +### Algorithmic Instability + +The practice of training a neural network to approximate Q(s, a) is unstable and +prone to oscillations and instabilities. + +However, there are a few approaches to make it more stable. The lab starts with two: +**Target Network** and **Experience Replay**. +Target Network is a technique in which we use two networks instead of one. +The first network is the one we are training, and the second network is the one we use to compute the target values. +The target values are computed using the second network, and the loss is computed using the first network. The second network is updated to match the first network every N steps. Furthermore, the target network update is damped, which is also called **soft update**. Experience Replay is a technique in which we store the training samples in a buffer and sample from the buffer to train the network. This helps to avoid overfitting to the most recent samples. diff --git a/Course3/lunar_lander_lab/lander.ipynb b/Course3/lunar_lander_lab/lander.ipynb new file mode 100644 index 0000000..ec4d1bc --- /dev/null +++ b/Course3/lunar_lander_lab/lander.ipynb @@ -0,0 +1,280 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"The lunar lander lab uses the Actor Environment formalism, where\n", + "the actor takes an action and the environment evaluates the action\n", + "to get a new observation state and reward. These files are my attempt\n", + "at constructing a readable and extensible codebase around the lander\n", + "and formalism. I have not actually implemented the lab, but the point\n", + "of this exercsie was to think of and explore ways to productionalize\n", + "code for data science.\n", + "\n", + "The code is a bit over-abstracted on purpose. To provide flexibility and\n", + "to test the abstractions which I may want to use. \n", + "\"\"\"\n", + "from enum import Enum\n", + "from dataclasses import dataclass\n", + "from typing import Collection, Callable, Protocol, TypeAlias\n", + "\n", + "class Action(Enum):\n", + " do_nothing = 0\n", + " fire_main_engine = 1\n", + " fire_left_engine = 2\n", + " fire_right_engine = 3\n", + "\n", + "\n", + "\n", + "@dataclass\n", + "class State:\n", + " \"\"\"Observation state of the lunar lander\"\"\"\n", + " x: float = 0\n", + " y: float = 0\n", + " x_velocity: float = 0\n", + " y_velocity: float = 0\n", + " angle: float = 0\n", + " angular_velocity: float = 0\n", + " left_leg_contact: bool = False\n", + " right_leg_contact: bool = False\n", + "\n", + " def step(self) -> None:\n", + " \"\"\"Step the state forward in time\"\"\"\n", + " self.y += self.y_velocity\n", + " self.x += self.x_velocity\n", + " self.angle += self.angular_velocity\n", + "\n", + "# This is the ideal state we want to reach\n", + "# However we could still be successful if we land but at an angle\n", + "# or have some residual velocity...\n", + "desired_state = State(\n", + " x=0,\n", + " y=0,\n", + " x_velocity=0,\n", + " y_velocity=0,\n", + " angle=0,\n", + " angular_velocity=0,\n", + " left_leg_contact=True,\n", + " right_leg_contact=True\n", + ")\n", + "\n", + "SurfaceFunction: TypeAlias = Callable[[float], float]\n", + "\n", + "def flat_surface(x: float) -> float:\n", + " \"\"\"A flat surface function\"\"\"\n", + " return 0.2\n", + "\n", + "class BoundaryStates(Enum):\n", + " \"\"\"States that are considered boundary conditions or the default\"\"\"\n", + " flying = 0 # default state\n", + " landed = 0\n", + " crashed = 1\n", + " left_screen = 2\n", + "\n", + "class BoundsCheck(Protocol):\n", + " \"\"\"Protocol for determining failure or success states,\n", + " which can be thought of as boundary conditions on the state space.\"\"\"\n", + " def __call__(self, state: State) -> BoundaryStates:\n", + " ...\n", + "\n", + "@dataclass\n", + "class MoonBounds(BoundsCheck):\n", + " \"\"\"Bounds of the moon\"\"\"\n", + " surface_func: SurfaceFunction = flat_surface\n", + " desired_state: State = desired_state\n", + "\n", + " def __call__(self, state: State) -> BoundaryStates:\n", + " \"\"\"Get the boundary condition for the current state\"\"\"\n", + " if self.crashed(state):\n", + " return BoundaryStates.crashed\n", + " if self.left_screen(state):\n", + " return BoundaryStates.left_screen\n", + " if self.landed(state):\n", + " return BoundaryStates.landed\n", + " return BoundaryStates.flying\n", + " \n", + " def landed(self, state: State) -> bool:\n", + " \"\"\"Whether we (safely) landed. Unsafe landing is when we land\n", + " at too much of an angle and/or with too much velocity. Note:\n", + " Currently this is treated just like not landing at all.\"\"\"\n", + " desired_state = self.desired_state\n", + " current_state = state\n", + " return current_state.x == desired_state.x and \\\n", + " current_state.y == desired_state.y and \\\n", + " current_state.left_leg_contact == desired_state.left_leg_contact and \\\n", + " current_state.right_leg_contact == desired_state.right_leg_contact and \\\n", + " current_state.x_velocity <= desired_state.x_velocity and \\\n", + " current_state.y_velocity <= desired_state.y_velocity and \\\n", + " current_state.angle <= abs(desired_state.angle) and \\\n", + " current_state.angular_velocity <= desired_state.angular_velocity\n", + "\n", + " \n", + " def crashed(self, state: State) -> bool:\n", + " \"\"\"Whether we crashed. We crash if we hit the moon surface.\n", + " The surface is defined by a function that takes the x coordinate\n", + " and returns the y coordinate of the surface.\"\"\"\n", + " return state.y <= self.surface_func(state.x)\n", + " \n", + " def left_screen(self, state: State) -> bool:\n", + " \"\"\"Whether we are still in bounds. We are out of bounds if we\n", + " are outside of the x bounds of the screen.\"\"\"\n", + " return 0 <= state.x <= 1\n", + " \n", + "\n", + "\n", + "class RewardAssignment(Protocol):\n", + " \"\"\"Protocol for assigning rewards to states. Allowing for different\n", + " reward functions both for different states and for boundary conditions\n", + " (landed, crashed, left screen, etc.)\"\"\"\n", + " def __call__(self, state: State, boundary_state: BoundaryStates) -> float:\n", + " ...\n", + " \n", + "@dataclass\n", + "class Reward:\n", + " \"\"\"Since the reward function is coupled to the boundary conditions\n", + " and state, define a class that takes in state and bounds, and provides\n", + " a callable as the overall reward function, implementing specifics as\n", + " needed.\"\"\"\n", + " observation_state_reward: RewardAssignment\n", + " collision_penalty: float = -100\n", + " screen_penalty: float = -100\n", + " done_reward: float = 100\n", + "\n", + " def __call__(self, state: State, boundary_state: BoundaryStates = BoundaryStates.flying) -> float:\n", + " \"\"\"Get the reward for the current state\"\"\"\n", + "\n", + " # Assuming failure boundary condition rewards invalidate other\n", + " # state dependent rewards.\n", + " if boundary_state is BoundaryStates.crashed:\n", + " return self.collision_penalty\n", + " if boundary_state is BoundaryStates.left_screen:\n", + " return self.screen_penalty\n", + " reward: float = 0\n", + " if boundary_state is BoundaryStates.landed:\n", + " reward += self.done_reward\n", + "\n", + " # @TODO: implement flying observation state dependent rewards\n", + " reward += self.observation_state_reward(state, boundary_state)\n", + " return reward \n", + " \n", + "@dataclass\n", + "class EngineActions:\n", + " \"\"\"State of the actions being taken for the engines, (here\n", + " we make no assumption of one action at a time.)\"\"\"\n", + " main: bool = False\n", + " left: bool = False\n", + " right: bool = False\n", + " \n", + " def get_actions(self) -> set[Action]:\n", + " \"\"\"Get the actions that are currently being taken, we use\n", + " set since order must not matter.\"\"\"\n", + " actions: set[Action] = set()\n", + " if self.main:\n", + " actions.add(Action.fire_main_engine)\n", + " if self.left:\n", + " actions.add(Action.fire_left_engine)\n", + " if self.right:\n", + " actions.add(Action.fire_right_engine)\n", + " if not actions: # if we are not doing anything...\n", + " actions.add(Action.do_nothing)\n", + " return actions\n", + " \n", + "\n", + "class Policy(Protocol):\n", + " \"\"\"Protocol for defining policies\"\"\"\n", + " def __call__(self, state: State) -> Action:\n", + " ...\n", + "\n", + "class StateAction(Protocol):\n", + " \"\"\"Protocol for defining state modification based on action\"\"\"\n", + " def __call__(self, state: State, action: Action) -> State:\n", + " ...\n", + "\n", + "\n", + "\n", + "def modify_state_with_action(state: State, action: Action) -> State:\n", + " \"\"\"Modify the state with the given action (in place modification)\"\"\"\n", + " match action:\n", + " case Action.do_nothing:\n", + " pass\n", + " case Action.fire_main_engine:\n", + " state.y_velocity += 0.1\n", + " case Action.fire_left_engine:\n", + " state.x_velocity -= 0.05\n", + " state.angular_velocity -= 0.05\n", + " case Action.fire_right_engine:\n", + " state.x_velocity += 0.05\n", + " state.angular_velocity += 0.05\n", + " gravity = -0.00 # Assuming negligible gravity\n", + " state.y_velocity += gravity\n", + " state.step()\n", + " return state\n", + "\n", + "\n", + "\n", + "@dataclass\n", + "class Agent:\n", + " \"\"\"We've leaked the abstraction a bit, as we communicate over state\n", + " instead of actions.\"\"\"\n", + " previous_action: Action = Action.do_nothing\n", + " current_state: State = State()\n", + " policy: str = \"SimplePolicy\" # TODO: implement policy\n", + " state_action: StateAction = modify_state_with_action\n", + "\n", + " def take_action(self, action: Action) -> State:\n", + " \"\"\"Take an action in the environment\"\"\"\n", + " # Technically we modify in place but we are being explicit\n", + " # especially good if we change to copy on write\n", + " self.current_state = self.state_action(self.current_state, action)\n", + " self.previous_action = action\n", + " return self.current_state\n", + "\n", + "\n", + "@dataclass\n", + "class Environment:\n", + " agent: Agent\n", + " bounds: BoundsCheck\n", + " reward: RewardAssignment #= Reward()\n", + " surface: SurfaceFunction = flat_surface\n", + " current_boundary_state: BoundaryStates = BoundaryStates.flying\n", + " done_boundary_state: BoundaryStates = BoundaryStates.landed\n", + " \n", + " def step(self, action: Action) -> tuple[State, float, bool]:\n", + " \"\"\"Take a step in the environment\"\"\"\n", + " # This is a leaky abstraction; we are communicating over state.\n", + " # We should instead communicate over actions and determine the\n", + " # state based on the action reported by the agent and its previous\n", + " # state!\n", + " unresolved_state = self.agent.take_action(action)\n", + "\n", + " # Evaluate leg contact\n", + " if (unresolved_state.y - self.surface(unresolved_state.x)) <= 0.01:\n", + " unresolved_state.left_leg_contact = True\n", + " unresolved_state.right_leg_contact = True\n", + "\n", + " # Update state and boundary state.\n", + " current_state = unresolved_state\n", + " self.current_boundary_state = self.bounds(current_state)\n", + "\n", + " reward = self.reward(current_state, self.current_boundary_state)\n", + " done = self.bounds(current_state) is self.done_boundary_state\n", + " return current_state, reward, done\n", + " \n", + "\n", + "\n", + "\n", + "agent = \"lander\"\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}