diff --git a/bfast/base.py b/bfast/base.py index ee72bfe..63aeaa2 100644 --- a/bfast/base.py +++ b/bfast/base.py @@ -12,6 +12,7 @@ class BFASTMonitorBase(ABC): def __init__( self, start_monitor, + history="all", freq=365, k=3, hfrac=0.25, @@ -21,6 +22,7 @@ def __init__( verbose=0, ): self.start_monitor = start_monitor + self.history = history self.freq = freq self.k = k self.hfrac = hfrac diff --git a/bfast/models.py b/bfast/models.py index 98520bd..6e178f3 100644 --- a/bfast/models.py +++ b/bfast/models.py @@ -19,6 +19,14 @@ class BFASTMonitor(): A datetime object specifying the start of the monitoring phase. + history : str, default "all" + Specification of the start of a stable history period. + Can be one of "ROC" and "all". If set to "ROC", a + reverse-ordered CUSUM test will be employed to + automatically detect the start of a stable history + period. If set to "all", the start of the stable + history period is the beginning of the time series. + freq : int, default 365 The frequency for the seasonal model. @@ -89,6 +97,7 @@ class BFASTMonitor(): def __init__( self, start_monitor, + history="all", freq=365, k=3, hfrac=0.25, @@ -102,7 +111,11 @@ def __init__( detailed_results=False, find_magnitudes=True, ): + history_enum = ("all", "ROC") + if history not in history_enum: + raise Exception("Current implementation can only handle the following values for history: {}".format(history_enum)) self.start_monitor = start_monitor + self.history = history self.freq = freq self.k = k self.hfrac = hfrac @@ -143,6 +156,7 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): if self.backend == 'python': self._model = BFASTMonitorPython( start_monitor=self.start_monitor, + history=self.history, freq=self.freq, k=self.k, hfrac=self.hfrac, @@ -155,6 +169,7 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): elif self.backend == 'python-mp': self._model = BFASTMonitorPython( start_monitor=self.start_monitor, + history=self.history, freq=self.freq, k=self.k, hfrac=self.hfrac, @@ -168,6 +183,7 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): elif self.backend == 'opencl': self._model = BFASTMonitorOpenCL( start_monitor=self.start_monitor, + history=self.history, freq=self.freq, k=self.k, hfrac=self.hfrac, @@ -312,9 +328,25 @@ def valids(self): ------- array-like : An array containing the number of valid values for each pixel in the - aray data + array data """ if self._is_fitted(): return self._model.valids raise Exception("Model not yet fitted!") + + @property + def history_starts(self): + """ Returns index of the start of the stable + history period for each pixel + + Returns + ------- + array-like : An array containing indices + of starts of stable history periods + in the array data + """ + if self._is_fitted(): + return self._model.history_starts + + raise Exception("Model not yet fitted!") diff --git a/bfast/monitor/opencl/base.py b/bfast/monitor/opencl/base.py index 7239edc..b4dbfa8 100644 --- a/bfast/monitor/opencl/base.py +++ b/bfast/monitor/opencl/base.py @@ -14,7 +14,9 @@ import pyopencl.array as pycl_array from bfast.base import BFASTMonitorBase -from bfast.monitor.utils import compute_end_history, compute_lam, map_indices +from bfast.monitor.utils import ( + compute_end_history, compute_lam, map_indices, compute_lam_brownian +) # generated by Futhark from .bfastfinal import bfastfinal @@ -37,6 +39,13 @@ class BFASTMonitorOpenCL(BFASTMonitorBase): start_monitor : datetime A datetime object specifying the start of the monitoring phase. + history : str, default "all" + Specification of the start of a stable history period. + Can be one of "ROC" and "all". If set to "ROC", a + reverse-ordered CUSUM test will be employed to + automatically detect the start of a stable history + period. If set to "all", the start of the stable + history period is the beginning of the time series. freq : int, default 365 The frequency for the seasonal model k : int, default 3 @@ -71,6 +80,7 @@ class BFASTMonitorOpenCL(BFASTMonitorBase): """ def __init__(self, start_monitor, + history="all", freq=365, k=3, hfrac=0.25, @@ -89,6 +99,7 @@ def __init__(self, raise Exception("Current implementation can only handle the following values for k: {}".format(k_valid)) super().__init__(start_monitor, + history, freq, k=k, hfrac=hfrac, @@ -102,14 +113,17 @@ def __init__(self, self.platform_id = platform_id self.device_id = device_id + history_enum = { "ROC": -1, "all": 0 } + self.history_num = history_enum[self.history] + # initialize device self._init_device(platform_id, device_id) self.futobj = bfastfinal(command_queue=self.queue, interactive=False, default_tile_size=8, - default_reg_tile_size=3, - sizes=self._get_futhark_params()) + default_reg_tile_size=3) + # sizes=self._get_futhark_params()) def _init_device(self, platform_id, device_id): """ Initializes the device.""" @@ -224,6 +238,7 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): self.breaks = results['breaks'] self.means = results['means'] self.valids = results['valids'] + self.history_starts = results['history_starts'] if self.find_magnitudes or self.detailed_results: self.magnitudes = results['magnitudes'] @@ -291,10 +306,11 @@ def _fit_single(self, data, dates, nan_value): def _fit_single_preprocess(self, data, dates, nan_value): start = time.time() - mapped_indices = map_indices(dates).astype(numpy.int32) + mapped_indices = map_indices(dates).astype(numpy.int64) N = data.shape[0] self.n = compute_end_history(dates, self.start_monitor) self.lam = compute_lam(N, self.hfrac, self.level, self.period) + self.conf_ROC = compute_lam_brownian(self.level) end = time.time() if self.verbose > 0: @@ -339,32 +355,43 @@ def _fit_single_kernel(self, y_cl, mapped_indices_cl): means, \ magnitudes, \ y_error, \ - y_pred = self.futobj.mainDetailed(trend, - self.k, - self.n, - self.freq, - self.hfrac, - self.lam, - mapped_indices_cl, y_cl) + y_pred, \ + hist = self.futobj.mainDetailed(trend, + self.k, + self.n, + self.freq, + self.hfrac, + self.level, + self.lam, + self.history_num, + self.conf_ROC, + mapped_indices_cl, y_cl) elif self.find_magnitudes: Ns, \ breaks, \ means, \ - magnitudes = self.futobj.mainMagnitude(trend, - self.k, - self.n, - self.freq, - self.hfrac, - self.lam, - mapped_indices_cl, y_cl) - else: - Ns, breaks, means = self.futobj.main(trend, + magnitudes, \ + hist = self.futobj.mainMagnitude(trend, self.k, self.n, self.freq, self.hfrac, + self.level, self.lam, + self.history_num, + self.conf_ROC, mapped_indices_cl, y_cl) + else: + Ns, breaks, means, hist = self.futobj.main(trend, + self.k, + self.n, + self.freq, + self.hfrac, + self.level, + self.lam, + self.history_num, + self.conf_ROC, + mapped_indices_cl, y_cl) end = time.time() @@ -380,6 +407,7 @@ def _fit_single_kernel(self, y_cl, mapped_indices_cl): results['breaks'] = breaks results['means'] = means results['valids'] = Ns + results['history_starts'] = hist if self.find_magnitudes or self.detailed_results: results['magnitudes'] = magnitudes @@ -398,6 +426,7 @@ def _fit_single_postprocessing(self, results, oshape): results['breaks'] = results['breaks'].get().reshape(oshape[1:]) results['means'] = results['means'].get().reshape(oshape[1:]) results['valids'] = results['valids'].get().T.reshape(oshape[1:]) + results['history_starts'] = results['history_starts'].get().T.reshape(oshape[1:]) if self.find_magnitudes or self.detailed_results: results['magnitudes'] = results['magnitudes'].get().reshape(oshape[1:]) @@ -435,6 +464,12 @@ def __append_results(self, res, results): else: results['valids'] = res['valids'] + if 'history_starts' in results.keys(): + results['history_starts'] = numpy.concatenate([results['history_starts'], + res['history_starts']], axis=0) + else: + results['history_starts'] = res['history_starts'] + if self.find_magnitudes or self.detailed_results: if 'magnitudes' in results.keys(): results['magnitudes'] = numpy.concatenate([results['magnitudes'], res['magnitudes']], axis=0) diff --git a/bfast/monitor/opencl/futhark/Makefile b/bfast/monitor/opencl/futhark/Makefile index b035576..073596f 100644 --- a/bfast/monitor/opencl/futhark/Makefile +++ b/bfast/monitor/opencl/futhark/Makefile @@ -1,4 +1,4 @@ -bfastfinal.py: bfastfinal.fut helpers.fut - futhark pkg add github.com/diku-dk/sorts +bfastfinal.py: bfastfinal.fut helpers.fut mroc.fut recresid.fut + futhark pkg upgrade futhark pkg sync futhark pyopencl --library bfastfinal.fut diff --git a/bfast/monitor/opencl/futhark/bfastfinal.fut b/bfast/monitor/opencl/futhark/bfastfinal.fut index 9e14a43..ee9e36c 100644 --- a/bfast/monitor/opencl/futhark/bfastfinal.fut +++ b/bfast/monitor/opencl/futhark/bfastfinal.fut @@ -12,45 +12,56 @@ import "lib/github.com/diku-dk/sorts/insertion_sort" import "helpers" +import "mroc" -- | implementation is in this entry point -- the outer map is distributed directly -let mainFun [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) - (hfrac: f32) (lam: f32) - (mappingindices : [N]i32) - (images : [m][N]f32) = +let mainFun [m][N] (trend: i64) (k: i64) (n: i64) (freq: f64) + (hfrac: f64) (level: f64) (lam: f64) (hist: i64) (conf: f64) + (mappingindices : [N]i64) + (images : [m][N]f64) = ---------------------------------- -- 1. make interpolation matrix -- ---------------------------------- - let n64 = i64.i32 n let k2p2 = 2 * k + 2 let k2p2' = if trend > 0 then k2p2 else k2p2-1 let X = (if trend > 0 - then mkX_with_trend (i64.i32 k2p2') freq mappingindices - else mkX_no_trend (i64.i32 k2p2') freq mappingindices) + then mkX_with_trend (k2p2') freq mappingindices + else mkX_no_trend (k2p2') freq mappingindices) |> intrinsics.opaque -- PERFORMANCE BUG: instead of `let Xt = copy (transpose X)` -- we need to write the following ugly thing to force manifestation: - let zero = r32 <| i32.i64 <| (N * N + 2 * N + 1) / (N + 1) - N - 1 + let zero = f64.i64 <| (N * N + 2 * N + 1) / (N + 1) - N - 1 let Xt = intrinsics.opaque <| map (map (+zero)) (copy (transpose X)) - let Xh = X[:,:n64] - let Xth = Xt[:n64,:] - let Yh = images[:,:n64] + let Xh = X[:,:n] + let Xth = Xt[:n,:] + let Yh = images[:,:n] + ---------------------------------- + -- 2. stable history -- + ---------------------------------- + let hist_inds = if hist == -1 + then mhistory_roc level conf Xth Yh + else replicate m hist + -- Set stable history period. + let images = map2 (\j -> + map2 (\i yi -> if i < j then f64.nan else yi) (iota N) + ) hist_inds images + let Yh = images[:,:n] ---------------------------------- - -- 2. mat-mat multiplication -- + -- 3. mat-mat multiplication -- ---------------------------------- let Xsqr = intrinsics.opaque <| map (matmul_filt Xh Xth) Yh ---------------------------------- - -- 3. matrix inversion -- + -- 4. matrix inversion -- ---------------------------------- let Xinv = intrinsics.opaque <| map mat_inv Xsqr --------------------------------------------- - -- 4. several matrix-vector multiplication -- + -- 5. several matrix-vector multiplication -- --------------------------------------------- let beta0 = map (matvecmul_row_filt Xh) Yh -- [m][2k+2] |> intrinsics.opaque @@ -65,73 +76,73 @@ let mainFun [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) -- (transpose X) instead of Xt --------------------------------------------- - -- 5. filter etc. -- + -- 6. filter etc. -- --------------------------------------------- let (Nss, y_errors, val_indss) = intrinsics.opaque <| unzip3 <| map2 (\y y_pred -> - let y_error_all = map2 (\ye yep -> if !(f32.isnan ye) + let y_error_all = map2 (\ye yep -> if !(f64.isnan ye) then ye - yep - else f32.nan + else f64.nan ) y y_pred - in filterPadWithKeys (\y -> !(f32.isnan y)) f32.nan y_error_all + in filterPadWithKeys (\y -> !(f64.isnan y)) f64.nan y_error_all ) images y_preds ------------------------------------------------ - -- 6. ns and sigma (can be fused with above) -- + -- 7. ns and sigma (can be fused with above) -- ------------------------------------------------ let (hs, nss, sigmas) = intrinsics.opaque <| unzip3 <| map2 (\yh y_error -> - let ns = map (\ye -> if !(f32.isnan ye) then 1 else 0) yh + let ns = map (\ye -> if !(f64.isnan ye) then 1 else 0) yh |> reduce (+) 0 - let sigma = map (\i -> if i < ns then y_error[i] else 0.0) (iota3232 n) + let sigma = map (\i -> if i < ns then y_error[i] else 0.0) (iota n) |> map (\a -> a * a) |> reduce (+) 0.0 - let sigma = f32.sqrt (sigma / (r32 (ns - k2p2))) - let h = t32 ((r32 ns) * hfrac) + let sigma = f64.sqrt (sigma / (f64.i64 (ns - k2p2))) + let h = i64.f64 ((f64.i64 ns) * hfrac) in (h, ns, sigma) ) Yh y_errors --------------------------------------------- - -- 7. moving sums first and bounds: -- + -- 8. moving sums first and bounds: -- --------------------------------------------- - let hmax = reduce_comm (i32.max) 0 hs + let hmax = reduce_comm (i64.max) 0 hs let MO_fsts = zip3 y_errors nss hs |> map (\(y_error, ns, h) -> map (\i -> if i < h then y_error[i + ns - h + 1] else 0.0 - ) (iota3232 hmax) + ) (iota hmax) |> reduce (+) 0.0 ) |> intrinsics.opaque let BOUND = map (\q -> let time = mappingindices[n + q] - let tmp = logplus ((r32 time) / (r32 mappingindices[n - 1])) - in lam * (f32.sqrt tmp) - ) (iota32 (N-n64)) + let tmp = logplus ((f64.i64 time) / (f64.i64 mappingindices[n - 1])) + in lam * (f64.sqrt tmp) + ) (iota (N-n)) --------------------------------------------- - -- 8. error magnitude computation: -- + -- 9. error magnitude computation: -- --------------------------------------------- let magnitudes = zip3 Nss nss y_errors |> map (\(Ns, ns, y_error) -> - map (\i -> if i < Ns - ns && !(f32.isnan y_error[ns + i]) + map (\i -> if i < Ns - ns && !(f64.isnan y_error[ns + i]) then y_error[ns + i] - else f32.inf - ) (iota32 (N - n64)) + else f64.inf + ) (iota (N - n)) -- sort - |> insertion_sort (f32.<=) + |> insertion_sort (f64.<=) -- extract median |> (\xs -> let i = (Ns - ns) / 2 let j = i - 1 in if Ns == ns - then 0f32 + then 0f64 else if (Ns - ns) % 2 == 0 then (xs[j] + xs[i]) / 2 else xs[i]) ) |> intrinsics.opaque --------------------------------------------- - -- 9. moving sums computation: -- + -- 10. moving sums computation: -- --------------------------------------------- let (MOs, MOs_NN, breaks, means) = zip (zip4 Nss nss sigmas hs) (zip3 MO_fsts y_errors val_indss) |> @@ -139,63 +150,63 @@ let mainFun [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) let MO = map (\j -> if j >= Ns - ns then 0.0 else if j == 0 then MO_fst else y_error[ns + j] - y_error[ns - h + j] - ) (iota32 (N - n64)) |> scan (+) 0.0 + ) (iota (N - n)) |> scan (+) 0.0 - let MO' = map (\mo -> mo / (sigma * (f32.sqrt (r32 ns))) ) MO + let MO' = map (\mo -> mo / (sigma * (f64.sqrt (f64.i64 ns))) ) MO let (is_break, fst_break) = - map3 (\mo' b j -> if j < Ns - ns && !(f32.isnan mo') - then ( (f32.abs mo') > b, j ) + map3 (\mo' b j -> if j < Ns - ns && !(f64.isnan mo') + then ( (f64.abs mo') > b, j ) else ( false, j ) - ) MO' BOUND (iota32 (N - n64)) + ) MO' BOUND (iota (N - n)) |> reduce (\(b1, i1) (b2, i2) -> if b1 then (b1, i1) else if b2 then (b2, i2) else (b1, i1) ) (false, -1) - let mean = map2 (\x j -> if j < Ns - ns then x else 0.0 ) MO' (iota32 (N - n64)) + let mean = map2 (\x j -> if j < Ns - ns then x else 0.0 ) MO' (iota (N - n)) |> reduce (+) 0.0 - |> (\x -> if (Ns - ns) == 0 then 0f32 else x / (r32 (Ns - ns))) + |> (\x -> if (Ns - ns) == 0 then 0f64 else x / (f64.i64 (Ns - ns))) let fst_break' = if !is_break then -1 else adjustValInds n ns Ns val_inds fst_break let fst_break' = if ns <=5 || Ns-ns <= 5 then -2 else fst_break' -- The computation of MO'' should be moved just after MO' to make bounds consistent - let val_inds' = map (adjustValInds n ns Ns val_inds) (iota32 (N - n64)) - let MO'' = scatter (replicate (N - n64) f32.nan) (map i64.i32 val_inds') MO' + let val_inds' = map (adjustValInds n ns Ns val_inds) (iota (N - n)) + let MO'' = scatter (replicate (N - n) f64.nan) val_inds' MO' in (MO'', MO', fst_break', mean) ) |> unzip4 - in (MO_fsts, Nss, nss, sigmas, MOs, MOs_NN, BOUND, breaks, means, magnitudes, y_errors, y_preds) + in (MO_fsts, Nss, nss, sigmas, MOs, MOs_NN, BOUND, breaks, means, magnitudes, y_errors, y_preds, hist_inds) -- | Entry points -entry mainDetailed [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) - (hfrac: f32) (lam: f32) - (mappingindices : [N]i32) - (images : [m][N]f32) = - mainFun trend k n freq hfrac lam mappingindices images - -entry mainMagnitude [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) - (hfrac: f32) (lam: f32) - (mappingindices : [N]i32) - (images : [m][N]f32) = - let (_, Nss, _, _, _, _, _, breaks, means, magnitudes, _, _) = - mainFun trend k n freq hfrac lam mappingindices images - in (Nss, breaks, means, magnitudes) - -entry main [m][N] (trend: i32) (k: i32) (n: i32) (freq: f32) - (hfrac: f32) (lam: f32) - (mappingindices : [N]i32) - (images : [m][N]f32) = - let (_, Nss, _, _, _, _, _, breaks, means, _, _, _) = - mainFun trend k n freq hfrac lam mappingindices images - in (Nss, breaks, means) +entry mainDetailed [m][N] (trend: i64) (k: i64) (n: i64) (freq: f64) + (hfrac: f64) (level: f64) (lam: f64) (hist: i64) (conf: f64) + (mappingindices : [N]i64) + (images : [m][N]f64) = + mainFun trend k n freq hfrac level lam hist conf mappingindices images + +entry mainMagnitude [m][N] (trend: i64) (k: i64) (n: i64) (freq: f64) + (hfrac: f64) (level: f64) (lam: f64) (hist: i64) (conf: f64) + (mappingindices : [N]i64) + (images : [m][N]f64) = + let (_, Nss, _, _, _, _, _, breaks, means, magnitudes, _, _, hist_inds) = + mainFun trend k n freq hfrac level lam hist conf mappingindices images + in (Nss, breaks, means, magnitudes, hist_inds) + +entry main [m][N] (trend: i64) (k: i64) (n: i64) (freq: f64) + (hfrac: f64) (level: f64) (lam: f64) (hist: i64) (conf: f64) + (mappingindices : [N]i64) + (images : [m][N]f64) = + let (_, Nss, _, _, _, _, _, breaks, means, _, _, _, hist_inds) = + mainFun trend k n freq hfrac level lam hist conf mappingindices images + in (Nss, breaks, means, hist_inds) entry convertToFloat [m][n][p] (nan_value: i16) (images : [m][n][p]i16) = map (\block -> map (\row -> - map (\el -> if el == nan_value then f32.nan else f32.i16 el) row + map (\el -> if el == nan_value then f64.nan else f64.i16 el) row ) block ) images -entry reshapeTransp [m][n][p] (images : [m][n][p]f32) : [][m]f32 = +entry reshapeTransp [m][n][p] (images : [m][n][p]f64) : [][m]f64 = let images' = map (flatten_to (n * p)) images in transpose images' diff --git a/bfast/monitor/opencl/futhark/bfastfinal.fut.tuning b/bfast/monitor/opencl/futhark/bfastfinal.fut.tuning deleted file mode 100644 index 86e86af..0000000 --- a/bfast/monitor/opencl/futhark/bfastfinal.fut.tuning +++ /dev/null @@ -1,48 +0,0 @@ -main.suff_intra_par_11=128 -main.suff_intra_par_13=128 -main.suff_intra_par_24=235 -main.suff_intra_par_29=113 -main.suff_intra_par_34=122 -main.suff_outer_par_16=2000000000 -main.suff_outer_par_17=892448 -main.suff_outer_par_18=2000000000 -main.suff_outer_par_19=892448 -main.suff_outer_par_20=2000000000 -main.suff_outer_par_21=26215660 -main.suff_outer_par_28=2000000000 -main.suff_outer_par_31=111556 -main.suff_outer_par_6=2000000000 -main.suff_outer_par_7=2000000000 -main.suff_outer_par_8=7139584 -mainMagnitude.suff_intra_par_11=128 -mainMagnitude.suff_intra_par_13=2000000000 -mainMagnitude.suff_intra_par_24=235 -mainMagnitude.suff_intra_par_29=113 -mainMagnitude.suff_intra_par_37=122 -mainMagnitude.suff_outer_par_16=2000000000 -mainMagnitude.suff_outer_par_17=892448 -mainMagnitude.suff_outer_par_18=111556 -mainMagnitude.suff_outer_par_19=2000000000 -mainMagnitude.suff_outer_par_20=2000000000 -mainMagnitude.suff_outer_par_21=26215660 -mainMagnitude.suff_outer_par_28=2000000000 -mainMagnitude.suff_outer_par_31=111556 -mainMagnitude.suff_outer_par_6=2000000000 -mainMagnitude.suff_outer_par_7=2000000000 -mainMagnitude.suff_outer_par_8=7139584 -mainDetailed.suff_intra_par_11=2000000000 -mainDetailed.suff_intra_par_13=2000000000 -mainDetailed.suff_intra_par_24=2000000000 -mainDetailed.suff_intra_par_29=2000000000 -mainDetailed.suff_intra_par_37=2000000000 -mainDetailed.suff_outer_par_16=2000000000 -mainDetailed.suff_outer_par_17=2000000000 -mainDetailed.suff_outer_par_18=2000000000 -mainDetailed.suff_outer_par_19=2000000000 -mainDetailed.suff_outer_par_20=2000000000 -mainDetailed.suff_outer_par_21=2000000000 -mainDetailed.suff_outer_par_28=2000000000 -mainDetailed.suff_outer_par_31=2000000000 -mainDetailed.suff_outer_par_6=2000000000 -mainDetailed.suff_outer_par_7=2000000000 -mainDetailed.suff_outer_par_8=2000000000 diff --git a/bfast/monitor/opencl/futhark/futhark.pkg b/bfast/monitor/opencl/futhark/futhark.pkg new file mode 100644 index 0000000..2abdb6a --- /dev/null +++ b/bfast/monitor/opencl/futhark/futhark.pkg @@ -0,0 +1,5 @@ +require { + github.com/diku-dk/sorts 0.4.2 #3b2cfcc0d0256df7cd1f2548f68d85f2453007a0 + github.com/diku-dk/statistics 0.2.1 #ad49c009c7fb5d3059d4edca65d0e0f6b145da05 + github.com/nhey/lm 0.0.3 #b0a4b4e2b21aa7d93adc0ce66bda43bd986dfe32 +} diff --git a/bfast/monitor/opencl/futhark/helpers.fut b/bfast/monitor/opencl/futhark/helpers.fut index e0d1b15..5709584 100644 --- a/bfast/monitor/opencl/futhark/helpers.fut +++ b/bfast/monitor/opencl/futhark/helpers.fut @@ -1,73 +1,67 @@ -let iota32 (x: i64) : []i32 = - iota x |> map i32.i64 +let logplus (x: f64) : f64 = + if x > (f64.exp 1) + then f64.log x else 1 -let iota3232 (x: i32) : []i32 = - iota (i64.i32 x) |> map i32.i64 - -let logplus (x: f32) : f32 = - if x > (f32.exp 1) - then f32.log x else 1 - -let adjustValInds [N] (n : i32) (ns : i32) (Ns : i32) (val_inds : [N]i32) (ind: i32) : i32 = +let adjustValInds [N] (n : i64) (ns : i64) (Ns : i64) (val_inds : [N]i64) (ind: i64) : i64 = if ind < Ns - ns then val_inds[ind + ns] - n else -1 let filterPadWithKeys [n] 't (p : (t -> bool)) (dummy : t) - (arr : [n]t) : (i32, [n]t, [n]i32) = + (arr : [n]t) : (i64, [n]t, [n]i64) = let tfs = map (\a -> if p a then 1i64 else 0i64) arr let isT = scan (+) 0i64 tfs - let i = last isT |> i32.i64 + let i = last isT let inds= map2 (\a iT -> if p a then iT - 1 else -1i64) arr isT let rs = scatter (replicate n dummy) inds arr - let ks = scatter (replicate n 0i32) inds (iota32 n) + let ks = scatter (replicate n 0i64) inds (iota n) in (i, rs, ks) -- | builds the X matrices; first result dimensions of size 2*k+2 -let mkX_with_trend [N] (k2p2: i64) (f: f32) (mappingindices: [N]i32): [k2p2][N]f32 = +let mkX_with_trend [N] (k2p2: i64) (f: f64) (mappingindices: [N]i64): [k2p2][N]f64 = map (\ i -> map (\ind -> - if i == 0 then 1f32 - else if i == 1 then r32 ind - else let (i', j') = (r32 (i / 2), r32 ind) - let angle = 2f32 * f32.pi * i' * j' / f - in if i % 2 == 0 then f32.sin angle - else f32.cos angle + if i == 0 then 1f64 + else if i == 1 then f64.i64 ind + else let (i', j') = (f64.i64 (i / 2), f64.i64 ind) + let angle = 2f64 * f64.pi * i' * j' / f + in if i % 2 == 0 then f64.sin angle + else f64.cos angle ) mappingindices - ) (iota32 k2p2) + ) (iota k2p2) -let mkX_no_trend [N] (k2p2m1: i64) (f: f32) (mappingindices: [N]i32): [k2p2m1][N]f32 = +let mkX_no_trend [N] (k2p2m1: i64) (f: f64) (mappingindices: [N]i64): [k2p2m1][N]f64 = map (\ i -> map (\ind -> - if i == 0 then 1f32 + if i == 0 then 1f64 else let i = i + 1 - let (i', j') = (r32 (i / 2), r32 ind) - let angle = 2f32 * f32.pi * i' * j' / f + let (i', j') = (f64.i64 (i / 2), f64.i64 ind) + let angle = 2f64 * f64.pi * i' * j' / f in - if i % 2 == 0 then f32.sin angle - else f32.cos angle + if i % 2 == 0 then f64.sin angle + else f64.cos angle ) mappingindices - ) (iota32 k2p2m1) + ) (iota k2p2m1) --------------------------------------------------- -- Adapted matrix inversion so that it goes well -- -- with intra-block parallelism -- --------------------------------------------------- -let gauss_jordan [nm] (n:i32) (m:i32) (A: *[nm]f32): [nm]f32 = +let gauss_jordan [nm] (n:i64) (m:i64) (A: *[nm]f64): [nm]f64 = loop A for i < n do - let v1 = A[i64.i32 i] + let v1 = A[i] let A' = map (\ind -> let (k, j) = (ind / m, ind % m) - in if v1 == 0.0 then A[i64.i32 (k * m + j)] else - let x = A[i64.i32 j] / v1 in + in if v1 == 0.0 then A[(k * m + j)] else + let x = A[j] / v1 in if k < n - 1 -- Ap case - then A[i64.i32 ((k + 1) * m + j)] - A[i64.i32((k + 1) * m + i)] * x + then A[((k + 1) * m + j)] - A[((k + 1) * m + i)] * x else x -- irow case - ) (map i32.i64 (iota nm)) + ) (iota nm) in scatter A (iota nm) A' -let mat_inv [n0] (A: [n0][n0]f32): [n0][n0]f32 = - let n = i32.i64 n0 +let mat_inv [n0] (A: [n0][n0]f64): [n0][n0]f64 = + let n = n0 let m = 2 * n let nm = 2 * n0 * n0 -- Pad the matrix with the identity matrix. @@ -76,26 +70,26 @@ let mat_inv [n0] (A: [n0][n0]f32): [n0][n0]f32 = else if j == n + i then 1 else 0 - ) (iota32 nm) + ) (iota nm) let Ap' = gauss_jordan n m Ap - let Ap'' = unflatten n0 (i64.i32 m) Ap' + let Ap'' = unflatten n0 (m) Ap' -- Drop the identity matrix at the front - in Ap''[0:n0, n0:(2 * n0)] :> [n0][n0]f32 + in Ap''[0:n0, n0:(2 * n0)] :> [n0][n0]f64 -------------------------------------------------- -------------------------------------------------- -let dotprod [n] (xs: [n]f32) (ys: [n]f32): f32 = +let dotprod [n] (xs: [n]f64) (ys: [n]f64): f64 = reduce (+) 0.0 <| map2 (*) xs ys -let matvecmul_row [n][m] (xss: [n][m]f32) (ys: [m]f32) = +let matvecmul_row [n][m] (xss: [n][m]f64) (ys: [m]f64) = map (dotprod ys) xss -let dotprod_filt [n] (vct: [n]f32) (xs: [n]f32) (ys: [n]f32) : f32 = - f32.sum (map3 (\v x y -> x * y * if (f32.isnan v) then 0.0 else 1.0) vct xs ys) +let dotprod_filt [n] (vct: [n]f64) (xs: [n]f64) (ys: [n]f64) : f64 = + f64.sum (map3 (\v x y -> x * y * if (f64.isnan v) then 0.0 else 1.0) vct xs ys) -let matvecmul_row_filt [n][m] (xss: [n][m]f32) (ys: [m]f32) = - map (\xs -> map2 (\x y -> if (f32.isnan y) then 0 else x*y) xs ys |> f32.sum) xss +let matvecmul_row_filt [n][m] (xss: [n][m]f64) (ys: [m]f64) = + map (\xs -> map2 (\x y -> if (f64.isnan y) then 0 else x*y) xs ys |> f64.sum) xss -let matmul_filt [n][p][m] (xss: [n][p]f32) (yss: [p][m]f32) (vct: [p]f32) : [n][m]f32 = +let matmul_filt [n][p][m] (xss: [n][p]f64) (yss: [p][m]f64) (vct: [p]f64) : [n][m]f64 = map (\xs -> map (dotprod_filt vct xs) (transpose yss)) xss diff --git a/bfast/monitor/opencl/futhark/mroc.fut b/bfast/monitor/opencl/futhark/mroc.fut new file mode 100644 index 0000000..7b6b3cc --- /dev/null +++ b/bfast/monitor/opencl/futhark/mroc.fut @@ -0,0 +1,142 @@ +import "lib/github.com/diku-dk/statistics/statistics" +import "recresid" + +module stats = mk_statistics f64 + +-- Compute the [sample standard +-- deviation](https://en.wikipedia.org/wiki/Standard_deviation), +-- `s = \sqrt{\frac{1}{N-1} \sum_{i=1}^N (x_i - \bar{x})^2}`, +-- but ignoring nans. +let sample_sd_nan [m] (xs: [m]f64) (num_non_nan: i64) = + let nf64 = f64.i64 num_non_nan + -- add_nan is associative. + -- TODO prove commutativity and use `reduce_comm` (probably no speedup) + let add_nan a b = if f64.isnan a + then b + else if f64.isnan b + then a + else a + b + let x_mean = (reduce add_nan 0 xs)/nf64 + let diffs = map (\x -> if f64.isnan x then 0 else (x - x_mean)**2) xs + in (f64.sum diffs)/(nf64 - 1) |> f64.sqrt + +-- Empircal fluctuation process containing recursive residuals. +-- Outputs recursive CUSUM and number of non nan values _excluding_ +-- the prepended zero for each `y` in `ys`. +let rcusum [m][N][k] (X: [N][k]f64) (ys: [m][N]f64) = + let (wTs, Nbar, ns) = mrecresid X ys + let ws = transpose wTs + -- Standardize and insert 0 in front. + let Nmk = Nbar-k+1 + let (process, ns) = unzip <| + map2 (\w npk -> + let n = npk - k -- num non nan recursive residuals + let s = sample_sd_nan w n + let fr = s * f64.sqrt(f64.i64 n) + let sdized = map (\j -> if j == 0 then 0 else w[j-1]/fr) (iota Nmk) + in (scan(+) 0f64 sdized, n) + ) ws ns + in (process, Nbar, ns) + +let std_normal_cdf = + stats.cdf (stats.mk_normal {mu=0f64, sigma=1f64}) + +-- TODO this may be done cheaper for x < 0.3; see +-- R strucchange `pvalue.efp`. +-- Also not saving intermediate results. +let pval_brownian_motion_max (x: f64): f64 = + -- Q is complementary CDF of N(0,1) + let Q = \y -> 1 - std_normal_cdf y + let exp = \y -> f64.e**y + in 2 * (Q(3*x) + exp(-4*x**2) - exp(-4*x**2) * (Q x)) + +-- Structural change test for Brownian motion. +-- `num_non_nan` is without the initial zero in process. +let sctest [n] (process: [n]f64) (num_non_nan: i64) : f64 = + let nf64 = f64.i64 num_non_nan + let xs = process[1:] + -- x = max(abs(xs * 1/(1 + 2*j))) where j = 1/n, 2/n, ..., n. + let div i = 1 + (f64.i64 (2*i+2)) / nf64 + let x = map2 (\x i -> f64.abs (x/(div i))) xs (indices xs) |> f64.maximum + in pval_brownian_motion_max x + +-- `N` is padded length. +-- `nm1` is number of non-nan values excluding inital zero. +let boundary confidence N nm1: [N]f64 = + let n = nm1 + 1 + -- conf*(1 + 2*t) with t in [0,1]. + let div = f64.i64 n - 1 + in map (\i -> if i < n + then confidence + (2*confidence*(f64.i64 i))/div + else f64.nan + ) (iota N) + +-- Map distributed stable history computation. +entry mhistory_roc [m][N][k] level confidence + (X: [N][k]f64) (ys: [m][N]f64) = + let (rocs, Nbar, nns) = rcusum (reverse X) (map reverse ys) + let pvals = map2 sctest rocs nns + let n = Nbar - k + 1 + let bounds = map (boundary confidence n) nns + -- index of first time roc crosses the boundary + let inds = + map2 (\roc bound -> + let nm1 = n - 1 + let roc = roc[1:] :> [nm1]f64 + let bound = bound[1:] :> [nm1]f64 + in map3 (\i r b -> + if f64.abs r > b + then i + else i64.highest + ) (iota nm1) roc bound + |> reduce_comm i64.min i64.highest + ) rocs bounds + in map3 (\ind nn pval -> + let chk = !(f64.isnan pval) && pval < level && ind != i64.highest + let y_start = if chk then nn - ind else 0 + in y_start + ) inds nns pvals + +-- Not faster, even though I reduce memory access. +-- Compiler seems to be better at fusing kernels than me. +entry mhistory_roc_inline [m][N][k] level confidence + (X: [N][k]f64) (ys: [m][N]f64) = + -- RCUSUM + -- Empircal fluctuation process containing recursive residuals. + -- Outputs recursive CUSUM and number of non nan values _excluding_ + -- the prepended zero for each `y` in `ys`. + let (wTs, Nbar, ns) = mrecresid (reverse X) (map reverse ys) + let ws = transpose wTs + let Nmkp1 = Nbar-k+1 + in map2 (\w npk -> -- INNER SIZE Nmkp1 (can be split into Nmkp1 and Nmkp1-1) + -- RCUSUM CONT. + -- Standardize and insert 0 in front. + let n = npk - k -- num non nan recursive residuals + let s = sample_sd_nan w n + let fr = s * f64.sqrt(f64.i64 n) + -- TODO never use prepended 0, but changing this results in + -- significantly poorer performance. Some weird bug I think. + let sdized = map (\j -> if j == 0 then 0 else w[j-1]/fr) (iota Nmkp1) + let roc = scan (+) 0f64 sdized + -- SCTEST + -- Structural change test for Brownian motion. + -- `num_non_nan` is without the initial zero in process. + let nf64 = f64.i64 n + let xs = roc[1:] + let div i = 1 + (f64.i64 (2*i+2)) / nf64 + let x = f64.maximum <| map2 (\i x -> f64.abs (x/(div i))) (indices xs) xs + let pval = pval_brownian_motion_max x + -- BOUNDARY + -- conf*(1 + 2*t) with t in [0,1]. + -- INDEX OF CROSSING + let ind = map2 (\i r -> + if f64.abs r > (confidence + (2*confidence*(f64.i64 i+1))/nf64) + then i + else i64.highest + ) (indices xs) xs + |> reduce_comm i64.min i64.highest + -- INDEX OF HISTORY START + let chk = !(f64.isnan pval) && pval < level && ind != i64.highest + let y_start = if chk then n - ind else 0 + in y_start + ) ws ns diff --git a/bfast/monitor/opencl/futhark/recresid.fut b/bfast/monitor/opencl/futhark/recresid.fut new file mode 100644 index 0000000..fc007c7 --- /dev/null +++ b/bfast/monitor/opencl/futhark/recresid.fut @@ -0,0 +1,100 @@ +import "lib/github.com/nhey/lm/lm" +import "helpers" + +module lm = lm_f64 + +let mean_abs [n] (xs: [n]f64) = + (reduce (+) 0 (map f64.abs xs)) / (f64.i64 n) + +-- BFAST strucchange-cpp armadillo version of approx. equal, which is +-- an absolute difference |x - y| <= tol. +let approx_equal x y tol = + (mean_abs (map2 (-) x y)) <= tol + +let filter_nan_pad = filterPadWithKeys ((!) <-< f64.isnan) f64.nan + +-- Map-distributed `recresid`. +entry mrecresid_nn [m][N][k] (Xs_nn: [m][N][k]f64) (ys_nn: [m][N]f64) = + let tol = f64.sqrt(f64.epsilon) / (f64.i64 k) + + -- Initialise recursion by fitting on first `k` observations. + let (X1s, betas, ranks) = + map2 (\X_nn y_nn -> + let model = lm.fit (transpose X_nn[:k, :]) y_nn[:k] + in (model.cov_params, model.params, model.rank) + ) Xs_nn ys_nn |> unzip3 + + let num_recresids_padded = N - k + let rets = replicate (m*num_recresids_padded) 0 + |> unflatten num_recresids_padded m + + let loop_body (r: i64) (X1: [k][k]f64) (beta: [k]f64) + (X_nn: [N][k]f64) (y_nn: [N]f64) = + -- Compute recursive residual + let x = X_nn[r, :] + let d = matvecmul_row X1 x + let fr = 1 + (dotprod x d) + let resid = y_nn[r] - dotprod x beta + let recresid_r = resid / f64.sqrt(fr) + -- Update formulas + -- X1 = X1 - ddT/fr + -- beta = beta + X1 x * resid + let X1 = map2 (\d1 -> map2 (\d2 x -> x - (d1*d2)/fr) d) d X1 + let beta = map2 (+) beta (map (dotprod x >-> (*resid)) X1) + in (X1, beta, recresid_r) + + -- Map is interchanged so that it is inside the sequential loop. + let (_, r', X1s, betas, _, retsT) = + loop (check, r, X1s, betas, ranks, rets_r) = (true,k,X1s,betas,ranks,rets) + while check && r < N - 1 do + let (checks, X1s, betas, ranks, recresids_r) = unzip5 <| + map5 (\X1 beta X_nn y_nn rank -> + let (_, beta, recresid_r) = loop_body r X1 beta X_nn y_nn + -- Check numerical stability (rectify if unstable) + let (check, X1, beta, rank) = + -- We check update formula value against full OLS fit + let rp1 = r+1 + -- NOTE We only need the transposed versions for the + -- first few iterations; I think it is more efficient + -- to transpose locally here because the matrix will + -- most definitely fit entirely in scratchpad memory. + -- Also we get to read from the array 1-strided. + let model = lm.fit (transpose X_nn[:rp1, :]) y_nn[:rp1] + -- Check that this and previous fit is full rank. + -- R checks nans in fitted parameters to same effect. + -- Also, yes it really is necessary to check all this. + let nona = !(f64.isnan recresid_r) && rank == k + && model.rank == k + let check = !(nona && approx_equal model.params beta tol) + -- Stop checking on all-nan ("empty") pixels. + let check = check && !(all f64.isnan y_nn) + in (check, model.cov_params, model.params, model.rank) + in (check, X1, beta, rank, recresid_r) + ) X1s betas Xs_nn ys_nn ranks + let rets_r[r-k, :] = recresids_r + in (reduce_comm (||) false checks, r+1, X1s, betas, ranks, rets_r) + + let (_, _, retsT) = + loop (X1s, betas, rets_r) = (X1s, betas, retsT) for r in (r'.. + map (\i -> if i >= 0 then X[i, :] else replicate k f64.nan) indss_nn[j,:Nbar] + ) (iota m) + -- Subset ys + let ys_nn = ys_nn[:,:Nbar] + + in (mrecresid_nn Xs_nn ys_nn, Nbar, ns) diff --git a/bfast/monitor/python/base.py b/bfast/monitor/python/base.py index 2ac2aab..78f82e4 100644 --- a/bfast/monitor/python/base.py +++ b/bfast/monitor/python/base.py @@ -13,7 +13,10 @@ from sklearn import linear_model from bfast.base import BFASTMonitorBase -from bfast.monitor.utils import compute_end_history, compute_lam, map_indices +from bfast.monitor.utils import ( + compute_end_history, compute_lam, map_indices, compute_lam_brownian +) +from .roc import history_roc class BFASTMonitorPython(BFASTMonitorBase): @@ -23,6 +26,7 @@ class BFASTMonitorPython(BFASTMonitorBase): def __init__(self, start_monitor, + history="all", freq=365, k=3, hfrac=0.25, @@ -42,6 +46,14 @@ def __init__(self, A datetime object specifying the start of the monitoring phase. + history : str, default "all" + Specification of the start of a stable history period. + Can be one of "ROC" and "all". If set to "ROC", a + reverse-ordered CUSUM test will be employed to + automatically detect the start of a stable history + period. If set to "all", the start of the stable + history period is the beginning of the time series. + freq : int, default 365 The frequency for the seasonal model. @@ -81,6 +93,7 @@ def __init__(self, """ def __init__(self, start_monitor, + history="all", freq=365, k=3, hfrac=0.25, @@ -91,6 +104,7 @@ def __init__(self, use_mp=False ): super().__init__(start_monitor, + history, freq, k=k, hfrac=hfrac, @@ -139,23 +153,26 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): # period = data.shape[0] / np.float(self.n) self.lam = compute_lam(data.shape[0], self.hfrac, self.level, self.period) + self.conf_ROC = compute_lam_brownian(self.level) if self.use_mp: print("Python backend is running in parallel using {} threads".format(mp.cpu_count())) y = np.transpose(data, (1, 2, 0)).reshape(data.shape[1] * data.shape[2], data.shape[0]) pool = mp.Pool(mp.cpu_count()) p_map = pool.map(self.fit_single, y) - rval = np.array(p_map, dtype=object).reshape(data.shape[1], data.shape[2], 4) + rval = np.array(p_map, dtype=object).reshape(data.shape[1], data.shape[2], 5) self.breaks = rval[:,:,0].astype(np.int32) self.means = rval[:,:,1].astype(np.float32) self.magnitudes = rval[:,:,2].astype(np.float32) self.valids = rval[:,:,3].astype(np.int32) + self.history_starts = rval[:,:,4].astype(np.int32) else: means_global = np.zeros((data.shape[1], data.shape[2]), dtype=np.float32) magnitudes_global = np.zeros((data.shape[1], data.shape[2]), dtype=np.float32) breaks_global = np.zeros((data.shape[1], data.shape[2]), dtype=np.int32) valids_global = np.zeros((data.shape[1], data.shape[2]), dtype=np.int32) + hists_global = np.zeros((data.shape[1], data.shape[2]), dtype=np.int32) for i in range(data.shape[1]): if self.verbose > 0: @@ -166,16 +183,20 @@ def fit(self, data, dates, n_chunks=None, nan_value=0): (pix_break, pix_mean, pix_magnitude, - pix_num_valid) = self.fit_single(y) + pix_num_valid, + pix_hist_start) = self.fit_single(y) breaks_global[i,j] = pix_break means_global[i,j] = pix_mean magnitudes_global[i,j] = pix_magnitude valids_global[i,j] = pix_num_valid + hists_global[i,j] = pix_hist_start self.breaks = breaks_global self.means = means_global self.magnitudes = magnitudes_global self.valids = valids_global + self.history_starts = hists_global + return self @@ -194,6 +215,21 @@ def fit_single(self, y): """ N = y.shape[0] + # stable history period + hist = 0 + if self.history == "ROC": + Xh = self.X[:, :self.n] + yh = y[:self.n] + nans = np.isnan(yh) + Xh_nn = Xh[:,~nans] + yh_nn = yh[~nans] + hist = history_roc(Xh_nn.astype(np.float64), + yh_nn.astype(np.float64), + self.level, + self.conf_ROC) + # TODO handle ns - hist < num regressors; R sets output to nan. + y[:hist] = np.nan + # compute nan mappings nans = np.isnan(y) num_nans = np.cumsum(nans) @@ -211,7 +247,7 @@ def fit_single(self, y): magnitude = 0.0 if self.verbose > 1: print("WARNING: Not enough observations: ns={ns}, Ns={Ns}".format(ns=ns, Ns=Ns)) - return brk, mean, magnitude, Ns + return brk, mean, magnitude, Ns, hist val_inds = val_inds[ns:] val_inds -= self.n @@ -285,7 +321,7 @@ def fit_single(self, y): else: first_break = -1 - return first_break, mean, magnitude, Ns + return first_break, mean, magnitude, Ns, hist def get_timers(self): """ Returns runtime measurements for the diff --git a/bfast/monitor/python/lm.py b/bfast/monitor/python/lm.py new file mode 100644 index 0000000..53cf774 --- /dev/null +++ b/bfast/monitor/python/lm.py @@ -0,0 +1,100 @@ +import numpy as np +from scipy.linalg import cho_solve, solve_triangular + +def dqrqty(a, qraux, k, y): + n, _ = a.shape + ju = min(k, n-1) + + qty = np.zeros(n) + qty[0:n] = y[0:n] + for j in range ( 0, ju ): + if ( qraux[j] != 0.0 ): + # put qraux on diagonal + ajj = a[j,j] + a[j,j] = qraux[j] + t = - (np.sum(a[j:n,j] * qty[j:n]))/a[j,j] + qty[j:n] = qty[j:n] + t*a[j:n,j] + # revert back to original diagonal + a[j,j] = ajj + return qty + +def dqrdc2(x, ldx, n, p, tol=1e-7): + qraux = np.zeros(p) + work = np.zeros((p,2)) + jpvt = np.arange(p) + if n > 0: + for j in range(0,p): + qraux[j] = np.linalg.norm(x[:,j], 2) + work[j,0] = qraux[j] + work[j,1] = qraux[j] + if work[j,1] == 0.0: + work[j,1] = 1.0 + lup = min(n,p) + k = p + 1 + for l in range(0, lup): + while l+1 < k and qraux[l] < work[l,1]*tol: + lp1 = l+1 + for i in range(n): + t = x[i,l] + for j in range(lp1, p): + x[i, j-1] = x[i,j] + x[i,p-1] = t + i = jpvt[l] + t = qraux[l] + tt = work[l,0] + ttt = work[l,1] + for j in range(lp1, p): + jpvt[j-1] = jpvt[j] + qraux[j-1] = qraux[j] + work[j-1,:] = work[j,:] + jpvt[p-1] = i + qraux[p-1] = t + work[p-1,0] = tt + work[p-1,1] = ttt + k = k -1 + if l+1 != n: + nrmxl = np.linalg.norm(x[l:, l], 2) + if nrmxl != 0.0: + if x[l,l] != 0.0: nrmxl = np.sign(x[l,l]) * abs(nrmxl) + x[l:, l] = x[l:, l] / nrmxl + x[l,l] = 1.0 + x[l,l] + for j in range(l+1, p): + t = -1*np.dot(x[l:,l], x[l:,j])/x[l,l] + x[l:,j] = x[l:,j] + t*x[l:,l] + if qraux[j] != 0.0: + tt = 1.0 - (abs(x[l,j])/qraux[j])**2 + tt = max(tt, 0.0) + t = tt + if abs(t) >= 1e-6: + qraux[j] = qraux[j]*np.sqrt(t) + else: + qraux[j] = np.linalg.norm(x[l+1:,j], 2) + work[j,0] = qraux[j] + qraux[l] = x[l,l] + x[l,l] = -1*nrmxl + + k = min(k-1, n) + return x, k, qraux, jpvt + +def lm(X, y): + n,p = X.shape + y = y.reshape(n) + A, rank, qraux, jpvt = dqrdc2(X.copy(), n, n, p) + r = np.triu(A)[:p, :p] + # Inverting r with cholesky gives (X.T X)^{-1} + cov_params = cho_solve((r[:rank, :rank], False), np.identity(rank)) + # compute parameters + qty = dqrqty(A, qraux, rank, y.copy()) + beta = solve_triangular(r[:rank, :rank], qty[:rank]) + # Pivot fitted parameters to match original order of Xs columns + b = np.zeros(p) + b[:rank] = beta + b[jpvt] = b[range(p)] + scratch = np.zeros((p,p)) + scratch[:rank, :rank] = cov_params + # swap rows + scratch[jpvt, :] = scratch[range(p), :] + # swap columns + scratch[:, jpvt] = scratch[:, range(p)] + cov_params = scratch + return b, cov_params, rank, r, qraux diff --git a/bfast/monitor/python/roc.py b/bfast/monitor/python/roc.py new file mode 100644 index 0000000..2f1959a --- /dev/null +++ b/bfast/monitor/python/roc.py @@ -0,0 +1,98 @@ +import numpy as np +from scipy import stats, optimize +from bfast.monitor.utils import pval_brownian_motion_max +from .lm import lm + +# BFAST strucchange-cpp armadillo version of approx. equal, which is +# an absolute difference |x - y| <= tol. +def approx_equal(x, y, tol): + return np.mean(np.abs(x - y)) <= tol + +def recresid(X, y, tol=None): + n, k = X.shape + assert(n == y.shape[0]) + if n == 0: + return np.array([]) + + if tol is None: + tol = np.sqrt(np.finfo(np.float64).eps) / k + + y = y.reshape(n, 1) + ret = np.zeros(n - k) + + # initialize recursion + yh = y[:k] # k x 1 + Xh = X[:k] # k x k + b, cov_params, rank, _, _ = lm(Xh, yh) + + X1 = cov_params # (X'X)^(-1), k x k + inds = np.isnan(X1) + X1[inds] = 0.0 + bhat = np.nan_to_num(b, 0.0) # k x 1 + + check = True + for r in range(k, n): + prev_rank = rank + # Compute recursive residual + x = X[r] + d = X1 @ x + fr = 1 + np.dot(x, d) + resid = y[r] - np.nansum(x * bhat) # dotprod ignoring nans + ret[r-k] = resid / np.sqrt(fr) + + # Update formulas + X1 = X1 - (np.outer(d, d))/fr + bhat += X1 @ x * resid + + # Check numerical stability (rectify if unstable). + if check: + # We check update formula value against full OLS fit + b, cov_params, rank, _, _ = lm(X[:r+1], y[:r+1]) + # R checks nans in fitted parameters; same as rank. + # Also check on latest recresidual, because fr may + # be nan. + nona = (rank == k and prev_rank == k + and not np.isnan(ret[r-k])) + check = not (nona and approx_equal(b, bhat, tol)) + X1 = cov_params + bhat = np.nan_to_num(b, 0.0) + + return ret + +# Recursive CUSUM process +def efp(X, y): + k, n = X.shape + w = recresid(X.T, y) + sigma = np.std(w, ddof=1) + process = np.cumsum(np.append([0],w))/(sigma*np.sqrt(n-k)) + return process + +# Linear boundary for Brownian motion (limiting process of rec.resid. CUSUM). +def boundary(process, confidence): + n = process.shape[0] + t = np.linspace(0, 1, num=n) + bounds = confidence + (2*confidence*t) # from Zeileis 2002 strucchange paper. + return bounds + +# Structural change test for Brownian motion. +def sctest(process): + x = process[1:] + n = x.shape[0] + j = np.linspace(1/n, 1, num=n) + x = x * 1/(1 + 2*j) + stat = np.max(np.abs(x)) + return pval_brownian_motion_max(stat) + +def history_roc(X, y, alpha, confidence): + if y.shape[0] == 0: return 0 + X_rev = np.flip(X, axis=1) + y_rev = y[::-1] + rcus = efp(X_rev, y_rev) + + pval = sctest(rcus) + y_start = 0 + if not np.isnan(pval) and pval < alpha: + bounds = boundary(rcus, confidence) + inds = (np.abs(rcus[1:]) > bounds[1:]).nonzero()[0] + y_start = rcus.size - np.min(inds) - 1 if inds.size > 0 else 0 + return y_start diff --git a/bfast/monitor/utils.py b/bfast/monitor/utils.py index f69728c..82ba23d 100644 --- a/bfast/monitor/utils.py +++ b/bfast/monitor/utils.py @@ -1,5 +1,6 @@ from datetime import datetime +from scipy import stats, optimize import numpy as np import pandas @@ -486,3 +487,12 @@ def map_indices(dates): indices = np.argwhere(~np.isnan(ts).to_numpy()).T[0] return indices + +# From Brown, Durbin, Evans, 1975. +def pval_brownian_motion_max(x): + Q = lambda x: 1 - stats.norm.cdf(x, loc=0, scale=1) + p = 2 * (Q(3*x) + np.exp(-4*x**2) - np.exp(-4*x**2)*Q(x)) + return p + +def compute_lam_brownian(alpha): + return optimize.brentq(lambda x: pval_brownian_motion_max(x) - alpha, 0, 20) diff --git a/examples/validator.py b/examples/validator.py index 78434df..a9aa5a8 100644 --- a/examples/validator.py +++ b/examples/validator.py @@ -65,6 +65,7 @@ def run_bfast_(backend, # fit BFASTMontiro model model = BFASTMonitor( start_monitor, + history="ROC", freq=freq, k=k, hfrac=hfrac, @@ -73,12 +74,18 @@ def run_bfast_(backend, backend=backend, verbose=0, device_id=0, + detailed_results=True, ) - #data = data[:,:50,:50] + # data = data[:,15:17,12:14] + # for i in range(20): + # for j in range(20): + # if data[-2,i,j] == 3874: # 2769, 3222, [3874, nan] + # print(i,j) + # exit(0) start_time = time.time() if backend == "opencl": - model.fit(data, dates, n_chunks=5, nan_value=-32768) + model.fit(data, dates, n_chunks=25, nan_value=-32768) else: model.fit(data, dates, nan_value=-32768) end_time = time.time() @@ -89,7 +96,8 @@ def run_bfast_(backend, means = model.means magnitudes = model.magnitudes valids = model.valids - return breaks, means, magnitudes, valids + history_starts = model.history_starts + return breaks, means, magnitudes, valids, history_starts def compare(name, arr_p, arr_o): @@ -113,12 +121,41 @@ def compare(name, arr_p, arr_o): def run_bfast(backend): return run_bfast_(backend) +def run_bfast_cached(backend): + @cached(backend) + def fun(backend): + return run_bfast_(backend) + return fun(backend) + + +def test(quantifier, pred, actual, expect, rel_err=False): + stmt = pred(actual, expect) + check = quantifier(stmt) + if check: + print("\033[92m PASSED \033[0m") + else: + print("\033[91m FAILED \033[0m") + inds = np.where(~stmt) + print("| Num. differences", np.sum(~stmt)) + print("| Expected", expect[inds]) + print("| Actual ", actual[inds]) + if rel_err: + print("| Relative absolute error") + rel_err = np.abs((expect - actual)/expect) + rel_err = rel_err[~np.isnan(rel_err)] + per_err = rel_err * 100 + print("| Max error {:10.5e} ({:.4f}%)".format(np.max(rel_err), + np.max(per_err))) + print("| Min error {:10.5e} ({:.4f}%)".format(np.min(rel_err), + np.min(per_err))) + print("| Mean error {:10.5e} ({:.4f}%)".format(np.mean(rel_err), + np.mean(per_err))) if __name__ == "__main__": # breaks_p, means_p, magnitudes_p, valids_p = run_bfast("python") - breaks_p, means_p, magnitudes_p, valids_p = run_bfast("python-mp") - breaks_o, means_o, magnitudes_o, valids_o = run_bfast("opencl") + breaks_p, means_p, magnitudes_p, valids_p, hist_p = run_bfast_cached("python-mp") + breaks_o, means_o, magnitudes_o, valids_o, hist_o = run_bfast("opencl") # compare("breaks", breaks_p, breaks_o) # compare("means", means_p, means_o) @@ -178,3 +215,14 @@ def run_bfast(backend): plt.clf() plt.imshow(valids_diff, cmap="Greys") plt.savefig("valids_diff.png") + + print("np.all(breaks_o == breaks_p):", end="") + test(np.all, np.equal, breaks_o, breaks_p) + print("np.all(hist_o == hist_p):", end="") + test(np.all, np.equal, hist_o, hist_p) + print("np.all(np.isclose(means_o, means_p)):", end="") + test(np.all, np.isclose, means_o, means_p, rel_err=True) + print("np.all(np.isclose(magnitudes_o, magnitudes_p)):", end="") + test(np.all, np.isclose, magnitudes_o, magnitudes_p, rel_err=True) + print("np.all(valids_o == valids_p):", end="") + test(np.all, np.equal, valids_o, valids_p)