Categorías
C#

Xoshiro256**

Aunque parezca que el título de la entrada lo tecleó mi gato, es en realidad el nombre de un algoritmo de generación de números aleatorio, inventado por Sebastiano Vigna y David Blackman (enlaces al final).

El nombre es una combinación de las operaciones principales del algoritmo: xor, shift, rotate. En realidad, hay toda una familia de algoritmos similares, con nombres que se parecen mucho. El xoshiro256** es, simplemente, el que ha adoptado .NET Core desde la versión 6 (implementación aquí).

Xoshiro256** es muy rápido, es robusto (aunque no te recomiendan que lo uses en aplicaciones de criptografía), y utiliza muy poca memoria. Como se puede ver en la implementación de .NET 8, sólo necesita cuatro variables de tipo ulong. Es decir, 32 bytes por cada instancia del generador.

Xoshiro vectorial

Austra utiliza generadores de números aleatorios a diestra y siniestra. ¿Es fácil escribir una versión de este algoritmo usando instrucciones AVX? La respuesta es: no, a no ser que uses directamente AVX512F. El problema es que este algoritmo realiza una rotación. Es curioso que los lenguajes modernos de programación no te den directamente esta operación como parte de los operadores de bit. C# tiene un >> y un <<, por ejemplo, pero las rotaciones tienes que buscarlas en la clase BitOperations. De todos modos, el problema es que AVX/AVX2 no tiene una operación SIMD de rotación. Podríamos simularla con shifts y máscaras, pero perderíamos parte de las ventajas de la vectorización. Hay implementaciones en GitHub de generadores aleatorios con AVX2, pero no dan una ganancia de velocidad destacable.

Hay otro problema con el que hay que tener sumo cuidado. Pongamos como ejemplo el constructor de la clase DVector que genera un vector con valores aleatorios:

/// 
/// Creates a vector filled with a uniform distribution generator.
/// 
/// Size of the vector.
/// A random number generator.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public DVector(int size, Random rnd);

Omito la implementación para simplificar. Lo que quiero subrayar es que este constructor recibe una instancia explícita de la clase Random. ¿Por qué? Pues porque al cliente de la librería puede necesitar resultados repetibles. Es decir, puede que para que un vector «aleatorio» tenga siempre los mismos resultados, podría ser que la instancia de Random se inicializase siempre con la misma semilla. Si no nos interesa usar una semilla, casi siempre utilizaremos Random.Shared, que es una propiedad estática de Random, que es única para cada hilo y que se crea por demanda.

Podríamos complicarnos la vida al escribir la versión vectorizada de este constructor, y averiguar cómo recuperar la semilla del generador que nos pasa el cliente para crear un generador vectorial con esa misma semilla. Pero:

  1. No tengo claro que recuperar esa semilla sea tarea fácil
  2. El generador aleatorio vectorial que he programado no tiene, de momento, la posibilidad de inicializarse con una semilla (y no me parece tampoco sencillo).

Por lo tanto, las rutinas que se han acelerado con el generador vectorial comprueban si el generador que han recibido del cliente es Random.Shared. Eso lo interpretamos como que al cliente no le interesa la repetibilidad de los resultados. Naturalmente, hay que verificar antes si AVX512F está disponible en el ordenador.

Cuando usamos el generador vectorial, los tiempos de ejecución se reducen a una cuarta o quinta parte. No se reducen a la octava parte mecánicamente porque en la mayoría de estas rutinas pesa más la asignación de memoria y el posterior llenado de la misma. Pero bajar el tiempo de ejecución a la cuarta parte me parece meritorio.

Enlaces

Estos son los enlaces a la clase de Austra.Library que genera ocho números de golpe por llamada, y a la página de Sebastiano Vigna, el autor del algoritmo que hemos vectorizado. Mi implementación, con toda seguridad, está abierta a mejoras. Por ejemplo, no me convence la forma en que tengo que convertir un valor ulong en otro de tipo double, porque no es vectorial. Debe existir algo más eficiente, pero de momento no lo he encontrado. De todas maneras, ahí tiene mi implementación, por si le es útil en alguno de sus proyectos.

Queda también pendiente la vectorización de números aleatorios provenientes de una distribución normal. Austra utiliza el método de Box-Muller, pero este método exige el cálculo de un logaritmo y de senos y cosenos. No es tarea imposible, pero tengo primero que proporcionar una rutina que genere vectorialmente senos y cosenos y, quizás antes, decidir si me interesa realmente el método de Box-Muller, o si merece la pena ir directamente a una implementación vectorial del algoritmo del zigurat. Todo, en su debido momento.

Categorías
C#

Austra PolySolve

C’est la vie: elle est dure et souvent courte.

Es imposible escribir sobre ecuaciones algebraicas sin mencionar a Évariste Galois, quien no sólo cerró una larga historia de intentos de solución de este tipo de ecuaciones, sino que además tuvo una vida corta y trágica.

Dicen que los egipcios y los babilonios eran capaces de resolver las ecuaciones de segundo grado y, por supuesto, las lineales. Las de tercer y cuarto grado tuvieron que esperar al Renacimiento Italiano. Y luego, la teoría se estancó: nadie era capaz de resolver una ecuación de quinto grado general; sólo algunas versiones restringidas.

Lagrange estuvo a punto de demostrar que las ecuaciones de quinto grado y superiores no tenían una solución general. Fue Ruffini quien lo consiguió, aunque con algunos pequeños errores, que más tarde corrigió Abel. De todos modos, la teoría que Galois puso por escrito en 1830, cuando sólo tenía 18 años, tenía un alcance mucho mayor, y ofrecía una estructura más completa y versátil para estudiar las ecuaciones algebraicas. Mientras que el teorema de Ruffini-Abel se centraba en la solubilidad de ecuaciones algebraicas por medio de funciones elementales (como exponentes, logaritmos, etc.), la criatura que inventó nuestro héroe introdujo los llamados grupos de Galois para analizar la solubilidad y las simetrías en un marco más general. No solo abordó la solubilidad de ecuaciones, sino que la teoría es aplicable también en otras áreas de las matemáticas, como la teoría de números y la geometría algebraica.

Por desgracia, el artículo presentado en 1830 por nuestro héroe no tuvo el éxito que merecía. Cauchy le pasó la patata caliente a Poisson, y Poisson no entendió ni papa del tema. El rechazo cabreó a Galois, pero aprovechó para rescribir la demostración, y fue esta modificación la que finalmente fue reconocida, entre otros, por el propio Cauchy. Eso sí: tras la muerte de Galois…

Galois tenía una cabecita muy loca, le pirraba la política, y para colmo, estaba algo deprimido por la muerte de su padre. 1832 fue un año difícil para el chico. Estuvo en prisión un par de veces, por ciscarse en Louis Philippe, el penúltimo rey de Francia. Al salir de la cárcel por segunda vez, se enredó en un duelo absurdo, teóricamente por una coquette, aunque no es descartable que todo fuese una trampa de sus enemigos políticos. Se pasó la noche anterior al duelo escribiendo una carta sobre sus últimos avances matemáticos. Al día siguiente, interpuso su abdomen en la trayectoria de una bala, y sus contrincantes lo dejaron tirado sobre la hierba como a un chien. Un transeúnte lo vio y lo llevó al hospital, pero al día siguiente se reunió con su Creador, probablemente por culpa de una peritonitis.

And all the king’s horses and all the king’s men, couldn’t put Évariste together again.

Raíces reales

Mi curiosidad por estos temas viene de cuando tenía unos diez u once años: encontré la solución razonada de las ecuaciones de segundo grado, en un libro de electrónica, y me dio por intentar resolver por mi cuenta el problema de las cúbicas. No lo conseguí. Tropecé por casualidad con la sustitución de Vieta, pero no conseguí algo mucho más sencillo: cómo eliminar el término cuadrático, que suele ser el primer paso de la solución. Pero compré un libro que explicaba la fórmula cúbica y la cuártica, y me convertí en un friqui de las mates.

Volví a enredar con ecuaciones algebraicas en 2005, cuando me dio por probar si se podía escribir un ray tracer decente en C#. Es bastante frecuente tener que resolver ecuaciones de tercer y cuarto grado para calcular intersecciones entre rayos de luz y determinados tipos de objetos. La particularidad es que, en este contexto, sólo se necesitan las soluciones reales. Cuando las cosas se ponen feas, existe una técnica para encontrar las raíces reales de cualquier polinomio utilizando las secuencias de Sturm. Naturalmente, este algoritmo es una sólo una aproximación iterativa.

Raíces complejas, todas

Cuando estás escribiendo una librería como AUSTRA, te interesa resolver el problema más general, que es encontrar todas las raíces, ya sean complejas o reales, de un polinomio arbitrario. ¿Se acuerda de los valores propios? El método que utilizo en AUSTRA está basado en ellos.

Supongamos que queremos resolver la ecuación:

$$c_0 + c_1x + c_2x^2 + \cdots + c_{n-1}x^{n-1} + x^n$$El término de mayor grado está normalizado para que su coeficiente sea la unidad. Ahora formamos la siguiente matriz, conocida como «matriz de Frobenius»:

$$F=\pmatrix{0&0&0&\cdots&0&-c_0\cr
1&0&0&\cdots&0&-c_1\cr
0&1&0&\cdots&0&-c_2\cr
\vdots&\vdots&\vdots&\ddots&\vdots&\vdots\cr
0&0&0&\cdots&1&-c_{n-1}}$$Nos planteamos entonces encontrar los valores propios de $F$, que deben cumplir esta igualdad:

$$F\vec{v} = \lambda\vec{v}$$donde $\vec{v}$ es uno de los vectores propios. Si reordenamos los términos, nos encontramos con esto:

$$(F-\lambda I)\vec{v}=0$$donde $I$ es la matriz identidad. Para que esta igualdad se cumpla, el determinante de $(F-\lambda I)$ debe ser igual a cero. Y resulta que el determinante de $(F-\lambda I)$ es, precisamente, la ecuación original. Qué listo era Frobenius.

AUSTRA tiene un método muy eficiente para calcular valores propios, incluso en casos como estos, en los que la matriz no es simétrica. Por lo tanto, para resolver un polinomio primero lo normalizamos, luego creamos su matriz de Frobenius, y finalmente calculamos sus valores propios. La función global polySolve es la que se encarga de la implementación, en el lenguaje funcional de AUSTRA. En la aplicación de consola, podemos teclear lo siguiente:

> set v = [5, 4, 3, 2, 1]
ans ∊ ℝ(5)
5  4  3  2  1
> polysolve(v)
ans ∊ ℂ(4)
<0,137832; 0,678154>   <-0,537832; 0,358285>
<0,137832; -0,678154>  <-0,537832; -0,358285>

polySolve puede recibir tanto un vector con los coeficientes, como los coeficientes sueltos. En este caso, estamos resolviendo la ecuación de cuarto grado $5x^4+4x^3+3x^2+2x+1=0$, y el resultado son cuatro números complejos, conjugados a pares.

¿Quiere comprobar que las raíces son realmente soluciones de la ecuación? Hagamos esto entonces:

> polysolve(v).map(c => polyeval(c, v))
ans ∊ ℂ(4)
<-1,33227E-15; -7,77156E-16>   <-1,33227E-15; 4,44089E-16>
 <-1,33227E-15; 7,77156E-16>  <-1,33227E-15; -4,44089E-16>

polyEval sirve para evaluar un polinomio para un argumento complejo o real, y el método map crea un nuevo vector complejo calculando sus entradas con una función lambda, al estilo del método Select de LINQ. Incluso tenemos una función poliDerivative que, con los mismos argumentos que polyEval, evalúa la derivada del polinomio que le pasamos en la coordenada que le digamos. Esto, a su vez, es muy conveniente para buscar raíces reales con el método de Newton-Raphson… que también ofrece AUSTRA (función solve, a secas).

¿Librería o lenguaje?

Por supuesto, todo esto sería igual de fácil, eficiente y elegante, o quizás un poco más, si simplemente enchufásemos el package Austra.Library a un proyecto en .NET Core y utilizásemos directamente las clases. Pero he querido mostrar este ejemplo en el lenguaje de fórmulas de AUSTRA como demostración de un caso de uso importante para el lenguaje: es una forma rápida y sencilla de poner a prueba la funcionalidad de la librería.

Y hay más casos de uso, que explicaré más adelante.

Categorías
C#

El Gran Secreto de los Complejos

Al grano: el Gran Secreto de los Números Complejos es que, si quieres utilizar instrucciones AVX para acelerar los cálculos, la mejor forma de representarlos no es la que todos imaginamos: la parte real y, a continuación, la parte imaginaria.

Partamos de una regla básica de las instrucciones vectoriales:

  • Es mejor manejar estructuras de arrays que arrays de estructuras.

Observe que ésta es una píldora difícil de tragar en la Programación Orientada a Objetos. Complex es una clase que ya está (bien) definida en System.Numerics, pero para simplificar la explicación, voy a fingir que la definimos nosotros. Con la POO en mente, comenzaríamos definiendo la estructura, junto con sus métodos, y la haríamos probablemente implementar algunas interfaces, por completitud, en este plan:

public readonly struct Complex
{
    public double Real { get; }
    public double Imaginary { get; }

    public Complex(double re, double im) =>
        (Real, Imaginary) = (re, im);

    // Y así, sucesivamente…
}

Si quisiéramos entonces un vector de números complejos, haríamos algo parecido a esto:

public readonly struct ComplexVector
{
    private readonly Complex[] values;

    public unsafe ComplexVector(Complex[] values) =>
        this.values = values;

    // Y así, sucesivamente…
}

Pues bien, ahora llego yo (o la sacrosanta Realidad, si lo prefiere hacer menos personal) y le digo que la mejor forma de programar un vector de complejos, al menos si queremos acelerarlo con AVX, es la siguiente:

public readonly struct ComplexVector
{
    private readonly double[] re;
    private readonly double[] im;

    // Omito verificaciones de igual longitud
    // para simplificar el ejemplo.
    public ComplexVector(double[] re, double[] im) =>
        (this.re, this.im) = (re, im);

    public unsafe ComplexVector(Complex[] values)
    {
        this.re = new double[values.Length];
        this.im = new double[values.Length];
        fixed (double* p = re, q = im)
            for (int i = 0; i < values.Length; i++)
                (p[i], q[i]) = values[i];
    }

    // Y así, sucesivamente…
}

Podemos dejar la estructura Complex original: nos es útil. Pero al representar la lista de complejos, es mejor que cada campo vaya en su propia lista. Es cierto también que deberíamos utilizar AVX para convertir un array tradicional de complejos en un vector: existe la posibilidad, pero no lo voy a mostrar aquí, para simplificar. He omitido también un método de extensión que he añadido en una clase estática para poder "deconstruir" fácilmente un complejo en sus componentes. No tiene mucha trascendencia, pero ahí va, para que no haya tantos espacios en blanco:

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Deconstruct(
    this Complex cmp, out double re, out double im) =>
    (re, im) = (cmp.Real, cmp.Imaginary);

¿Cómo se da cuenta uno?

¿Cómo se da cuenta uno de estas cosas? StackOverflow está lleno de consejos de este tipo, escritos por personas que ya lo han sufrido en sus carnes. Pero a uno no se le enciende la bombilla hasta que se pega con su propia pared. En este caso, fue intentando acelerar una Transformada Discreta de Fourier para AUSTRA. El código sin acelerar usaba los complejos como se han usado casi toda la vida: cada real con su parte imaginaria. A priori, cuando uno no conoce bien AVX, se imagina que será relativamente sencillo manejar dos números complejos dentro de un vector de cuatro valores reales, y que el conjunto de instrucciones va a estar de tu parte. Craso error.

Al final, tuve que limitarme a acelerar algunas partes que, oportunistamente, "se dejaban", casi siempre con código SSE y vectores de sólo 128 bits. Lo normal, cuando he acelerado otros algoritmos, ha sido reducir los tiempos de ejecuciones de cuatro hasta incluso ocho o diez veces. En este caso, la mejora sólo ha sido la mitad, en la mayoría de los casos, y en algunos, la tercera parte. Como resultado, tengo pendiente replantearme todo el asunto de la Transformada Discreta de Fourier, pero utilizando listas paralelas para los reales y los imaginarios.

Quiero que vea, de todas maneras, lo sencillo que es usar la técnica de estructura de listas en vez de listas de estructuras. El siguiente método es el producto escalar de dos vectores complejos, representados como Dios manda:

public static unsafe Complex operator *(
    ComplexVector v1, ComplexVector v2)
{
    if (v1.Length != v2.Length)
        throw new VectorLengthException();
    fixed (double* pr = v1.re, pi = v1.im,
                   qr = v2.re, qi = v2.im)
    {
        double sumRe = 0, sumIm = 0;
        int i = 0, size = v1.Length;
        if (Avx.IsSupported)
        {
            Vector256<double> accRe = Vector256<double>.Zero;
            Vector256<double> accIm = Vector256<double>.Zero;
            for (int top = size & ~3; i < top; i += 4)
            {
                var vpr = Avx.LoadVector256(pr + i);
                var vpi = Avx.LoadVector256(pi + i);
                var vqr = Avx.LoadVector256(qr + i);
                var vqi = Avx.LoadVector256(qi + i);
                accRe = Avx.Add(accRe,
                    Avx.Multiply(vpr, vqr)
                       .MultiplyAdd(vpi, vqi));
                accIm = Avx.Add(accIm,
                    Avx.Multiply(vpi, vqr)
                       .MultiplySub(vpr, vqi));
            }
            sumRe = accRe.Sum();
            sumIm = accIm.Sum();
        }
        for (; i < size; i++)
        {
            sumRe += pr[i] * qr[i] + pi[i] * qi[i];
            sumIm += pi[i] * qr[i] - pr[i] * qi[i];
        }
        return new(sumRe, sumIm);
    }
}

Si no lo recuerda del álgebra, cuando se trata de vectores complejos, el producto escalar usa la conjugada del segundo operando. Esto es: la parte imaginaria del segundo operando invierte su signo. Esto es lo que permite que el producto escalar de un vector consigo mismo sea un valor real.

El código vectorial tiene una correspondencia uno a uno con el código escalar que maneja la parte de los arrays que puede sobrar al final. Si repasa la entrada anterior, la del algoritmo de Welford, notará el mismo patrón: a pesar de que el algoritmo escalar es bastante oscuro, es relativamente sencillo convertir esa parte en código vectorial. La parte más complicada de la entrada pasada era cuando había que mezclar los cuatro acumuladores individuales. Es el mismo problema que hemos visto ya unas cuantas veces cuando calculamos un producto escalar: acumular es sencillo. Lo complicado es sumar después los cuatro acumuladores.

El Misterioso Constructor Postergado

Para no dejar demasiados cabos sueltos, aquí tiene una posible implementación del código que reparte partes reales e imaginarias a sus respectivos arrays. Mi primer impulso fue utilizar Avx2.GatherVector: leer cuatro partes reales saltándome las imaginarias, y luego leer cuatro partes imaginarias. Pero, por desgracia, el tiempo de ejecución del constructor se disparó al doble. No hay forma humana de predecir estas cosas, que no sea prueba, benchmark y error.

La versión que funciona, y que reduce casi a la mitad el tiempo de la versión de más arriba, lee cuatro complejos en dos vectores de 256 bits. Lo primero que hace es usar Avx.Shuffle para "barajar las cartas" y juntar todas las partes reales en un mismo vector de 256 bits, y las imaginarias en otro. No me pida que calcule estas cosas de memorias. Cuando me tocan estos marrones, tengo todavía que pillar un cuaderno y lápiz, e irme a páginas como ésta, para repasar los diagramas. He visto también que el Shuffle se puede sustituir también por llamadas a UnpackHigh/UnpackLoad. Es probable que estas llamadas den mejores tiempos, pero no me ha dado tiempo a hacer la prueba.

El problema de Shuffle (y de las alternativas mencionadas) es que te deja los números en el orden [1, 3, 2, 4]. Si no es importante respetar el orden, se pueden quedar así. Pero si hay que reordenar los elementos, hay que usar Avx2.Permute4x64 para ello. En general, AVX intenta, dentro de lo posible, no pasar valores de una mitad a la otra mitad del vector. Hay que usar cosas introducidas en AVX2 para conseguirlo. Por ese motivo, el constructor verifica si Avx2.IsSupported antes de lanzarse al río:

public unsafe ComplexVector(Complex[] values)
    : this(values.Length)
{
    fixed (double* p = re, q = im)
    fixed (Complex* r = values)
    {
        int i = 0;
        if (Avx2.IsSupported)
        {
            for (int top = values.Length & ~7; i < top; i += 4)
            {
                var v1 = Avx.LoadVector256((double*)(r+i));
                var v2 = Avx.LoadVector256((double*)(r+i+2));
                Avx.Store(p + i, Avx2.Permute4x64(
                    Avx.Shuffle(v1, v2, 0b0000), 0b11011000));
                Avx.Store(q + i, Avx2.Permute4x64(
                    Avx.Shuffle(v1, v2, 0b1111), 0b11011000));
            }
        }
        for (; i < values.Length; i++)
            (p[i], q[i]) = r[i];
    }
}

Números: en mi máquina, un i9-11900K, crear un ComplexVector directamente a partir de un array de 1024 complejos, tardaba más o menos un milisegundo. Con las mejoras AVX2, tarda 650 microsegundos. Casi la mitad. Y lo mejor, para mi gusto, es que no he tenido que usar paralelismo con tareas. El usuario de la librería ya usará ese paralelismo cuando lo considere necesario, y tendrá las manos más libres.

Como regalo, le dejo la conversión inversa: de vector complejo a array de complejos:

public unsafe static explicit
    operator Complex[](ComplexVector v)
{
    Complex[] result = new Complex[v.Length];
    fixed (double* p = v.re, q = v.im)
    fixed (Complex* r = result)
    {
        int i = 0;
        if (Avx2.IsSupported)
        {
            for (int top = v.Length & ~3; i < top; i += 4)
            {
                var vr = Avx.LoadVector256(p + i);
                var vi = Avx.LoadVector256(q + i);
                Avx.Store((double*)(r + i),
                    Avx2.Permute4x64(Avx.Permute2x128(
                    vr, vi, 0b0010_0000), 0b11_01_10_00));
                Avx.Store((double*)(r + i + 2),
                    Avx2.Permute4x64(Avx.Permute2x128(
                    vr, vi, 0b0011_0001), 0b11_01_10_00));
                }
            }
        for (; i < result.Length; i++)
            r[i] = new(p[i], q[i]);
    }
    return result;
}

El tiempo de ejecución se reduce "sólo" a las tres cuartas partes, pero yo creo que merece la pena.

Categorías
C#

El algoritmo de Welford

En la entrada sobre la varianza, vimos que podíamos tener problemas de estabilidad numérica si intentábamos calcular la varianza en una sola pasada sobre los datos, usando inocentemente la definición matemática. La solución de entonces fue usar un algoritmo de dos pasos: calcular la media en el primer paso, y en el segundo, calcular la varianza de la muestra menos la media. Había otra posibilidad: usar el primer valor de la secuencia como estimado malo de la media, y restar ese valor a las sucesivas muestras.

¿Podríamos hacer algo mejor si corrigiésemos el estimado de la media sobre la marcha? Resulta que se puede, y el primero en darse cuenta fue Welford, allá por el 1962. Donald Knuth incluyó el algoritmo en el segundo tomo de The Art of Computer Programming. El algoritmo original sólo calculaba la media y la varianza sobre la marcha, pero Timothy Terriberry, en 2007, lo amplió para que calculase momentos superiores. Este algoritmo está implementado, por ejemplo, en Math.Net Numerics, aunque la implementación es mejorable.

¿Qué nos da?

En AUSTRA, la clase que implementa este algoritmo se llama Accumulator. Hay también una clase simplificada, SimpleAccumulator, que sólo calcula los dos primeros momentos, con el beneficio evidente de tener que ejecutar menos trabajo.

La definición de Accumulator, junto con su constructor principal y los campos y propiedades de almacenamiento, es la siguiente:

/// Calculates statistics by adding samples.
public sealed class Accumulator
{
    ///Minimum value.
    private double min = double.PositiveInfinity;
    ///Maximum value.
    private double max = double.NegativeInfinity;
    ///Estimated mean.
    private double m1;
    ///Accumulated second moment.
    private double m2;
    ///Accumulated third moment.
    private double m3;
    ///Accumulated fourth moment.
    private double m4;

    ///Gets the total number of samples.
    public long Count { get; private set; }

    ///Creates an empty accumulator.
    public Accumulator() { }

    /* … */
}

La información que nos interesa se obtiene a través de estos campos, por medio de propiedades calculadas:

///Returns the minimum value.
public double Minimum => Count > 0 ? min : double.NaN;
///Returns the maximum value.
public double Maximum => Count > 0 ? max : double.NaN;
///Gets the sample mean.
public double Mean => Count > 0 ? m1 : double.NaN;
///Gets the unbiased variance.
public double Variance =>
    Count < 2 ? double.NaN : m2 / (Count - 1);
///Gets the unbiased standard deviation.
public double StandardDeviation =>
    Count < 2 ? double.NaN : Sqrt(m2 / (Count - 1));
///Gets the unbiased population skewness.
public double Skewness =>
    Count < 3
    ? double.NaN
    : Count * m3 * Sqrt(m2 / (Count - 1))
        / (m2 * m2 * (Count - 2)) * (Count - 1);
///Gets the unbiased population kurtosis.
public double Kurtosis =>
    Count < 4
    ? double.NaN
    : ((double)Count * Count - 1)
        / ((Count - 2) * (Count - 3))
        * (Count * m4 / (m2 * m2) - 3 + 6.0 / (Count + 1));

He omitido, por brevedad, el cálculo de propiedades como PopulationVariance, PopulationSkewness y demás. De todas maneras, están disponibles en el código de Austra, y en el propio código de Math.NET Numerics.

¿Qué le damos?

Para alimentar al acumulador, hay que pasarle las muestras, al menos en principio, de una en una, por medio de un método que hemos nombrado Add:

/// Adds a sample to this accumulator.
/// The new sample.
public void Add(double sample)
{
    ++Count;
    double d = sample - m1, s = d / Count;
    double t = d * s * (Count - 1);
    m1 += s;
    m4 += (t * s * (Count * (Count - 3 + 3)
        + 6 * s * m2 - 4 * m3) * s;
    m3 += (t * (Count - 2) - 3 * m2) * s;
    m2 += t;
    if (sample < min) min = sample;
    if (sample > max) max = sample;
}

Hay algunas pequeñas mejoras en el código anterior, respecto al original. Hay algunas multiplicaciones menos, y está todo preparado por si quisiéramos usar alguna instrucción de fusión de multiplicación y suma. No las he usado porque tengo algunas dudas sobre la eficiencia en .NET Core. Es cierto que siempre tienes la ventaja de la mayor exactitud, pero ya veremos dónde sí se usan (en pocas palabras: donde realmente importan).

Combinando acumuladores

Lo mejor de todo es que podemos combinar los valores en dos acumuladores independientes y generar un acumulador de los datos conjuntos. Esto nos permitiría, por ejemplo, dividir una muestra grande en cuatro partes, calcular cuatro acumuladores en paralelo, y luego mezclarlos en el resultado final.

public static Accumulator operator +(
    Accumulator a1, Accumulator a2)
{
    if (a1.Count == 0) return a2;
    if (a2.Count == 0) return a1;

    long n = a1.Count + a2.Count, n2 = n * n;
    double d = a2.m1 - a1.m1, d2 = d * d;
    double d3 = d2 * d, d4 = d2 * d2;
    double m1 = (a1.Count * a1.m1 + a2.Count * a2.m1) / n;
    double m2 = a1.m2 + a2.m2 + d2 * a1.Count * a2.Count / n;
    double m3 = a1.m3 + a2.m3
        + d3 * a1.Count * a2.Count * (a1.Count - a2.Count) / n2
        + 3 * d * (a1.Count * a2.m2 - a2.Count * a1.m2) / n;
    double m4 = a1.m4 + a2.m4 + d4 * a1.Count * a2.Count
            * (a1.Count * (a1.Count - a2.Count)
                + a2.Count * a2.Count) / (n2 * n)
        + 6 * d2 * (a1.Count * a1.Count * a2.m2
            + a2.Count * a2.Count * a1.m2) / n2
        + 4 * d * (a1.Count * a2.m3 - a2.Count * a1.m3) / n;
    return new() {
        Count = n,
        m1 = m1, m2 = m2, m3 = m3, m4 = m4,
        min = Min(a1.min, a2.min),
        max = Max(a1.max, a2.max),
    };
}

El código es complicado, y es fácil equivocarse copiando y pegando (ya me ha pasado). De todas maneras, es una clase que es fácil de testear.

El primer impulso es dejarlo aquí, y confiar en el paralelismo con tareas para cuando queremos acelerar el código. Mi problema con esto es que, cuando escribo código para una librería, prefiero realizar la aceleración básica con código vectorial (AVX o lo que esté disponible). ¿Por qué? Pues porque por experiencia, el programador que usa luego la biblioteca prefiere tener la opción del paralelismo por tareas para su propio código. Es cierto que las tareas se combinan más o menos bien en .NET, gracias al thread-pool que obtienes del entorno de ejecución, sin esforzarte demasiado; en Java, todo es más complicado con sus malditos executors.

Drum roll, please...

Prefiero, por lo tanto, ganar todo lo que pueda en paralelismo en una librería a golpe de instrucciones SIMD. Y esto es precisamente lo que hacemos en Accumulator con el siguiente método y algunos más que lo llaman:

public unsafe void Add(double* samples, int size)
{
    int i = 0;
    if (Avx.IsSupported && size >= 16)
    {
        var vMin = Vector256.Create(double.PositiveInfinity);
        var vMax = Vector256.Create(double.NegativeInfinity);
        var vM1 = Vector256<double>.Zero;
        var vM2 = Vector256<double>.Zero;
        var vM3 = Vector256<double>.Zero;
        var vM4 = Vector256<double>.Zero;
        var v3 = Vector256.Create(3.0);
        var v4 = Vector256.Create(4.0);
        var v6 = Vector256.Create(6.0);
        long c = 0;
        for (int top = size & CommonMatrix.AVX_MASK;
             i < top; i += 4)
        {
            c++;
            var vSample = Avx.LoadVector256(samples + i);
            vMin = Avx.Min(vMin, vSample);
            vMax = Avx.Max(vMax, vSample);
            var vd = Avx.Subtract(vSample, vM1);
            var vs = Avx.Divide(vd,
                Vector256.Create((double)c));
            var vt = Avx.Multiply(Avx.Multiply(vd, vs),
                Vector256.Create((double)(c - 1)));
            vM1 = Avx.Add(vM1, vs);
            var t1 = Avx.Multiply(Avx.Multiply(vt, vs),
                Vector256.Create((double)(c * (c - 3) + 3)));
            var t2 = Avx.Multiply(Avx.Multiply(vs, vM2), v6);
            var t3 = Avx.Multiply(v4, vM3);
            vM4 = vM4.MultiplyAdd(Avx.Subtract(
                Avx.Add(t1, t2), t3), vs);
            t1 = Avx.Multiply(vt,
                Vector256.Create((double)(c - 2)));
            t2 = Avx.Multiply(vM2, v3);
            vM3 = vM3.MultiplyAdd(Avx.Subtract(t1, t2), vs);
            vM2 = Avx.Add(vM2, vt);
        }
        var acc01 = Mix(c,
            vM1.ToScalar(), vM2.ToScalar(),
            vM3.ToScalar(), vM4.ToScalar(),
            vM1.GetElement(1), vM2.GetElement(1),
            vM3.GetElement(1), vM4.GetElement(1));
        var acc23 = Mix(c,
            vM1.GetElement(2), vM2.GetElement(2),
            vM3.GetElement(2), vM4.GetElement(2),
            vM1.GetElement(3), vM2.GetElement(3),
            vM3.GetElement(3), vM4.GetElement(3));
        var a = Mix(c + c,
            acc01.m1, acc01.m2, acc01.m3, acc01.m4,
            acc23.m1, acc23.m2, acc23.m3, acc23.m4);
        if (Count == 0)
            (Count, m1, m2, m3, m4)
                = (4 * c, a.m1, a.m2, a.m3, a.m4);
        else
        {
            long acCnt = 4 * c, n = Count + acCnt, n2 = n * n;
            double d = a.m1 - m1, d2 = d * d;
            double d3 = d2 * d, d4 = d2 * d2;

            double nm1 = (Count * m1 + acCnt * a.m1) / n;
            double nm2 = m2 + a.m2 + d2 * Count * acCnt / n;
            double nm3 = m3 + a.m3
                + d3 * Count * acCnt * (Count - acCnt) / n2
                + 3 * d * (Count * a.m2 - acCnt * m2) / n;
            m4 += a.m4 + d4 * Count * acCnt
                    * (Count * (Count - acCnt) 
                        + acCnt * acCnt) / (n2 * n)
                + 6 * d2 * (Count * Count * a.m2
                    + acCnt * acCnt * m2) / n2
                + 4 * d * (Count * a.m3 - acCnt * m3) / n;
            (m1, m2, m3, Count) = (nm1, nm2, nm3, n);
        }
        min = Min(min, vMin.Min());
        max = Max(max, vMax.Max());
    }
    for (; i < size; ++i)
        Add(samples[i]);

    static (double m1, double m2, double m3, double m4) Mix(
        long c,
        double a1, double a2, double a3, double a4,
        double b1, double b2, double b3, double b4)
    {
        long n = c + c, n2 = n * n;
        double d = b1 - a1, d2 = d * d, d4 = d2 * d2;
        return (
            (a1 + b1) / 2,
            a2 + b2 + d2 * c / 2,
            a3 + b3 + 3 * d * (b2 - a2) / 2,
            a4 + b4 + d4 * c / 8 + 3 * d2 * (b2 + a2) / 2
               + 2 * d * (b3 - a3));
    }
}

Observe que la precondición para aprovechar las instrucciones vectoriales es tener todo un array de muestras a nuestra disposición. Si nos diesen un IEnumerable<double>, tendríamos que hacer maniobras como materializar las muestras en grupos de cuatro, en un array, y alimentar así al animalito vectorial.

El código es relativamente sencillo, si miramos con atención. La parte AVX prácticamente repite el código del Add escalar. Por cada campo de Accumulator hay un vector de doble precisión. La excepción es la propiedad Count, y la tratamos diferente porque para los cuatro acumuladores virtuales que maneja el método, la cantidad de muestras es siempre la misma.

Esto es una ventaja cuando tenemos que mezclar los resultados de los cuatro acumuladores. La función interna estática Mix aprovecha la igualdad de los contadores para simplificar algebraicamente algunas fórmula. Observe, por ejemplo, que la fórmula para el m3 combinado es más sencilla, al anularse uno de los términos.

Una vez que hemos mezclado los cuatro acumuladores parciales, mezclamos el resultado, a su vez, con los valores que pueda haber ya en el propio acumulador (si los hubiera). Aquí no podemos simplificar tanto, porque los contadores nuevos y antiguos pueden ser muy diferentes, aunque en el caso en el que el acumulador inicial no tuviese muestras, es todo más simple.

Si quiere hacerse una idea de cuánto mejora este tipo de procesamiento vectorial, los benchmarks que he ejecutado me dan casi cinco veces más velocidad. Es extraño, porque yo esperaría una mejora de 4x, pero puede deberse a que aquí hacemos uso de las instrucciones FMA vectoriales, cuando están disponibles. Las instrucciones FMA están escondidas en los métodos de extensión MultiplyAdd que presenté en esta entrada.

Por cierto, la niña de la imagen de la entrada tiene poco que ver con el algoritmo, pero estoy usando imágenes generadas por AI, entre otros motivos, para evitar problemas de derechos de autor. En este caso, le pedí a la AI que generase una niña perdida e indefensa en un universo digital simulado. En parte, la AI me hizo caso; en parte, ignoró la petición. Pero el resultado me gusta, y ahí lo tiene.

Categorías
C#

La la, how their life goes on

Si insisto tanto en estos temas de instrucciones vectoriales, es parar preparar a mis lectores potenciales para que usen estas técnicas cuando lo crean necesario. Naturalmente, las primeras explicaciones van a ser del tipo heroico: todo hay que hacerlo a mano, razonando desde los primeros principios. Pero, como decía Molly Jones, la mujer de Desmond Jones, la la, how their life goes on: la vida sigue su curso, y lo normal en la vida de un programador es usar las técnicas que todos conocemos y apreciamos para ahorrarnos esfuerzo.

Lo primero para que todos nos ahorremos trabajo, por supuesto, consiste en usar estas cosas a través de una librería bien probada. Ya existen unas cuantas de estas, pero estoy creando Austra porque hay un nicho muy concreto, y porque la librería tiene méritos propios. La idea es hacer de Austra un proyecto open source, por supuesto. En este momento está en GitHub, pero no es por ahora un repositorio público porque la aplicación de «pruebas» está basada en WPF y DevExpress. O me compro personalmente una licencia comercial, o cambio los controles por algo gratuito. No es sencillo. Hay cosas como Avalonia o Uno Platform, que además, son multiplataformas, pero son bastante pobres en componentes. Y no quiero ni oír hablar de .NET MAUI: es un fracaso, de momento. Antes que usar .NET MAUI, prefiero «degradar» la aplicación a Windows Forms (tengo un editor de código bastante bueno) y currarme los gráficos y las cosas que falten. Pero a eso vamos: a que Austra esté disponible gratuitamente como open source, que cualquier que quiera pueda colaborar, que haya un conjunto de tests y benchmarks exhaustivo, etc, etc.

Pero no se escribe un post para explicar lo obvio. Mi verdadero objetivo ahora es explicar un par de técnicas muy tontas que bajan el listón del heroísmo al escribir este tipo de código, y que todos conocemos, pero que nos puede dar miedo usar a primeras, porque no sabes cómo va a interferir en la generación de código de .NET 7 y .NET 8 (que tiene sus cagadas, como todo).

Primer ejemplo: resulta que mucho de estos algoritmos algebraicos calculan el producto escalar de dos zonas de memoria en muchos casos. Hay una clase Vector con su correspondiente producto escalar, e incluso un ComplexVector, pero es mejor tener una rutina que maneje directamente zonas de memoria con un tamaño predeterminado. Austra tiene una clase estática CommonMatrix que precisamente implementa estas cosas. Ésta es la última versión del método en cuestión:

/// <summary>
/// Computes the dot product of two double arrays.
/// </summary>
/// <param name="p">Pointer to the first array.</param>
/// <param name="q">Pointer to the second array.</param>
/// <param name="size">Number of items in each array.</param>
/// <returns>A sum of products.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public unsafe static double DotProduct(
    double* p, double* q, int size)
{
    double sum;
    int i = 0;
    if (Avx.IsSupported)
    {
        Vector256<double> acc = Vector256<double>.Zero;
        for (int top = size & AVX_MASK; i < top; i += 4)
            acc = acc.MultiplyAdd(p + i, q + i);
        sum = acc.Sum();
    }
    else
        sum = 0;
    for (; i < size; i++)
        sum += p[i] * q[i];
    return sum;
}

A primera vista, lo único raro aquí es el atributo que fuerza un inlining agresivo. Pero hay un par de cosillas más que no son métodos habituales de AVX. Los muestro aparte aquí, porque son métodos de extensión de la misma clase CommonMatrix:

/// <summary>Sums all the elements in a vector.</summary>
/// <param name="v">
/// A intrinsics vector with four doubles.
/// </param>
/// <returns>The total value.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static double Sum(this Vector256<double> v)
{ 
    v = Avx.HorizontalAdd(v, v);
    return v.ToScalar() + v.GetElement(2);
}

Resulta que sumar los cuatro elementos de un vector de cuatro dobles no es moco de pavo. Hay teorías que pululan por la Internet sobre cuál es la forma eficiente de lograrlo. Yo no estoy seguro aún de que la mía sea la más eficiente, pero gracias al encapsulamiento en un método separado, si descubro algo mejor, tendré que tocar el código en un único lugar. Obvio, ¿no? Pero hasta comprobar que el JIT de .NET Core no se volvía loco, no me atreví a dar este paso. Ya he comprobado que no pasan cosas raras.

Es curioso, sin embargo, que el mínimo y el máximo de un vector se calcule más eficientemente con un método como el siguiente:

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static double Max(this Vector256<double> v)
{
    var x = Sse2.Max(v.GetLower(), v.GetUpper());
    return Math.Max(x.ToScalar(), x.GetElement(1));
}

En este caso, no parece que la transición temporal a una instrucción SSE haga mucho daño al código que la rodea. De todas maneras, observe que para la reducción final al escalar hay que ir a por el segundo elemento directamente. Cosas de Intel.

Éste es otro método que usa el código original, omitiendo esta vez los comentarios XML:

internal unsafe static Vector256<double> MultiplyAdd(
    this Vector256<double> summand,
    double* multiplicand,
    double* multiplier) =>
    Fma.IsSupported
        ? Fma.MultiplyAdd(
            Avx.LoadVector256(multiplicand),
            Avx.LoadVector256(multiplier),
            summand)
        : Avx.Add(summand, Avx.Multiply(
            Avx.LoadVector256(multiplicand),
            Avx.LoadVector256(multiplier)));

Hay un par de variantes más que usan vectores directamente en vez de direcciones, y variantes adicionales para MultiplySubtract y MultiplyAddNegated, que sons métodos FMA. Estas aparentes tonterías ahorran líneas y líneas de código. En mi caso, como tengo memoria fotográfica, prefiero que el código fuente sea lo más pequeño posible para poder recordarlo más adelante.

Categorías
C#

Multiplicar por la traspuesta

Una operación muy frecuente con matrices consiste en multiplicar una matriz por la traspuesta de otra. La implementación más sencilla de esto sería trasponer la segunda matriz y luego multiplicar la primera matriz por la traspuesta. Lamentablemente, trasponer una matriz es poco eficiente. No hay un patrón sencillo de acceso a la caché que sea bueno al mismo tiempo: hay que descomponer la matriz en trozos pequeños… en pocas palabras, un lío. Y resulta que hay una forma más sencilla de conseguirlo: invertir la posición de los índices al acceder a la segunda matriz. El código «naïve» para la multiplicación directa de dos matrices se resumía en un triple bucle:

for (int i = 0; i < m; i++)
    for (int j = 0; j < p; j++)
    {
        double d = 0;
        for (int k = 0; k < n; k++)
            d += a[i, k] * b[k, j];
        result[i, j] = d;
    }

En la entrada sobre multiplicación de matrices, invertíamos el orden de los dos bucles interiores para mejorar el rendimiento de la caché de memoria. Además, añadíamos punteros e instrucciones AVX. Para la nueva rutina, lo que invertimos son los índices de la segunda matrix y, en consecuencia, renunciamos al truco de intercambiar los bucles internos. Perdemos algo de velocidad pero, en cualquier caso, el resultado es mejor que trasponer explícitamente y luego multiplicar.

La rutina que finalmente está incluida en Austra es la siguiente:

public unsafe Matrix MultiplyTranspose(Matrix m)
{
    Contract.Requires(IsInitialized);
    Contract.Requires(m.IsInitialized);
    if (Cols != m.Cols)
        throw new MatrixSizeException();
    Contract.Ensures(Contract.Result<Matrix>().Rows == Rows);
    Contract.Ensures(Contract.Result<Matrix>().Cols == m.Rows);

    int r = Rows;
    int n = Cols;
    int c = m.Rows;
    double[,] result = new double[r, c];
    fixed (double* pA = values, pB = m.values, pC = result)
    {
        int lastBlockIndex = n & CommonMatrix.AVX_MASK;
        double* pAi = pA;
        double* pCi = pC;
        for (int i = 0; i < r; i++)
        {
            double* pBj = pB;
            for (int j = 0; j < c; j++)
            {
                int k = 0;
                double acc = 0;
                if (Avx.IsSupported)
                {
                    Vector256<double> sum = Vector256<double>.Zero;
                    for (; k < lastBlockIndex; k += 4)
                        sum = Avx.Add(
                            left: Avx.Multiply(
                                left: Avx.LoadVector256(pAi + k),
                                right: Avx.LoadVector256(pBj + k)),
                            right: sum);
                    sum = Avx.HorizontalAdd(sum, sum);
                    acc = sum.ToScalar() + sum.GetElement(2);
                }
                for (; k < n; k++)
                    acc += pAi[k] * pBj[k];
                pCi[j] = acc;
                pBj += n;
            }
            pAi += n;
            pCi += c;
        }
    }
    return result;
}

Una parte importante del truco es hacer que el lenguaje que implementa Austra reconozca la multiplicación por la traspuesta y sepa usar la operación más eficiente. Pero eso es fácil de conseguir, y lo explicaré en otro momento.

Categorías
FinTech

Austra

Otro pequeño interludio: si os interesan estas cosas, echad un vistazo a la ayuda online de Austra. Es un lenguaje de expresiones orientado al manejo de series financieras y modelos econométricos. Está todavía en desarrollo, pero ya va tomando forma. Entre otras cosas interesantes, permite usar funciones lambda, y la librería, Austra.Library, utiliza AVX2 para optimizar las operaciones sobre matrices, vectores y series temporales. Hasta 200/300 dimensiones, es generalmente más eficiente que las alternativas, incluso las que usan Intel MKL. A partir de ese punto, naturalmente, Intel MKL es más eficiente. En cualquier caso, tengo la posibilidad de mejorar la implementación para ese volumen de información. Y como última alternativa, puedo incorporar Intel MKL como una opción más; opcional, por supuesto, porque esta librería tiene licencias de pago.

Mi idea es, en el momento en que esté suficientemente maduro, convertir parte del código en open source. Aparte de añadir más algoritmos a la librería básica, quiero dar alternativas al analizador del lenguaje. Ahora mismo, el lenguaje genera árboles de expresiones, que a su vez generan código nativo sobre la marcha. Para una línea de comandos, sin embargo, puede que esto sea excesivo. No voy a destruir el compilador actual, pero quiero subir, antes de abrir el código, un intérprete más modesto que, al mismo tiempo, seguramente será más eficiente para el propósito de la herramienta.

Categorías
C# FinTech

Cholesky

Esta va a ser una entrada «utilitaria»: poca explicación, pero aportando código fuente, por si alguien lo necesita. ¿Recuerda la entrada en la que presenté la descomposición de Cholesky? En aquel momento, no incluí el algoritmo de la descomposición, porque quería experimentar un poco con la implementación. Ya lo he hecho, y estoy más o menos satisfecho con el resultado.

Antes de ver el código, le explico el contexto. Este es un método de una estructura Matrix, que encapsula solamente una matriz bidimensional de valores flotantes (campo values). Dentro de la estructura se definen todos los operadores y métodos que podéis imaginar. En paralelo, defino otra estructura, LowerMatrix, para representar matrices triangulares inferiores. No hay relaciones de herencia, al tratarse de estructura, pero la clase de matrices triangulares permite definir métodos y operadores más eficientes. Es importante tener en cuenta que LowerMatrix gasta exactamente la misma memoria que Matrix. La ventaja está en esos métodos que evitan procesar las dos mitades de la matriz.

También hace falta saber que he definido operadores de conversión implícitos que transforman un array bidimensional en uno u otro tipo de matrices. Este operador es el que me permite evitar un constructor explícito para devolver un valor en el método:

public unsafe LowerMatrix Cholesky()
{
    int n = Rows;
    double[,] dest = new double[n, n];
    double[,] src = values;
    double* tmp = stackalloc double[n + n];

    // First column is special.
    double ajj = src[0, 0];
    if (ajj <= 0)
    {
        dest[0, 0] = double.NaN;
        return dest;
    }
    dest[0, 0] = ajj = Math.Sqrt(ajj);
    double r = 1 / ajj;
    n--;
    for (int i = 1; i <= n; i++)
        dest[i, 0] = src[i, 0] * r;
    for (int j = 1; j <= n; j++)
    {
        // Compute the diagonal cell.
        double v = 0.0;
        for (int i = 0; i < j; i++)
        {
            double a = dest[j, i];
            v += a * a;
        }
        ajj = src[j, j] - v;
        if (ajj <= 0)
        {
            dest[j, j] = double.NaN;
            return dest;
        }
        dest[j, j] = ajj = Math.Sqrt(ajj);

        // Compute the other cells of column J.
        if (j < n)
        {
            r = 1 / ajj;
            for (int i = 0; i < j; i++)
                tmp[i] = dest[j, i];
            for (int i = j; i < n; i++)
            {
                v = 0.0;
                for (int k = 0; k < j; k++)
                    v += dest[i + 1, k] * tmp[k];
                tmp[i] = v;
            }
            for (int i = j; i < n; i++)
                dest[i + 1, j] = (src[i + 1, j] - tmp[i]) * r;
        }
    }
    return dest;
}

Sobre el algoritmo, en sí: el algoritmo de Cholesky puede fallar cuando la matriz de origen no es positiva semidefinida. Mi implementación detecta ese caso al calcular las raíces cuadradas… y se limita a parar, poniendo un NaN en la celda diagonal donde se ha detectado el problema. Esto significa que el método, en la práctica, asume que la matriz es positiva semidefinida. En mi código, tengo un segundo método, TryCholesky, que devuelve un valor lógico para ver si la conversión fue posible, y retorna la matriz transformada como parámetro de salida.

Desde el punto de vista de un programador de C#, el único detalle interesante es el uso de stackalloc para reservar un array de memoria en la pila, en vez de usar memoria dinámica. Esto es lo que obliga a declarar el método con unsafe.

En rendimiento, el método es más rápido que la «versión base» de la librería que he visto que es más rápida usando sólo C# (usando Intel MKL, no hay competencia posible). Me refiero a la versión base porque, para matrices grandes, las librerías serias suelen dividir la matriz en bloques que se pueden procesar en paralelo. Este código, como ve, no usa threads, instrucciones SIMD y sólo utiliza punteros para la caché. En menos palabras: todo es mejorable.

Categorías
FinTech

Varianza

Todo el mundo sabe lo que es una media (me refiero a la media estadística, por supuesto, no a las otras), por lo que no tiene mucho sentido escribir una entrada sobre medias. Conceptualmente, sin embargo, la media es el primer elemento de una serie de valores que caracterizan a las series aleatorias, y que se conocen como momentos. Si nos saltamos el primer momento, aterrizamos en el segundo momento estadístico, que es la varianza o, cuando tenemos una distribución multivariante, la matriz de covarianza.

Definición

Casi todo el mundo tiene también claro cómo se define la varianza, pero refresquémoslo, por si acaso. Supongamos que la media de una serie, o de una variable aleatoria, se representa como $\mathbb{E}[X]$. Entonces, la varianza se puede definir así:
$$
Var(X)=\mathbb{E}[(X-\mathbb{E}[X])^2]
$$Sí, estamos usando la media dentro de otra media. Según esta definición, literalmente, tenemos primero que conocer la media de la variable aleatoria. Entonces, tenemos que ver cuánto se desvía la variable aleatoria de esa media. Esto es, se mide el grado de «desparrame» de la variable. No podemos medir directamente la media de la resta que hemos mencionado, porque los valores negativos se compensarían con los valores positivos, y obtendríamos siempre cero. La manera de evitar esta compensación es elevar la resta al cuadrado.

Si masajeamos un poco la definición, podemos transformarla de esta manera:
$$
Var(X)=\mathbb{E}[X^2] – \mathbb{E}[X]^2
$$La transformación es fácil de realizar, y no la incluyo por brevedad. La nueva fórmula viene a decir que la varianza es la diferencia entre la media del cuadrado y el cuadrado de la media. Enseguida veremos la gran utilidad de esta definición alternativa.

Por cierto, a partir de la varianza se define la desviación estándar, que es simplemente la raíz cuadrada de la varianza. ¿Para qué necesitamos la raíz cuadrada? Principalmente, por las unidades de medida. Si la variable aleatoria X representa euros, la media estará expresada como euros, pero la varianza estará en euros al cuadrado. La desviación estándar, en cambio, vuelve a estar en euros. Probablemente tenga otros usos y beneficios, pero ahora mismo no los recuerdo.

Más definiciones

Si un matemático leyese lo que acabo de escribir, seguramente me pegaría una somanta de palos. Y con razón. He manejado con demasiada alegría los términos «muestra aleatoria» y «variable aleatoria», pero si hubiese sido más riguroso, la introducción habría sido infinita. Ahora aclaro lo que hace falta aclarar desde el punto de vista práctico.

La diferencia que tenemos que conocer es la de «varianza» contra «varianza de una muestra». La definición anterior es válida para una variable aleatoria definida analíticamente, o para fenómenos en los que disponemos de todos los datos. En la vida real, lo que solemos tener es una muestra de una serie, no todos los elementos. En ese caso, lo que podemos hacer pragmáticamente es calcular un «estimado» de la varianza a partir de la muestra.

El ajuste, de todas maneras, es sencillo:
$$
Var_M[X] = Var(X) \cdot N / (N – 1)
$$donde N es el tamaño de la muestra. Este es el motivo por el que Excel tiene dos funciones, VAR y VAR.P, para la varianza. La primera es la varianza de una muestra, y la segunda es la varianza «completa» de una población.

Implementación

Escribo esta entrada, además de para que sirva de referencia a entradas posteriores, porque quiero explicar un pequeño truco de implementación que he visto pasar por alto muchas veces.

Vamos a suponer que tenemos una muestra aleatoria en un array. Para calcular la media, hacemos un bucle y vamos sumando las entradas. Si utilizo la primera definición que hemos dado de la varianza, necesitamos hacer dos pasadas sobre el array. La primera, para calcular la media, y la segunda, para calcular la media de los cuadrados de las diferencias respecto a la media. ¿No sería mejor hacer una sola pasada? Si tenemos los datos en un array, no es tan acuciante, pero si los datos son grandes, o vienen de un enumerador, nos interesa hacerlo todo en una pasada.

Es en estos casos en los que la segunda fórmula equivalente es importante: podemos calcular simultáneamente, en una sola pasada, la media y la media de los cuadrados, y luego restar al segundo valor el cuadrado del primero. Algo así:

double sumX = 0, sumX2 = 0;
int total = 0;
foreach (double v in source)
{
    sumX += v;
    sumX2 += v * v;
    total++;
}
double mean = sumX / total;
double variance = (sumX2 / total - mean * mean);

Si quisiéramos la varianza de una muestra (es decir, si no tuviésemos todos los valores de la población) tendríamos que ajustar la varianza calculada multiplicándola por total y dividiéndola por total - 1.

De todos modos, el problema principal con el código anterior es otro, mucho más sutil. Las variables sumX y sumX2 pueden llegar a tener valores muy altos. Y al restar los dos valores se puede producir una cancelación catastrófica que nos haga perder la precisión del resultado. Ese problema no lo tiene el algoritmo en dos pasadas.

La solución, sin embargo, es muy sencilla. Resulta que:
$$
Var[X] = Var[X – a]
$$Es decir: si le restamos una constante arbitraria a la variable aleatoria, la «dispersión» o varianza de la misma no varía. Lo ideal sería que la constante en cuestión fuese la media de la muestra, pero para eso necesitaríamos dos pasadas. Lo que haremos es llegar a un compromiso: como no podemos tener la media, nos conformaremos con un valor representativo de la muestra. ¿Qué tal si elegimos el primero?

double sumX = 0, sumX2 = 0, mean = 0, first = 0;
bool hasFirst = false;
int total = 0;
foreach (double v in source)
{
    if (!hasFirst)
    {
        hasFirst = true;
        first = v;
    }
    else
    {
        double v1 = v - first;
        sumX += v1;
        sumX2 += v1 * v1;
    }
    mean += v; 
    total++;
}
mean /= total;
double variance = (sumX2 - sumX * sumX / total) / total;

He puesto un else dentro del bucle porque la primera suma no aporta nada a los acumuladores. Por esto, también he tenido que modificar el cálculo de la media al final. Idealmente, el primer elemento de la muestra debería estar lo más cercano posible a la media. Si sabemos que la media va a estar alrededor de cero, además, podemos olvidar esta precaución.

Quizás alguien crea que la instrucción condicional dentro del bucle lo puede ralentizar un poco. No lo he medido para ver si ocurre, pero si eso fuese importante, lo que podríamos hacer es desarrollar el bucle en dos trozos:

var enumerator = source.GetEnumerator();
if (enumerator.MoveNext()
{
    double first = enumerator.Current;
    double sumX = 0, sumX2 = 0, mean = first;
    int total = 1;
    while (enumerator.MoveNext())
    {
        double v = enumerator.Current - first;
        sumX += v;
        sumX2 += v * v;
        mean += enumerator.Current;
        total++;
    }
    mean /= total;
    double variance = (sumX2 - sumX * sumX / total) / total;
}

Me falta un else en el fragmento anterior, para cuando la secuencia está vacía, pero me da pereza ponerlo. También habría que llamar a Dispose().

Concurrencia

Los otros trucos interesantes, que no voy a tratar en esta entrada, tienen que ver con la posibilidad de repartir el cálculo entre varios hilos, cuando el tamaño de la serie lo amerite. No hay grandes complicaciones a la vista, pero hay que tener cuidado al mezclar los valores obtenidos en cada hilo. Si la serie está en un array, es quizás más sencillo, porque se pueden repartir a priori los rangos entre tareas. Lo recomendable, en cualquier caso, es utilizar una de las sobrecargas de Parallel.ForEach que permita el uso de variables de estados. Permítame que me haga el sueco y pase de página en este punto.

Categorías
FinTech

La transformación de Box-Muller

En casi todos los fenómenos aleatorios, ya pertenezcan a la física, la genética o las finanzas, la distribución normal, o de Gauss-Laplace (la de la famosa curva de la campana) juega un papel importante. Sin embargo, .NET no ofrece de serie una clase, o un método, que genere valores aleatorios pertenecientes a esta distribución. Podemos utilizar una librería de terceros, por supuesto. Pero no está de más conocer alternativas, sobre todo para aplicaciones pequeñas o pruebas de concepto, en los que no merezca la pena usar algo más completo.

El problema a resolver es: teniendo como punto de partida un generador de números aleatorios que utilice una distribución uniforme, como la clase Random, ¿cómo podemos transformarlos para obtener la distribución normal? Lo primero es ponernos de acuerdo sobre los parámetros de la distribución normal que generaremos. Hay dos parámetros: la media y la varianza. Pero podemos ceñirnos a una distribución con media igual a cero y varianza igual a uno. Es fácil cambiar de parámetros desplazando y estirando los números que vamos a generar.

¿Cuál es el algoritmo adecuado para transformar una distribución uniforme en una normal? La respuesta es el llamado algoritmo del zigurat, que realiza un muestreo por regiones. El enlace anterior incluye código en C#. Pero existe un algoritmo mucho más sencillo, que se conoce como la transformación de Box-Muller. Esta transformación convierte dos valores aleatorios $u$ y $v$, pertenecientes a una distribución uniforme sobre el intervalo [0, 1], en otros dos valores aleatorios, a los que llamaremos $x$ e $y$, pertenecientes a una normal con media cero y varianza uno. Las fórmulas necesarias son estas:
$$
\eqalign{x&=\sqrt{-2 \ln u} \cos 2\pi v\cr
y&=\sqrt{-2 \ln u} \sin 2\pi v}
$$Existen métodos alternativos, como el de Marsaglia, que evitan las funciones trigonométricas, pero al precio de descartar algunas muestras. Antes de recomendar el método original de Box-Muller, he hecho la prueba en un Core i7-4770, y no he encontrado diferencias significativas entre ambos métodos:

  1. Probablemente, los procesadores más o menos modernos (el mío es un Intel Core de cuarta generación, que ya tiene su edad) penalicen más los saltos que las funciones trigonométricas.
  2. Además, la función Random de .NET utiliza internamente un algoritmo relativamente bueno, pero que tiene su propio coste.

La manera más sencilla de implementar un generador de números aleatorios con las características anteriores sea probablemente utilizar un iterador basado en un bucle infinito.

public static IEnumerable<double> BoxMuller()
{
    Random rnd = new Random();
    while (true)
    {
        double u = Math.Log(1 - rnd.NextDouble());
        double r = Math.Sqrt(-u - u);
        double v = 2 * Math.PI * rnd.NextDouble();
        yield return Math.Cos(v) * r;
        yield return Math.Sin(v) * r;
    }
}

Por supuesto, esta es la implementación más tonta posible: la instancia que contiene las variables de estado de la iteración pertenece a una clase y ocupa memoria dinámica. Además, es bastante probable que el compilador llame a la propiedad Current y al método MoveNext a través del tipo de interfaz IEnumerator, con lo que se trataría de llamadas virtuales. Pero existen técnicas sencillas para resolver estos dos problemas, aunque las explicaré en otro momento. Si tiene prisa, puede mirar como la clase List implementa internamente su iterador (se utiliza una estructura). He hecho la prueba y, al menos en .NET Core, la ganancia en velocidad no es significativa.

La imagen de la entrada, por cierto, es una representación ficticia de la famosa torre de Babel. Quizás habría sido más apropiado usar una imagen de un zigurat, pero pensándolo mejor, la forma de la torre se parece un poco a la campana de Gauss.