Categorías
C# Insights

Smart boy’s optimizations

Decía el gran Donald Knuth algo así como que premature optimization is the root of all evil. Santificado sea su nombre…

Calidad de código

Una vez citadas las sagradas escrituras, debo reconocer que mi lado hereje cumple otros mandamientos:

  • Peor que la optimización prematura, es no optimizar nunca. Una vez escuché una mala excusa sobre un programa que tardaba 25 horas en cargar un fichero: «es que nadie me dijo que tenía que ser rápido». Los ordenadores existen, precisamente, para hacer las cosas más rápidamente. No sólo porque es interés directo del usuario del programa que éste termine antes, sino que, además, le interesa el ahorro en electricidad y en el desgaste del propio aparato.
  • Si estás escribiendo una aplicación, te puedes permitir el lujo de esperar a que funcione para buscar los puntos críticos de eficiencia. Pero si estás escribiendo una librería, que se va a utilizar en formas que aún no sospechas… mejor que todo vaya como la seda desde el principio.
  • La mayoría de las optimizaciones (yo diría más bien mejoras) caen en una categoría que yo llamo «mejoras de chico listo», y tienen que ver con la calidad del código que cada programador puede generar sin esfuerzo adicional.

Las optimizaciones de chico listo, por supuesto, dependen de la experiencia del programador, de lo bien que le funcione la memoria y de lo bien que se le dé la detección de patrones, por lo que se trata de una categoría difícil de delimitar. Programar es un arte.

Un ejemplo

En cualquier caso, el propósito de esta entrada es mostrarle algunas de las optimizaciones que he aprendido mirando el código fuente de .NET. Yo las tengo ya en mi memoria de trabajo: las aplico automáticamente cuando detecto que son aplicables.

Trabajando con una implementación de la función erf, tropecé con este código, que evalúa un polinomio en un punto, usando los coeficientes de una tabla:

private static double Evaluate(double z, double[] coefficients)
{
    if (coefficients == null)
        throw new ArgumentNullException(nameof(coefficients));
    int n = coefficients.Length;
    if (n == 0)
        return 0;
    double sum = coefficients[n - 1];
    for (int i = n - 2; i >= 0; --i)
    {
        sum *= z;
        sum += coefficients[i];
    }
    return sum;
}

Esta función se ejecuta varias veces, con distintos coeficientes. Un ejemplo de tabla de coeficientes es ésta:

static readonly double[] ErvInvImpAn =
{
    -0.000508781949658280665617, -0.00836874819741736770379,
    0.0334806625409744615033, -0.0126926147662974029034,
    -0.0365637971411762664006, 0.0219878681111168899165,
    0.00822687874676915743155, -0.00538772965071242932965
};

Este método es un método privado de una clase, y una rápida ojeada me confirmó que las tablas que se le pasan son siempre no nulas, y con longitud mayor que cero. ¿A qué vienen las dos comprobaciones iniciales? Respuesta: es uno de los problemas que causa la «modularidad». Escribes software que no sabes cómo se puede usar, y lo proteges de las cosas más inverosímiles. Pero si es un método privado, tanta precaución sobra. Empezamos por esta simplificación, para ir haciendo boca y verlo todo más claro:

private static double Evaluate(double z, double[] coefficients)
{
    int n = coefficients.Length;
    double sum = coefficients[n - 1];
    for (int i = n - 2; i >= 0; --i)
    {
        sum *= z;
        sum += coefficients[i];
    }
    return sum;
}

El siguiente paso seguramente le sorprenderá: sustituyo la tabla de coeficientes, que ahora es un campo estático de sólo lectura, por esto:

static ReadOnlySpan<double> ErvInvImpAn => new[]
{
    -0.000508781949658280665617, -0.00836874819741736770379,
    0.0334806625409744615033, -0.0126926147662974029034,
    -0.0365637971411762664006, 0.0219878681111168899165,
    0.00822687874676915743155, -0.00538772965071242932965
};

Sorprendente, ¿verdad? Es un truco poco conocido, pero que Microsoft usa a diestra y siniestra en el código de .NET Core. Por razones que en parte se me escapan, el compilador de C# y el JIT transforman esta construcción en una zona de datos dentro de los metadatos del código IL. Y el JIT lo maneja más eficientemente. No hay mucha lógica en que tengamos que usar precisamente un ReadOnlySpan<double>, o que haya que convertir el campo en una propiedad de sólo lectura. Se trata de una marca, o un guiño de complicidad, que utilizan el JIT y el compilador para generar código más eficiente.

Esto me obliga a crear una nueva versión del amigo Evaluate que acepte un ReadOnlySpan<double> como origen de sus coeficientes. Esta es la nueva versión, con dos optimizaciones adicionales:

private static double Evaluate(
    double z, ReadOnlySpan<double> coeffs)
{
    int n = coeffs.Length;
    ref double rd = ref MemoryMarshal.GetReference(coeffs);
    double sum = Unsafe.Add(ref rd, n - 1);
    for (int i = n - 2; i >= 0; --i)
        sum = Math.FusedMultiplyAdd(
            z, sum, Unsafe.Add(ref rd, i));
    return sum;
}

De las dos nuevas mejoras, la más sencilla es el uso de Math.FusedMultiplyAdd: un método de la clase Math que combina la multiplicación y la suma en una sola instrucción de la CPU, y puede darnos más velocidad y precisión. En este caso, además, he medido que realmente sea ventajosa, porque no siempre lo es.

El segundo cambio tiene dos partes. Como el bucle for utilizado no es un bucle convencional, el JIT actual no puede deducir que no habrán referencias fuera de rango para eliminar las comprobaciones de los índices en tiempo de ejecución. El bucle es descendente, y ni siquiera comienza por el último elemento. No le podemos exigir tanto al JIT.

Lo primero que hago es pedir una managed reference a la primera celda de la tabla de coeficientes:

    ref double rd = ref MemoryMarshal.GetReference(coeffs);
    // Equivalente a:
    // ref double rd = ref coeffs[0];

Esto es más o menos parecido a pedir un puntero al inicio de la tabla. En realidad, C# nos permitiría pedir un puntero al inicio de la tabla, pero el precio sería «fijar» la tabla en memoria para que el recolector de basura no vaya a pensar que no la estamos usando. El puntero que conseguimos con esta técnica es uno que el recolector de basura puede identificar y tener en cuenta. Y la forma normal de pedirlo es la que muestro en los comentarios del fragmento. ¿Por qué no la he usado? Pues porque implicaría una comprobación de rango innecesaria: el JIT generaría una comparación y un salto para verificar si la tabla no está vacía. Para evitarlo, uso MemoryMarshal.GetReference, que es otro truco sucio de Microsoft, para conseguir un puntero al inicio de un array sin costes ocultos.

Lo que sigue es más sencillo: utilizo el método Add de la clase Unsafe para llegar a cada una de las celdas que contienen los coeficientes. Sí, todo es un poco enrevesado, pero una vez que te lo aprendes, no te cuesta nada escribir estas cosas de carrerilla. Me siento en el deber de contárselas. Ya usted decidirá si merece la pena o no usarlas en su propio código cuando lo crea necesario. No son cosas para usar en una aplicación que tienes que escribir en tres meses. Pero creo que tienen un lugar en una librería de código.

Y hay más, claro

Hay montones de trucos similares en el código fuente de .NET. Por ejemplo, imagine que hay que tiene que hacer una comprobación de rango de un índice:

if (0 <= index && index < length) ...

Dos comparaciones, y dos saltos. Las comparaciones son lo de menos. Los dos saltos ralentizan todo. ¿Qué hace Microsoft en estos casos?

if ((uint)index < length) ...

La variable index suele ser un entero con signo. No cuesta nada pedir que el compilador la trate, momentáneamente, como un entero del mismo tamaño, pero sin signo. Si el índice fuese negativo, al tratarlo como un entero sin signo, el valor sería inevitablemente superior al de length. Una sola comparación, y un único salto potencial.

Veamos una variante derivada de este truco. El analizador lexical de Austra tiene que comprobar muchas veces si un carácter es un dígito decimal:

if ('0' <= ch && ch <= '9') ...

La forma más eficiente, sin embargo, es la siguiente:

if ((uint)(ch - '0') < 10u) ...

He introducido una resta, que se ejecuta eficientemente, y he quitado un salto potencial.

De todas maneras, una de mis optimizaciones de chico listo preferidas es muy sencilla. En vez de escribir:

x * x - y * y

un servidor prefiere:

(x + y) * (x - y)

Y es que en la segunda expresión hay una suma de más, pero una multiplicación de menos.

Es agradable tener un cerebro cargado, y estar dispuestos a usarlo.

Categorías
C#

El algoritmo de Welford

En la entrada sobre la varianza, vimos que podíamos tener problemas de estabilidad numérica si intentábamos calcular la varianza en una sola pasada sobre los datos, usando inocentemente la definición matemática. La solución de entonces fue usar un algoritmo de dos pasos: calcular la media en el primer paso, y en el segundo, calcular la varianza de la muestra menos la media. Había otra posibilidad: usar el primer valor de la secuencia como estimado malo de la media, y restar ese valor a las sucesivas muestras.

¿Podríamos hacer algo mejor si corrigiésemos el estimado de la media sobre la marcha? Resulta que se puede, y el primero en darse cuenta fue Welford, allá por el 1962. Donald Knuth incluyó el algoritmo en el segundo tomo de The Art of Computer Programming. El algoritmo original sólo calculaba la media y la varianza sobre la marcha, pero Timothy Terriberry, en 2007, lo amplió para que calculase momentos superiores. Este algoritmo está implementado, por ejemplo, en Math.Net Numerics, aunque la implementación es mejorable.

¿Qué nos da?

En AUSTRA, la clase que implementa este algoritmo se llama Accumulator. Hay también una clase simplificada, SimpleAccumulator, que sólo calcula los dos primeros momentos, con el beneficio evidente de tener que ejecutar menos trabajo.

La definición de Accumulator, junto con su constructor principal y los campos y propiedades de almacenamiento, es la siguiente:

/// Calculates statistics by adding samples.
public sealed class Accumulator
{
    ///Minimum value.
    private double min = double.PositiveInfinity;
    ///Maximum value.
    private double max = double.NegativeInfinity;
    ///Estimated mean.
    private double m1;
    ///Accumulated second moment.
    private double m2;
    ///Accumulated third moment.
    private double m3;
    ///Accumulated fourth moment.
    private double m4;

    ///Gets the total number of samples.
    public long Count { get; private set; }

    ///Creates an empty accumulator.
    public Accumulator() { }

    /* … */
}

La información que nos interesa se obtiene a través de estos campos, por medio de propiedades calculadas:

///Returns the minimum value.
public double Minimum => Count > 0 ? min : double.NaN;
///Returns the maximum value.
public double Maximum => Count > 0 ? max : double.NaN;
///Gets the sample mean.
public double Mean => Count > 0 ? m1 : double.NaN;
///Gets the unbiased variance.
public double Variance =>
    Count < 2 ? double.NaN : m2 / (Count - 1);
///Gets the unbiased standard deviation.
public double StandardDeviation =>
    Count < 2 ? double.NaN : Sqrt(m2 / (Count - 1));
///Gets the unbiased population skewness.
public double Skewness =>
    Count < 3
    ? double.NaN
    : Count * m3 * Sqrt(m2 / (Count - 1))
        / (m2 * m2 * (Count - 2)) * (Count - 1);
///Gets the unbiased population kurtosis.
public double Kurtosis =>
    Count < 4
    ? double.NaN
    : ((double)Count * Count - 1)
        / ((Count - 2) * (Count - 3))
        * (Count * m4 / (m2 * m2) - 3 + 6.0 / (Count + 1));

He omitido, por brevedad, el cálculo de propiedades como PopulationVariance, PopulationSkewness y demás. De todas maneras, están disponibles en el código de Austra, y en el propio código de Math.NET Numerics.

¿Qué le damos?

Para alimentar al acumulador, hay que pasarle las muestras, al menos en principio, de una en una, por medio de un método que hemos nombrado Add:

/// Adds a sample to this accumulator.
/// The new sample.
public void Add(double sample)
{
    ++Count;
    double d = sample - m1, s = d / Count;
    double t = d * s * (Count - 1);
    m1 += s;
    m4 += (t * s * (Count * (Count - 3 + 3)
        + 6 * s * m2 - 4 * m3) * s;
    m3 += (t * (Count - 2) - 3 * m2) * s;
    m2 += t;
    if (sample < min) min = sample;
    if (sample > max) max = sample;
}

Hay algunas pequeñas mejoras en el código anterior, respecto al original. Hay algunas multiplicaciones menos, y está todo preparado por si quisiéramos usar alguna instrucción de fusión de multiplicación y suma. No las he usado porque tengo algunas dudas sobre la eficiencia en .NET Core. Es cierto que siempre tienes la ventaja de la mayor exactitud, pero ya veremos dónde sí se usan (en pocas palabras: donde realmente importan).

Combinando acumuladores

Lo mejor de todo es que podemos combinar los valores en dos acumuladores independientes y generar un acumulador de los datos conjuntos. Esto nos permitiría, por ejemplo, dividir una muestra grande en cuatro partes, calcular cuatro acumuladores en paralelo, y luego mezclarlos en el resultado final.

public static Accumulator operator +(
    Accumulator a1, Accumulator a2)
{
    if (a1.Count == 0) return a2;
    if (a2.Count == 0) return a1;

    long n = a1.Count + a2.Count, n2 = n * n;
    double d = a2.m1 - a1.m1, d2 = d * d;
    double d3 = d2 * d, d4 = d2 * d2;
    double m1 = (a1.Count * a1.m1 + a2.Count * a2.m1) / n;
    double m2 = a1.m2 + a2.m2 + d2 * a1.Count * a2.Count / n;
    double m3 = a1.m3 + a2.m3
        + d3 * a1.Count * a2.Count * (a1.Count - a2.Count) / n2
        + 3 * d * (a1.Count * a2.m2 - a2.Count * a1.m2) / n;
    double m4 = a1.m4 + a2.m4 + d4 * a1.Count * a2.Count
            * (a1.Count * (a1.Count - a2.Count)
                + a2.Count * a2.Count) / (n2 * n)
        + 6 * d2 * (a1.Count * a1.Count * a2.m2
            + a2.Count * a2.Count * a1.m2) / n2
        + 4 * d * (a1.Count * a2.m3 - a2.Count * a1.m3) / n;
    return new() {
        Count = n,
        m1 = m1, m2 = m2, m3 = m3, m4 = m4,
        min = Min(a1.min, a2.min),
        max = Max(a1.max, a2.max),
    };
}

El código es complicado, y es fácil equivocarse copiando y pegando (ya me ha pasado). De todas maneras, es una clase que es fácil de testear.

El primer impulso es dejarlo aquí, y confiar en el paralelismo con tareas para cuando queremos acelerar el código. Mi problema con esto es que, cuando escribo código para una librería, prefiero realizar la aceleración básica con código vectorial (AVX o lo que esté disponible). ¿Por qué? Pues porque por experiencia, el programador que usa luego la biblioteca prefiere tener la opción del paralelismo por tareas para su propio código. Es cierto que las tareas se combinan más o menos bien en .NET, gracias al thread-pool que obtienes del entorno de ejecución, sin esforzarte demasiado; en Java, todo es más complicado con sus malditos executors.

Drum roll, please...

Prefiero, por lo tanto, ganar todo lo que pueda en paralelismo en una librería a golpe de instrucciones SIMD. Y esto es precisamente lo que hacemos en Accumulator con el siguiente método y algunos más que lo llaman:

public unsafe void Add(double* samples, int size)
{
    int i = 0;
    if (Avx.IsSupported && size >= 16)
    {
        var vMin = Vector256.Create(double.PositiveInfinity);
        var vMax = Vector256.Create(double.NegativeInfinity);
        var vM1 = Vector256<double>.Zero;
        var vM2 = Vector256<double>.Zero;
        var vM3 = Vector256<double>.Zero;
        var vM4 = Vector256<double>.Zero;
        var v3 = Vector256.Create(3.0);
        var v4 = Vector256.Create(4.0);
        var v6 = Vector256.Create(6.0);
        long c = 0;
        for (int top = size & CommonMatrix.AVX_MASK;
             i < top; i += 4)
        {
            c++;
            var vSample = Avx.LoadVector256(samples + i);
            vMin = Avx.Min(vMin, vSample);
            vMax = Avx.Max(vMax, vSample);
            var vd = Avx.Subtract(vSample, vM1);
            var vs = Avx.Divide(vd,
                Vector256.Create((double)c));
            var vt = Avx.Multiply(Avx.Multiply(vd, vs),
                Vector256.Create((double)(c - 1)));
            vM1 = Avx.Add(vM1, vs);
            var t1 = Avx.Multiply(Avx.Multiply(vt, vs),
                Vector256.Create((double)(c * (c - 3) + 3)));
            var t2 = Avx.Multiply(Avx.Multiply(vs, vM2), v6);
            var t3 = Avx.Multiply(v4, vM3);
            vM4 = vM4.MultiplyAdd(Avx.Subtract(
                Avx.Add(t1, t2), t3), vs);
            t1 = Avx.Multiply(vt,
                Vector256.Create((double)(c - 2)));
            t2 = Avx.Multiply(vM2, v3);
            vM3 = vM3.MultiplyAdd(Avx.Subtract(t1, t2), vs);
            vM2 = Avx.Add(vM2, vt);
        }
        var acc01 = Mix(c,
            vM1.ToScalar(), vM2.ToScalar(),
            vM3.ToScalar(), vM4.ToScalar(),
            vM1.GetElement(1), vM2.GetElement(1),
            vM3.GetElement(1), vM4.GetElement(1));
        var acc23 = Mix(c,
            vM1.GetElement(2), vM2.GetElement(2),
            vM3.GetElement(2), vM4.GetElement(2),
            vM1.GetElement(3), vM2.GetElement(3),
            vM3.GetElement(3), vM4.GetElement(3));
        var a = Mix(c + c,
            acc01.m1, acc01.m2, acc01.m3, acc01.m4,
            acc23.m1, acc23.m2, acc23.m3, acc23.m4);
        if (Count == 0)
            (Count, m1, m2, m3, m4)
                = (4 * c, a.m1, a.m2, a.m3, a.m4);
        else
        {
            long acCnt = 4 * c, n = Count + acCnt, n2 = n * n;
            double d = a.m1 - m1, d2 = d * d;
            double d3 = d2 * d, d4 = d2 * d2;

            double nm1 = (Count * m1 + acCnt * a.m1) / n;
            double nm2 = m2 + a.m2 + d2 * Count * acCnt / n;
            double nm3 = m3 + a.m3
                + d3 * Count * acCnt * (Count - acCnt) / n2
                + 3 * d * (Count * a.m2 - acCnt * m2) / n;
            m4 += a.m4 + d4 * Count * acCnt
                    * (Count * (Count - acCnt) 
                        + acCnt * acCnt) / (n2 * n)
                + 6 * d2 * (Count * Count * a.m2
                    + acCnt * acCnt * m2) / n2
                + 4 * d * (Count * a.m3 - acCnt * m3) / n;
            (m1, m2, m3, Count) = (nm1, nm2, nm3, n);
        }
        min = Min(min, vMin.Min());
        max = Max(max, vMax.Max());
    }
    for (; i < size; ++i)
        Add(samples[i]);

    static (double m1, double m2, double m3, double m4) Mix(
        long c,
        double a1, double a2, double a3, double a4,
        double b1, double b2, double b3, double b4)
    {
        long n = c + c, n2 = n * n;
        double d = b1 - a1, d2 = d * d, d4 = d2 * d2;
        return (
            (a1 + b1) / 2,
            a2 + b2 + d2 * c / 2,
            a3 + b3 + 3 * d * (b2 - a2) / 2,
            a4 + b4 + d4 * c / 8 + 3 * d2 * (b2 + a2) / 2
               + 2 * d * (b3 - a3));
    }
}

Observe que la precondición para aprovechar las instrucciones vectoriales es tener todo un array de muestras a nuestra disposición. Si nos diesen un IEnumerable<double>, tendríamos que hacer maniobras como materializar las muestras en grupos de cuatro, en un array, y alimentar así al animalito vectorial.

El código es relativamente sencillo, si miramos con atención. La parte AVX prácticamente repite el código del Add escalar. Por cada campo de Accumulator hay un vector de doble precisión. La excepción es la propiedad Count, y la tratamos diferente porque para los cuatro acumuladores virtuales que maneja el método, la cantidad de muestras es siempre la misma.

Esto es una ventaja cuando tenemos que mezclar los resultados de los cuatro acumuladores. La función interna estática Mix aprovecha la igualdad de los contadores para simplificar algebraicamente algunas fórmula. Observe, por ejemplo, que la fórmula para el m3 combinado es más sencilla, al anularse uno de los términos.

Una vez que hemos mezclado los cuatro acumuladores parciales, mezclamos el resultado, a su vez, con los valores que pueda haber ya en el propio acumulador (si los hubiera). Aquí no podemos simplificar tanto, porque los contadores nuevos y antiguos pueden ser muy diferentes, aunque en el caso en el que el acumulador inicial no tuviese muestras, es todo más simple.

Si quiere hacerse una idea de cuánto mejora este tipo de procesamiento vectorial, los benchmarks que he ejecutado me dan casi cinco veces más velocidad. Es extraño, porque yo esperaría una mejora de 4x, pero puede deberse a que aquí hacemos uso de las instrucciones FMA vectoriales, cuando están disponibles. Las instrucciones FMA están escondidas en los métodos de extensión MultiplyAdd que presenté en esta entrada.

Por cierto, la niña de la imagen de la entrada tiene poco que ver con el algoritmo, pero estoy usando imágenes generadas por AI, entre otros motivos, para evitar problemas de derechos de autor. En este caso, le pedí a la AI que generase una niña perdida e indefensa en un universo digital simulado. En parte, la AI me hizo caso; en parte, ignoró la petición. Pero el resultado me gusta, y ahí lo tiene.

Categorías
C#

Entran una matriz y un vector en un bar

… y claro, al rato sale un vector «transformado».

Esta entrada no es, aunque pueda parecerlo, un ripio de la anterior. Algorítmicamente, transformar un vector con una matriz se parece mucho a una sucesión de productos escalares. Pero resulta que el producto escalar, al menos hasta AVX2, tiene su truco. Vamos a comenzar por la implementación más tonta:

public static double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    for (int = 0; i < m; i++)
    {
        double d = 0;
        for (int j = 0; j < n; j++)
            d += a[i, j] * x[j];
        b[i] = d;
    }
    return b;
}

Recordemos que tenemos un «handicap» autoimpuesto por representar las matrices como arrays bidimensionales de C#. Pero esta vez no voy a dar la brasa con los punteros, que ya sabemos que resuelven este problema sin pestañear. Esta es la implementación final que necesitamos, con soporte opcional de AVX para cuando esté disponible y merezca la pena:

public static unsafe double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    int lastBlockIndex = n - (n % 4);
    fixed (double* pA = a)
    fixed (double* pX = x)
    fixed (double* pB = b)
    {
        double* pA1 = pA;
        double* pB1 = pB;
        if (n >= 12 && Avx2.IsSupported)
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                var v = Vector256<double>.Zero;
                while (j < lastBlockIndex)
                {
                    v = Avx.Add(
                        v,
                        Avx.Multiply(
                            Avx.LoadVector256(pA1 + j),
                            Avx.LoadVector256(pX + j)));
                    j += 4;
                }
                v = Avx.HorizontalAdd(v, v);
                double d = v.ToScalar() + v.GetElement(2);
                for (; j < n; j++)
                    d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
        else
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                double d = 0;
                while (j < lastBlockIndex)
                {
                    d += (*(pA1 + j) * *(pX + j)) +
                        (*(pA1 + j + 1) * *(pX + j + 1)) +
                        (*(pA1 + j + 2) * *(pX + j + 2)) +
                        (*(pA1 + j + 3) * *(pX + j + 3));
                    j += 4;
                }
                for (; j < n; j++)
                     d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
    }
    return b;
}

Esta vez, el código SIMD sólo se usa cuando hay doce o más elementos en el vector. La cifra la he elegido experimentando en mi i7-4770. Puede que en otros ordenadores, el umbral sea más bajo incluso.

Tengo que explicar cómo se implementa un producto escalar con SIMD, porque no es muy evidente. Uno diría que hay que acumular un escalar en una variable global al bucle… pero no hay ninguna operación SIMD que calcule directamente la suma de las cuatro multiplicaciones necesarias. La explicación oficial es que una suma de ese tipo destrozaría el paralelismo de la CPU. Y yo me lo creo, de veras. La consecuencia es que necesitamos acumular las multiplicaciones en cuatro variables; es decir, en un vector que hace de acumulador.

Las cosas se ponen de color hormiga cuando terminamos el bucle y tenemos entonces que sumar los cuatro elementos del vector acumulador. Analicemos las líneas 27 y 28 del listado anterior. Según mis experimentos, es la forma más rápida de conseguirlo. HorizontalAdd, cuando se trata de Vector256<double>, suma el primer elemento con el segundo, y lo almacena por partida doble en el primer y segundo elemento. A la vez, suma el tercero y el cuarto y hace lo mismo para guardar el resultado. Los métodos de extensión ToScalar() y GetElement() acceden entonces directamente al primer y tercer elemento y los suma. Mantengo la llamada inicial a HorizontalAdd porque, teóricamente, puede hacer dos de las sumas en paralelo, pero puedes experimentar a ver qué pasa si accedes directamente a los cuatro elementos y los sumas como toda la vida. A mí ya se me ha acabado la partida de tiempo libre para este experimento.

La razón para la controversia es que, en realidad, Internet está lleno de recomendaciones para hacer esta suma final de esta otra manera:

v = Avx2.Permute4x64(
    Avx.HorizontalAdd(v, v),
    0b00_10_01_11);
double d = Avx.HorizontalAdd(v, v).ToScalar();
// v = Avx.HorizontalAdd(v, v);
// double d = v.ToScalar() + v.GetElement(2);

Es decir: se llama dos veces a HorizontalAdd, pasando entre medias por una permutación entre escalares. En la arquitectura Haswell, al menos, esto funciona más lento que mi solución.

Si multiplico una matriz aleatoria de 64×64 por un vector de 64 elementos, obtengo estas cifras:

Method Mean Error StdDev Median
MultVector 5.762 μs 0.1142 μs 0.2227 μs 5.646 μs
FMultVector 1.814 μs 0.0320 μs 0.0416 μs 1.818 μs

No está mal, aunque no conseguimos tanta ventaja como con la multiplicación entre matrices. La versión con punteros y sin SIMD tampoco va mal, pero queda muy claro que el SIMD acelera este código. De paso, ya tenemos un patrón de código para productos escalares (y para cosas más raras como multiplicar un vector de sensibilidad delta-gamma por un escenario histórico: cosas de la valoración de productos financieros).

Por cierto, el mejor chiste que conozco sobre gente que entra en un bar tiene que ver con la Mecánica Cuántica. Dice así: entra el Gato de Schrödinger en un bar… y no entra.

Categorías
C#

Multiplicación de matrices

Supongamos que queremos multiplicar un par de matrices, $A$ y $B$. Digamos que la primera tiene dimensiones $m\times n$ y que la segunda es $n\times p$. La coincidencia entre columnas de la primera y filas de la segunda es condición necesaria para que podamos multiplicarlas.

Si me piden que escriba de carrerilla un método para esta multiplicación, esto es lo que se me ocurre:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] result = new double[m, p];
    for (int i = 0; i < m; i++)
        for (int j = 0; j < p; j++)
        {
            double d = 0;
            for (int k = 0; k < n; k++)
                d += a[i, k] * b[k, j];
            result[i, j] = d;
        }
    return result;
}

He utilizado matrices bidimensionales de C# porque acceder a sus elementos individuales es sencillo. Internamente, C# las almacena en una sola memoria contigua de memoria, fila por fila.

El código que he mostrado no es una maravilla. Para empezar, cada vez que decimos algo como a[i, k], el compilador tiene que multiplicar la variable i por el número de columnas y por los ocho bytes que tiene un flotante de doble precisión. Hacerlo una vez no es problema… pero tenemos tres bucles anidados. Eso tiene que doler. Si en vez de C# escribiésemos esto en C++, el compilador podría sustituir un montón de multiplicaciones por sumas. RyuJIT ha mejorado muchísimo, pero no tanto.

C#, además, es un lenguaje mucho más seguro que C++, pero esta seguridad nos cuesta un montón de verificaciones de rango para poder indexar. Recordemos, además, que cada acceso necesita dos índices.

Y hay un tercer problema, mucho más sutil: cuando las matrices son grandes, el código anterior machaca la caché de la CPU sin piedad. Toma un folio de papel y haz el experimento: dibuja dos matrices, y ve numerando las celdas siguiendo el orden en que las usa el algoritmo.

La clase Unsafe

Llegados a este punto, tenemos dos alternativas: o marcamos el método como unsafe y usamos directamente punteros de C#, o intentamos evitarlo haciendo uso de la clase Unsafe, de System.Runtime.CompilerServices. Vamos a comenzar por esta última. De paso, voy a invertir el orden de los dos bucles más internos, para ver qué conseguimos con ello. Este es el código modificado, y suele funcionar el doble de rápido, o un poco más:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c= new double[m, p];
    ref double rA = ref a[0, 0];
    ref double rB = ref b[0, 0];
    ref double rC = ref c[0, 0];
    for (int i = 0; i < m; i++)
    {
        ref double rAi = ref Unsafe.Add(ref rA, i * n);
        ref double rCi = ref Unsafe.Add(ref rC, i * n);
        for (int k = 0; k < n; k++)
        {
            double d = Unsafe.Add(ref rAi, k);
            int kp = k * p;
            for (int j = 0; j < p; j++)
                Unsafe.Add(ref rCi, j) +=
                    d * Unsafe.Add(ref rB, kp + j);
        }
    }
    return c;
}

La regla principal del uso de Unsafe.Add es que si inicializamos así:

ref double rA = ref a[0, 0];

entonces el acceso a a[i, j] debe parecerse a esto:

Unsafe.Add(ref rA, i * n + j) = 42;

Esa multiplicación es un problema del que ya advertimos. En nuestro código lo paliamos moviendo la multiplicación al inicio del bucle donde se le da valor al índice de la fila. Mi apaño no es la palabra definitiva: le dejo como ejercicio la eliminación total de esas multiplicaciones.

Ahora hay que prestar atención, sobre todo, al patrón de acceso a memoria que se produce en el bucle más interno. En el algoritmo inicial, acumulábamos todos los términos de un elemento de la matriz final en el bucle interno, y asignábamos su suma de golpe a la celda del resultado. Esta variante, sin embargo, no parece tan buena. Tenemos que asumir que, al reservar memoria para la matriz, todas sus entradas valen cero (y es así). Luego, cada celda del resultado se va rellenando por pasos, no de una vez. Puede que esto sea bueno para la caché de la CPU, pero no me queda tan claro que sea bueno para el compilador de C#.

Pero lo que nos interesa realmente es que ahora ejecutamos el siguiente patrón de cálculo:

  1. Tenemos dos zonas de memoria consecutiva.
  2. Leemos algo de la primera zona.
  3. Lo transformamos como sea.
  4. Lo asignamos a la celda equivalente en la segunda zona de memoria.

Instrucciones SIMD

Ese patrón de actividad es el típico algoritmo «vectorial» que podemos acelerar utilizando operaciones SIMD. Tenemos dos opciones:

  • Utilizar System.Numerics.Vector, que se adapta automáticamente a cualquier máquina que soporte SIMD, e incluso ofrece una alternativa cuando no existe ese soporte. Este tipo funciona también para .NET Framework, a través de un paquete.
  • Si podemos usar .NET Core 3.1, podemos ir directamente a las clases declaradas en System.Runtime.Intrinsics y System.Runtime.Intrinsics.X86. Es un poco más complicado y no está bien documentado, pero da resultados ligeramente mejores.

Vamos a ir directamente por la segunda vía. Vamos a optimizar las CPUs que soporten el conjunto de instrucciones AVX, haremos algo más en el caso en que soporte el conjunto FMA (que mezcla multiplicaciones y sumas en una misma operación) y, de todas maneras, habilitaremos código de respaldo para cuando el procesador no soporte SIMD.

Cuando hay soporte para instrucciones AVX, podemos procesar hasta cuatro variables de tipo double de una tacada. Para ello tenemos que utilizar el tipo de estructura Vector256, que tiene capacidad para cuatro elementos. La forma más sencilla de inicializar estos vectores es utilizando punteros, por lo que vamos a tener que declarar nuestro método unsafe y pasarnos directamente a los punteros.

public static unsafe double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c = new double[m, p];
    int lastBlockIndex = p - (p % 4);
    fixed (double* pA = a)
    fixed (double* pB = b)
    fixed (double* pC = c)
    {
        double* pAi = pA;
        double* pCi = pC;
        for (int i = 0; i < m; i++)
        {
            double* pBk = pB;
            for (int k = 0; k < n; k++)
            {
                double d = *(pAi + k);
                if (Avx.IsSupported)
                {
                    int j = 0;
                    var vd = Vector256.Create(d);
                    while (j < lastBlockIndex)
                    {
                        if (Fma.IsSupported)
                            Avx.Store(pCi + j,
                                Fma.MultiplyAdd(
                                Avx.LoadVector256(pBk + j),
                                vd,
                                Avx.LoadVector256(pCi + j)));
                        else
                            Avx.Store(pCi + j,
                                Avx.Add(
                                Avx.LoadVector256(pCi + j),
                                Avx.Multiply(
                                Avx.LoadVector256(pBk + j),
                                vd)));
                        j += 4;
                    }
                    while (j < p)
                    {
                        pCi[j] += d * pBk[j];
                        j++;
                    }
                }
                else
                {
                    for (int j = 0; j < p; j++)
                        pCi[j] += d * pBk[j];
                }
                pBk += p;
            }
            pAi += n;
            pCi += p;
        }
    }
    return c;
}

Observaciones:

  1. Lo peor de trabajar con SIMD es tener que lidiar con vectores que no son múltiplos exactos del tamaño del vector básico. Nuestros vectores básicos tienen cuatro elementos. Si tenemos un vector de 75 elementos, necesitaremos un bucle de 18 repeticiones que procese cuatro elementos por vez, para una mierdecilla de bucle final que maneje los 3 elementos que nos sobran.
  2. Aunque la llamada a Avx.IsSupported está metida dentro de dos bucles anidados, no se preocupe: el compilador JIT la trata como una constante en tiempo de generación de código nativo, y no cuesta nada. Si no se soporta AVX, el compilador JIT solamente genera el código de la cláusula else, que funciona sobre cualquier arquitectura.
  3. Ojo: ese código «para cualquier máquina» podría optimizarse echando mano de la técnica de loop unrolling. Pero mi política en estos casos es: si no tienes una máquina decente, jódete.
  4. En el ejemplo anterior, cuando intercambiamos el orden de los bucles más internos, teníamos un valor escalar que sacábamos fuera del tercer bucle. Pero SIMD no ofrece instrucciones para multiplicar un vector por un escalar: tenemos que convertir ese escalar en todo un vector y utilizar la instrucción de multiplicación más general. No es grave, de todos modos.
  5. Si, además de AVX, la máquina soporta el conjunto FMA de instrucciones, podemos utilizar el método MultiplyAdd para acelerar un poco el algoritmo. Pero con esto hay que tener cuidado: a * b + c puede dar resultados diferentes si se hacen las dos operaciones por separado o a la vez. Si se hacen a la vez, aumenta la exactitud de la operación al existir menos redondeos. Pero el efecto secundario es que los cálculos con y sin esa opción dan resultados ligeramente diferentes. Tenemos que decidir cuándo es aceptable que exista esa diferencia y cuándo no. En cualquier caso, tengamos presente que el resultado de MultiplyAdd es más preciso.

Benchmark.NET

Para estar seguro de las ganancias en velocidad, he utilizado el package Benchmark.NET para generar las pruebas. Estos son los resultados:

Method Mean Error StdDev
MultMatrix 4,482.3 μs 88.75 μs 138.17 μs
UMultMatrix 1,895.2 μs 37.87 μs 63.26 μs
FMultMatrix 506.3 μs 3.44 μs 2.87 μs

La mejora por el uso de SIMD es cerca de cuatro veces, porque es el número de operaciones simultáneas que permite esta arquitectura en particular. Con AVX512 tendríamos vectores de ocho valores, pero necesitaríamos procesadores mucho más modernos, y de momento .NET Core no lo soporta.

Para esta prueba, he utilizado matrices de 128×128. He probado también con matrices de 8×8 e incluso de 4×4. La ganancia no es tan espectacular, pero en total se consigue una cuarta parte del tiempo de ejecución respecto al algoritmo más sencillo.