tdigest_ch/
lib.rs

1//! A Rust library for estimating quantiles in a stream,
2//! using [ClickHouse t-digest][ClickHouseRefTDigest] data structure.
3//!
4//! The [t-digest][Dunning19] data structure is designed around computing
5//! accurate quantile estimates from streaming data. Two t-digests can be
6//! merged, making the data structure well suited for map-reduce settings.
7//!
8//! [Repository]
9//!
10//! [ClickHouseRefTDigest]: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/reference/quantiletdigest/
11//! [Dunning19]: https://github.com/tdunning/t-digest/blob/main/docs/t-digest-paper/histo.pdf
12//! [Repository]: https://github.com/vivienm/rust-tdigest-ch
13//!
14//! # Examples
15//!
16//! ```
17//! use tdigest_ch::TDigest;
18//!
19//! let mut digest = TDigest::new();
20//!
21//! // Add some elements.
22//! digest.insert(1.0);
23//! digest.insert(2.0);
24//! digest.insert(3.0);
25//!
26//! // Get the median of the distribution.
27//! let quantile = digest.quantile(0.5);
28//! assert_eq!(quantile, 2.0);
29//! ```
30
31use std::{
32    cmp::Ordering,
33    ops::{BitOr, BitOrAssign},
34};
35
36/// Stores the weight of points around their mean value.
37#[derive(Clone, Copy, Debug, PartialEq)]
38struct Centroid {
39    mean: f32,
40    count: usize,
41}
42
43#[cfg(feature = "serde")]
44impl serde::Serialize for Centroid {
45    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
46    where
47        S: serde::Serializer,
48    {
49        (self.mean, self.count).serialize(serializer)
50    }
51}
52
53#[cfg(feature = "serde")]
54impl<'de> serde::Deserialize<'de> for Centroid {
55    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
56    where
57        D: serde::Deserializer<'de>,
58    {
59        let (mean, count) = serde::Deserialize::deserialize(deserializer)?;
60        Ok(Self { mean, count })
61    }
62}
63
64#[derive(Clone, Debug, PartialEq)]
65struct Config {
66    epsilon: f32,
67    max_centroids: usize,
68    max_unmerged: usize,
69}
70
71impl Default for Config {
72    fn default() -> Self {
73        Self {
74            epsilon: 0.01,
75            max_centroids: 2048,
76            max_unmerged: 2048,
77        }
78    }
79}
80
81#[cfg(feature = "serde")]
82impl serde::Serialize for Config {
83    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
84    where
85        S: serde::Serializer,
86    {
87        (self.epsilon, self.max_centroids, self.max_unmerged).serialize(serializer)
88    }
89}
90
91#[cfg(feature = "serde")]
92impl<'de> serde::Deserialize<'de> for Config {
93    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
94    where
95        D: serde::Deserializer<'de>,
96    {
97        let (epsilon, max_centroids, max_unmerged) = serde::Deserialize::deserialize(deserializer)?;
98        Ok(Self {
99            epsilon,
100            max_centroids,
101            max_unmerged,
102        })
103    }
104}
105
106/// A `TDigestBuilder` can be used to create a `TDigest` with custom
107/// configuration.
108///
109/// # Examples
110///
111/// ```
112/// use tdigest_ch::TDigestBuilder;
113///
114/// let mut builder = TDigestBuilder::new();
115/// builder.max_centroids(1024);
116/// builder.max_unmerged(1024);
117///
118/// let digest = builder.build();
119/// ```
120#[derive(Debug)]
121pub struct TDigestBuilder {
122    config: Config,
123}
124
125impl TDigestBuilder {
126    /// Constructs a new `TDigestBuilder`.
127    ///
128    /// This is the same as `TDigest::builder()`.
129    pub fn new() -> Self {
130        Self {
131            config: Config::default(),
132        }
133    }
134
135    /// Returns a `TDigest` that uses this `TDigestBuilder` configuration.
136    pub fn build(self) -> TDigest {
137        let centroids = Vec::with_capacity(self.config.max_centroids);
138        TDigest {
139            config: self.config,
140            centroids,
141            count: 0,
142            unmerged: 0,
143        }
144    }
145
146    /// Sets the compression parameter of the `TDigest`. Defaults to 0.01.
147    pub fn epsilon(&mut self, epsilon: f32) -> &mut Self {
148        self.config.epsilon = epsilon;
149        self
150    }
151
152    /// Sets the maximum number of centroids that the `TDigest` will store.
153    /// Defaults to 2048.
154    pub fn max_centroids(&mut self, max_centroids: usize) -> &mut Self {
155        self.config.max_centroids = max_centroids;
156        self
157    }
158
159    /// Sets the maximum number of unmerged centroids that the `TDigest` will
160    /// store. Defaults to 2048.
161    pub fn max_unmerged(&mut self, max_unmerged: usize) -> &mut Self {
162        self.config.max_unmerged = max_unmerged;
163        self
164    }
165}
166
167impl Default for TDigestBuilder {
168    #[inline]
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174fn interpolate(x: f32, x1: f32, y1: f32, x2: f32, y2: f32) -> f32 {
175    let k = (x - x1) / (x2 - x1);
176    (1. - k) * y1 + k * y2
177}
178
179#[inline]
180fn can_be_merged(l_mean: f64, r_mean: f32) -> bool {
181    l_mean == r_mean as f64 || (!l_mean.is_infinite() && !r_mean.is_infinite())
182}
183
184fn cmp_f32(lhs: f32, rhs: f32) -> Ordering {
185    match lhs.partial_cmp(&rhs) {
186        Some(ordering) => ordering,
187        None => {
188            if lhs.is_nan() {
189                if rhs.is_nan() {
190                    Ordering::Equal
191                } else {
192                    Ordering::Greater
193                }
194            } else {
195                Ordering::Less
196            }
197        }
198    }
199}
200
201/// T-digest data structure for approximating the quantiles of a distribution.
202///
203/// # Examples
204///
205/// ```
206/// use tdigest_ch::TDigest;
207///
208/// let mut digest = TDigest::new();
209///
210/// // Add some elements.
211/// digest.insert(1.0);
212/// digest.insert(2.0);
213/// digest.insert(3.0);
214///
215/// // Get the median of the distribution.
216/// let quantile = digest.quantile(0.5);
217/// assert_eq!(quantile, 2.0);
218/// ```
219#[derive(Clone, Debug, PartialEq)]
220pub struct TDigest {
221    config: Config,
222    centroids: Vec<Centroid>,
223    count: usize,
224    unmerged: usize,
225}
226
227impl TDigest {
228    /// Creates an empty `TDigest`.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use tdigest_ch::TDigest;
234    /// let digest = TDigest::new();
235    /// ```
236    #[must_use]
237    pub fn new() -> Self {
238        Self::builder().build()
239    }
240
241    /// Creates a `TDigestBuilder` to configure a `TDigest`.
242    ///
243    /// This is the same as `TDigestBuilder::new()`.
244    #[inline]
245    pub fn builder() -> TDigestBuilder {
246        TDigestBuilder::new()
247    }
248
249    /// Moves all the elements of `other` into `self`, leaving `other` empty.
250    ///
251    /// # Examples
252    ///
253    /// ```
254    /// use tdigest_ch::TDigest;
255    ///
256    /// let mut a = TDigest::from([-10.0, 1.0, 2.0, 2.0, 3.0]);
257    /// let mut b = TDigest::from([-20.0, 5.0, 43.0]);
258    ///
259    /// a.append(&mut b);
260    ///
261    /// assert_eq!(a.len(), 8);
262    /// assert!(b.is_empty());
263    /// ```
264    pub fn append(&mut self, other: &mut TDigest) {
265        self.bitor_assign(other);
266        other.clear();
267    }
268
269    /// Returns the number of elements in the t-digest.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use tdigest_ch::TDigest;
275    ///
276    /// let mut digest = TDigest::new();
277    /// assert_eq!(digest.len(), 0);
278    /// digest.insert(1.0);
279    /// assert_eq!(digest.len(), 1);
280    /// ```
281    #[inline]
282    pub fn len(&self) -> usize {
283        self.count
284    }
285
286    /// Returns `true` if the t-digest contains no elements.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use tdigest_ch::TDigest;
292    ///
293    /// let mut digest = TDigest::new();
294    /// assert!(digest.is_empty());
295    /// digest.insert(1.0);
296    /// assert!(!digest.is_empty());
297    /// ```
298    #[inline]
299    pub fn is_empty(&self) -> bool {
300        self.len() == 0
301    }
302
303    /// Clears the t-digest, removing all values.
304    ///
305    /// # Examples
306    ///
307    /// ```
308    /// use tdigest_ch::TDigest;
309    ///
310    /// let mut digest = TDigest::new();
311    /// digest.insert(1.0);
312    /// digest.clear();
313    /// assert!(digest.is_empty());
314    /// ```
315    pub fn clear(&mut self) {
316        self.centroids.clear();
317        self.count = 0;
318        self.unmerged = 0;
319    }
320
321    /// Returns the estimated quantile of the t-digest.
322    ///
323    /// This method expects `self` to be mutable, since the t-digest may be
324    /// compressed. If you require an immutable, shared reference to compute
325    /// quantiles, consider using `quantiles` instead.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use tdigest_ch::TDigest;
331    ///
332    /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
333    /// assert_eq!(digest.quantile(0.0), 1.0);
334    /// assert_eq!(digest.quantile(0.5), 3.0);
335    /// assert_eq!(digest.quantile(1.0), 5.0);
336    /// ```
337    pub fn quantile(&mut self, level: f64) -> f32 {
338        self.compress();
339        self.quantile_uncompressed(level)
340    }
341
342    fn quantile_uncompressed(&self, level: f64) -> f32 {
343        // Calculates the quantile q [0, 1] based on the digest.
344        // For an empty digest returns NaN.
345        if self.centroids.is_empty() {
346            return f32::NAN;
347        }
348
349        if self.centroids.len() == 1 {
350            return self.centroids[0].mean;
351        }
352
353        let x = level * self.count as f64;
354        let mut prev_x = 0f64;
355        let mut sum = 0usize;
356        let mut prev = self.centroids[0];
357
358        for c in self.centroids.iter() {
359            let current_x = sum as f64 + c.count as f64 * 0.5;
360
361            if current_x >= x {
362                // Special handling of singletons.
363                let mut left = prev_x;
364                if prev.count == 1 {
365                    left += 0.5;
366                }
367                let mut right = current_x;
368                if c.count == 1 {
369                    right -= 0.5;
370                }
371
372                return {
373                    if x <= left {
374                        prev.mean
375                    } else if x >= right {
376                        c.mean
377                    } else {
378                        interpolate(x as f32, left as f32, prev.mean, right as f32, c.mean)
379                    }
380                };
381            }
382
383            sum += c.count;
384            prev = *c;
385            prev_x = current_x;
386        }
387
388        self.centroids.last().unwrap().mean
389    }
390
391    /// Creates an immutable quantile estimator from the t-digest.
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// use std::thread;
397    ///
398    /// use tdigest_ch::TDigest;
399    ///
400    /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
401    /// let quantiles = digest.quantiles();
402    ///
403    /// thread::scope(|s| {
404    ///     s.spawn(|| {
405    ///         assert_eq!(quantiles.get(0.0), 1.0);
406    ///     });
407    ///     s.spawn(|| {
408    ///         assert_eq!(quantiles.get(0.5), 3.0);
409    ///     });
410    ///     s.spawn(|| {
411    ///         assert_eq!(quantiles.get(1.0), 5.0);
412    ///     });
413    /// });
414    /// ```
415    pub fn quantiles(&mut self) -> Quantiles<'_> {
416        self.compress();
417        Quantiles { digest: self }
418    }
419
420    /// Adds a value to the t-digest.
421    ///
422    /// # Examples
423    ///
424    /// ```
425    /// use tdigest_ch::TDigest;
426    ///
427    /// let mut digest = TDigest::new();
428    ///
429    /// digest.insert(1.0);
430    /// digest.insert(2.0);
431    /// assert_eq!(digest.len(), 2);
432    /// ```
433    #[inline]
434    pub fn insert(&mut self, value: f32) {
435        self.insert_many(value, 1);
436    }
437
438    /// Adds multiple values to the t-digest.
439    ///
440    /// # Examples
441    ///
442    /// ```
443    /// use tdigest_ch::TDigest;
444    ///
445    /// let mut digest = TDigest::new();
446    ///
447    /// digest.insert_many(1.0, 1);
448    /// digest.insert_many(2.0, 2);
449    /// assert_eq!(digest.len(), 3);
450    /// ```
451    pub fn insert_many(&mut self, value: f32, count: usize) {
452        if count == 0 || value.is_nan() {
453            // Count 0 breaks compress() assumptions, NaN breaks sort(). We treat them as no
454            // sample.
455            return;
456        }
457        self.insert_centroid(&Centroid { mean: value, count });
458    }
459
460    fn insert_centroid(&mut self, centroid: &Centroid) {
461        self.count += centroid.count;
462        self.unmerged += 1;
463        self.centroids.push(*centroid);
464        if self.unmerged > self.config.max_unmerged {
465            self.compress();
466        }
467    }
468
469    fn compress(&mut self) {
470        // Performs compression of accumulated centroids
471        // When merging, the invariant is retained to the maximum size of each centroid
472        // that does not exceed `4 q (1 - q) \ delta N`.
473        if self.unmerged > 0 || self.centroids.len() > self.config.max_centroids {
474            self.centroids.sort_by(|l, r| cmp_f32(l.mean, r.mean));
475
476            let mut l_index = 0;
477
478            // Compiler is unable to do this optimization.
479            let count_epsilon_4 = self.count as f64 * self.config.epsilon as f64 * 4.;
480            let mut sum = 0;
481            let (mut l_mean, mut l_count) = {
482                let l = self.centroids.first().unwrap();
483                (l.mean as f64, l.count)
484            };
485            for r_index in 1..self.centroids.len() {
486                let r = self.centroids[r_index];
487                // N.B. We cannot merge all the same values into single centroids because this
488                // will lead to unbalanced compression and wrong results.
489                // For more information see: https://arxiv.org/abs/1902.04023.
490
491                // The ratio of the part of the histogram to l, including the half l to the
492                // entire histogram. That is, what level quantile in position l.
493                let ql = (sum as f64 + l_count as f64 * 0.5) / self.count as f64;
494                let mut err = ql * (1. - ql);
495
496                // The ratio of the portion of the histogram to l, including l and half r to the
497                // entire histogram. That is, what level is the quantile in position r.
498                let qr = (sum as f64 + l_count as f64 + r.count as f64 * 0.5) / self.count as f64;
499                let err2 = qr * (1. - qr);
500
501                if err > err2 {
502                    err = err2;
503                }
504
505                let k = count_epsilon_4 * err;
506
507                // The ratio of the weight of the glued column pair to all values is not
508                // greater, than epsilon multiply by a certain quadratic
509                // coefficient, which in the median is 1 (4 * 1/2 * 1/2), and at
510                // the edges decreases and is approximately equal to the
511                // distance to the edge * 4.
512
513                if l_count as f64 + r.count as f64 <= k && can_be_merged(l_mean, r.mean) {
514                    // It is possible to merge left and right.
515                    // The left column "eats" the right.
516                    l_count += r.count;
517                    if r.mean as f64 != l_mean {
518                        // Handling infinities of the same sign well.
519                        // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower.
520                        l_mean += r.count as f64 * (r.mean as f64 - l_mean) / l_count as f64;
521                    }
522                    self.centroids[l_index] = Centroid {
523                        mean: l_mean as f32,
524                        count: l_count,
525                    };
526                } else {
527                    // Not enough capacity, check the next pair.
528                    // Not l_count, otherwise actual sum of elements will be different.
529                    sum += self.centroids[l_index].count;
530                    l_index += 1;
531
532                    // We skip all the values "eaten" earlier.
533                    while l_index != r_index {
534                        self.centroids[l_index].count = 0;
535                        l_index += 1;
536                    }
537                    (l_mean, l_count) = {
538                        let l = self.centroids[l_index];
539                        (l.mean as f64, l.count)
540                    };
541                }
542            }
543            // Update count, it might be different due to += inaccuracy
544            self.count = sum + l_count;
545
546            // At the end of the loop, all values to the right of l were "eaten".
547            self.centroids.retain(|c| c.count != 0);
548            self.unmerged = 0;
549        }
550
551        // Ensures centroids.size() < max_centroids, independent of unprovable floating
552        // point blackbox above.
553        self.compress_brute();
554    }
555
556    fn compress_brute(&mut self) {
557        if self.centroids.len() <= self.config.max_centroids {
558            return;
559        }
560        let batch_size =  // At least 2.
561            self.centroids.len().div_ceil(self.config.max_centroids);
562        debug_assert!(batch_size >= 2);
563
564        let mut l_index = 0;
565        let mut sum = 0;
566        // We have high-precision temporaries for numeric stability
567        let (mut l_mean, mut l_count) = {
568            let l = self.centroids.first().unwrap();
569            (l.mean as f64, l.count)
570        };
571        let mut batch_pos = 0usize;
572
573        for r_index in 1..self.centroids.len() {
574            let r = self.centroids[r_index];
575            if batch_pos < batch_size - 1 {
576                // The left column "eats" the right. Middle of the batch.
577                l_count += r.count;
578                if r.mean as f64 != l_mean {
579                    // Handling infinities of the same sign well.
580                    // Symmetric algo (M1*C1 + M2*C2)/(C1+C2) is numerically better, but slower.
581                    l_mean += r.count as f64 * (r.mean as f64 - l_mean) / l_count as f64;
582                }
583                self.centroids[l_index] = Centroid {
584                    mean: l_mean as f32,
585                    count: l_count,
586                };
587                batch_pos += 1;
588            } else {
589                // End of the batch, start the next one.
590                if !self.centroids[l_index].mean.is_nan() {
591                    // Skip writing batch result if we compressed something to nan.
592                    // Not l_count, otherwise actual sum of elements will be different.
593                    sum += self.centroids[l_index].count;
594                    l_index += 1;
595                }
596
597                while l_index != r_index {
598                    // We skip all the values "eaten" earlier.
599                    self.centroids[l_index].count = 0;
600                    l_index += 1;
601                }
602                (l_mean, l_count) = {
603                    let l = self.centroids[l_index];
604                    (l.mean as f64, l.count)
605                };
606                batch_pos = 0;
607            }
608        }
609
610        if !self.centroids[l_index].mean.is_nan() {
611            // Update count, it might be different due to += inaccuracy.
612            self.count = sum + l_count;
613        } else {
614            // Skip writing last batch if (super unlikely) it's nan.
615            self.count = sum;
616            self.centroids[l_index].count = 0;
617        }
618        self.centroids.retain(|c| c.count != 0);
619        // Here centroids.len() <= params.max_centroids.
620        debug_assert!(self.centroids.len() <= self.config.max_centroids);
621    }
622}
623
624impl BitOr<&TDigest> for &TDigest {
625    type Output = TDigest;
626
627    /// Returns the union of `self` and `rhs` as a new `TDigest`.
628    ///
629    /// # Examples
630    ///
631    /// ```
632    /// use tdigest_ch::TDigest;
633    ///
634    /// let a = TDigest::from([1.0, 2.0, 3.0]);
635    /// let b = TDigest::from([3.0, 4.0, 5.0]);
636    ///
637    /// let mut c = &a | &b;
638    ///
639    /// assert_eq!(c.len(), 6);
640    /// assert_eq!(c.quantile(0.5), 3.0);
641    /// ```
642    fn bitor(self, rhs: &TDigest) -> TDigest {
643        let mut result = self.clone();
644        result |= rhs;
645        result
646    }
647}
648
649impl BitOrAssign<&TDigest> for TDigest {
650    /// Merges `self` and `rhs` into `self`.
651    ///
652    /// # Examples
653    ///
654    /// ```
655    /// use tdigest_ch::TDigest;
656    ///
657    /// let mut a = TDigest::from([1.0, 2.0, 3.0]);
658    /// let b = TDigest::from([3.0, 4.0, 5.0]);
659    ///
660    /// a |= &b;
661    ///
662    /// assert_eq!(a.len(), 6);
663    /// assert_eq!(a.quantile(0.5), 3.0);
664    /// ```
665    fn bitor_assign(&mut self, rhs: &TDigest) {
666        for c in &rhs.centroids {
667            self.insert_centroid(c);
668        }
669    }
670}
671
672impl Default for TDigest {
673    #[inline]
674    fn default() -> Self {
675        Self::new()
676    }
677}
678
679impl Extend<f32> for TDigest {
680    fn extend<I: IntoIterator<Item = f32>>(&mut self, iter: I) {
681        for value in iter {
682            self.insert(value);
683        }
684    }
685}
686
687impl<const N: usize> From<[f32; N]> for TDigest {
688    /// # Examples
689    ///
690    /// ```
691    /// use tdigest_ch::TDigest;
692    ///
693    /// let digest1 = TDigest::from([1.0, 2.0, 3.0, 4.0]);
694    /// let digest2: TDigest = [1.0, 2.0, 3.0, 4.0].into();
695    /// assert_eq!(digest1, digest2);
696    /// ```
697    fn from(array: [f32; N]) -> Self {
698        let mut digest = TDigest::new();
699        for value in array.iter() {
700            digest.insert(*value);
701        }
702        digest
703    }
704}
705
706impl FromIterator<f32> for TDigest {
707    fn from_iter<I: IntoIterator<Item = f32>>(iter: I) -> Self {
708        let mut digest = TDigest::new();
709        digest.extend(iter);
710        digest
711    }
712}
713
714#[cfg(feature = "serde")]
715impl serde::Serialize for TDigest {
716    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
717    where
718        S: serde::Serializer,
719    {
720        (&self.config, &self.centroids, self.count, self.unmerged).serialize(serializer)
721    }
722}
723
724#[cfg(feature = "serde")]
725impl<'de> serde::Deserialize<'de> for TDigest {
726    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
727    where
728        D: serde::Deserializer<'de>,
729    {
730        let (config, centroids, count, unmerged) = serde::Deserialize::deserialize(deserializer)?;
731        Ok(Self {
732            config,
733            centroids,
734            count,
735            unmerged,
736        })
737    }
738}
739
740/// Estimates quantiles of a t-digest.
741///
742/// This `struct` is created by the [`quantiles`] method on [`TDigest`]. See its
743/// documentation for more.
744///
745/// [`quantiles`]: TDigest::quantiles
746#[derive(Debug)]
747pub struct Quantiles<'a> {
748    digest: &'a TDigest,
749}
750
751impl Quantiles<'_> {
752    /// Returns the estimated quantile of the t-digest.
753    ///
754    /// # Examples
755    ///
756    /// ```
757    /// use tdigest_ch::TDigest;
758    ///
759    /// let mut digest = TDigest::from([1.0, 2.0, 3.0, 4.0, 5.0]);
760    /// let quantiles = digest.quantiles();
761    /// assert_eq!(quantiles.get(0.0), 1.0);
762    /// assert_eq!(quantiles.get(0.5), 3.0);
763    /// assert_eq!(quantiles.get(1.0), 5.0);
764    /// ```
765    pub fn get(&self, level: f64) -> f32 {
766        self.digest.quantile_uncompressed(level)
767    }
768}