Categorías
C#

El algoritmo de Welford

En la entrada sobre la varianza, vimos que podíamos tener problemas de estabilidad numérica si intentábamos calcular la varianza en una sola pasada sobre los datos, usando inocentemente la definición matemática. La solución de entonces fue usar un algoritmo de dos pasos: calcular la media en el primer paso, y en el segundo, calcular la varianza de la muestra menos la media. Había otra posibilidad: usar el primer valor de la secuencia como estimado malo de la media, y restar ese valor a las sucesivas muestras.

¿Podríamos hacer algo mejor si corrigiésemos el estimado de la media sobre la marcha? Resulta que se puede, y el primero en darse cuenta fue Welford, allá por el 1962. Donald Knuth incluyó el algoritmo en el segundo tomo de The Art of Computer Programming. El algoritmo original sólo calculaba la media y la varianza sobre la marcha, pero Timothy Terriberry, en 2007, lo amplió para que calculase momentos superiores. Este algoritmo está implementado, por ejemplo, en Math.Net Numerics, aunque la implementación es mejorable.

¿Qué nos da?

En AUSTRA, la clase que implementa este algoritmo se llama Accumulator. Hay también una clase simplificada, SimpleAccumulator, que sólo calcula los dos primeros momentos, con el beneficio evidente de tener que ejecutar menos trabajo.

La definición de Accumulator, junto con su constructor principal y los campos y propiedades de almacenamiento, es la siguiente:

/// Calculates statistics by adding samples.
public sealed class Accumulator
{
    ///Minimum value.
    private double min = double.PositiveInfinity;
    ///Maximum value.
    private double max = double.NegativeInfinity;
    ///Estimated mean.
    private double m1;
    ///Accumulated second moment.
    private double m2;
    ///Accumulated third moment.
    private double m3;
    ///Accumulated fourth moment.
    private double m4;

    ///Gets the total number of samples.
    public long Count { get; private set; }

    ///Creates an empty accumulator.
    public Accumulator() { }

    /* … */
}

La información que nos interesa se obtiene a través de estos campos, por medio de propiedades calculadas:

///Returns the minimum value.
public double Minimum => Count > 0 ? min : double.NaN;
///Returns the maximum value.
public double Maximum => Count > 0 ? max : double.NaN;
///Gets the sample mean.
public double Mean => Count > 0 ? m1 : double.NaN;
///Gets the unbiased variance.
public double Variance =>
    Count < 2 ? double.NaN : m2 / (Count - 1);
///Gets the unbiased standard deviation.
public double StandardDeviation =>
    Count < 2 ? double.NaN : Sqrt(m2 / (Count - 1));
///Gets the unbiased population skewness.
public double Skewness =>
    Count < 3
    ? double.NaN
    : Count * m3 * Sqrt(m2 / (Count - 1))
        / (m2 * m2 * (Count - 2)) * (Count - 1);
///Gets the unbiased population kurtosis.
public double Kurtosis =>
    Count < 4
    ? double.NaN
    : ((double)Count * Count - 1)
        / ((Count - 2) * (Count - 3))
        * (Count * m4 / (m2 * m2) - 3 + 6.0 / (Count + 1));

He omitido, por brevedad, el cálculo de propiedades como PopulationVariance, PopulationSkewness y demás. De todas maneras, están disponibles en el código de Austra, y en el propio código de Math.NET Numerics.

¿Qué le damos?

Para alimentar al acumulador, hay que pasarle las muestras, al menos en principio, de una en una, por medio de un método que hemos nombrado Add:

/// Adds a sample to this accumulator.
/// The new sample.
public void Add(double sample)
{
    ++Count;
    double d = sample - m1, s = d / Count;
    double t = d * s * (Count - 1);
    m1 += s;
    m4 += (t * s * (Count * (Count - 3 + 3)
        + 6 * s * m2 - 4 * m3) * s;
    m3 += (t * (Count - 2) - 3 * m2) * s;
    m2 += t;
    if (sample < min) min = sample;
    if (sample > max) max = sample;
}

Hay algunas pequeñas mejoras en el código anterior, respecto al original. Hay algunas multiplicaciones menos, y está todo preparado por si quisiéramos usar alguna instrucción de fusión de multiplicación y suma. No las he usado porque tengo algunas dudas sobre la eficiencia en .NET Core. Es cierto que siempre tienes la ventaja de la mayor exactitud, pero ya veremos dónde sí se usan (en pocas palabras: donde realmente importan).

Combinando acumuladores

Lo mejor de todo es que podemos combinar los valores en dos acumuladores independientes y generar un acumulador de los datos conjuntos. Esto nos permitiría, por ejemplo, dividir una muestra grande en cuatro partes, calcular cuatro acumuladores en paralelo, y luego mezclarlos en el resultado final.

public static Accumulator operator +(
    Accumulator a1, Accumulator a2)
{
    if (a1.Count == 0) return a2;
    if (a2.Count == 0) return a1;

    long n = a1.Count + a2.Count, n2 = n * n;
    double d = a2.m1 - a1.m1, d2 = d * d;
    double d3 = d2 * d, d4 = d2 * d2;
    double m1 = (a1.Count * a1.m1 + a2.Count * a2.m1) / n;
    double m2 = a1.m2 + a2.m2 + d2 * a1.Count * a2.Count / n;
    double m3 = a1.m3 + a2.m3
        + d3 * a1.Count * a2.Count * (a1.Count - a2.Count) / n2
        + 3 * d * (a1.Count * a2.m2 - a2.Count * a1.m2) / n;
    double m4 = a1.m4 + a2.m4 + d4 * a1.Count * a2.Count
            * (a1.Count * (a1.Count - a2.Count)
                + a2.Count * a2.Count) / (n2 * n)
        + 6 * d2 * (a1.Count * a1.Count * a2.m2
            + a2.Count * a2.Count * a1.m2) / n2
        + 4 * d * (a1.Count * a2.m3 - a2.Count * a1.m3) / n;
    return new() {
        Count = n,
        m1 = m1, m2 = m2, m3 = m3, m4 = m4,
        min = Min(a1.min, a2.min),
        max = Max(a1.max, a2.max),
    };
}

El código es complicado, y es fácil equivocarse copiando y pegando (ya me ha pasado). De todas maneras, es una clase que es fácil de testear.

El primer impulso es dejarlo aquí, y confiar en el paralelismo con tareas para cuando queremos acelerar el código. Mi problema con esto es que, cuando escribo código para una librería, prefiero realizar la aceleración básica con código vectorial (AVX o lo que esté disponible). ¿Por qué? Pues porque por experiencia, el programador que usa luego la biblioteca prefiere tener la opción del paralelismo por tareas para su propio código. Es cierto que las tareas se combinan más o menos bien en .NET, gracias al thread-pool que obtienes del entorno de ejecución, sin esforzarte demasiado; en Java, todo es más complicado con sus malditos executors.

Drum roll, please...

Prefiero, por lo tanto, ganar todo lo que pueda en paralelismo en una librería a golpe de instrucciones SIMD. Y esto es precisamente lo que hacemos en Accumulator con el siguiente método y algunos más que lo llaman:

public unsafe void Add(double* samples, int size)
{
    int i = 0;
    if (Avx.IsSupported && size >= 16)
    {
        var vMin = Vector256.Create(double.PositiveInfinity);
        var vMax = Vector256.Create(double.NegativeInfinity);
        var vM1 = Vector256<double>.Zero;
        var vM2 = Vector256<double>.Zero;
        var vM3 = Vector256<double>.Zero;
        var vM4 = Vector256<double>.Zero;
        var v3 = Vector256.Create(3.0);
        var v4 = Vector256.Create(4.0);
        var v6 = Vector256.Create(6.0);
        long c = 0;
        for (int top = size & CommonMatrix.AVX_MASK;
             i < top; i += 4)
        {
            c++;
            var vSample = Avx.LoadVector256(samples + i);
            vMin = Avx.Min(vMin, vSample);
            vMax = Avx.Max(vMax, vSample);
            var vd = Avx.Subtract(vSample, vM1);
            var vs = Avx.Divide(vd,
                Vector256.Create((double)c));
            var vt = Avx.Multiply(Avx.Multiply(vd, vs),
                Vector256.Create((double)(c - 1)));
            vM1 = Avx.Add(vM1, vs);
            var t1 = Avx.Multiply(Avx.Multiply(vt, vs),
                Vector256.Create((double)(c * (c - 3) + 3)));
            var t2 = Avx.Multiply(Avx.Multiply(vs, vM2), v6);
            var t3 = Avx.Multiply(v4, vM3);
            vM4 = vM4.MultiplyAdd(Avx.Subtract(
                Avx.Add(t1, t2), t3), vs);
            t1 = Avx.Multiply(vt,
                Vector256.Create((double)(c - 2)));
            t2 = Avx.Multiply(vM2, v3);
            vM3 = vM3.MultiplyAdd(Avx.Subtract(t1, t2), vs);
            vM2 = Avx.Add(vM2, vt);
        }
        var acc01 = Mix(c,
            vM1.ToScalar(), vM2.ToScalar(),
            vM3.ToScalar(), vM4.ToScalar(),
            vM1.GetElement(1), vM2.GetElement(1),
            vM3.GetElement(1), vM4.GetElement(1));
        var acc23 = Mix(c,
            vM1.GetElement(2), vM2.GetElement(2),
            vM3.GetElement(2), vM4.GetElement(2),
            vM1.GetElement(3), vM2.GetElement(3),
            vM3.GetElement(3), vM4.GetElement(3));
        var a = Mix(c + c,
            acc01.m1, acc01.m2, acc01.m3, acc01.m4,
            acc23.m1, acc23.m2, acc23.m3, acc23.m4);
        if (Count == 0)
            (Count, m1, m2, m3, m4)
                = (4 * c, a.m1, a.m2, a.m3, a.m4);
        else
        {
            long acCnt = 4 * c, n = Count + acCnt, n2 = n * n;
            double d = a.m1 - m1, d2 = d * d;
            double d3 = d2 * d, d4 = d2 * d2;

            double nm1 = (Count * m1 + acCnt * a.m1) / n;
            double nm2 = m2 + a.m2 + d2 * Count * acCnt / n;
            double nm3 = m3 + a.m3
                + d3 * Count * acCnt * (Count - acCnt) / n2
                + 3 * d * (Count * a.m2 - acCnt * m2) / n;
            m4 += a.m4 + d4 * Count * acCnt
                    * (Count * (Count - acCnt) 
                        + acCnt * acCnt) / (n2 * n)
                + 6 * d2 * (Count * Count * a.m2
                    + acCnt * acCnt * m2) / n2
                + 4 * d * (Count * a.m3 - acCnt * m3) / n;
            (m1, m2, m3, Count) = (nm1, nm2, nm3, n);
        }
        min = Min(min, vMin.Min());
        max = Max(max, vMax.Max());
    }
    for (; i < size; ++i)
        Add(samples[i]);

    static (double m1, double m2, double m3, double m4) Mix(
        long c,
        double a1, double a2, double a3, double a4,
        double b1, double b2, double b3, double b4)
    {
        long n = c + c, n2 = n * n;
        double d = b1 - a1, d2 = d * d, d4 = d2 * d2;
        return (
            (a1 + b1) / 2,
            a2 + b2 + d2 * c / 2,
            a3 + b3 + 3 * d * (b2 - a2) / 2,
            a4 + b4 + d4 * c / 8 + 3 * d2 * (b2 + a2) / 2
               + 2 * d * (b3 - a3));
    }
}

Observe que la precondición para aprovechar las instrucciones vectoriales es tener todo un array de muestras a nuestra disposición. Si nos diesen un IEnumerable<double>, tendríamos que hacer maniobras como materializar las muestras en grupos de cuatro, en un array, y alimentar así al animalito vectorial.

El código es relativamente sencillo, si miramos con atención. La parte AVX prácticamente repite el código del Add escalar. Por cada campo de Accumulator hay un vector de doble precisión. La excepción es la propiedad Count, y la tratamos diferente porque para los cuatro acumuladores virtuales que maneja el método, la cantidad de muestras es siempre la misma.

Esto es una ventaja cuando tenemos que mezclar los resultados de los cuatro acumuladores. La función interna estática Mix aprovecha la igualdad de los contadores para simplificar algebraicamente algunas fórmula. Observe, por ejemplo, que la fórmula para el m3 combinado es más sencilla, al anularse uno de los términos.

Una vez que hemos mezclado los cuatro acumuladores parciales, mezclamos el resultado, a su vez, con los valores que pueda haber ya en el propio acumulador (si los hubiera). Aquí no podemos simplificar tanto, porque los contadores nuevos y antiguos pueden ser muy diferentes, aunque en el caso en el que el acumulador inicial no tuviese muestras, es todo más simple.

Si quiere hacerse una idea de cuánto mejora este tipo de procesamiento vectorial, los benchmarks que he ejecutado me dan casi cinco veces más velocidad. Es extraño, porque yo esperaría una mejora de 4x, pero puede deberse a que aquí hacemos uso de las instrucciones FMA vectoriales, cuando están disponibles. Las instrucciones FMA están escondidas en los métodos de extensión MultiplyAdd que presenté en esta entrada.

Por cierto, la niña de la imagen de la entrada tiene poco que ver con el algoritmo, pero estoy usando imágenes generadas por AI, entre otros motivos, para evitar problemas de derechos de autor. En este caso, le pedí a la AI que generase una niña perdida e indefensa en un universo digital simulado. En parte, la AI me hizo caso; en parte, ignoró la petición. Pero el resultado me gusta, y ahí lo tiene.

Categorías
C#

Entran una matriz y un vector en un bar

… y claro, al rato sale un vector «transformado».

Esta entrada no es, aunque pueda parecerlo, un ripio de la anterior. Algorítmicamente, transformar un vector con una matriz se parece mucho a una sucesión de productos escalares. Pero resulta que el producto escalar, al menos hasta AVX2, tiene su truco. Vamos a comenzar por la implementación más tonta:

public static double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    for (int = 0; i < m; i++)
    {
        double d = 0;
        for (int j = 0; j < n; j++)
            d += a[i, j] * x[j];
        b[i] = d;
    }
    return b;
}

Recordemos que tenemos un «handicap» autoimpuesto por representar las matrices como arrays bidimensionales de C#. Pero esta vez no voy a dar la brasa con los punteros, que ya sabemos que resuelven este problema sin pestañear. Esta es la implementación final que necesitamos, con soporte opcional de AVX para cuando esté disponible y merezca la pena:

public static unsafe double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    int lastBlockIndex = n - (n % 4);
    fixed (double* pA = a)
    fixed (double* pX = x)
    fixed (double* pB = b)
    {
        double* pA1 = pA;
        double* pB1 = pB;
        if (n >= 12 && Avx2.IsSupported)
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                var v = Vector256<double>.Zero;
                while (j < lastBlockIndex)
                {
                    v = Avx.Add(
                        v,
                        Avx.Multiply(
                            Avx.LoadVector256(pA1 + j),
                            Avx.LoadVector256(pX + j)));
                    j += 4;
                }
                v = Avx.HorizontalAdd(v, v);
                double d = v.ToScalar() + v.GetElement(2);
                for (; j < n; j++)
                    d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
        else
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                double d = 0;
                while (j < lastBlockIndex)
                {
                    d += (*(pA1 + j) * *(pX + j)) +
                        (*(pA1 + j + 1) * *(pX + j + 1)) +
                        (*(pA1 + j + 2) * *(pX + j + 2)) +
                        (*(pA1 + j + 3) * *(pX + j + 3));
                    j += 4;
                }
                for (; j < n; j++)
                     d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
    }
    return b;
}

Esta vez, el código SIMD sólo se usa cuando hay doce o más elementos en el vector. La cifra la he elegido experimentando en mi i7-4770. Puede que en otros ordenadores, el umbral sea más bajo incluso.

Tengo que explicar cómo se implementa un producto escalar con SIMD, porque no es muy evidente. Uno diría que hay que acumular un escalar en una variable global al bucle… pero no hay ninguna operación SIMD que calcule directamente la suma de las cuatro multiplicaciones necesarias. La explicación oficial es que una suma de ese tipo destrozaría el paralelismo de la CPU. Y yo me lo creo, de veras. La consecuencia es que necesitamos acumular las multiplicaciones en cuatro variables; es decir, en un vector que hace de acumulador.

Las cosas se ponen de color hormiga cuando terminamos el bucle y tenemos entonces que sumar los cuatro elementos del vector acumulador. Analicemos las líneas 27 y 28 del listado anterior. Según mis experimentos, es la forma más rápida de conseguirlo. HorizontalAdd, cuando se trata de Vector256<double>, suma el primer elemento con el segundo, y lo almacena por partida doble en el primer y segundo elemento. A la vez, suma el tercero y el cuarto y hace lo mismo para guardar el resultado. Los métodos de extensión ToScalar() y GetElement() acceden entonces directamente al primer y tercer elemento y los suma. Mantengo la llamada inicial a HorizontalAdd porque, teóricamente, puede hacer dos de las sumas en paralelo, pero puedes experimentar a ver qué pasa si accedes directamente a los cuatro elementos y los sumas como toda la vida. A mí ya se me ha acabado la partida de tiempo libre para este experimento.

La razón para la controversia es que, en realidad, Internet está lleno de recomendaciones para hacer esta suma final de esta otra manera:

v = Avx2.Permute4x64(
    Avx.HorizontalAdd(v, v),
    0b00_10_01_11);
double d = Avx.HorizontalAdd(v, v).ToScalar();
// v = Avx.HorizontalAdd(v, v);
// double d = v.ToScalar() + v.GetElement(2);

Es decir: se llama dos veces a HorizontalAdd, pasando entre medias por una permutación entre escalares. En la arquitectura Haswell, al menos, esto funciona más lento que mi solución.

Si multiplico una matriz aleatoria de 64×64 por un vector de 64 elementos, obtengo estas cifras:

Method Mean Error StdDev Median
MultVector 5.762 μs 0.1142 μs 0.2227 μs 5.646 μs
FMultVector 1.814 μs 0.0320 μs 0.0416 μs 1.818 μs

No está mal, aunque no conseguimos tanta ventaja como con la multiplicación entre matrices. La versión con punteros y sin SIMD tampoco va mal, pero queda muy claro que el SIMD acelera este código. De paso, ya tenemos un patrón de código para productos escalares (y para cosas más raras como multiplicar un vector de sensibilidad delta-gamma por un escenario histórico: cosas de la valoración de productos financieros).

Por cierto, el mejor chiste que conozco sobre gente que entra en un bar tiene que ver con la Mecánica Cuántica. Dice así: entra el Gato de Schrödinger en un bar… y no entra.

Categorías
C#

Multiplicación de matrices

Supongamos que queremos multiplicar un par de matrices, $A$ y $B$. Digamos que la primera tiene dimensiones $m\times n$ y que la segunda es $n\times p$. La coincidencia entre columnas de la primera y filas de la segunda es condición necesaria para que podamos multiplicarlas.

Si me piden que escriba de carrerilla un método para esta multiplicación, esto es lo que se me ocurre:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] result = new double[m, p];
    for (int i = 0; i < m; i++)
        for (int j = 0; j < p; j++)
        {
            double d = 0;
            for (int k = 0; k < n; k++)
                d += a[i, k] * b[k, j];
            result[i, j] = d;
        }
    return result;
}

He utilizado matrices bidimensionales de C# porque acceder a sus elementos individuales es sencillo. Internamente, C# las almacena en una sola memoria contigua de memoria, fila por fila.

El código que he mostrado no es una maravilla. Para empezar, cada vez que decimos algo como a[i, k], el compilador tiene que multiplicar la variable i por el número de columnas y por los ocho bytes que tiene un flotante de doble precisión. Hacerlo una vez no es problema… pero tenemos tres bucles anidados. Eso tiene que doler. Si en vez de C# escribiésemos esto en C++, el compilador podría sustituir un montón de multiplicaciones por sumas. RyuJIT ha mejorado muchísimo, pero no tanto.

C#, además, es un lenguaje mucho más seguro que C++, pero esta seguridad nos cuesta un montón de verificaciones de rango para poder indexar. Recordemos, además, que cada acceso necesita dos índices.

Y hay un tercer problema, mucho más sutil: cuando las matrices son grandes, el código anterior machaca la caché de la CPU sin piedad. Toma un folio de papel y haz el experimento: dibuja dos matrices, y ve numerando las celdas siguiendo el orden en que las usa el algoritmo.

La clase Unsafe

Llegados a este punto, tenemos dos alternativas: o marcamos el método como unsafe y usamos directamente punteros de C#, o intentamos evitarlo haciendo uso de la clase Unsafe, de System.Runtime.CompilerServices. Vamos a comenzar por esta última. De paso, voy a invertir el orden de los dos bucles más internos, para ver qué conseguimos con ello. Este es el código modificado, y suele funcionar el doble de rápido, o un poco más:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c= new double[m, p];
    ref double rA = ref a[0, 0];
    ref double rB = ref b[0, 0];
    ref double rC = ref c[0, 0];
    for (int i = 0; i < m; i++)
    {
        ref double rAi = ref Unsafe.Add(ref rA, i * n);
        ref double rCi = ref Unsafe.Add(ref rC, i * n);
        for (int k = 0; k < n; k++)
        {
            double d = Unsafe.Add(ref rAi, k);
            int kp = k * p;
            for (int j = 0; j < p; j++)
                Unsafe.Add(ref rCi, j) +=
                    d * Unsafe.Add(ref rB, kp + j);
        }
    }
    return c;
}

La regla principal del uso de Unsafe.Add es que si inicializamos así:

ref double rA = ref a[0, 0];

entonces el acceso a a[i, j] debe parecerse a esto:

Unsafe.Add(ref rA, i * n + j) = 42;

Esa multiplicación es un problema del que ya advertimos. En nuestro código lo paliamos moviendo la multiplicación al inicio del bucle donde se le da valor al índice de la fila. Mi apaño no es la palabra definitiva: le dejo como ejercicio la eliminación total de esas multiplicaciones.

Ahora hay que prestar atención, sobre todo, al patrón de acceso a memoria que se produce en el bucle más interno. En el algoritmo inicial, acumulábamos todos los términos de un elemento de la matriz final en el bucle interno, y asignábamos su suma de golpe a la celda del resultado. Esta variante, sin embargo, no parece tan buena. Tenemos que asumir que, al reservar memoria para la matriz, todas sus entradas valen cero (y es así). Luego, cada celda del resultado se va rellenando por pasos, no de una vez. Puede que esto sea bueno para la caché de la CPU, pero no me queda tan claro que sea bueno para el compilador de C#.

Pero lo que nos interesa realmente es que ahora ejecutamos el siguiente patrón de cálculo:

  1. Tenemos dos zonas de memoria consecutiva.
  2. Leemos algo de la primera zona.
  3. Lo transformamos como sea.
  4. Lo asignamos a la celda equivalente en la segunda zona de memoria.

Instrucciones SIMD

Ese patrón de actividad es el típico algoritmo «vectorial» que podemos acelerar utilizando operaciones SIMD. Tenemos dos opciones:

  • Utilizar System.Numerics.Vector, que se adapta automáticamente a cualquier máquina que soporte SIMD, e incluso ofrece una alternativa cuando no existe ese soporte. Este tipo funciona también para .NET Framework, a través de un paquete.
  • Si podemos usar .NET Core 3.1, podemos ir directamente a las clases declaradas en System.Runtime.Intrinsics y System.Runtime.Intrinsics.X86. Es un poco más complicado y no está bien documentado, pero da resultados ligeramente mejores.

Vamos a ir directamente por la segunda vía. Vamos a optimizar las CPUs que soporten el conjunto de instrucciones AVX, haremos algo más en el caso en que soporte el conjunto FMA (que mezcla multiplicaciones y sumas en una misma operación) y, de todas maneras, habilitaremos código de respaldo para cuando el procesador no soporte SIMD.

Cuando hay soporte para instrucciones AVX, podemos procesar hasta cuatro variables de tipo double de una tacada. Para ello tenemos que utilizar el tipo de estructura Vector256, que tiene capacidad para cuatro elementos. La forma más sencilla de inicializar estos vectores es utilizando punteros, por lo que vamos a tener que declarar nuestro método unsafe y pasarnos directamente a los punteros.

public static unsafe double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c = new double[m, p];
    int lastBlockIndex = p - (p % 4);
    fixed (double* pA = a)
    fixed (double* pB = b)
    fixed (double* pC = c)
    {
        double* pAi = pA;
        double* pCi = pC;
        for (int i = 0; i < m; i++)
        {
            double* pBk = pB;
            for (int k = 0; k < n; k++)
            {
                double d = *(pAi + k);
                if (Avx.IsSupported)
                {
                    int j = 0;
                    var vd = Vector256.Create(d);
                    while (j < lastBlockIndex)
                    {
                        if (Fma.IsSupported)
                            Avx.Store(pCi + j,
                                Fma.MultiplyAdd(
                                Avx.LoadVector256(pBk + j),
                                vd,
                                Avx.LoadVector256(pCi + j)));
                        else
                            Avx.Store(pCi + j,
                                Avx.Add(
                                Avx.LoadVector256(pCi + j),
                                Avx.Multiply(
                                Avx.LoadVector256(pBk + j),
                                vd)));
                        j += 4;
                    }
                    while (j < p)
                    {
                        pCi[j] += d * pBk[j];
                        j++;
                    }
                }
                else
                {
                    for (int j = 0; j < p; j++)
                        pCi[j] += d * pBk[j];
                }
                pBk += p;
            }
            pAi += n;
            pCi += p;
        }
    }
    return c;
}

Observaciones:

  1. Lo peor de trabajar con SIMD es tener que lidiar con vectores que no son múltiplos exactos del tamaño del vector básico. Nuestros vectores básicos tienen cuatro elementos. Si tenemos un vector de 75 elementos, necesitaremos un bucle de 18 repeticiones que procese cuatro elementos por vez, para una mierdecilla de bucle final que maneje los 3 elementos que nos sobran.
  2. Aunque la llamada a Avx.IsSupported está metida dentro de dos bucles anidados, no se preocupe: el compilador JIT la trata como una constante en tiempo de generación de código nativo, y no cuesta nada. Si no se soporta AVX, el compilador JIT solamente genera el código de la cláusula else, que funciona sobre cualquier arquitectura.
  3. Ojo: ese código «para cualquier máquina» podría optimizarse echando mano de la técnica de loop unrolling. Pero mi política en estos casos es: si no tienes una máquina decente, jódete.
  4. En el ejemplo anterior, cuando intercambiamos el orden de los bucles más internos, teníamos un valor escalar que sacábamos fuera del tercer bucle. Pero SIMD no ofrece instrucciones para multiplicar un vector por un escalar: tenemos que convertir ese escalar en todo un vector y utilizar la instrucción de multiplicación más general. No es grave, de todos modos.
  5. Si, además de AVX, la máquina soporta el conjunto FMA de instrucciones, podemos utilizar el método MultiplyAdd para acelerar un poco el algoritmo. Pero con esto hay que tener cuidado: a * b + c puede dar resultados diferentes si se hacen las dos operaciones por separado o a la vez. Si se hacen a la vez, aumenta la exactitud de la operación al existir menos redondeos. Pero el efecto secundario es que los cálculos con y sin esa opción dan resultados ligeramente diferentes. Tenemos que decidir cuándo es aceptable que exista esa diferencia y cuándo no. En cualquier caso, tengamos presente que el resultado de MultiplyAdd es más preciso.

Benchmark.NET

Para estar seguro de las ganancias en velocidad, he utilizado el package Benchmark.NET para generar las pruebas. Estos son los resultados:

Method Mean Error StdDev
MultMatrix 4,482.3 μs 88.75 μs 138.17 μs
UMultMatrix 1,895.2 μs 37.87 μs 63.26 μs
FMultMatrix 506.3 μs 3.44 μs 2.87 μs

La mejora por el uso de SIMD es cerca de cuatro veces, porque es el número de operaciones simultáneas que permite esta arquitectura en particular. Con AVX512 tendríamos vectores de ocho valores, pero necesitaríamos procesadores mucho más modernos, y de momento .NET Core no lo soporta.

Para esta prueba, he utilizado matrices de 128×128. He probado también con matrices de 8×8 e incluso de 4×4. La ganancia no es tan espectacular, pero en total se consigue una cuarta parte del tiempo de ejecución respecto al algoritmo más sencillo.