En la entrada sobre la varianza, vimos que podíamos tener problemas de estabilidad numérica si intentábamos calcular la varianza en una sola pasada sobre los datos, usando inocentemente la definición matemática. La solución de entonces fue usar un algoritmo de dos pasos: calcular la media en el primer paso, y en el segundo, calcular la varianza de la muestra menos la media. Había otra posibilidad: usar el primer valor de la secuencia como estimado malo de la media, y restar ese valor a las sucesivas muestras.
¿Podríamos hacer algo mejor si corrigiésemos el estimado de la media sobre la marcha? Resulta que se puede, y el primero en darse cuenta fue Welford, allá por el 1962. Donald Knuth incluyó el algoritmo en el segundo tomo de The Art of Computer Programming. El algoritmo original sólo calculaba la media y la varianza sobre la marcha, pero Timothy Terriberry, en 2007, lo amplió para que calculase momentos superiores. Este algoritmo está implementado, por ejemplo, en Math.Net Numerics, aunque la implementación es mejorable.
¿Qué nos da?
En AUSTRA, la clase que implementa este algoritmo se llama Accumulator
. Hay también una clase simplificada, SimpleAccumulator
, que sólo calcula los dos primeros momentos, con el beneficio evidente de tener que ejecutar menos trabajo.
La definición de Accumulator
, junto con su constructor principal y los campos y propiedades de almacenamiento, es la siguiente:
/// Calculates statistics by adding samples.
public sealed class Accumulator
{
///Minimum value.
private double min = double.PositiveInfinity;
///Maximum value.
private double max = double.NegativeInfinity;
///Estimated mean.
private double m1;
///Accumulated second moment.
private double m2;
///Accumulated third moment.
private double m3;
///Accumulated fourth moment.
private double m4;
///Gets the total number of samples.
public long Count { get; private set; }
///Creates an empty accumulator.
public Accumulator() { }
/* … */
}
La información que nos interesa se obtiene a través de estos campos, por medio de propiedades calculadas:
///Returns the minimum value.
public double Minimum => Count > 0 ? min : double.NaN;
///Returns the maximum value.
public double Maximum => Count > 0 ? max : double.NaN;
///Gets the sample mean.
public double Mean => Count > 0 ? m1 : double.NaN;
///Gets the unbiased variance.
public double Variance =>
Count < 2 ? double.NaN : m2 / (Count - 1);
///Gets the unbiased standard deviation.
public double StandardDeviation =>
Count < 2 ? double.NaN : Sqrt(m2 / (Count - 1));
///Gets the unbiased population skewness.
public double Skewness =>
Count < 3
? double.NaN
: Count * m3 * Sqrt(m2 / (Count - 1))
/ (m2 * m2 * (Count - 2)) * (Count - 1);
///Gets the unbiased population kurtosis.
public double Kurtosis =>
Count < 4
? double.NaN
: ((double)Count * Count - 1)
/ ((Count - 2) * (Count - 3))
* (Count * m4 / (m2 * m2) - 3 + 6.0 / (Count + 1));
He omitido, por brevedad, el cálculo de propiedades como PopulationVariance
, PopulationSkewness
y demás. De todas maneras, están disponibles en el código de Austra, y en el propio código de Math.NET Numerics.
¿Qué le damos?
Para alimentar al acumulador, hay que pasarle las muestras, al menos en principio, de una en una, por medio de un método que hemos nombrado Add
:
/// Adds a sample to this accumulator.
/// The new sample.
public void Add(double sample)
{
++Count;
double d = sample - m1, s = d / Count;
double t = d * s * (Count - 1);
m1 += s;
m4 += (t * s * (Count * (Count - 3 + 3)
+ 6 * s * m2 - 4 * m3) * s;
m3 += (t * (Count - 2) - 3 * m2) * s;
m2 += t;
if (sample < min) min = sample;
if (sample > max) max = sample;
}
Hay algunas pequeñas mejoras en el código anterior, respecto al original. Hay algunas multiplicaciones menos, y está todo preparado por si quisiéramos usar alguna instrucción de fusión de multiplicación y suma. No las he usado porque tengo algunas dudas sobre la eficiencia en .NET Core. Es cierto que siempre tienes la ventaja de la mayor exactitud, pero ya veremos dónde sí se usan (en pocas palabras: donde realmente importan).
Combinando acumuladores
Lo mejor de todo es que podemos combinar los valores en dos acumuladores independientes y generar un acumulador de los datos conjuntos. Esto nos permitiría, por ejemplo, dividir una muestra grande en cuatro partes, calcular cuatro acumuladores en paralelo, y luego mezclarlos en el resultado final.
public static Accumulator operator +(
Accumulator a1, Accumulator a2)
{
if (a1.Count == 0) return a2;
if (a2.Count == 0) return a1;
long n = a1.Count + a2.Count, n2 = n * n;
double d = a2.m1 - a1.m1, d2 = d * d;
double d3 = d2 * d, d4 = d2 * d2;
double m1 = (a1.Count * a1.m1 + a2.Count * a2.m1) / n;
double m2 = a1.m2 + a2.m2 + d2 * a1.Count * a2.Count / n;
double m3 = a1.m3 + a2.m3
+ d3 * a1.Count * a2.Count * (a1.Count - a2.Count) / n2
+ 3 * d * (a1.Count * a2.m2 - a2.Count * a1.m2) / n;
double m4 = a1.m4 + a2.m4 + d4 * a1.Count * a2.Count
* (a1.Count * (a1.Count - a2.Count)
+ a2.Count * a2.Count) / (n2 * n)
+ 6 * d2 * (a1.Count * a1.Count * a2.m2
+ a2.Count * a2.Count * a1.m2) / n2
+ 4 * d * (a1.Count * a2.m3 - a2.Count * a1.m3) / n;
return new() {
Count = n,
m1 = m1, m2 = m2, m3 = m3, m4 = m4,
min = Min(a1.min, a2.min),
max = Max(a1.max, a2.max),
};
}
El código es complicado, y es fácil equivocarse copiando y pegando (ya me ha pasado). De todas maneras, es una clase que es fácil de testear.
El primer impulso es dejarlo aquí, y confiar en el paralelismo con tareas para cuando queremos acelerar el código. Mi problema con esto es que, cuando escribo código para una librería, prefiero realizar la aceleración básica con código vectorial (AVX o lo que esté disponible). ¿Por qué? Pues porque por experiencia, el programador que usa luego la biblioteca prefiere tener la opción del paralelismo por tareas para su propio código. Es cierto que las tareas se combinan más o menos bien en .NET, gracias al thread-pool que obtienes del entorno de ejecución, sin esforzarte demasiado; en Java, todo es más complicado con sus malditos executors.
Drum roll, please...
Prefiero, por lo tanto, ganar todo lo que pueda en paralelismo en una librería a golpe de instrucciones SIMD. Y esto es precisamente lo que hacemos en Accumulator
con el siguiente método y algunos más que lo llaman:
public unsafe void Add(double* samples, int size)
{
int i = 0;
if (Avx.IsSupported && size >= 16)
{
var vMin = Vector256.Create(double.PositiveInfinity);
var vMax = Vector256.Create(double.NegativeInfinity);
var vM1 = Vector256<double>.Zero;
var vM2 = Vector256<double>.Zero;
var vM3 = Vector256<double>.Zero;
var vM4 = Vector256<double>.Zero;
var v3 = Vector256.Create(3.0);
var v4 = Vector256.Create(4.0);
var v6 = Vector256.Create(6.0);
long c = 0;
for (int top = size & CommonMatrix.AVX_MASK;
i < top; i += 4)
{
c++;
var vSample = Avx.LoadVector256(samples + i);
vMin = Avx.Min(vMin, vSample);
vMax = Avx.Max(vMax, vSample);
var vd = Avx.Subtract(vSample, vM1);
var vs = Avx.Divide(vd,
Vector256.Create((double)c));
var vt = Avx.Multiply(Avx.Multiply(vd, vs),
Vector256.Create((double)(c - 1)));
vM1 = Avx.Add(vM1, vs);
var t1 = Avx.Multiply(Avx.Multiply(vt, vs),
Vector256.Create((double)(c * (c - 3) + 3)));
var t2 = Avx.Multiply(Avx.Multiply(vs, vM2), v6);
var t3 = Avx.Multiply(v4, vM3);
vM4 = vM4.MultiplyAdd(Avx.Subtract(
Avx.Add(t1, t2), t3), vs);
t1 = Avx.Multiply(vt,
Vector256.Create((double)(c - 2)));
t2 = Avx.Multiply(vM2, v3);
vM3 = vM3.MultiplyAdd(Avx.Subtract(t1, t2), vs);
vM2 = Avx.Add(vM2, vt);
}
var acc01 = Mix(c,
vM1.ToScalar(), vM2.ToScalar(),
vM3.ToScalar(), vM4.ToScalar(),
vM1.GetElement(1), vM2.GetElement(1),
vM3.GetElement(1), vM4.GetElement(1));
var acc23 = Mix(c,
vM1.GetElement(2), vM2.GetElement(2),
vM3.GetElement(2), vM4.GetElement(2),
vM1.GetElement(3), vM2.GetElement(3),
vM3.GetElement(3), vM4.GetElement(3));
var a = Mix(c + c,
acc01.m1, acc01.m2, acc01.m3, acc01.m4,
acc23.m1, acc23.m2, acc23.m3, acc23.m4);
if (Count == 0)
(Count, m1, m2, m3, m4)
= (4 * c, a.m1, a.m2, a.m3, a.m4);
else
{
long acCnt = 4 * c, n = Count + acCnt, n2 = n * n;
double d = a.m1 - m1, d2 = d * d;
double d3 = d2 * d, d4 = d2 * d2;
double nm1 = (Count * m1 + acCnt * a.m1) / n;
double nm2 = m2 + a.m2 + d2 * Count * acCnt / n;
double nm3 = m3 + a.m3
+ d3 * Count * acCnt * (Count - acCnt) / n2
+ 3 * d * (Count * a.m2 - acCnt * m2) / n;
m4 += a.m4 + d4 * Count * acCnt
* (Count * (Count - acCnt)
+ acCnt * acCnt) / (n2 * n)
+ 6 * d2 * (Count * Count * a.m2
+ acCnt * acCnt * m2) / n2
+ 4 * d * (Count * a.m3 - acCnt * m3) / n;
(m1, m2, m3, Count) = (nm1, nm2, nm3, n);
}
min = Min(min, vMin.Min());
max = Max(max, vMax.Max());
}
for (; i < size; ++i)
Add(samples[i]);
static (double m1, double m2, double m3, double m4) Mix(
long c,
double a1, double a2, double a3, double a4,
double b1, double b2, double b3, double b4)
{
long n = c + c, n2 = n * n;
double d = b1 - a1, d2 = d * d, d4 = d2 * d2;
return (
(a1 + b1) / 2,
a2 + b2 + d2 * c / 2,
a3 + b3 + 3 * d * (b2 - a2) / 2,
a4 + b4 + d4 * c / 8 + 3 * d2 * (b2 + a2) / 2
+ 2 * d * (b3 - a3));
}
}
Observe que la precondición para aprovechar las instrucciones vectoriales es tener todo un array de muestras a nuestra disposición. Si nos diesen un IEnumerable<double>
, tendríamos que hacer maniobras como materializar las muestras en grupos de cuatro, en un array, y alimentar así al animalito vectorial.
El código es relativamente sencillo, si miramos con atención. La parte AVX prácticamente repite el código del Add
escalar. Por cada campo de Accumulator
hay un vector de doble precisión. La excepción es la propiedad Count
, y la tratamos diferente porque para los cuatro acumuladores virtuales que maneja el método, la cantidad de muestras es siempre la misma.
Esto es una ventaja cuando tenemos que mezclar los resultados de los cuatro acumuladores. La función interna estática Mix
aprovecha la igualdad de los contadores para simplificar algebraicamente algunas fórmula. Observe, por ejemplo, que la fórmula para el m3
combinado es más sencilla, al anularse uno de los términos.
Una vez que hemos mezclado los cuatro acumuladores parciales, mezclamos el resultado, a su vez, con los valores que pueda haber ya en el propio acumulador (si los hubiera). Aquí no podemos simplificar tanto, porque los contadores nuevos y antiguos pueden ser muy diferentes, aunque en el caso en el que el acumulador inicial no tuviese muestras, es todo más simple.
Si quiere hacerse una idea de cuánto mejora este tipo de procesamiento vectorial, los benchmarks que he ejecutado me dan casi cinco veces más velocidad. Es extraño, porque yo esperaría una mejora de 4x, pero puede deberse a que aquí sí hacemos uso de las instrucciones FMA vectoriales, cuando están disponibles. Las instrucciones FMA están escondidas en los métodos de extensión MultiplyAdd
que presenté en esta entrada.
Por cierto, la niña de la imagen de la entrada tiene poco que ver con el algoritmo, pero estoy usando imágenes generadas por AI, entre otros motivos, para evitar problemas de derechos de autor. En este caso, le pedí a la AI que generase una niña perdida e indefensa en un universo digital simulado. En parte, la AI me hizo caso; en parte, ignoró la petición. Pero el resultado me gusta, y ahí lo tiene.