Skip to content

Commit

Permalink
Optimized/fixed s-exp, added n<=0 validations
Browse files Browse the repository at this point in the history
  • Loading branch information
SGrondin committed Jan 14, 2023
1 parent 4421c6d commit 479962c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 21 deletions.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ Tdigest

OCaml implementation of the T-Digest algorithm.

```ocaml
let td =
Tdigest.create ()
|> Tdigest.add_list [ 10.0; 11.0; 12.0; 13.0 ]
in
Tdigest.p_ranks td [ 9.; 10.; 11.; 12.; 13.; 14. ] (* [ Some 0; Some 0.125; Some 0.375; Some 0.625; Some 0.875; Some 1 ] *)
```

The T-Digest is a data structure and algorithm for constructing an approximate distribution for a collection of real numbers presented as a stream.

The median of a list of medians is not necessarily equal to the median of the whole dataset. The median (p50), p95, and p99 are critical measures that are expensive to compute due to their requirement of having the entire **sorted** dataset present in one place.
Expand All @@ -11,12 +19,26 @@ The T-Digest can estimate percentiles or quantiles extremely accurately even at

Additionally, the T-Digest is concatenable, making it a good fit for distributed systems. The internal state of a T-Digest can be exported as a binary string, and the concatenation of any number of those strings can then be imported to form a new T-Digest.

```ocaml
let combined = Tdigest.merge [ td1; td2; td3 ] in
```

A T-Digest's state can be stored in a database `VARCHAR`/`TEXT` column and multiple such states can be merged by concatenating strings:
```sql
SELECT
STRING_AGG(M.tdigest_state) AS concat_state
FROM my_table AS M
```
```ocaml
let combined = Tdigest.of_string concat_state in
```

Links:
- [A simple overview of the T-Digest](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest)
- [A walkthrough of the algorithm by its creator](https://mapr.com/blog/better-anomaly-detection-t-digest-whiteboard-walkthrough/)
- [The white paper](https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf)

This library started off as a port of [Will Welch's JavaScript implementation](https://github.com/welch/tdigest), down to the unit tests. However some modifications have been made to adapt it to OCaml, the most important one being immutability. As such, almost every function in the `Tdigest` module return a new `Tdigest.t`, including "reading" ones since they may trigger expensive intermediate computations worth caching.
This library started off as a port of [Will Welch's JavaScript implementation](https://github.com/welch/tdigest), down to the unit tests. However some modifications have been made to adapt it to OCaml, the most important one being immutability. As such, almost every function in the `Tdigest` module return a new `Tdigest.t`, including "reading" ones since they may trigger intermediate computations worth caching.

## Usage

Expand Down
49 changes: 39 additions & 10 deletions src/tdigest.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ type settings = {
cx: cx;
k_delta: float option;
}
[@@deriving sexp]

type centroid = {
mean: float;
cumn: float;
mean_cumn: float;
n: float;
}
[@@deriving sexp]

let empty_centroid = { mean = 0.0; n = 0.0; cumn = 0.0; mean_cumn = 0.0 }

Expand All @@ -59,7 +57,6 @@ type t = {
last_cumulate: float;
stats: stats;
}
[@@deriving sexp]

type info = {
count: int;
Expand Down Expand Up @@ -97,7 +94,8 @@ let create ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) () =
| Automatic x when is_positive x -> k
| Automatic 0.0 ->
invalid_arg
"TDigest k parameter cannot be zero, set to Tdigest.Manual to disable automatic compression."
"TDigest.create: k parameter cannot be zero, set to Tdigest.Manual to disable automatic \
compression."
| Automatic x -> invalid_argf "TDigest k parameter must be positive, but was %f" x ()
in
let cx =
Expand All @@ -106,9 +104,9 @@ let create ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) () =
| Growth x when is_positive x -> cx
| Growth 0.0 ->
invalid_arg
"TDigest cx parameter cannot be zero, set to Tdigest.Always to disable caching of cumulative \
totals."
| Growth x -> invalid_argf "TDigest cx parameter must be positive, but was %f" x ()
"TDigest.create: cx parameter cannot be zero, set to Tdigest.Always to disable caching of \
cumulative totals."
| Growth x -> invalid_argf "TDigest.create: cx parameter must be positive, but was %f" x ()
in
{
settings = { delta; k; cx; k_delta = get_k_delta (k, delta) };
Expand Down Expand Up @@ -269,9 +267,13 @@ let digest ?(n = 1) td ~mean =
rebuild ~auto:true td.settings td.stats (weights_of_td td)
| _ -> td

let add ?(n = 1) ~data td = digest td ~n ~mean:data
let add ?(n = 1) ~data td =
if Int.(n <= 0) then invalid_arg "Tdigest.add: n <= 0";
digest td ~n ~mean:data

let add_list ?(n = 1) xs td = List.fold xs ~init:td ~f:(fun acc mean -> digest acc ~n ~mean)
let add_list ?(n = 1) xs td =
if Int.(n <= 0) then invalid_arg "Tdigest.add_list: n <= 0";
List.fold xs ~init:td ~f:(fun acc mean -> digest acc ~n ~mean)

let compress ?delta td =
match delta with
Expand Down Expand Up @@ -313,7 +315,7 @@ let parse_float str pos =
|> float_of_bits

let of_string ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) str =
if Int.(String.length str % 16 <> 0) then invalid_arg "Invalid string length for Tdigest.of_string";
if Int.(String.length str % 16 <> 0) then invalid_arg "Tdigest.of_string: invalid string length";
let settings = { delta; k; cx; k_delta = get_k_delta (k, delta) } in
let table = Table.create () in
let rec loop = function
Expand All @@ -327,6 +329,33 @@ let of_string ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) str =
loop 0;
weights_of_table table |> rebuild ~auto:true settings empty_stats

module Export = struct
type td_t = t

type settings = {
delta: delta;
k: k;
cx: cx;
}
[@@deriving sexp]

type t = {
settings: settings;
state: string;
stats: stats;
}
[@@deriving sexp]

let create ({ settings = { delta; k; cx; _ }; stats; _ } as td : td_t) =
{ settings = { delta; k; cx }; state = to_string td |> snd; stats }
end

let sexp_of_t td = Export.create td |> [%sexp_of: Export.t]

let t_of_sexp sexp =
let Export.{ settings = { delta; k; cx }; state; stats } = [%of_sexp: Export.t] sexp in
{ (of_string ~delta ~k ~cx state) with stats }

let merge ?(delta = default_delta) ?(k = default_k) ?(cx = default_cx) tds =
let settings = { delta; k; cx; k_delta = get_k_delta (k, delta) } in
let table = Table.create () in
Expand Down
1 change: 1 addition & 0 deletions src/tdigest.mli
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type info = {
compress_count: int;
auto_compress_count: int;
}
[@@deriving sexp]

(**
[Tdigest.create ?delta ?k ?cx ()]
Expand Down
54 changes: 44 additions & 10 deletions test/test_tdigest.ml
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,17 @@ let%expect_test "percentiles" =
()

let%expect_test "serialization" =
(* identical after recreating *)
let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in
let td = Tdigest.create () |> Tdigest.add_list xs in
let td1, export = Tdigest.to_string td in
if String.length export <> 160 then failwith "export length <> 160";
let td2 = Tdigest.of_string export in
check td1;
check td2;
[%expect
{|
(* identical after recreating (to_string/of_string) *)
let () =
let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in
let td = Tdigest.create () |> Tdigest.add_list xs in
let td1, export = Tdigest.to_string td in
if String.length export <> 160 then failwith "export length <> 160";
let td2 = Tdigest.of_string export in
check td1;
check td2;
[%expect
{|
(((mean 0.11359872617230203) (n 1)) ((mean 0.14207889800896825) (n 1))
((mean 0.26565242651667725) (n 1)) ((mean 0.32088115017788221) (n 1))
((mean 0.41519876713081461) (n 1)) ((mean 0.43630556398799153) (n 1))
Expand All @@ -232,6 +233,39 @@ let%expect_test "serialization" =
((mean 0.41519876713081461) (n 1)) ((mean 0.43630556398799153) (n 1))
((mean 0.45062527388924589) (n 1)) ((mean 0.56883568253605243) (n 1))
((mean 0.64030838064885942) (n 1)) ((mean 0.96035328719918678) (n 1))) |}]
in

(* identical after recreating (sexp) *)
let () =
let xs = List.init 10 ~f:(fun _i -> Random.float 1.) in
let td1 = Tdigest.create () |> Tdigest.add_list xs in
let td2 = [%sexp_of: Tdigest.t] td1 |> [%of_sexp: Tdigest.t] in
print_endline (sprintf !"%{sexp#hum: Tdigest.t}" td1);
print_endline (sprintf !"%{sexp#hum: Tdigest.t}" td2);
check td1;
check td2;
[%expect
{|
((settings ((delta (Merging 0.01)) (k (Automatic 25)) (cx (Growth 1.1))))
(state
"y\142\169\223\026\255\208?\000\000\000\000\000\000\240?\181\251T\028\003\017\220?\000\000\000\000\000\000\240?L\193\172/_\213\220?\000\000\000\000\000\000\240?>\t[\140\144\246\220?\000\000\000\000\000\000\240?\016<\007\197.-\224?\000\000\000\000\000\000\240?\160T2\154\179\029\226?\000\000\000\000\000\000\240?\152\193,U\239m\226?\000\000\000\000\000\000\240?E\138\192\255\\)\236?\000\000\000\000\000\000\240?\243\210YPe\217\236?\000\000\000\000\000\000\240?\"\025\132\145\202n\238?\000\000\000\000\000\000\240?")
(stats ((cumulates_count 10) (compress_count 0) (auto_compress_count 0))))
((settings ((delta (Merging 0.01)) (k (Automatic 25)) (cx (Growth 1.1))))
(state
"y\142\169\223\026\255\208?\000\000\000\000\000\000\240?\181\251T\028\003\017\220?\000\000\000\000\000\000\240?L\193\172/_\213\220?\000\000\000\000\000\000\240?>\t[\140\144\246\220?\000\000\000\000\000\000\240?\016<\007\197.-\224?\000\000\000\000\000\000\240?\160T2\154\179\029\226?\000\000\000\000\000\000\240?\152\193,U\239m\226?\000\000\000\000\000\000\240?E\138\192\255\\)\236?\000\000\000\000\000\000\240?\243\210YPe\217\236?\000\000\000\000\000\000\240?\"\025\132\145\202n\238?\000\000\000\000\000\000\240?")
(stats ((cumulates_count 10) (compress_count 0) (auto_compress_count 0))))
(((mean 0.26557037202858386) (n 1)) ((mean 0.43853833929818659) (n 1))
((mean 0.45052318244690492) (n 1)) ((mean 0.45254911142923848) (n 1))
((mean 0.50551546556551052) (n 1)) ((mean 0.56612568012737441) (n 1))
((mean 0.57591978679379263) (n 1)) ((mean 0.88004922820648146) (n 1))
((mean 0.90153756803064622) (n 1)) ((mean 0.95102432652564439) (n 1)))
(((mean 0.26557037202858386) (n 1)) ((mean 0.43853833929818659) (n 1))
((mean 0.45052318244690492) (n 1)) ((mean 0.45254911142923848) (n 1))
((mean 0.50551546556551052) (n 1)) ((mean 0.56612568012737441) (n 1))
((mean 0.57591978679379263) (n 1)) ((mean 0.88004922820648146) (n 1))
((mean 0.90153756803064622) (n 1)) ((mean 0.95102432652564439) (n 1))) |}]
in
()

let%expect_test "merge" =
(* incorporates all points *)
Expand Down

0 comments on commit 479962c

Please sign in to comment.