diff --git a/README.md b/README.md index 4089d6f..a7b3460 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,14 @@ Tdigest OCaml implementation of the T-Digest algorithm. +```ocaml +let td = + Tdigest.create () + |> Tdigest.add_list [ 10.0; 11.0; 12.0; 13.0 ] +in +Tdigest.p_ranks td [ 9.; 10.; 11.; 12.; 13.; 14. ] (* [ Some 0; Some 0.125; Some 0.375; Some 0.625; Some 0.875; Some 1 ] *) +``` + The T-Digest is a data structure and algorithm for constructing an approximate distribution for a collection of real numbers presented as a stream. The median of a list of medians is not necessarily equal to the median of the whole dataset. The median (p50), p95, and p99 are critical measures that are expensive to compute due to their requirement of having the entire **sorted** dataset present in one place. @@ -11,12 +19,26 @@ The T-Digest can estimate percentiles or quantiles extremely accurately even at Additionally, the T-Digest is concatenable, making it a good fit for distributed systems. The internal state of a T-Digest can be exported as a binary string, and the concatenation of any number of those strings can then be imported to form a new T-Digest. +```ocaml +let combined = Tdigest.merge [ td1; td2; td3 ] in +``` + +A T-Digest's state can be stored in a database `VARCHAR`/`TEXT` column and multiple such states can be merged by concatenating strings: +```sql +SELECT + STRING_AGG(M.tdigest_state) AS concat_state +FROM my_table AS M +``` +```ocaml +let combined = Tdigest.of_string concat_state in +``` + Links: - [A simple overview of the T-Digest](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest) - [A walkthrough of the algorithm by its creator](https://mapr.com/blog/better-anomaly-detection-t-digest-whiteboard-walkthrough/) - [The white paper](https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf) -This library started off as a port of [Will Welch's JavaScript implementation](https://github.com/welch/tdigest), down to the unit tests. However some modifications have been made to adapt it to OCaml, the most important one being immutability. As such, almost every function in the `Tdigest` module return a new `Tdigest.t`, including "reading" ones since they may trigger expensive intermediate computations worth caching. +This library started off as a port of [Will Welch's JavaScript implementation](https://github.com/welch/tdigest), down to the unit tests. However some modifications have been made to adapt it to OCaml, the most important one being immutability. As such, almost every function in the `Tdigest` module return a new `Tdigest.t`, including "reading" ones since they may trigger intermediate computations worth caching. ## Usage diff --git a/src/tdigest.ml b/src/tdigest.ml index a68fc50..2ebb1ba 100644 --- a/src/tdigest.ml +++ b/src/tdigest.ml @@ -29,7 +29,6 @@ type settings = { cx: cx; k_delta: float option; } -[@@deriving sexp] type centroid = { mean: float; @@ -37,7 +36,6 @@ type centroid = { mean_cumn: float; n: float; } -[@@deriving sexp] let empty_centroid = { mean = 0.0; n = 0.0; cumn = 0.0; mean_cumn = 0.0 } @@ -59,7 +57,6 @@ type t = { last_cumulate: float; stats: stats; } -[@@deriving sexp] type info = { count: int; @@ -97,7 +94,8 @@ let create ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) () = | Automatic x when is_positive x -> k | Automatic 0.0 -> invalid_arg - "TDigest k parameter cannot be zero, set to Tdigest.Manual to disable automatic compression." + "TDigest.create: k parameter cannot be zero, set to Tdigest.Manual to disable automatic \ + compression." | Automatic x -> invalid_argf "TDigest k parameter must be positive, but was %f" x () in let cx = @@ -106,9 +104,9 @@ let create ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) () = | Growth x when is_positive x -> cx | Growth 0.0 -> invalid_arg - "TDigest cx parameter cannot be zero, set to Tdigest.Always to disable caching of cumulative \ - totals." - | Growth x -> invalid_argf "TDigest cx parameter must be positive, but was %f" x () + "TDigest.create: cx parameter cannot be zero, set to Tdigest.Always to disable caching of \ + cumulative totals." + | Growth x -> invalid_argf "TDigest.create: cx parameter must be positive, but was %f" x () in { settings = { delta; k; cx; k_delta = get_k_delta (k, delta) }; @@ -269,9 +267,13 @@ let digest ?(n = 1) td ~mean = rebuild ~auto:true td.settings td.stats (weights_of_td td) | _ -> td -let add ?(n = 1) ~data td = digest td ~n ~mean:data +let add ?(n = 1) ~data td = + if Int.(n <= 0) then invalid_arg "Tdigest.add: n <= 0"; + digest td ~n ~mean:data -let add_list ?(n = 1) xs td = List.fold xs ~init:td ~f:(fun acc mean -> digest acc ~n ~mean) +let add_list ?(n = 1) xs td = + if Int.(n <= 0) then invalid_arg "Tdigest.add_list: n <= 0"; + List.fold xs ~init:td ~f:(fun acc mean -> digest acc ~n ~mean) let compress ?delta td = match delta with @@ -313,7 +315,7 @@ let parse_float str pos = |> float_of_bits let of_string ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) str = - if Int.(String.length str % 16 <> 0) then invalid_arg "Invalid string length for Tdigest.of_string"; + if Int.(String.length str % 16 <> 0) then invalid_arg "Tdigest.of_string: invalid string length"; let settings = { delta; k; cx; k_delta = get_k_delta (k, delta) } in let table = Table.create () in let rec loop = function @@ -327,6 +329,33 @@ let of_string ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) str = loop 0; weights_of_table table |> rebuild ~auto:true settings empty_stats +module Export = struct + type td_t = t + + type settings = { + delta: delta; + k: k; + cx: cx; + } + [@@deriving sexp] + + type t = { + settings: settings; + state: string; + stats: stats; + } + [@@deriving sexp] + + let create ({ settings = { delta; k; cx; _ }; stats; _ } as td : td_t) = + { settings = { delta; k; cx }; state = to_string td |> snd; stats } +end + +let sexp_of_t td = Export.create td |> [%sexp_of: Export.t] + +let t_of_sexp sexp = + let Export.{ settings = { delta; k; cx }; state; stats } = [%of_sexp: Export.t] sexp in + { (of_string ~delta ~k ~cx state) with stats } + let merge ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) tds = let settings = { delta; k; cx; k_delta = get_k_delta (k, delta) } in let table = Table.create () in diff --git a/src/tdigest.mli b/src/tdigest.mli index 5d193cb..55db71e 100644 --- a/src/tdigest.mli +++ b/src/tdigest.mli @@ -48,6 +48,7 @@ type info = { compress_count: int; auto_compress_count: int; } +[@@deriving sexp] (** [Tdigest.create ?delta ?k ?cx ()] diff --git a/test/test_tdigest.ml b/test/test_tdigest.ml index 3a5c9bd..528d06e 100644 --- a/test/test_tdigest.ml +++ b/test/test_tdigest.ml @@ -212,16 +212,17 @@ let%expect_test "percentiles" = () let%expect_test "serialization" = - (* identical after recreating *) - let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in - let td = Tdigest.create () |> Tdigest.add_list xs in - let td1, export = Tdigest.to_string td in - if String.length export <> 160 then failwith "export length <> 160"; - let td2 = Tdigest.of_string export in - check td1; - check td2; - [%expect - {| + (* identical after recreating (to_string/of_string) *) + let () = + let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in + let td = Tdigest.create () |> Tdigest.add_list xs in + let td1, export = Tdigest.to_string td in + if String.length export <> 160 then failwith "export length <> 160"; + let td2 = Tdigest.of_string export in + check td1; + check td2; + [%expect + {| (((mean 0.11359872617230203) (n 1)) ((mean 0.14207889800896825) (n 1)) ((mean 0.26565242651667725) (n 1)) ((mean 0.32088115017788221) (n 1)) ((mean 0.41519876713081461) (n 1)) ((mean 0.43630556398799153) (n 1)) @@ -232,6 +233,39 @@ let%expect_test "serialization" = ((mean 0.41519876713081461) (n 1)) ((mean 0.43630556398799153) (n 1)) ((mean 0.45062527388924589) (n 1)) ((mean 0.56883568253605243) (n 1)) ((mean 0.64030838064885942) (n 1)) ((mean 0.96035328719918678) (n 1))) |}] + in + + (* identical after recreating (sexp) *) + let () = + let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in + let td1 = Tdigest.create () |> Tdigest.add_list xs in + let td2 = [%sexp_of: Tdigest.t] td1 |> [%of_sexp: Tdigest.t] in + print_endline (sprintf !"%{sexp#hum: Tdigest.t}" td1); + print_endline (sprintf !"%{sexp#hum: Tdigest.t}" td2); + check td1; + check td2; + [%expect + {| + ((settings ((delta (Merging 0.01)) (k (Automatic 25)) (cx (Growth 1.1)))) + (state + "y\142\169\223\026\255\208?\000\000\000\000\000\000\240?\181\251T\028\003\017\220?\000\000\000\000\000\000\240?L\193\172/_\213\220?\000\000\000\000\000\000\240?>\t[\140\144\246\220?\000\000\000\000\000\000\240?\016<\007\197.-\224?\000\000\000\000\000\000\240?\160T2\154\179\029\226?\000\000\000\000\000\000\240?\152\193,U\239m\226?\000\000\000\000\000\000\240?E\138\192\255\\)\236?\000\000\000\000\000\000\240?\243\210YPe\217\236?\000\000\000\000\000\000\240?\"\025\132\145\202n\238?\000\000\000\000\000\000\240?") + (stats ((cumulates_count 10) (compress_count 0) (auto_compress_count 0)))) + ((settings ((delta (Merging 0.01)) (k (Automatic 25)) (cx (Growth 1.1)))) + (state + "y\142\169\223\026\255\208?\000\000\000\000\000\000\240?\181\251T\028\003\017\220?\000\000\000\000\000\000\240?L\193\172/_\213\220?\000\000\000\000\000\000\240?>\t[\140\144\246\220?\000\000\000\000\000\000\240?\016<\007\197.-\224?\000\000\000\000\000\000\240?\160T2\154\179\029\226?\000\000\000\000\000\000\240?\152\193,U\239m\226?\000\000\000\000\000\000\240?E\138\192\255\\)\236?\000\000\000\000\000\000\240?\243\210YPe\217\236?\000\000\000\000\000\000\240?\"\025\132\145\202n\238?\000\000\000\000\000\000\240?") + (stats ((cumulates_count 10) (compress_count 0) (auto_compress_count 0)))) + (((mean 0.26557037202858386) (n 1)) ((mean 0.43853833929818659) (n 1)) + ((mean 0.45052318244690492) (n 1)) ((mean 0.45254911142923848) (n 1)) + ((mean 0.50551546556551052) (n 1)) ((mean 0.56612568012737441) (n 1)) + ((mean 0.57591978679379263) (n 1)) ((mean 0.88004922820648146) (n 1)) + ((mean 0.90153756803064622) (n 1)) ((mean 0.95102432652564439) (n 1))) + (((mean 0.26557037202858386) (n 1)) ((mean 0.43853833929818659) (n 1)) + ((mean 0.45052318244690492) (n 1)) ((mean 0.45254911142923848) (n 1)) + ((mean 0.50551546556551052) (n 1)) ((mean 0.56612568012737441) (n 1)) + ((mean 0.57591978679379263) (n 1)) ((mean 0.88004922820648146) (n 1)) + ((mean 0.90153756803064622) (n 1)) ((mean 0.95102432652564439) (n 1))) |}] + in + () let%expect_test "merge" = (* incorporates all points *)