tdigest_ch/lib.rs
1//! A Rust library for estimating quantiles in a stream,
2//! using [ClickHouse t-digest][ClickHouseRefTDigest] data structure.
3//!
4//! The [t-digest][Dunning19] data structure is designed around computing
5//! accurate quantile estimates from streaming data. Two t-digests can be
6//! merged, making the data structure well suited for map-reduce settings.
7//!
8//! [Repository]
9//!
10//! [ClickHouseRefTDigest]: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/reference/quantiletdigest/
11//! [Dunning19]: https://github.com/tdunning/t-digest/blob/main/docs/t-digest-paper/histo.pdf
12//! [Repository]: https://github.com/vivienm/rust-tdigest-ch
13//!
14//! # Examples
15//!
16//! ```
17//! use tdigest_ch::TDigest;
18//!
19//! let mut digest = TDigest::new();
20//!
21//! // Add some elements.
22//! digest.insert(1.0);
23//! digest.insert(2.0);
24//! digest.insert(3.0);
25//!
26//! // Get the median of the distribution.
27//! let quantile = digest.quantile(0.5);
28//! assert_eq!(quantile, 2.0);
29//! ```
30
31use std::{
32 cmp::Ordering,
33 ops::{BitOr, BitOrAssign},
34};
35
36/// Stores the weight of points around their mean value.
37#[derive(Clone, Copy, Debug, PartialEq)]
38struct Centroid {
39 mean: f32,
40 count: usize,
41}
42
43#[cfg(feature = "serde")]
44impl serde::Serialize for Centroid {
45 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
46 where
47 S: serde::Serializer,
48 {
49 (self.mean, self.count).serialize(serializer)
50 }
51}
52
53#[cfg(feature = "serde")]
54impl<'de> serde::Deserialize<'de> for Centroid {
55 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
56 where
57 D: serde::Deserializer<'de>,
58 {
59 let (mean, count) = serde::Deserialize::deserialize(deserializer)?;
60 Ok(Self { mean, count })
61 }
62}
63
64#[derive(Clone, Debug, PartialEq)]
65struct Config {
66 epsilon: f32,
67 max_centroids: usize,
68 max_unmerged: usize,
69}
70
71impl Default for Config {
72 fn default() -> Self {
73 Self {
74 epsilon: 0.01,
75 max_centroids: 2048,
76 max_unmerged: 2048,
77 }
78 }
79}
80
81#[cfg(feature = "serde")]
82impl serde::Serialize for Config {
83 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
84 where
85 S: serde::Serializer,
86 {
87 (self.epsilon, self.max_centroids, self.max_unmerged).serialize(serializer)
88 }
89}
90
91#[cfg(feature = "serde")]
92impl<'de> serde::Deserialize<'de> for Config {
93 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
94 where
95 D: serde::Deserializer<'de>,
96 {
97 let (epsilon, max_centroids, max_unmerged) = serde::Deserialize::deserialize(deserializer)?;
98 Ok(Self {
99 epsilon,
100 max_centroids,
101 max_unmerged,
102 })
103 }
104}
105
106/// A `TDigestBuilder` can be used to create a `TDigest` with custom
107/// configuration.
108///
109/// # Examples
110///
111/// ```
112/// use tdigest_ch::TDigestBuilder;
113///
114/// let mut builder = TDigestBuilder::new();
115/// builder.max_centroids(1024);
116/// builder.max_unmerged(1024);
117///
118/// let digest = builder.build();
119/// ```
120#[derive(Debug)]
121pub struct TDigestBuilder {
122 config: Config,
123}
124
125impl TDigestBuilder {
126 /// Constructs a new `TDigestBuilder`.
127 ///
128 /// This is the same as `TDigest::builder()`.
129 pub fn new() -> Self {
130 Self {
131 config: Config::default(),
132 }
133 }
134
135 /// Returns a `TDigest` that uses this `TDigestBuilder` configuration.
136 pub fn build(self) -> TDigest {
137 let centroids = Vec::with_capacity(self.config.max_centroids);
138 TDigest {
139 config: self.config,
140 centroids,
141 count: 0,
142 unmerged: 0,
143 }
144 }
145
146 /// Sets the compression parameter of the `TDigest`. Defaults to 0.01.
147 pub fn epsilon(&mut self, epsilon: f32) -> &mut Self {
148 self.config.epsilon = epsilon;
149 self
150 }
151
152 /// Sets the maximum number of centroids that the `TDigest` will store.
153 /// Defaults to 2048.
154 pub fn max_centroids(&mut self, max_centroids: usize) -> &mut Self {
155 self.config.max_centroids = max_centroids;
156 self
157 }
158
159 /// Sets the maximum number of unmerged centroids that the `TDigest` will
160 /// store. Defaults to 2048.
161 pub fn max_unmerged(&mut self, max_unmerged: usize) -> &mut Self {
162 self.config.max_unmerged = max_unmerged;
163 self
164 }
165}
166
167impl Default for TDigestBuilder {
168 #[inline]
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174fn interpolate(x: f32, x1: f32, y1: f32, x2: f32, y2: f32) -> f32 {
175 let k = (x - x1) / (x2 - x1);
176 (1. - k) * y1 + k * y2
177}
178
179#[inline]
180fn can_be_merged(l_mean: f64, r_mean: f32) -> bool {
181 l_mean == r_mean as f64 || (!l_mean.is_infinite() && !r_mean.is_infinite())
182}
183
184fn cmp_f32(lhs: f32, rhs: f32) -> Ordering {
185 match lhs.partial_cmp(&rhs) {
186 Some(ordering) => ordering,
187 None => {
188 if lhs.is_nan() {
189 if rhs.is_nan() {
190 Ordering::Equal
191 } else {
192 Ordering::Greater
193 }
194 } else {
195 Ordering::Less
196 }
197 }
198 }
199}
200
201/// T-digest data structure for approximating the quantiles of a distribution.
202///
203/// # Examples
204///
205/// ```
206/// use tdigest_ch::TDigest;
207///
208/// let mut digest = TDigest::new();
209///
210/// // Add some elements.
211/// digest.insert(1.0);
212/// digest.insert(2.0);
213/// digest.insert(3.0);
214///
215/// // Get the median of the distribution.
216/// let quantile = digest.quantile(0.5);
217/// assert_eq!(quantile, 2.0);
218/// ```
219#[derive(Clone, Debug, PartialEq)]
220pub struct TDigest {
221 config: Config,
222 centroids: Vec<Centroid>,
223 count: usize,
224 unmerged: usize,
225}
226
227impl TDigest {
228 /// Creates an empty `TDigest`.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use tdigest_ch::TDigest;
234 /// let digest = TDigest::new();
235 /// ```
236 #[must_use]
237 pub fn new() -> Self {
238 Self::builder().build()
239 }
240
241 /// Creates a `TDigestBuilder` to configure a `TDigest`.
242 ///
243 /// This is the same as `TDigestBuilder::new()`.
244 #[inline]
245 pub fn builder() -> TDigestBuilder {
246 TDigestBuilder::new()
247 }
248
249 /// Moves all the elements of `other` into `self`, leaving `other` empty.
250 ///
251 /// # Examples
252 ///
253 /// ```
254 /// use tdigest_ch::TDigest;
255 ///
256 /// let mut a = TDigest::from([-10.0, 1.0, 2.0, 2.0, 3.0]);
257 /// let mut b = TDigest::from([-20.0, 5.0, 43.0]);
258 ///
259 /// a.append(&mut b);
260 ///
261 /// assert_eq!(a.len(), 8);
262 /// assert!(b.is_empty());
263 /// ```
264 pub fn append(&mut self, other: &mut TDigest) {
265 self.bitor_assign(other);
266 other.clear();
267 }
268
269 /// Returns the number of elements in the t-digest.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use tdigest_ch::TDigest;
275 ///
276 /// let mut digest = TDigest::new();
277 /// assert_eq!(digest.len(), 0);
278 /// digest.insert(1.0);
279 /// assert_eq!(digest.len(), 1);
280 /// ```
281 #[inline]
282 pub fn len(&self) -> usize {
283 self.count
284 }
285
286 /// Returns `true` if the t-digest contains no elements.
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use tdigest_ch::TDigest;
292 ///
293 /// let mut digest = TDigest::new();
294 /// assert!(digest.is_empty());
295 /// digest.insert(1.0);
296 /// assert!(!digest.is_empty());
297 /// ```
298 #[inline]
299 pub fn is_empty(&self) -> bool {
300 self.len() == 0
301 }
302
303 /// Clears the t-digest, removing all values.
304 ///
305 /// # Examples
306 ///
307 /// ```
308 /// use tdigest_ch::TDigest;
309 ///
310 /// let mut digest = TDigest::new();
311 /// digest.insert(1.0);
312 /// digest.clear();
313 /// assert!(digest.is_empty());
314 /// ```
315 pub fn clear(&mut self) {
316 self.centroids.clear();
317 self.count = 0;
318 self.unmerged = 0;
319 }
320
321 /// Returns the estimated quantile of the t-digest.
322 ///
323 /// This method expects `self` to be mutable, since the t-digest may be
324 /// compressed. If you require an immutable, shared reference to compute
325 /// quantiles, consider using `quantiles` instead.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use tdigest_ch::TDigest;
331 ///
332 /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
333 /// assert_eq!(digest.quantile(0.0), 1.0);
334 /// assert_eq!(digest.quantile(0.5), 3.0);
335 /// assert_eq!(digest.quantile(1.0), 5.0);
336 /// ```
337 pub fn quantile(&mut self, level: f64) -> f32 {
338 self.compress();
339 self.quantile_uncompressed(level)
340 }
341
342 fn quantile_uncompressed(&self, level: f64) -> f32 {
343 // Calculates the quantile q [0, 1] based on the digest.
344 // For an empty digest returns NaN.
345 if self.centroids.is_empty() {
346 return f32::NAN;
347 }
348
349 if self.centroids.len() == 1 {
350 return self.centroids[0].mean;
351 }
352
353 let x = level * self.count as f64;
354 let mut prev_x = 0f64;
355 let mut sum = 0usize;
356 let mut prev = self.centroids[0];
357
358 for c in self.centroids.iter() {
359 let current_x = sum as f64 + c.count as f64 * 0.5;
360
361 if current_x >= x {
362 // Special handling of singletons.
363 let mut left = prev_x;
364 if prev.count == 1 {
365 left += 0.5;
366 }
367 let mut right = current_x;
368 if c.count == 1 {
369 right -= 0.5;
370 }
371
372 return {
373 if x <= left {
374 prev.mean
375 } else if x >= right {
376 c.mean
377 } else {
378 interpolate(x as f32, left as f32, prev.mean, right as f32, c.mean)
379 }
380 };
381 }
382
383 sum += c.count;
384 prev = *c;
385 prev_x = current_x;
386 }
387
388 self.centroids.last().unwrap().mean
389 }
390
391 /// Creates an immutable quantile estimator from the t-digest.
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// use std::thread;
397 ///
398 /// use tdigest_ch::TDigest;
399 ///
400 /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
401 /// let quantiles = digest.quantiles();
402 ///
403 /// thread::scope(|s| {
404 /// s.spawn(|| {
405 /// assert_eq!(quantiles.get(0.0), 1.0);
406 /// });
407 /// s.spawn(|| {
408 /// assert_eq!(quantiles.get(0.5), 3.0);
409 /// });
410 /// s.spawn(|| {
411 /// assert_eq!(quantiles.get(1.0), 5.0);
412 /// });
413 /// });
414 /// ```
415 pub fn quantiles(&mut self) -> Quantiles<'_> {
416 self.compress();
417 Quantiles { digest: self }
418 }
419
420 /// Adds a value to the t-digest.
421 ///
422 /// # Examples
423 ///
424 /// ```
425 /// use tdigest_ch::TDigest;
426 ///
427 /// let mut digest = TDigest::new();
428 ///
429 /// digest.insert(1.0);
430 /// digest.insert(2.0);
431 /// assert_eq!(digest.len(), 2);
432 /// ```
433 #[inline]
434 pub fn insert(&mut self, value: f32) {
435 self.insert_many(value, 1);
436 }
437
438 /// Adds multiple values to the t-digest.
439 ///
440 /// # Examples
441 ///
442 /// ```
443 /// use tdigest_ch::TDigest;
444 ///
445 /// let mut digest = TDigest::new();
446 ///
447 /// digest.insert_many(1.0, 1);
448 /// digest.insert_many(2.0, 2);
449 /// assert_eq!(digest.len(), 3);
450 /// ```
451 pub fn insert_many(&mut self, value: f32, count: usize) {
452 if count == 0 || value.is_nan() {
453 // Count 0 breaks compress() assumptions, NaN breaks sort(). We treat them as no
454 // sample.
455 return;
456 }
457 self.insert_centroid(&Centroid { mean: value, count });
458 }
459
460 fn insert_centroid(&mut self, centroid: &Centroid) {
461 self.count += centroid.count;
462 self.unmerged += 1;
463 self.centroids.push(*centroid);
464 if self.unmerged > self.config.max_unmerged {
465 self.compress();
466 }
467 }
468
469 fn compress(&mut self) {
470 // Performs compression of accumulated centroids
471 // When merging, the invariant is retained to the maximum size of each centroid
472 // that does not exceed `4 q (1 - q) \ delta N`.
473 if self.unmerged > 0 || self.centroids.len() > self.config.max_centroids {
474 self.centroids.sort_by(|l, r| cmp_f32(l.mean, r.mean));
475
476 let mut l_index = 0;
477
478 // Compiler is unable to do this optimization.
479 let count_epsilon_4 = self.count as f64 * self.config.epsilon as f64 * 4.;
480 let mut sum = 0;
481 let (mut l_mean, mut l_count) = {
482 let l = self.centroids.first().unwrap();
483 (l.mean as f64, l.count)
484 };
485 for r_index in 1..self.centroids.len() {
486 let r = self.centroids[r_index];
487 // N.B. We cannot merge all the same values into single centroids because this
488 // will lead to unbalanced compression and wrong results.
489 // For more information see: https://arxiv.org/abs/1902.04023.
490
491 // The ratio of the part of the histogram to l, including the half l to the
492 // entire histogram. That is, what level quantile in position l.
493 let ql = (sum as f64 + l_count as f64 * 0.5) / self.count as f64;
494 let mut err = ql * (1. - ql);
495
496 // The ratio of the portion of the histogram to l, including l and half r to the
497 // entire histogram. That is, what level is the quantile in position r.
498 let qr = (sum as f64 + l_count as f64 + r.count as f64 * 0.5) / self.count as f64;
499 let err2 = qr * (1. - qr);
500
501 if err > err2 {
502 err = err2;
503 }
504
505 let k = count_epsilon_4 * err;
506
507 // The ratio of the weight of the glued column pair to all values is not
508 // greater, than epsilon multiply by a certain quadratic
509 // coefficient, which in the median is 1 (4 * 1/2 * 1/2), and at
510 // the edges decreases and is approximately equal to the
511 // distance to the edge * 4.
512
513 if l_count as f64 + r.count as f64 <= k && can_be_merged(l_mean, r.mean) {
514 // It is possible to merge left and right.
515 // The left column "eats" the right.
516 l_count += r.count;
517 if r.mean as f64 != l_mean {
518 // Handling infinities of the same sign well.
519 // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower.
520 l_mean += r.count as f64 * (r.mean as f64 - l_mean) / l_count as f64;
521 }
522 self.centroids[l_index] = Centroid {
523 mean: l_mean as f32,
524 count: l_count,
525 };
526 } else {
527 // Not enough capacity, check the next pair.
528 // Not l_count, otherwise actual sum of elements will be different.
529 sum += self.centroids[l_index].count;
530 l_index += 1;
531
532 // We skip all the values "eaten" earlier.
533 while l_index != r_index {
534 self.centroids[l_index].count = 0;
535 l_index += 1;
536 }
537 (l_mean, l_count) = {
538 let l = self.centroids[l_index];
539 (l.mean as f64, l.count)
540 };
541 }
542 }
543 // Update count, it might be different due to += inaccuracy
544 self.count = sum + l_count;
545
546 // At the end of the loop, all values to the right of l were "eaten".
547 self.centroids.retain(|c| c.count != 0);
548 self.unmerged = 0;
549 }
550
551 // Ensures centroids.size() < max_centroids, independent of unprovable floating
552 // point blackbox above.
553 self.compress_brute();
554 }
555
556 fn compress_brute(&mut self) {
557 if self.centroids.len() <= self.config.max_centroids {
558 return;
559 }
560 let batch_size = // At least 2.
561 self.centroids.len().div_ceil(self.config.max_centroids);
562 debug_assert!(batch_size >= 2);
563
564 let mut l_index = 0;
565 let mut sum = 0;
566 // We have high-precision temporaries for numeric stability
567 let (mut l_mean, mut l_count) = {
568 let l = self.centroids.first().unwrap();
569 (l.mean as f64, l.count)
570 };
571 let mut batch_pos = 0usize;
572
573 for r_index in 1..self.centroids.len() {
574 let r = self.centroids[r_index];
575 if batch_pos < batch_size - 1 {
576 // The left column "eats" the right. Middle of the batch.
577 l_count += r.count;
578 if r.mean as f64 != l_mean {
579 // Handling infinities of the same sign well.
580 // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower.
581 l_mean += r.count as f64 * (r.mean as f64 - l_mean) / l_count as f64;
582 }
583 self.centroids[l_index] = Centroid {
584 mean: l_mean as f32,
585 count: l_count,
586 };
587 batch_pos += 1;
588 } else {
589 // End of the batch, start the next one.
590 if !self.centroids[l_index].mean.is_nan() {
591 // Skip writing batch result if we compressed something to nan.
592 // Not l_count, otherwise actual sum of elements will be different.
593 sum += self.centroids[l_index].count;
594 l_index += 1;
595 }
596
597 while l_index != r_index {
598 // We skip all the values "eaten" earlier.
599 self.centroids[l_index].count = 0;
600 l_index += 1;
601 }
602 (l_mean, l_count) = {
603 let l = self.centroids[l_index];
604 (l.mean as f64, l.count)
605 };
606 batch_pos = 0;
607 }
608 }
609
610 if !self.centroids[l_index].mean.is_nan() {
611 // Update count, it might be different due to += inaccuracy.
612 self.count = sum + l_count;
613 } else {
614 // Skip writing last batch if (super unlikely) it's nan.
615 self.count = sum;
616 self.centroids[l_index].count = 0;
617 }
618 self.centroids.retain(|c| c.count != 0);
619 // Here centroids.len() <= params.max_centroids.
620 debug_assert!(self.centroids.len() <= self.config.max_centroids);
621 }
622}
623
624impl BitOr<&TDigest> for &TDigest {
625 type Output = TDigest;
626
627 /// Returns the union of `self` and `rhs` as a new `TDigest`.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// use tdigest_ch::TDigest;
633 ///
634 /// let a = TDigest::from([1.0, 2.0, 3.0]);
635 /// let b = TDigest::from([3.0, 4.0, 5.0]);
636 ///
637 /// let mut c = &a | &b;
638 ///
639 /// assert_eq!(c.len(), 6);
640 /// assert_eq!(c.quantile(0.5), 3.0);
641 /// ```
642 fn bitor(self, rhs: &TDigest) -> TDigest {
643 let mut result = self.clone();
644 result |= rhs;
645 result
646 }
647}
648
649impl BitOrAssign<&TDigest> for TDigest {
650 /// Merges `self` and `rhs` into `self`.
651 ///
652 /// # Examples
653 ///
654 /// ```
655 /// use tdigest_ch::TDigest;
656 ///
657 /// let mut a = TDigest::from([1.0, 2.0, 3.0]);
658 /// let b = TDigest::from([3.0, 4.0, 5.0]);
659 ///
660 /// a |= &b;
661 ///
662 /// assert_eq!(a.len(), 6);
663 /// assert_eq!(a.quantile(0.5), 3.0);
664 /// ```
665 fn bitor_assign(&mut self, rhs: &TDigest) {
666 for c in &rhs.centroids {
667 self.insert_centroid(c);
668 }
669 }
670}
671
672impl Default for TDigest {
673 #[inline]
674 fn default() -> Self {
675 Self::new()
676 }
677}
678
679impl Extend<f32> for TDigest {
680 fn extend<I: IntoIterator<Item = f32>>(&mut self, iter: I) {
681 for value in iter {
682 self.insert(value);
683 }
684 }
685}
686
687impl<const N: usize> From<[f32; N]> for TDigest {
688 /// # Examples
689 ///
690 /// ```
691 /// use tdigest_ch::TDigest;
692 ///
693 /// let digest1 = TDigest::from([1.0, 2.0, 3.0, 4.0]);
694 /// let digest2: TDigest = [1.0, 2.0, 3.0, 4.0].into();
695 /// assert_eq!(digest1, digest2);
696 /// ```
697 fn from(array: [f32; N]) -> Self {
698 let mut digest = TDigest::new();
699 for value in array.iter() {
700 digest.insert(*value);
701 }
702 digest
703 }
704}
705
706impl FromIterator<f32> for TDigest {
707 fn from_iter<I: IntoIterator<Item = f32>>(iter: I) -> Self {
708 let mut digest = TDigest::new();
709 digest.extend(iter);
710 digest
711 }
712}
713
714#[cfg(feature = "serde")]
715impl serde::Serialize for TDigest {
716 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
717 where
718 S: serde::Serializer,
719 {
720 (&self.config, &self.centroids, self.count, self.unmerged).serialize(serializer)
721 }
722}
723
724#[cfg(feature = "serde")]
725impl<'de> serde::Deserialize<'de> for TDigest {
726 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
727 where
728 D: serde::Deserializer<'de>,
729 {
730 let (config, centroids, count, unmerged) = serde::Deserialize::deserialize(deserializer)?;
731 Ok(Self {
732 config,
733 centroids,
734 count,
735 unmerged,
736 })
737 }
738}
739
740/// Estimates quantiles of a t-digest.
741///
742/// This `struct` is created by the [`quantiles`] method on [`TDigest`]. See its
743/// documentation for more.
744///
745/// [`quantiles`]: TDigest::quantiles
746#[derive(Debug)]
747pub struct Quantiles<'a> {
748 digest: &'a TDigest,
749}
750
751impl Quantiles<'_> {
752 /// Returns the estimated quantile of the t-digest.
753 ///
754 /// # Examples
755 ///
756 /// ```
757 /// use tdigest_ch::TDigest;
758 ///
759 /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
760 /// let quantiles = digest.quantiles();
761 /// assert_eq!(quantiles.get(0.0), 1.0);
762 /// assert_eq!(quantiles.get(0.5), 3.0);
763 /// assert_eq!(quantiles.get(1.0), 5.0);
764 /// ```
765 pub fn get(&self, level: f64) -> f32 {
766 self.digest.quantile_uncompressed(level)
767 }
768}