From a7a7339bec4ba9ad76aec5511384b6ae42f66e89 Mon Sep 17 00:00:00 2001 From: Alex Yuan Date: Tue, 11 Jun 2024 16:22:28 -0500 Subject: [PATCH] copying over files from my local mp4 branch, only one of real relevance is fixed_points, other just for my testing purposes --- demo/fixed_points/ball_scenario_branch.py | 97 ++++++ demo/fixed_points/ball_scenario_branch_nt.py | 106 ++++++ demo/fixed_points/ball_scenario_copy.py | 90 +++++ demo/fixed_points/fixed_points.py | 244 +++++++++++++ demo/fixed_points/mp4_p2.py | 348 +++++++++++++++++++ demo/fixed_points/traffic_controller.py | 34 ++ demo/fixed_points/traffic_signal_scenario.py | 119 +++++++ demo/fixed_points/vehicle_controller.py | 56 +++ 8 files changed, 1094 insertions(+) create mode 100644 demo/fixed_points/ball_scenario_branch.py create mode 100644 demo/fixed_points/ball_scenario_branch_nt.py create mode 100644 demo/fixed_points/ball_scenario_copy.py create mode 100644 demo/fixed_points/fixed_points.py create mode 100644 demo/fixed_points/mp4_p2.py create mode 100644 demo/fixed_points/traffic_controller.py create mode 100644 demo/fixed_points/traffic_signal_scenario.py create mode 100644 demo/fixed_points/vehicle_controller.py diff --git a/demo/fixed_points/ball_scenario_branch.py b/demo/fixed_points/ball_scenario_branch.py new file mode 100644 index 00000000..a9bc2acc --- /dev/null +++ b/demo/fixed_points/ball_scenario_branch.py @@ -0,0 +1,97 @@ +from verse.plotter.plotter2D import * +from verse.agents.example_agent.ball_agent import BallAgent +from verse.map.example_map.simple_map2 import SimpleMap3 +from verse import Scenario, ScenarioConfig +from enum import Enum, auto +import copy +import os + +# from verse.map import Lane + +class BallMode(Enum): + '''NOTE: Any model should have at least one mode + The one mode of this automation is called "Normal" and auto assigns it an integer value.''' + NORMAL = auto() + REVERSE = auto() + DOUBLE = auto() + +class State: + '''Defines the state variables of the model + Both discrete and continuous variables + ''' + x: float + y = 0.0 + vx = 0.0 + vy = 0.0 + mode: BallMode + + def __init__(self, x, y, vx, vy, ball_mode: BallMode): + pass + +def decisionLogic(ego: State): + '''Computes the possible mode transitions''' + # Stores the prestate first + output = copy.deepcopy(ego) + if ego.mode == BallMode.NORMAL: + output.mode = BallMode.DOUBLE + if ego.mode == BallMode.NORMAL: + output.mode = BallMode.REVERSE + if ego.x < 0: + output.vx = -ego.vx + output.x = 0 + if ego.y < 0: + output.vy = -ego.vy + output.y = 0 + if ego.x > 20: + # TODO: Q. If I change this to ego.x >= 20 then the model does not work. + # I suspect this is because the same transition can be take many, many times. + # We need to figure out a clean solution + output.vx = -ego.vx + output.x = 20 + if ego.y > 20: + output.vy = -ego.vy + output.y = 20 + return output + +class BallScenarioBranch: + scenario: Scenario + + def __init__(self) -> None: + self.scenario = Scenario(ScenarioConfig(parallel=False)) + script_dir = os.path.realpath(os.path.dirname(__file__)) + BALL_CONTROLLER = os.path.join(script_dir, "ball_scenario_branch.py") + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + self.scenario.add_agent(myball1) + self.scenario.set_init( + [[[0, 0, 2, 2], [0, 0, 2, 2]], ], # modified from original to check for fixed points, see below + # [[[5, 10, 2, 2], [5, 10, 2, 2]],], + [(BallMode.NORMAL,), ], + ) + +if __name__ == "__main__": + #Defining and using a scenario involves the following 5 easy steps: + #1. creating a basic scenario object with Scenario() + #2. defining the agents that will populate the object, here we have two ball agents + #3. adding the agents to the scenario using .add_agent() + #4. initializing the agents for this scenario. + # Note that agents are only initialized *in* a scenario, not individually outside a scenario + #5. genetating the simulation traces or computing the reachable states + bouncingBall = Scenario(ScenarioConfig(parallel=False)) # scenario too small, parallel too slow + BALL_CONTROLLER = "./demo/ball/ball_bounces.py" + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + myball2 = BallAgent("green-ball", file_name=BALL_CONTROLLER) + bouncingBall.add_agent(myball1) + bouncingBall.add_agent(myball2) + bouncingBall.set_init( + [[[5, 10, 2, 2], [5, 10, 2, 2]], [[15, 1, 1, -2], [15, 1, 1, -2]]], + [(BallMode.NORMAL,), (BallMode.NORMAL,)], + ) + # TODO: We should be able to initialize each of the balls separately + # this may be the cause for the VisibleDeprecationWarning + # TODO: Longer term: We should initialize by writing expressions like "-2 \leq myball1.x \leq 5" + # "-2 \leq myball1.x + myball2.x \leq 5" + traces = bouncingBall.simulate_simple(40, 0.01, 6) + # TODO: There should be a print({traces}) function + fig = go.Figure() + fig = simulation_tree(traces, None, fig, 1, 2, [1, 2], "fill", "trace") + fig.show() \ No newline at end of file diff --git a/demo/fixed_points/ball_scenario_branch_nt.py b/demo/fixed_points/ball_scenario_branch_nt.py new file mode 100644 index 00000000..190422cb --- /dev/null +++ b/demo/fixed_points/ball_scenario_branch_nt.py @@ -0,0 +1,106 @@ +from verse.plotter.plotter2D import * +from verse.agents.example_agent.ball_agent import BallAgent +from verse.map.example_map.simple_map2 import SimpleMap3 +from verse import Scenario, ScenarioConfig +from enum import Enum, auto +import copy +import os + +# from verse.map import Lane + +class BallMode(Enum): + '''NOTE: Any model should have at least one mode + The one mode of this automation is called "Normal" and auto assigns it an integer value.''' + INTER = auto() + REVERSE = auto() + NORMAL = auto() + +class State: + '''Defines the state variables of the model + Both discrete and continuous variables + ''' + x: float + y = 0.0 + vx = 0.0 + vy = 0.0 + mode: BallMode + + def __init__(self, x, y, vx, vy, ball_mode: BallMode): + pass + +def decisionLogic(ego: State): + '''Computes the possible mode transitions''' + # Stores the prestate first + output = copy.deepcopy(ego) + if ego.mode == BallMode.INTER: + output.mode = BallMode.NORMAL + if ego.mode == BallMode.INTER: + output.mode = BallMode.REVERSE + if ego.x < 0: + output.vx = -ego.vx + output.x = 0 + if ego.y < 0: + output.vy = -ego.vy + output.y = 0 + if ego.x > 20: + output.vx = -ego.vx + output.x = 20 + # if ego.mode==BallMode.REVERSE: + # output.y = 0 + # output.mode = BallMode.INTER + if ego.y > 20: + output.vy = -ego.vy + output.y = 20 + # if ego.mode==BallMode.REVERSE: + # output.x = 0 + # output.mode = BallMode.INTER + return output + +class BallScenarioBranchNT: + scenario: Scenario + + def __init__(self) -> None: + self.scenario = Scenario(ScenarioConfig(parallel=False)) + script_dir = os.path.realpath(os.path.dirname(__file__)) + BALL_CONTROLLER = os.path.join(script_dir, "ball_scenario_branch_nt.py") + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + myball2 = BallAgent("green-ball", file_name=BALL_CONTROLLER) + self.scenario.add_agent(myball1) + self.scenario.add_agent(myball2) + self.scenario.set_init( + [[[0, 0, 2, 2], [0, 0, 2, 2]], + [[0, 0, 2, 2], [0, 0, 2, 2]] + ], # modified from original to check for fixed points, see below + # [[[5, 10, 2, 2], [5, 10, 2, 2]],], + [(BallMode.INTER,) + , (BallMode.INTER,) + ], + ) + +if __name__ == "__main__": + #Defining and using a scenario involves the following 5 easy steps: + #1. creating a basic scenario object with Scenario() + #2. defining the agents that will populate the object, here we have two ball agents + #3. adding the agents to the scenario using .add_agent() + #4. initializing the agents for this scenario. + # Note that agents are only initialized *in* a scenario, not individually outside a scenario + #5. genetating the simulation traces or computing the reachable states + bouncingBall = Scenario(ScenarioConfig(parallel=False)) # scenario too small, parallel too slow + BALL_CONTROLLER = "./demo/ball/ball_bounces.py" + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + myball2 = BallAgent("green-ball", file_name=BALL_CONTROLLER) + bouncingBall.add_agent(myball1) + bouncingBall.add_agent(myball2) + bouncingBall.set_init( + [[[5, 10, 2, 2], [5, 10, 2, 2]], [[15, 1, 1, -2], [15, 1, 1, -2]]], + [(BallMode.NORMAL,), (BallMode.NORMAL,)], + ) + # TODO: We should be able to initialize each of the balls separately + # this may be the cause for the VisibleDeprecationWarning + # TODO: Longer term: We should initialize by writing expressions like "-2 \leq myball1.x \leq 5" + # "-2 \leq myball1.x + myball2.x \leq 5" + traces = bouncingBall.simulate_simple(40, 0.01, 6) + # TODO: There should be a print({traces}) function + fig = go.Figure() + fig = simulation_tree(traces, None, fig, 1, 2, [1, 2], "fill", "trace") + fig.show() \ No newline at end of file diff --git a/demo/fixed_points/ball_scenario_copy.py b/demo/fixed_points/ball_scenario_copy.py new file mode 100644 index 00000000..fd617330 --- /dev/null +++ b/demo/fixed_points/ball_scenario_copy.py @@ -0,0 +1,90 @@ +from verse.plotter.plotter2D import * +from verse.agents.example_agent.ball_agent import BallAgent +from verse.map.example_map.simple_map2 import SimpleMap3 +from verse import Scenario, ScenarioConfig +from enum import Enum, auto +import copy +import os + +# from verse.map import Lane + +class BallMode(Enum): + '''NOTE: Any model should have at least one mode + The one mode of this automation is called "Normal" and auto assigns it an integer value.''' + NORMAL = auto() + +class State: + '''Defines the state variables of the model + Both discrete and continuous variables + ''' + x: float + y = 0.0 + vx = 0.0 + vy = 0.0 + mode: BallMode + + def __init__(self, x, y, vx, vy, ball_mode: BallMode): + pass + +def decisionLogic(ego: State): + '''Computes the possible mode transitions''' + # Stores the prestate first + output = copy.deepcopy(ego) + if ego.x < 0: + output.vx = -ego.vx + output.x = 0 + if ego.y < 0: + output.vy = -ego.vy + output.y = 0 + if ego.x > 20: + # TODO: Q. If I change this to ego.x >= 20 then the model does not work. + # I suspect this is because the same transition can be take many, many times. + # We need to figure out a clean solution + output.vx = -ego.vx + output.x = 20 + if ego.y > 20: + output.vy = -ego.vy + output.y = 20 + return output + +class BallScenario: + scenario: Scenario + + def __init__(self) -> None: + self.scenario = Scenario(ScenarioConfig(parallel=False)) + script_dir = os.path.realpath(os.path.dirname(__file__)) + BALL_CONTROLLER = os.path.join(script_dir, "ball_scenario_copy.py") + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + self.scenario.add_agent(myball1) + self.scenario.set_init( + [[[0, 0, 2, 2], [0, 0, 2, 2]], ], # modified from original to check for fixed points, see below + [(BallMode.NORMAL,), ], + ) + +if __name__ == "__main__": + #Defining and using a scenario involves the following 5 easy steps: + #1. creating a basic scenario object with Scenario() + #2. defining the agents that will populate the object, here we have two ball agents + #3. adding the agents to the scenario using .add_agent() + #4. initializing the agents for this scenario. + # Note that agents are only initialized *in* a scenario, not individually outside a scenario + #5. genetating the simulation traces or computing the reachable states + bouncingBall = Scenario(ScenarioConfig(parallel=False)) # scenario too small, parallel too slow + BALL_CONTROLLER = "./demo/ball/ball_bounces.py" + myball1 = BallAgent("red-ball", file_name=BALL_CONTROLLER) + myball2 = BallAgent("green-ball", file_name=BALL_CONTROLLER) + bouncingBall.add_agent(myball1) + bouncingBall.add_agent(myball2) + bouncingBall.set_init( + [[[5, 10, 2, 2], [5, 10, 2, 2]], [[15, 1, 1, -2], [15, 1, 1, -2]]], + [(BallMode.NORMAL,), (BallMode.NORMAL,)], + ) + # TODO: We should be able to initialize each of the balls separately + # this may be the cause for the VisibleDeprecationWarning + # TODO: Longer term: We should initialize by writing expressions like "-2 \leq myball1.x \leq 5" + # "-2 \leq myball1.x + myball2.x \leq 5" + traces = bouncingBall.simulate_simple(40, 0.01, 6) + # TODO: There should be a print({traces}) function + fig = go.Figure() + fig = simulation_tree(traces, None, fig, 1, 2, [1, 2], "fill", "trace") + fig.show() \ No newline at end of file diff --git a/demo/fixed_points/fixed_points.py b/demo/fixed_points/fixed_points.py new file mode 100644 index 00000000..d7adf2f7 --- /dev/null +++ b/demo/fixed_points/fixed_points.py @@ -0,0 +1,244 @@ +from typing import Tuple, List +import numpy as np +from scipy.integrate import ode +from verse import BaseAgent, Scenario, ScenarioConfig +from verse.analysis.utils import wrap_to_pi +from verse.analysis.analysis_tree import TraceType, AnalysisTree +from verse.parser import ControllerIR +from vehicle_controller import VehicleMode +from verse.analysis import AnalysisTreeNode, AnalysisTree, AnalysisTreeNodeType +from enum import Enum, auto +from verse.plotter.plotter2D import * +from verse.plotter.plotter3D_new import * +import plotly.graph_objects as go +import copy +from typing import Dict, Sequence +from z3 import * + +from vehicle_controller import VehicleMode, TLMode # this line is not id of controller, but fixed_points should be + +### returns whether hyperrectangle 1 is contained within hyperrectangle 2 +### expects to be given two states (lower and upper bounds for each state) from a trace/set of traces +def contained_single(h1: List[np.array], h2: List[np.array]) -> bool: + lower: List[bool] = h2[0][1:] <= h1[0][1:] # consider after first index, time -- move to top + upper: List[bool] = h1[1][1:] <= h2[1][1:] + return all(lower) and all(upper) + +### extracts nodes reached between a certain lower time bound and upper time bound +### by default, will only extract the states from the very last +def reach_at(trace: AnalysisTree, t_lower: float = None, t_upper: float = None) -> Dict[str, Dict[str, List[List[np.array]]]]: + nodes = trace.nodes + agents = nodes[0].agent.keys() # list of agents + reached: Dict[str, Dict[str, List[List[np.array]]]] = {} + for agent in agents: + reached[agent] = {} + T = 0 # we know time horizon occurs in the leaf node, by definition, shouldn't matter which leaf node we check + if t_lower is None or t_upper is None: + last: list[AnalysisTreeNode] = trace.get_leaf_nodes(trace.root) + T = last[0].trace[list(agents)[0]][-1][0] # using list casting since the agent doesn't matter + for node in nodes: + modes = node.mode + for agent in agents: + ### make error message for t_lower and t_upper + if t_lower is not None and t_upper is not None: ### may want to seperate this out into a seperate function + for i in range(0, len(node.trace[agent]), 2): # just check the upper bound, time difference between lower and upper known to be time step of scenario + lower = node.trace[agent][i] + upper = node.trace[agent][i+1] + if upper[0]>=t_lower and upper[0]<=t_upper: ### think about why cut off here explicitly -- includes T-delta T but not T if upper bound is T-delta T + state = [lower, upper] + if modes[agent] not in reached[agent]: #initializing mode of agent in dict if not already there + reached[agent][modes[agent]] = [] + reached[agent][modes[agent]].append(state) #finally, add state to reached + else: ### for now, just assume if t_lower and t_upper not supplied, we just want last node + if node.trace[agent][-1][0]==T: + if modes[agent] not in reached[agent]: #initializing mode of agent in dict if not already there + reached[agent][modes[agent]] = [] + reached[agent][modes[agent]].append([node.trace[agent][-2], node.trace[agent][-1]]) + return reached + +### assuming getting inputs from reach_at calls +def contain_all(reach1: Dict[str, Dict[str, List[List[np.array]]]], reach2: Dict[str, Dict[str, List[List[np.array]]]], state_len: int = None) -> bool: + if state_len is None: ### taking from input now, could also derive from either reach1 or reach2 + agents = list(reach1.keys()) # if this works 100%, then get rid of if and option to pass state_len as input + state_len = len(reach1[agents[0]][list(reach1[agents[0]].keys())[0]][0][0])-1 # strip one dimension to account for ignoring time + P = RealVector('p', state_len) + in_r1 = Or() #just set this boolean statement to just Or for now, can sub with Or(False) if having issues + in_r2 = Or() #as above, sub with Or(False) if having issues + s = Solver() + ### eventually want to check sat of (in_r1 and not in_r2) + for agent in reach1.keys(): + for mode in reach1[agent].keys(): + for hr in reach1[agent][mode]: # hr is a hyperrectangle defined by lower and upper bounds stored in a list + includes = And() + for i in range(state_len): + includes = And(includes, P[i] >= hr[0][i+1], P[i] <= hr[1][i+1]) ### include the previous conditions and new bounds for the new dimension + in_r1 = Or(in_r1, includes) ### in_r1 should check to if there exists a point that can be any hyperrectangle from set r1 + ### could probably seperate out above and below loops into subroutine + for agent in reach2.keys(): + for mode in reach2[agent].keys(): + for hr in reach2[agent][mode]: + includes = And() + for i in range(state_len): + includes = And(includes, P[i] >= hr[0][i+1], P[i] <= hr[1][i+1]) ### include the previous conditions and new bounds for the new dimension + in_r2 = Or(in_r2, includes) ### in_r2 should check to if there exists a point that can be any hyperrectangle from set r2 + + s.add(in_r1, Not(in_r2)) + return s.check() == unsat + +### assuming user has T and t_step from previous scenario definition +def fixed_points_sat(tree: AnalysisTree, T: float = 40, t_step: float = 0.01) -> bool: + reach_end = reach_at(tree) + reach_else = reach_at(tree, 0, T-t_step+t_step*.01) # need to add some offset because of some potential decimical diffs + # print(reach_end, reach_else) + return contain_all(reach_end, reach_else) + +### should we getting a scenario or is a trace enough? --shouldn't matter, but should switch to trace to be a bit faster +def fixed_points(scenario: Scenario, agent: str, tol: float = 0.01, t: float = 160) -> bool: + #what algorithm should do: + #1: compute reachable states for t + #2: compute reachables states for t+dt + #3: repeat until either Reach_t+d(n-1)t = Reach_t_t+dnt or no fixed points exist + # **should be using verify instead of simulate + # **ask about or find scenarios where loops possible/not possible + + ## v2 + # just check if fixed points for a given time horizon + # can make helper function that calls this function with different time horizons if necessary + # should also make agent mode a parameter if only checking for a single agent (doesn't really make sense, should be checking for all, only makes sense for scenarios with only one non-stationary agent) + # see if rounded to nearest hundredth helps or using a tolerance value -- otherwise need too many simulations + cur : AnalysisTree = scenario.verify(t, 0.1) + all : list[AnalysisTreeNode] = cur.nodes + last : AnalysisTreeNode = cur.get_leaf_nodes(cur.root)[0] # check to branching structures + ### present with better example -- non-trivial branching scenario, understand at variable level + check = list(map(lambda row: np.array(row[1:]), last.trace[agent][-2:])) # lambda function converts to np array and cuts off first col + reached = list(map(lambda row: np.array(row[1:]), last.trace[agent][:-2][1:])) + # # print(reached, check) + for node in all: + if node.mode == last.mode and node!=last: + # for state in node.trace[agent]: + reached += list(map(lambda row: np.array(row[1:]), node.trace[agent][:][1:])) + # print(len(reached)) + # print(reached) + # print(check) + + return contained(check, reached, tol) + # for state in check: + # flag = False + # # mini = np.inf + # # r: np.array + # for reach in reached: + # # if np.linalg.norm(state-reach) bool: + for part in goal: + flag = False + for reached_state in reached: + if np.linalg.norm(part-reached_state)<=tol: + flag = True + break + if not flag: + return False + return True + +def fixed_points_aa_branching(scenario: Scenario, tol: float = 0.01, t: float = 160) -> bool: + cur : AnalysisTree = scenario.verify(t, 0.1) + all : list[AnalysisTreeNode] = cur.nodes + last : list[AnalysisTreeNode] = cur.get_leaf_nodes(cur.root) # check to branching structures + ### present with better example -- non-trivial branching scenario, understand at variable level + agents = last[0].agent.keys() # list of agents + check: dict[str, list[list[np.array]]] = {} # store each pair as its own list + for agent in agents: + for leaf in last: + if agent not in check: + check[agent] = [list(map(lambda row: np.array(row[1:]), leaf.trace[agent][-2:]))] # lambda function converts to np array and cuts off first col + else: + check[agent].append(list(map(lambda row: np.array(row[1:]), leaf.trace[agent][-2:]))) + reached: dict[str, list[np.array]] = {} + for agent in agents: + for leaf in last: + if agent not in reached: # necessary because not using default dict + reached[agent] = list(map(lambda row: np.array(row[1:]), last[0].trace[agent][:-2][1:])) + else: + reached[agent] += list(map(lambda row: np.array(row[1:]), last[0].trace[agent][:-2][1:])) + for node in all: + # if node.mode == last.mode and node not in last: + for leaf in last: + if node.mode == leaf.mode and node not in last: + for agent in agents: + reached[agent] += list(map(lambda row: np.array(row[1:]), node.trace[agent][:][1:])) + break + # for agent in agents: + # for goal in check[agent]: + # print(goal) + for agent in agents: + for goal in check[agent]: + if not contained(goal, reached[agent], tol): + return False + return True + +### instead of checking each goal state for each agent, makes sure that for each branch, all agents reach fixed points +### use other aa_branching algorithm if only checking one agent +### verified works on easy branching ball scenario, same with above alg +def fixed_points_aa_branching_composed(scenario: Scenario, tol: float = 0.01, t: float = 160) -> bool: + cur : AnalysisTree = scenario.verify(t, 0.1) + all : list[AnalysisTreeNode] = cur.nodes + last : list[AnalysisTreeNode] = cur.get_leaf_nodes(cur.root) # now grabs + agents = last[0].agent.keys() # list of agents + check: dict[str, list[list[np.array]]] = {} # store each pair (min/max of goal hyperrectangle) as its own list + for agent in agents: + for leaf in last: + if agent not in check: + check[agent] = [list(map(lambda row: np.array(row[1:]), leaf.trace[agent][-2:]))] # lambda function converts to np array and cuts off first col + else: + check[agent].append(list(map(lambda row: np.array(row[1:]), leaf.trace[agent][-2:]))) + ### should reached instead also consider the branch that the states came from? i.e. reached[branch][agent] = [reached states] ? + reached: dict[str, list[np.array]] = {} + # initializing reached with the leaf nodes first since unlike other nodes, can't take all states + for agent in agents: + for leaf in last: + if agent not in reached: # necessary because not using default dict + reached[agent] = list(map(lambda row: np.array(row[1:]), last[0].trace[agent][:-2][1:])) + else: + reached[agent] += list(map(lambda row: np.array(row[1:]), last[0].trace[agent][:-2][1:])) + for node in all: + for leaf in last: + if node.mode == leaf.mode and node not in last: + for agent in agents: + reached[agent] += list(map(lambda row: np.array(row[1:]), node.trace[agent][:][1:])) + break + + first_agent = list(agents)[0] + def within_tol(agent: str, ind: int, br: int): + if np.linalg.norm(reached[agent][ind]-check[agent][i][0])<=tol and np.linalg.norm(reached[agent][ind+1]-check[agent][i][1])<=tol: + return True + if np.linalg.norm(reached[agent][ind]-check[agent][i][1])<=tol and np.linalg.norm(reached[agent][ind+1]-check[agent][i][0])<=tol: + return True + return False + for i in range(len(check[first_agent])): + flag = False #gets set to false at start of each branch/leaf, will only get set to true if we check that final state of all agents contained at some point within reached states + mins = (np.inf, np.inf, 0, 0, check[first_agent][i][0], check[first_agent][i][1]) + for j in range(0, len(reached[first_agent]), 2): # these two loops should iterate through each branch and every state -- should I be checking all of reach or just the branch's portion of reach? + # if np.linalg.norm(reached[first_agent][j]-check[first_agent][0])<=tol and np.linalg.norm(reached[first_agent][j+1]-check[first_agent][1])<=tol: + if within_tol(first_agent, j, i): + flag = True #tentatively set flag to true if we found fixed point for first agent + for agent in agents: #iterate through the other agents and guarentee that + if agent==first_agent: + continue + # if either min or max of the rectangle isn't within the current state, set flag + # if np.linalg.norm(reached[agent][j]-check[agent][0])>tol or np.linalg.norm(reached[agent][j+1]-check[agent][1])>tol: + if within_tol(agent, j, i): + flag = False + break + if not flag: + return False + return True \ No newline at end of file diff --git a/demo/fixed_points/mp4_p2.py b/demo/fixed_points/mp4_p2.py new file mode 100644 index 00000000..0e1cc804 --- /dev/null +++ b/demo/fixed_points/mp4_p2.py @@ -0,0 +1,348 @@ +from typing import Tuple, List + +import numpy as np +from scipy.integrate import ode + +from verse import BaseAgent, Scenario +from verse.analysis.utils import wrap_to_pi +from verse.analysis.analysis_tree import TraceType, AnalysisTree +from verse.parser import ControllerIR +from vehicle_controller import VehicleMode +from verse.analysis import AnalysisTreeNode, AnalysisTree, AnalysisTreeNodeType +from traffic_controller import TLMode +import copy + +refine_profile = { + 'R1': [0], + 'R2': [0], + 'R3': [0,0,0,3] +} + +def tree_safe(tree: AnalysisTree): + for node in tree.nodes: + if node.assert_hits is not None: + return False + return True + +class TrafficSignalAgent(BaseAgent): + def __init__( + self, + id, + file_name + ): + super().__init__(id, code = None, file_name = file_name) + + def TC_simulate( + self, mode: List[str], init, time_bound, time_step, lane_map = None + ) -> TraceType: + time_bound = float(time_bound) + num_points = int(np.ceil(time_bound / time_step)) + trace = np.zeros((num_points + 1, 1 + len(init))) + trace[1:, 0] = [round((i+1) * time_step, 10) for i in range(num_points)] + trace[:,-1] = trace[:,0]+init[4] + trace[:, 1:-1] = init[:-1] + return trace + +class VehicleAgent(BaseAgent): + def __init__( + self, + id, + code = None, + file_name = None, + accel_brake = 1, + accel_notbrake = 1, + accel_hardbrake = 5, + speed = 10 + ): + super().__init__( + id, code, file_name + ) + self.accel_brake = accel_brake + self.accel_notbrake = accel_notbrake + self.accel_hardbrake = accel_hardbrake + self.speed = speed + self.vmax = 20 + + @staticmethod + def dynamic(t, state, u): + x, y, theta, v = state + delta, a = u + x_dot = v * np.cos(theta + delta) + y_dot = v * np.sin(theta + delta) + theta_dot = v / 1.75 * np.sin(delta) + v_dot = a + return [x_dot, y_dot, theta_dot, v_dot] + + def action_handler(self, mode: List[str], state) -> Tuple[float, float]: + x, y, theta, v = state + vehicle_mode, = mode + vehicle_pos = np.array([x, y]) + a = 0 + lane_width = 3 + d = -y + if vehicle_mode == "Normal" or vehicle_mode == "Stop": + pass + elif vehicle_mode == "SwitchLeft": + d += lane_width + elif vehicle_mode == "SwitchRight": + d -= lane_width + elif vehicle_mode == "Brake": + a = max(-self.accel_brake, -v) + # a = -50 + elif vehicle_mode == "HardBrake": + a = max(-self.accel_hardbrake, -v) + # a = -50 + elif vehicle_mode == "Accel": + a = min(self.accel_notbrake, self.speed-v) + else: + raise ValueError(f"Invalid mode: {vehicle_mode}") + + heading = 0 + psi = wrap_to_pi(heading - theta) + steering = psi + np.arctan2(0.45 * d, v) + steering = np.clip(steering, -0.61, 0.61) + return steering, a + + def TC_simulate( ### put breakpoint here to see vehicle state and trace + self, mode: List[str], init, time_bound, time_step, lane_map = None + ) -> TraceType: + time_bound = float(time_bound) + num_points = int(np.ceil(time_bound / time_step)) + trace = np.zeros((num_points + 1, 1 + len(init))) + trace[1:, 0] = [round(i * time_step, 10) for i in range(num_points)] + trace[0, 1:] = init + for i in range(num_points): + steering, a = self.action_handler(mode, init) + r = ode(self.dynamic) + r.set_initial_value(init).set_f_params([steering, a]) + res: np.ndarray = r.integrate(r.t + time_step) + init = res.flatten() + if init[3] < 0: + init[3] = 0 + trace[i + 1, 0] = time_step * (i + 1) + trace[i + 1, 1:] = init + return trace + +def dist(pnt1, pnt2): + return np.linalg.norm( + np.array(pnt1) - np.array(pnt2) + ) + +def get_extreme(rect1, rect2): + lb11 = rect1[0] + lb12 = rect1[1] + ub11 = rect1[2] + ub12 = rect1[3] + + lb21 = rect2[0] + lb22 = rect2[1] + ub21 = rect2[2] + ub22 = rect2[3] + + # Using rect 2 as reference + left = lb21 > ub11 + right = ub21 < lb11 + bottom = lb22 > ub12 + top = ub22 < lb12 + + if top and left: + dist_min = dist((ub11, lb12),(lb21, ub22)) + dist_max = dist((lb11, ub12),(ub21, lb22)) + elif bottom and left: + dist_min = dist((ub11, ub12),(lb21, lb22)) + dist_max = dist((lb11, lb12),(ub21, ub22)) + elif top and right: + dist_min = dist((lb11, lb12), (ub21, ub22)) + dist_max = dist((ub11, ub12), (lb21, lb22)) + elif bottom and right: + dist_min = dist((lb11, ub12),(ub21, lb22)) + dist_max = dist((ub11, lb12),(lb21, ub22)) + elif left: + dist_min = lb21 - ub11 + dist_max = np.sqrt((lb21 - ub11)**2 + max((ub22-lb12)**2, (ub12-lb22)**2)) + elif right: + dist_min = lb11 - ub21 + dist_max = np.sqrt((lb21 - ub11)**2 + max((ub22-lb12)**2, (ub12-lb22)**2)) + elif top: + dist_min = lb12 - ub22 + dist_max = np.sqrt((ub12 - lb22)**2 + max((ub21-lb11)**2, (ub11-lb21)**2)) + elif bottom: + dist_min = lb22 - ub12 + dist_max = np.sqrt((ub22 - lb12)**2 + max((ub21-lb11)**2, (ub11-lb21)**2)) + else: + dist_min = 0 + dist_max = max( + dist((lb11, lb12), (ub21, ub22)), + dist((lb11, ub12), (ub21, lb22)), + dist((ub11, lb12), (lb21, ub12)), + dist((ub11, ub12), (lb21, lb22)) + ) + return dist_min, dist_max + +class TrafficSensor: + def __init__(self): + self.sensor_distance = 60 + + # The baseline sensor is omniscient. Each agent can get the state of all other agents + def sense(self, agent: BaseAgent, state_dict, lane_map): + len_dict = {} + cont = {} + disc = {} + len_dict = {"others": len(state_dict) - 1} + tmp = np.array(list(state_dict.values())[0][0]) + if tmp.ndim < 2: + if agent.id == 'car': + len_dict['others'] = 1 + cont['ego.x'] = state_dict['car'][0][1] + cont['ego.y'] = state_dict['car'][0][2] + cont['ego.theta'] = state_dict['car'][0][3] + cont['ego.v'] = state_dict['car'][0][4] + disc['ego.agent_mode'] = state_dict['car'][1][0] + disc['other.signal_mode'] = state_dict['tl'][1][0] + # dist = np.sqrt( + # (state_dict['car'][0][1]-state_dict['tl'][0][1])**2+\ + # (state_dict['car'][0][2]-state_dict['tl'][0][2])**2 + # ) + # dist = state_dict['tl'][0][1] - state_dict['car'][0][1] + # if dist < self.sensor_distance: + # cont['other.dist'] = dist + # else: + # cont['other.dist'] = 1000 + cont['other.x'] = state_dict['tl'][0][1] + elif agent.id == 'tl': + cont['ego.x'] = state_dict['tl'][0][1] + cont['ego.y'] = state_dict['tl'][0][2] + cont['ego.theta'] = state_dict['tl'][0][3] + cont['ego.v'] = state_dict['tl'][0][4] + cont['ego.timer'] = state_dict['tl'][0][5] + disc['ego.signal_mode'] = state_dict['tl'][1][0] + else: + if agent.id == 'car': + len_dict['others'] = 1 + # dist_min, dist_max = get_extreme( + # (state_dict['car'][0][0][1],state_dict['car'][0][0][2],state_dict['car'][0][1][1],state_dict['car'][0][1][2]), + # (state_dict['tl'][0][0][1],state_dict['tl'][0][0][2],state_dict['tl'][0][1][1],state_dict['tl'][0][1][2]), + # ) + # dist_min = min(abs(state_dict['car'][0][0][1]-state_dict['tl'][0][0][1]), abs(state_dict['car'][0][1][1]-state_dict['tl'][0][0][1])) + # dist_max = max(abs(state_dict['car'][0][0][1]-state_dict['tl'][0][0][1]), abs(state_dict['car'][0][1][1]-state_dict['tl'][0][0][1])) + cont['ego.x'] = [ + state_dict['car'][0][0][1], state_dict['car'][0][1][1] + ] + cont['ego.y'] = [ + state_dict['car'][0][0][2], state_dict['car'][0][1][2] + ] + cont['ego.theta'] = [ + state_dict['car'][0][0][3], state_dict['car'][0][1][3] + ] + cont['ego.v'] = [ + state_dict['car'][0][0][4], state_dict['car'][0][1][4] + ] + cont['other.x'] = [ + state_dict['tl'][0][0][1], state_dict['tl'][0][1][1] + ] + # cont['other.dist'] = [ + # dist_min, dist_max + # ] + disc['ego.agent_mode'] = state_dict['car'][1][0] + disc['other.signal_mode'] = state_dict['tl'][1][0] + # if dist_min 20: + output.signal_mode = TLMode.YELLOW + output.timer = 0 + if ego.signal_mode == TLMode.YELLOW and ego.timer > 5: + output.signal_mode = TLMode.RED + output.timer = 0 + if ego.signal_mode == TLMode.RED and ego.timer > 20: + output.signal_mode = TLMode.GREEN + output.timer = 0 + + # assert True + return output \ No newline at end of file diff --git a/demo/fixed_points/traffic_signal_scenario.py b/demo/fixed_points/traffic_signal_scenario.py new file mode 100644 index 00000000..b521a1c0 --- /dev/null +++ b/demo/fixed_points/traffic_signal_scenario.py @@ -0,0 +1,119 @@ +from mp4_p2 import VehicleAgent, TrafficSignalAgent, TrafficSensor, eval_velocity, sample_init +from verse import Scenario, ScenarioConfig +from vehicle_controller import VehicleMode, TLMode + +from verse.plotter.plotter2D import * +from verse.plotter.plotter3D_new import * +import plotly.graph_objects as go +import copy + +### + +from ball_scenario_copy import BallScenario +from ball_scenario_branch import BallScenarioBranch +from ball_scenario_branch_nt import BallScenarioBranchNT +from z3 import * +from fixed_points import fixed_points, fixed_points_aa_branching, fixed_points_aa_branching_composed, contained_single, reach_at, fixed_points_sat + +if __name__ == "__main__": + + import os + script_dir = os.path.realpath(os.path.dirname(__file__)) + input_code_name = os.path.join(script_dir, "vehicle_controller.py") + vehicle = VehicleAgent('car', file_name=input_code_name) + input_code_name = os.path.join(script_dir, "traffic_controller.py") + tl = TrafficSignalAgent('tl', file_name=input_code_name) + + scenario = Scenario(ScenarioConfig(init_seg_length=1, parallel=False)) + + scenario.add_agent(vehicle) ### need to add breakpoint around here to check decision_logic of agents + scenario.add_agent(tl) + scenario.set_sensor(TrafficSensor()) + + # # ----------- Different initial ranges ------------- + # # Uncomment this block to use R1 + # init_car = [[0,-5,0,5],[50,5,0,5]] + # init_trfficlight = [[300,0,0,0,0],[300,0,0,0,0]] + # # ----------------------------------------- + + # # Uncomment this block to use R2 + # init_car = [[0,-5,0,5],[100,5,0,5]] + # init_trfficlight = [[300,0,0,0,0],[300,0,0,0,0]] + # # ----------------------------------------- + + # # Uncomment this block to use R3 + init_car = [[0,-5,0,0],[100,5,0,10]] + init_trfficlight = [[300,0,0,0,0],[300,0,0,0,0]] + # # ----------------------------------------- + + scenario.set_init_single( + 'car', init_car,(VehicleMode.Normal,) + ) + scenario.set_init_single( + 'tl', init_trfficlight, (TLMode.GREEN,) + ) + + + # ----------- Simulate single: Uncomment this block to perform single simulation ------------- + # trace = scenario.simulate(80, 0.1) + # trace = scenario.verify(80, 0.1) + # print(fixed_points_sat(trace, 80)) + # print(len(trace.get_leaf_nodes(trace.root)), len(trace._get_all_nodes(trace.root))) + # avg_vel, unsafe_frac, unsafe_init = eval_velocity([trace]) + # fig = go.Figure() + # fig = simulation_tree_3d(trace, fig,\ + # 0,'time', 1,'x',2,'y') + # fig.write_html('simulate.html', auto_open=True) + + ### testing out z3 set solver + # P = RealVector('p', 4) + # s = Solver() + # or_chain = Or() + # for i in range(len(P)): + # or_chain = Or(or_chain, P[i]**2 5, Not(P[0]> 5)) ### sat inherently does AND due to needing to fulfill all conditions + # print(s.sexpr()) + # print(s.check(), s.check()==unsat) + ### testing helper functions + # h1 = [np.array([0, 0, 0, 0]), np.array([0, 1, 1, 1])] + # h2 = [np.array([0.5, -1, -0.5, -1.5]), np.array([0, 1, 2 , 3])] + # print("Is h1 contained within h2:", contained_single(h1, h2)) + # print("Is h2 contained within h1:", contained_single(h2, h1)) + # print("Is h1 contained within h1:", contained_single(h1, h1)) + ### + + ### + ball_scenario = BallScenario().scenario + ball_scenario_branch = BallScenarioBranch().scenario + + # ball_scenario_branch_nt = BallScenarioBranchNT().scenario ### this scenario's verify doesn't really make any sense given its simulate -- figure out why + # ## trying to verify with two agents in NT takes forever for some reason + # trace = ball_scenario_branch_nt.verify(80, 0.1) + # trace = ball_scenario_branch_nt.simulate(80, 0.1) + + trace = ball_scenario_branch.verify(40, 0.1) + # print(reach_at(trace)) ### print out more elegantly, for example, line breaks and cut off array floats, test out more thoroughly + # print(reach_at(trace, 39, 39.91)) ### needs to be slightly more than T-delta T due to small differences in real trace, could also fix in reach_at by rounding off the times dimension + print(fixed_points_sat(trace, 40, 0.01)) + + # fig = go.Figure() + # fig = simulation_tree(trace, None, fig, 1, 2, [1, 2], "fill", "trace") + # fig.show() + # print(f'Do there exist fixed points? {fixed_points(ball_scenario, "red-ball", t=80)}') + # print(f'Do there exist fixed points? {fixed_points_aa_branching(ball_scenario, t=80)}') + # print(f'Do there exist fixed points? {fixed_points_aa_branching(ball_scenario_branch, t=80)}') + # print(f'Do there exist fixed points? {fixed_points_aa_branching_composed(ball_scenario_branch, t=80)}') + # print(f'Do there exist fixed points? {fixed_points_aa_branching(ball_scenario_branch_nt, t=40)}') + ### + # ----------------------------------------- + + # init_dict_list= sample_init(scenario, num_sample=50) + # traces = scenario.simulate_multi(80, 0.1, + # init_dict_list=init_dict_list) + # fig = go.Figure() + # avg_vel, unsafe_frac, unsafe_init = eval_velocity(traces) + # print(f"###### Average velocity {avg_vel}, Unsafe fraction {unsafe_frac}, Unsafe init {unsafe_init}") + # for trace in traces: + # fig = simulation_tree_3d(trace, fig,\ + # 0,'time', 1,'x',2,'y') + # fig.show() diff --git a/demo/fixed_points/vehicle_controller.py b/demo/fixed_points/vehicle_controller.py new file mode 100644 index 00000000..52bc8e19 --- /dev/null +++ b/demo/fixed_points/vehicle_controller.py @@ -0,0 +1,56 @@ +from enum import Enum, auto +import copy +from typing import List + +class TLMode(Enum): + GREEN=auto() + YELLOW=auto() + RED=auto() + +class VehicleMode(Enum): + Normal = auto() + Brake = auto() + Accel = auto() + HardBrake = auto() + +class State: + x: float + y: float + theta: float + v: float + agent_mode: VehicleMode + + def __init__(self, x, y, theta, v, agent_mode: VehicleMode): + pass + +def decisionLogic(ego: State, other: State): + output = copy.deepcopy(ego) + + # TODO: Edit this part of decision logic + # if ego.agent_mode == VehicleMode.Normal and other.signal_mode == TLMode.RED: + # output.agent_mode = VehicleMode.Brake + + if (ego.agent_mode == VehicleMode.Normal or ego.agent_mode == VehicleMode.HardBrake) and other.signal_mode == TLMode.GREEN: + output.agent_mode = VehicleMode.Accel #accelerate no matter what if green + if ego.agent_mode == VehicleMode.Accel and other.signal_mode == TLMode.YELLOW and ego.x>other.x-80 and ego.xother.x-20 and ego.xother.x-15 and ego.x