From 8b3667b11d079991b496dcef75a18adcd69e0f05 Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Wed, 7 Feb 2024 08:27:01 -0500 Subject: [PATCH] Add csp.random module with initial versions of a Poisson timer and Brownian Motion generator, with time-varying parameters and test cases Remove brownian_increments and replace with return_increments flag on brownian_motion node. Incorporate PR feedback Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com> --- csp/random.py | 151 +++++++++++++++++++++++++ csp/tests/test_random.py | 239 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 391 insertions(+) create mode 100644 csp/random.py create mode 100644 csp/tests/test_random.py diff --git a/csp/random.py b/csp/random.py new file mode 100644 index 00000000..f5d749df --- /dev/null +++ b/csp/random.py @@ -0,0 +1,151 @@ +import numpy as np +from datetime import timedelta +from typing import TypeVar + +import csp +from csp import ts +from csp.stats import numpy_to_list +from csp.typing import Numpy1DArray, NumpyNDArray + +__all__ = ("poisson_timer", "brownian_motion", "brownian_motion_1d") + + +T = TypeVar("T") + + +@csp.node +def poisson_timer(rate: ts[float], seed: object, value: "~T" = True) -> ts["T"]: + """Generate events according to a Poisson process with time-varying rate. + For a fixed-interval timer see csp.timer. + + Args: + rate: The rate of the Poisson process (per second), must be non-negative + seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng + value: The value to tick when there are events (similar to csp.timer) + """ + with csp.alarms(): + event = csp.alarm(bool) + + with csp.state(): + s_rng = np.random.default_rng(seed) + s_scheduled_event = None + + if csp.ticked(rate): + if rate < 0: + raise ValueError(f"{csp.now()}: rate must be non-negative") + if s_scheduled_event: + csp.cancel_alarm(event, s_scheduled_event) + if rate > 0: + seconds = s_rng.exponential(1.0 / rate) + s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True) + + if csp.ticked(event): + seconds = s_rng.exponential(1.0 / rate) + s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True) + return value + + +def _make_brownian_increment(t, s_rng, drift, s_cov_decomp): + # Make the brownian increment as efficiently as possible + values = s_rng.normal(scale=np.sqrt(t), size=drift.size) + np.dot(s_cov_decomp, values, out=values) + np.add(drift * t, values, out=values) + return values + + +def _matrix_decomposition(matrix, now): + # Split matrix decomposition into its own function to help with profiling and make it easier to replace + # We use SVD instead of Cholesky because it's more stable and handles the zero variance case. + # Could also use eigenvalue decomposition, but we choose svd because it's the default for np.random.Generator.multivariate_normal + U, S, Vt = np.linalg.svd(matrix) + if not np.allclose(matrix, matrix.T): + raise ValueError(f"{now}: covariance not symmetric") + if not np.allclose(U, Vt): + raise ValueError(f"{now}: covariance not positive semidefinite") + return np.dot(U, np.sqrt(np.diag(S)), out=U) + + +@csp.node +def brownian_motion( + trigger: ts[object], + drift: ts[Numpy1DArray[float]], + covariance: ts[NumpyNDArray[float]], + seed: object, + return_increments: bool = False, +) -> ts[Numpy1DArray[float]]: + """Generate multi-dimensional Brownian motion (or increments) at the trigger times, with time-varying drift and covariance. + The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero. + To use a different start value, use csp.const(initial_value) + brownian_motion(...) + + Args: + trigger: When to return the value of the process + drift: Drift parameter (per second), i.e. array of length n + covariance: Covariance matrix (per second), i.e. array of size nxn + seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng + return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself + """ + with csp.state(): + s_rng = np.random.default_rng(seed) + s_cov_decomp = None # Placeholder for covariance matrix decomposition + s_last_change = None # Placeholder for last csp.now() + s_last_drift = None # Placeholder for last value of drift + s_last_value = None # Placeholder for cumulative value between trigger ticks + + if csp.ticked(drift, covariance) and csp.valid(drift, covariance): + if s_last_change is None: + if not drift.ndim == 1: + raise ValueError(f"{csp.now()}: drift must be 1-dimensional") + if not covariance.ndim == 2: + raise ValueError(f"{csp.now()}: covariance must be 2-dimensional") + if not drift.size == covariance.shape[0]: + raise ValueError(f"{csp.now()}: drift and covariance must have same length") + s_last_value = np.zeros_like(drift) + else: + t = (csp.now() - s_last_change).total_seconds() + values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp) + np.add(values, s_last_value, out=s_last_value) + s_last_change = csp.now() + if s_last_drift is not None and drift.shape != s_last_drift.shape: + raise ValueError(f"{csp.now()}: shape of drift is not allowed to change") + s_last_drift = drift + + if csp.ticked(covariance): + # Only do the covariance decomposition when it changes + if s_cov_decomp is not None and covariance.shape != s_cov_decomp.shape: + raise ValueError(f"{csp.now()}: shape of covariance is not allowed to change") + s_cov_decomp = _matrix_decomposition(covariance, csp.now()) + + if csp.ticked(trigger) and csp.valid(drift, covariance): + if s_last_change is not None: + t = (csp.now() - s_last_change).total_seconds() + values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp) + s_last_change = csp.now() + if return_increments: + # Add in "last value" in case drift/covariance changed in between trigger updates + np.add(s_last_value, values, out=values) + s_last_value.fill(0.0) + return values + else: + np.add(s_last_value, values, out=s_last_value) + return s_last_value + + +@csp.graph +def brownian_motion_1d( + trigger: ts[object], drift: ts[float], variance: ts[float], seed: object, return_increments: bool = False +) -> ts[float]: + """Generate one-dimensional Brownian motion at the trigger times, with time-varying drift and variance. + The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero. + To use a different start value, use csp.const(initial_value) + brownian_motion_1d(...) + + Args: + trigger: When to return the value of the process + drift: Drift parameter (per second) + variance: Variance parameter (per second) + seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng + return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself + """ + drift = csp.apply(drift, lambda x: np.array([x]), Numpy1DArray[float]) + covariance = csp.apply(variance, lambda x: np.array([[x]]), NumpyNDArray[float]) + bm = brownian_motion(trigger, drift, covariance, seed=seed, return_increments=return_increments) + return numpy_to_list(bm, 1)[0] diff --git a/csp/tests/test_random.py b/csp/tests/test_random.py new file mode 100644 index 00000000..c43e02bd --- /dev/null +++ b/csp/tests/test_random.py @@ -0,0 +1,239 @@ +import numpy as np +import unittest +from datetime import datetime, timedelta + +import csp +from csp.random import brownian_motion, brownian_motion_1d, poisson_timer +from csp.typing import Numpy1DArray, NumpyNDArray + + +class TestPoissonTimer(unittest.TestCase): + def test_simple(self): + rate = csp.const(2.0) # two events per second + out = csp.run( + poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1000) + )[0] + # Should be about 2k events, subject to random noise + self.assertGreater(len(out), 1900) + self.assertLess(len(out), 2100) + self.assertEqual(out[0][1], "foo") + + def test_delayed_start(self): + rate = csp.const(0.1, delay=timedelta(days=1)) # two events per second + out = csp.run( + poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(days=2) + )[0] + self.assertGreater(out[0][0], datetime(2020, 1, 2)) + + def test_changing_rate(self): + rate1 = 1.0 + rate3 = 0.1 + rate = csp.curve( + float, [(datetime(2020, 1, 1, 0), rate1), (datetime(2020, 1, 1, 1), 0.0), (datetime(2020, 1, 1, 2), rate3)] + ) + out = csp.run(poisson_timer, rate, seed=1234, starttime=datetime(2020, 1, 1), endtime=datetime(2020, 1, 1, 3))[ + 0 + ] + first_hour = [v for t, v in out if t <= datetime(2020, 1, 1, 1)] + second_hour = [v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)] + third_hour = [v for t, v in out if t > datetime(2020, 1, 1, 2)] + self.assertGreater(len(first_hour), 60 * 60 * rate1 * 0.9) + self.assertLess(len(first_hour), 60 * 60 * rate1 * 1.1) + self.assertEqual(len(second_hour), 0) + self.assertGreater(len(third_hour), 60 * 60 * rate3 * 0.9) + self.assertLess(len(third_hour), 60 * 60 * rate3 * 1.1) + + +class TestBrownianMotion(unittest.TestCase): + def test_simple_increments(self): + mean = np.array([10.0, 0.0]) + cov = np.array([[1.0, 0.5], [0.5, 2.0]]) + # Use two second increment to make sure time scaling is ok + n_seconds = 2 + trigger = csp.timer(timedelta(seconds=n_seconds)) + out = csp.run( + brownian_motion, + trigger, + csp.const(mean), + csp.const(cov), + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=2000), + )[0] + self.assertEqual(len(out), 1000) + data = np.vstack([o[1] for o in out]) + err_mean = data.mean(axis=0) - mean * n_seconds + err_cov = np.cov(data.T) - cov * n_seconds + self.assertLess(np.abs(err_mean).max(), 0.2) + self.assertLess(np.abs(err_cov).max(), 0.2) + + def test_bad_covariance(self): + mean = np.array([10.0, 0.0]) + covs = [] + covs.append(np.array([[1.0], [2.0]])) # Not square + covs.append(np.array([[1.0]])) # Not same length as drift + covs.append(np.array([[1.0, 0.5], [1.0, 2.0]])) # Not symmetric + covs.append(np.array([[1.0, 10.0], [10.0, 2.0]])) # Not positive semi-definite + trigger = csp.timer(timedelta(seconds=1)) + for cov in covs: + self.assertRaises( + ValueError, + csp.run, + brownian_motion, + trigger, + csp.const(mean), + csp.const(cov), + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=2), + ) + + def test_brownian_motion(self): + # Test that increments add to brownian motion + mean = np.array([10.0, 0.0]) + cov = np.array([[1.0, 0.5], [0.5, 2.0]]) + trigger = csp.timer(timedelta(seconds=1)) + out = csp.run( + brownian_motion, + trigger, + csp.const(mean), + csp.const(cov), + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=100), + )[0] + data = np.vstack([o[1] for o in out]) + bm_out = csp.run( + brownian_motion, + trigger, + csp.const(mean), + csp.const(cov), + seed=1234, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=100), + )[0] + err = bm_out[-1][1] - data.sum(axis=0) + self.assertAlmostEquals(np.abs(err).max(), 0.0) + + def test_brownian_motion_1d(self): + mean = 10.0 + cov = 1.0 + trigger = csp.timer(timedelta(seconds=1)) + out = csp.run( + brownian_motion, + trigger, + csp.const(np.array([mean])), + csp.const(np.array([[cov]])), + seed=1234, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=100), + )[0] + out1 = csp.run( + brownian_motion_1d, + trigger, + csp.const(mean), + csp.const(cov), + seed=1234, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=100), + )[0] + self.assertEqual(out[-1][1][0], out1[-1][1]) + + def test_change_at_trigger(self): + mean1 = np.array([1.0]) + mean2 = np.array([-1.0]) + cov = np.array([[0.0]]) + trigger = csp.timer(timedelta(seconds=1)) + drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)]) + cov = csp.const(cov) + out = csp.run( + brownian_motion, + trigger, + drift, + cov, + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=datetime(2020, 1, 1, 0, 0, 2), + )[0] + target = [(datetime(2020, 1, 1, 0, 0, 1), np.array([1.0])), (datetime(2020, 1, 1, 0, 0, 2), np.array([-1.0]))] + np.testing.assert_equal(out, target) + + def test_change_between_triggers(self): + mean1 = np.array([1.0]) + mean2 = np.array([2.0]) + cov = np.array([[0.0]]) + trigger = csp.timer(timedelta(seconds=2)) + drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)]) + cov = csp.const(cov) + out = csp.run( + brownian_motion, + trigger, + drift, + cov, + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=datetime(2020, 1, 1, 0, 0, 2), + )[0] + target = [(datetime(2020, 1, 1, 0, 0, 2), np.array([3.0]))] + np.testing.assert_equal(out, target) + + def test_changing_parameters(self): + mean1 = np.array([10.0, 0.0]) + mean2 = np.array([1.0, 0.0]) + cov1 = np.array([[1.0, 0.5], [0.5, 2.0]]) + cov2 = np.array([[0.0, 0.0], [0.0, 1.0]]) # Note zero covariance for first dim! + n_seconds = 1 + trigger = csp.timer(timedelta(seconds=n_seconds)) + drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1, 0), mean1), (datetime(2020, 1, 1, 1), mean2)]) + cov = csp.curve(NumpyNDArray[float], [(datetime(2020, 1, 1, 0), cov1), (datetime(2020, 1, 1, 1), cov2)]) + out = csp.run( + brownian_motion, + trigger, + drift, + cov, + seed=1234, + return_increments=True, + starttime=datetime(2020, 1, 1), + endtime=datetime(2020, 1, 1, 2), + )[0] + data1 = np.vstack([v for t, v in out if t <= datetime(2020, 1, 1, 1)]) + data2 = np.vstack([v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)]) + + err_mean1 = data1.mean(axis=0) - mean1 * n_seconds + err_cov1 = np.cov(data1.T) - cov1 * n_seconds + self.assertLess(np.abs(err_mean1).max(), 0.05) + self.assertLess(np.abs(err_cov1).max(), 0.05) + + err_mean2 = data2.mean(axis=0) - mean2 * n_seconds + err_cov2 = np.cov(data2.T) - cov2 * n_seconds + self.assertLess(np.abs(err_mean2).max(), 0.05) + self.assertLess(np.abs(err_cov2).max(), 0.05) + + def disable_test_performance(self): + dim = 1_000 + N = 100_000 + mean = np.zeros(dim) + cov = np.diag(np.ones(dim)) + trigger = csp.timer(timedelta(seconds=1)) + + def graph(): + out = brownian_motion(trigger, csp.const(mean), csp.const(cov), seed=1234) + csp.add_graph_output("BM", out, tick_count=1) + + start = datetime.utcnow() + + from threadpoolctl import threadpool_limits # May need to pip install separately + + with threadpool_limits(limits=1): # To limit numpy parallelism + csp.run( + graph, + starttime=datetime(2020, 1, 1), + endtime=timedelta(seconds=N), + ) + end = datetime.utcnow() + print(f"Elapsed: {end-start}") diff --git a/pyproject.toml b/pyproject.toml index f046f006..0c66c43b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,7 @@ develop = [ "pytest-cov", "pytest-sugar", "scikit-build", + "threadpoolctl", "tornado", ]