Skip to content

Commit

Permalink
Merge pull request #56 from Point72/pit/csp_random
Browse files Browse the repository at this point in the history
Adding poisson_timer and brownian_motion to new csp.random module
  • Loading branch information
ptomecek authored Feb 13, 2024
2 parents 388eb26 + 8b3667b commit bcf3b47
Show file tree
Hide file tree
Showing 3 changed files with 391 additions and 0 deletions.
151 changes: 151 additions & 0 deletions csp/random.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import numpy as np
from datetime import timedelta
from typing import TypeVar

import csp
from csp import ts
from csp.stats import numpy_to_list
from csp.typing import Numpy1DArray, NumpyNDArray

__all__ = ("poisson_timer", "brownian_motion", "brownian_motion_1d")


T = TypeVar("T")


@csp.node
def poisson_timer(rate: ts[float], seed: object, value: "~T" = True) -> ts["T"]:
"""Generate events according to a Poisson process with time-varying rate.
For a fixed-interval timer see csp.timer.
Args:
rate: The rate of the Poisson process (per second), must be non-negative
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng
value: The value to tick when there are events (similar to csp.timer)
"""
with csp.alarms():
event = csp.alarm(bool)

with csp.state():
s_rng = np.random.default_rng(seed)
s_scheduled_event = None

if csp.ticked(rate):
if rate < 0:
raise ValueError(f"{csp.now()}: rate must be non-negative")
if s_scheduled_event:
csp.cancel_alarm(event, s_scheduled_event)
if rate > 0:
seconds = s_rng.exponential(1.0 / rate)
s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True)

if csp.ticked(event):
seconds = s_rng.exponential(1.0 / rate)
s_scheduled_event = csp.schedule_alarm(event, timedelta(seconds=seconds), True)
return value


def _make_brownian_increment(t, s_rng, drift, s_cov_decomp):
# Make the brownian increment as efficiently as possible
values = s_rng.normal(scale=np.sqrt(t), size=drift.size)
np.dot(s_cov_decomp, values, out=values)
np.add(drift * t, values, out=values)
return values


def _matrix_decomposition(matrix, now):
# Split matrix decomposition into its own function to help with profiling and make it easier to replace
# We use SVD instead of Cholesky because it's more stable and handles the zero variance case.
# Could also use eigenvalue decomposition, but we choose svd because it's the default for np.random.Generator.multivariate_normal
U, S, Vt = np.linalg.svd(matrix)
if not np.allclose(matrix, matrix.T):
raise ValueError(f"{now}: covariance not symmetric")
if not np.allclose(U, Vt):
raise ValueError(f"{now}: covariance not positive semidefinite")
return np.dot(U, np.sqrt(np.diag(S)), out=U)


@csp.node
def brownian_motion(
trigger: ts[object],
drift: ts[Numpy1DArray[float]],
covariance: ts[NumpyNDArray[float]],
seed: object,
return_increments: bool = False,
) -> ts[Numpy1DArray[float]]:
"""Generate multi-dimensional Brownian motion (or increments) at the trigger times, with time-varying drift and covariance.
The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero.
To use a different start value, use csp.const(initial_value) + brownian_motion(...)
Args:
trigger: When to return the value of the process
drift: Drift parameter (per second), i.e. array of length n
covariance: Covariance matrix (per second), i.e. array of size nxn
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng
return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself
"""
with csp.state():
s_rng = np.random.default_rng(seed)
s_cov_decomp = None # Placeholder for covariance matrix decomposition
s_last_change = None # Placeholder for last csp.now()
s_last_drift = None # Placeholder for last value of drift
s_last_value = None # Placeholder for cumulative value between trigger ticks

if csp.ticked(drift, covariance) and csp.valid(drift, covariance):
if s_last_change is None:
if not drift.ndim == 1:
raise ValueError(f"{csp.now()}: drift must be 1-dimensional")
if not covariance.ndim == 2:
raise ValueError(f"{csp.now()}: covariance must be 2-dimensional")
if not drift.size == covariance.shape[0]:
raise ValueError(f"{csp.now()}: drift and covariance must have same length")
s_last_value = np.zeros_like(drift)
else:
t = (csp.now() - s_last_change).total_seconds()
values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp)
np.add(values, s_last_value, out=s_last_value)
s_last_change = csp.now()
if s_last_drift is not None and drift.shape != s_last_drift.shape:
raise ValueError(f"{csp.now()}: shape of drift is not allowed to change")
s_last_drift = drift

if csp.ticked(covariance):
# Only do the covariance decomposition when it changes
if s_cov_decomp is not None and covariance.shape != s_cov_decomp.shape:
raise ValueError(f"{csp.now()}: shape of covariance is not allowed to change")
s_cov_decomp = _matrix_decomposition(covariance, csp.now())

if csp.ticked(trigger) and csp.valid(drift, covariance):
if s_last_change is not None:
t = (csp.now() - s_last_change).total_seconds()
values = _make_brownian_increment(t, s_rng, s_last_drift, s_cov_decomp)
s_last_change = csp.now()
if return_increments:
# Add in "last value" in case drift/covariance changed in between trigger updates
np.add(s_last_value, values, out=values)
s_last_value.fill(0.0)
return values
else:
np.add(s_last_value, values, out=s_last_value)
return s_last_value


@csp.graph
def brownian_motion_1d(
trigger: ts[object], drift: ts[float], variance: ts[float], seed: object, return_increments: bool = False
) -> ts[float]:
"""Generate one-dimensional Brownian motion at the trigger times, with time-varying drift and variance.
The Brownian motion starts once drift and covariance have at least 1 tick each, and will start from zero.
To use a different start value, use csp.const(initial_value) + brownian_motion_1d(...)
Args:
trigger: When to return the value of the process
drift: Drift parameter (per second)
variance: Variance parameter (per second)
seed: The seed for the numpy random Generator. Can be anything accepted by np.random.default_rng
return_increments: Whether to return increments of the brownian motion at trigger times instead of the process itself
"""
drift = csp.apply(drift, lambda x: np.array([x]), Numpy1DArray[float])
covariance = csp.apply(variance, lambda x: np.array([[x]]), NumpyNDArray[float])
bm = brownian_motion(trigger, drift, covariance, seed=seed, return_increments=return_increments)
return numpy_to_list(bm, 1)[0]
239 changes: 239 additions & 0 deletions csp/tests/test_random.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import numpy as np
import unittest
from datetime import datetime, timedelta

import csp
from csp.random import brownian_motion, brownian_motion_1d, poisson_timer
from csp.typing import Numpy1DArray, NumpyNDArray


class TestPoissonTimer(unittest.TestCase):
def test_simple(self):
rate = csp.const(2.0) # two events per second
out = csp.run(
poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1000)
)[0]
# Should be about 2k events, subject to random noise
self.assertGreater(len(out), 1900)
self.assertLess(len(out), 2100)
self.assertEqual(out[0][1], "foo")

def test_delayed_start(self):
rate = csp.const(0.1, delay=timedelta(days=1)) # two events per second
out = csp.run(
poisson_timer, rate, seed=1234, value="foo", starttime=datetime(2020, 1, 1), endtime=timedelta(days=2)
)[0]
self.assertGreater(out[0][0], datetime(2020, 1, 2))

def test_changing_rate(self):
rate1 = 1.0
rate3 = 0.1
rate = csp.curve(
float, [(datetime(2020, 1, 1, 0), rate1), (datetime(2020, 1, 1, 1), 0.0), (datetime(2020, 1, 1, 2), rate3)]
)
out = csp.run(poisson_timer, rate, seed=1234, starttime=datetime(2020, 1, 1), endtime=datetime(2020, 1, 1, 3))[
0
]
first_hour = [v for t, v in out if t <= datetime(2020, 1, 1, 1)]
second_hour = [v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)]
third_hour = [v for t, v in out if t > datetime(2020, 1, 1, 2)]
self.assertGreater(len(first_hour), 60 * 60 * rate1 * 0.9)
self.assertLess(len(first_hour), 60 * 60 * rate1 * 1.1)
self.assertEqual(len(second_hour), 0)
self.assertGreater(len(third_hour), 60 * 60 * rate3 * 0.9)
self.assertLess(len(third_hour), 60 * 60 * rate3 * 1.1)


class TestBrownianMotion(unittest.TestCase):
def test_simple_increments(self):
mean = np.array([10.0, 0.0])
cov = np.array([[1.0, 0.5], [0.5, 2.0]])
# Use two second increment to make sure time scaling is ok
n_seconds = 2
trigger = csp.timer(timedelta(seconds=n_seconds))
out = csp.run(
brownian_motion,
trigger,
csp.const(mean),
csp.const(cov),
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=2000),
)[0]
self.assertEqual(len(out), 1000)
data = np.vstack([o[1] for o in out])
err_mean = data.mean(axis=0) - mean * n_seconds
err_cov = np.cov(data.T) - cov * n_seconds
self.assertLess(np.abs(err_mean).max(), 0.2)
self.assertLess(np.abs(err_cov).max(), 0.2)

def test_bad_covariance(self):
mean = np.array([10.0, 0.0])
covs = []
covs.append(np.array([[1.0], [2.0]])) # Not square
covs.append(np.array([[1.0]])) # Not same length as drift
covs.append(np.array([[1.0, 0.5], [1.0, 2.0]])) # Not symmetric
covs.append(np.array([[1.0, 10.0], [10.0, 2.0]])) # Not positive semi-definite
trigger = csp.timer(timedelta(seconds=1))
for cov in covs:
self.assertRaises(
ValueError,
csp.run,
brownian_motion,
trigger,
csp.const(mean),
csp.const(cov),
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=2),
)

def test_brownian_motion(self):
# Test that increments add to brownian motion
mean = np.array([10.0, 0.0])
cov = np.array([[1.0, 0.5], [0.5, 2.0]])
trigger = csp.timer(timedelta(seconds=1))
out = csp.run(
brownian_motion,
trigger,
csp.const(mean),
csp.const(cov),
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=100),
)[0]
data = np.vstack([o[1] for o in out])
bm_out = csp.run(
brownian_motion,
trigger,
csp.const(mean),
csp.const(cov),
seed=1234,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=100),
)[0]
err = bm_out[-1][1] - data.sum(axis=0)
self.assertAlmostEquals(np.abs(err).max(), 0.0)

def test_brownian_motion_1d(self):
mean = 10.0
cov = 1.0
trigger = csp.timer(timedelta(seconds=1))
out = csp.run(
brownian_motion,
trigger,
csp.const(np.array([mean])),
csp.const(np.array([[cov]])),
seed=1234,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=100),
)[0]
out1 = csp.run(
brownian_motion_1d,
trigger,
csp.const(mean),
csp.const(cov),
seed=1234,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=100),
)[0]
self.assertEqual(out[-1][1][0], out1[-1][1])

def test_change_at_trigger(self):
mean1 = np.array([1.0])
mean2 = np.array([-1.0])
cov = np.array([[0.0]])
trigger = csp.timer(timedelta(seconds=1))
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)])
cov = csp.const(cov)
out = csp.run(
brownian_motion,
trigger,
drift,
cov,
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=datetime(2020, 1, 1, 0, 0, 2),
)[0]
target = [(datetime(2020, 1, 1, 0, 0, 1), np.array([1.0])), (datetime(2020, 1, 1, 0, 0, 2), np.array([-1.0]))]
np.testing.assert_equal(out, target)

def test_change_between_triggers(self):
mean1 = np.array([1.0])
mean2 = np.array([2.0])
cov = np.array([[0.0]])
trigger = csp.timer(timedelta(seconds=2))
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1), mean1), (datetime(2020, 1, 1, 0, 0, 1), mean2)])
cov = csp.const(cov)
out = csp.run(
brownian_motion,
trigger,
drift,
cov,
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=datetime(2020, 1, 1, 0, 0, 2),
)[0]
target = [(datetime(2020, 1, 1, 0, 0, 2), np.array([3.0]))]
np.testing.assert_equal(out, target)

def test_changing_parameters(self):
mean1 = np.array([10.0, 0.0])
mean2 = np.array([1.0, 0.0])
cov1 = np.array([[1.0, 0.5], [0.5, 2.0]])
cov2 = np.array([[0.0, 0.0], [0.0, 1.0]]) # Note zero covariance for first dim!
n_seconds = 1
trigger = csp.timer(timedelta(seconds=n_seconds))
drift = csp.curve(Numpy1DArray[float], [(datetime(2020, 1, 1, 0), mean1), (datetime(2020, 1, 1, 1), mean2)])
cov = csp.curve(NumpyNDArray[float], [(datetime(2020, 1, 1, 0), cov1), (datetime(2020, 1, 1, 1), cov2)])
out = csp.run(
brownian_motion,
trigger,
drift,
cov,
seed=1234,
return_increments=True,
starttime=datetime(2020, 1, 1),
endtime=datetime(2020, 1, 1, 2),
)[0]
data1 = np.vstack([v for t, v in out if t <= datetime(2020, 1, 1, 1)])
data2 = np.vstack([v for t, v in out if datetime(2020, 1, 1, 1) < t <= datetime(2020, 1, 1, 2)])

err_mean1 = data1.mean(axis=0) - mean1 * n_seconds
err_cov1 = np.cov(data1.T) - cov1 * n_seconds
self.assertLess(np.abs(err_mean1).max(), 0.05)
self.assertLess(np.abs(err_cov1).max(), 0.05)

err_mean2 = data2.mean(axis=0) - mean2 * n_seconds
err_cov2 = np.cov(data2.T) - cov2 * n_seconds
self.assertLess(np.abs(err_mean2).max(), 0.05)
self.assertLess(np.abs(err_cov2).max(), 0.05)

def disable_test_performance(self):
dim = 1_000
N = 100_000
mean = np.zeros(dim)
cov = np.diag(np.ones(dim))
trigger = csp.timer(timedelta(seconds=1))

def graph():
out = brownian_motion(trigger, csp.const(mean), csp.const(cov), seed=1234)
csp.add_graph_output("BM", out, tick_count=1)

start = datetime.utcnow()

from threadpoolctl import threadpool_limits # May need to pip install separately

with threadpool_limits(limits=1): # To limit numpy parallelism
csp.run(
graph,
starttime=datetime(2020, 1, 1),
endtime=timedelta(seconds=N),
)
end = datetime.utcnow()
print(f"Elapsed: {end-start}")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ develop = [
"pytest-cov",
"pytest-sugar",
"scikit-build",
"threadpoolctl",
"tornado",
]

Expand Down

0 comments on commit bcf3b47

Please sign in to comment.