Categorías
C#

Entran una matriz y un vector en un bar

… y claro, al rato sale un vector «transformado».

Esta entrada no es, aunque pueda parecerlo, un ripio de la anterior. Algorítmicamente, transformar un vector con una matriz se parece mucho a una sucesión de productos escalares. Pero resulta que el producto escalar, al menos hasta AVX2, tiene su truco. Vamos a comenzar por la implementación más tonta:

public static double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    for (int = 0; i < m; i++)
    {
        double d = 0;
        for (int j = 0; j < n; j++)
            d += a[i, j] * x[j];
        b[i] = d;
    }
    return b;
}

Recordemos que tenemos un «handicap» autoimpuesto por representar las matrices como arrays bidimensionales de C#. Pero esta vez no voy a dar la brasa con los punteros, que ya sabemos que resuelven este problema sin pestañear. Esta es la implementación final que necesitamos, con soporte opcional de AVX para cuando esté disponible y merezca la pena:

public static unsafe double[] Mult(double[,] a, double[] x)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    double[] b = new double[m];
    int lastBlockIndex = n - (n % 4);
    fixed (double* pA = a)
    fixed (double* pX = x)
    fixed (double* pB = b)
    {
        double* pA1 = pA;
        double* pB1 = pB;
        if (n >= 12 && Avx2.IsSupported)
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                var v = Vector256<double>.Zero;
                while (j < lastBlockIndex)
                {
                    v = Avx.Add(
                        v,
                        Avx.Multiply(
                            Avx.LoadVector256(pA1 + j),
                            Avx.LoadVector256(pX + j)));
                    j += 4;
                }
                v = Avx.HorizontalAdd(v, v);
                double d = v.ToScalar() + v.GetElement(2);
                for (; j < n; j++)
                    d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
        else
            for (int i = 0; i < m; i++)
            {
                int j = 0;
                double d = 0;
                while (j < lastBlockIndex)
                {
                    d += (*(pA1 + j) * *(pX + j)) +
                        (*(pA1 + j + 1) * *(pX + j + 1)) +
                        (*(pA1 + j + 2) * *(pX + j + 2)) +
                        (*(pA1 + j + 3) * *(pX + j + 3));
                    j += 4;
                }
                for (; j < n; j++)
                     d += pA1[j] * pX[j];
                *pB1 = d;
                pA1 += n;
                pB1++;
            }
    }
    return b;
}

Esta vez, el código SIMD sólo se usa cuando hay doce o más elementos en el vector. La cifra la he elegido experimentando en mi i7-4770. Puede que en otros ordenadores, el umbral sea más bajo incluso.

Tengo que explicar cómo se implementa un producto escalar con SIMD, porque no es muy evidente. Uno diría que hay que acumular un escalar en una variable global al bucle… pero no hay ninguna operación SIMD que calcule directamente la suma de las cuatro multiplicaciones necesarias. La explicación oficial es que una suma de ese tipo destrozaría el paralelismo de la CPU. Y yo me lo creo, de veras. La consecuencia es que necesitamos acumular las multiplicaciones en cuatro variables; es decir, en un vector que hace de acumulador.

Las cosas se ponen de color hormiga cuando terminamos el bucle y tenemos entonces que sumar los cuatro elementos del vector acumulador. Analicemos las líneas 27 y 28 del listado anterior. Según mis experimentos, es la forma más rápida de conseguirlo. HorizontalAdd, cuando se trata de Vector256<double>, suma el primer elemento con el segundo, y lo almacena por partida doble en el primer y segundo elemento. A la vez, suma el tercero y el cuarto y hace lo mismo para guardar el resultado. Los métodos de extensión ToScalar() y GetElement() acceden entonces directamente al primer y tercer elemento y los suma. Mantengo la llamada inicial a HorizontalAdd porque, teóricamente, puede hacer dos de las sumas en paralelo, pero puedes experimentar a ver qué pasa si accedes directamente a los cuatro elementos y los sumas como toda la vida. A mí ya se me ha acabado la partida de tiempo libre para este experimento.

La razón para la controversia es que, en realidad, Internet está lleno de recomendaciones para hacer esta suma final de esta otra manera:

v = Avx2.Permute4x64(
    Avx.HorizontalAdd(v, v),
    0b00_10_01_11);
double d = Avx.HorizontalAdd(v, v).ToScalar();
// v = Avx.HorizontalAdd(v, v);
// double d = v.ToScalar() + v.GetElement(2);

Es decir: se llama dos veces a HorizontalAdd, pasando entre medias por una permutación entre escalares. En la arquitectura Haswell, al menos, esto funciona más lento que mi solución.

Si multiplico una matriz aleatoria de 64×64 por un vector de 64 elementos, obtengo estas cifras:

Method Mean Error StdDev Median
MultVector 5.762 μs 0.1142 μs 0.2227 μs 5.646 μs
FMultVector 1.814 μs 0.0320 μs 0.0416 μs 1.818 μs

No está mal, aunque no conseguimos tanta ventaja como con la multiplicación entre matrices. La versión con punteros y sin SIMD tampoco va mal, pero queda muy claro que el SIMD acelera este código. De paso, ya tenemos un patrón de código para productos escalares (y para cosas más raras como multiplicar un vector de sensibilidad delta-gamma por un escenario histórico: cosas de la valoración de productos financieros).

Por cierto, el mejor chiste que conozco sobre gente que entra en un bar tiene que ver con la Mecánica Cuántica. Dice así: entra el Gato de Schrödinger en un bar… y no entra.

Categorías
C#

Multiplicación de matrices

Supongamos que queremos multiplicar un par de matrices, $A$ y $B$. Digamos que la primera tiene dimensiones $m\times n$ y que la segunda es $n\times p$. La coincidencia entre columnas de la primera y filas de la segunda es condición necesaria para que podamos multiplicarlas.

Si me piden que escriba de carrerilla un método para esta multiplicación, esto es lo que se me ocurre:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] result = new double[m, p];
    for (int i = 0; i < m; i++)
        for (int j = 0; j < p; j++)
        {
            double d = 0;
            for (int k = 0; k < n; k++)
                d += a[i, k] * b[k, j];
            result[i, j] = d;
        }
    return result;
}

He utilizado matrices bidimensionales de C# porque acceder a sus elementos individuales es sencillo. Internamente, C# las almacena en una sola memoria contigua de memoria, fila por fila.

El código que he mostrado no es una maravilla. Para empezar, cada vez que decimos algo como a[i, k], el compilador tiene que multiplicar la variable i por el número de columnas y por los ocho bytes que tiene un flotante de doble precisión. Hacerlo una vez no es problema… pero tenemos tres bucles anidados. Eso tiene que doler. Si en vez de C# escribiésemos esto en C++, el compilador podría sustituir un montón de multiplicaciones por sumas. RyuJIT ha mejorado muchísimo, pero no tanto.

C#, además, es un lenguaje mucho más seguro que C++, pero esta seguridad nos cuesta un montón de verificaciones de rango para poder indexar. Recordemos, además, que cada acceso necesita dos índices.

Y hay un tercer problema, mucho más sutil: cuando las matrices son grandes, el código anterior machaca la caché de la CPU sin piedad. Toma un folio de papel y haz el experimento: dibuja dos matrices, y ve numerando las celdas siguiendo el orden en que las usa el algoritmo.

La clase Unsafe

Llegados a este punto, tenemos dos alternativas: o marcamos el método como unsafe y usamos directamente punteros de C#, o intentamos evitarlo haciendo uso de la clase Unsafe, de System.Runtime.CompilerServices. Vamos a comenzar por esta última. De paso, voy a invertir el orden de los dos bucles más internos, para ver qué conseguimos con ello. Este es el código modificado, y suele funcionar el doble de rápido, o un poco más:

public static double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c= new double[m, p];
    ref double rA = ref a[0, 0];
    ref double rB = ref b[0, 0];
    ref double rC = ref c[0, 0];
    for (int i = 0; i < m; i++)
    {
        ref double rAi = ref Unsafe.Add(ref rA, i * n);
        ref double rCi = ref Unsafe.Add(ref rC, i * n);
        for (int k = 0; k < n; k++)
        {
            double d = Unsafe.Add(ref rAi, k);
            int kp = k * p;
            for (int j = 0; j < p; j++)
                Unsafe.Add(ref rCi, j) +=
                    d * Unsafe.Add(ref rB, kp + j);
        }
    }
    return c;
}

La regla principal del uso de Unsafe.Add es que si inicializamos así:

ref double rA = ref a[0, 0];

entonces el acceso a a[i, j] debe parecerse a esto:

Unsafe.Add(ref rA, i * n + j) = 42;

Esa multiplicación es un problema del que ya advertimos. En nuestro código lo paliamos moviendo la multiplicación al inicio del bucle donde se le da valor al índice de la fila. Mi apaño no es la palabra definitiva: le dejo como ejercicio la eliminación total de esas multiplicaciones.

Ahora hay que prestar atención, sobre todo, al patrón de acceso a memoria que se produce en el bucle más interno. En el algoritmo inicial, acumulábamos todos los términos de un elemento de la matriz final en el bucle interno, y asignábamos su suma de golpe a la celda del resultado. Esta variante, sin embargo, no parece tan buena. Tenemos que asumir que, al reservar memoria para la matriz, todas sus entradas valen cero (y es así). Luego, cada celda del resultado se va rellenando por pasos, no de una vez. Puede que esto sea bueno para la caché de la CPU, pero no me queda tan claro que sea bueno para el compilador de C#.

Pero lo que nos interesa realmente es que ahora ejecutamos el siguiente patrón de cálculo:

  1. Tenemos dos zonas de memoria consecutiva.
  2. Leemos algo de la primera zona.
  3. Lo transformamos como sea.
  4. Lo asignamos a la celda equivalente en la segunda zona de memoria.

Instrucciones SIMD

Ese patrón de actividad es el típico algoritmo «vectorial» que podemos acelerar utilizando operaciones SIMD. Tenemos dos opciones:

  • Utilizar System.Numerics.Vector, que se adapta automáticamente a cualquier máquina que soporte SIMD, e incluso ofrece una alternativa cuando no existe ese soporte. Este tipo funciona también para .NET Framework, a través de un paquete.
  • Si podemos usar .NET Core 3.1, podemos ir directamente a las clases declaradas en System.Runtime.Intrinsics y System.Runtime.Intrinsics.X86. Es un poco más complicado y no está bien documentado, pero da resultados ligeramente mejores.

Vamos a ir directamente por la segunda vía. Vamos a optimizar las CPUs que soporten el conjunto de instrucciones AVX, haremos algo más en el caso en que soporte el conjunto FMA (que mezcla multiplicaciones y sumas en una misma operación) y, de todas maneras, habilitaremos código de respaldo para cuando el procesador no soporte SIMD.

Cuando hay soporte para instrucciones AVX, podemos procesar hasta cuatro variables de tipo double de una tacada. Para ello tenemos que utilizar el tipo de estructura Vector256, que tiene capacidad para cuatro elementos. La forma más sencilla de inicializar estos vectores es utilizando punteros, por lo que vamos a tener que declarar nuestro método unsafe y pasarnos directamente a los punteros.

public static unsafe double[,] Mult(double[,] a, double[,] b)
{
    int m = a.GetLength(0);
    int n = a.GetLength(1);
    int p = b.GetLength(1);
    double[,] c = new double[m, p];
    int lastBlockIndex = p - (p % 4);
    fixed (double* pA = a)
    fixed (double* pB = b)
    fixed (double* pC = c)
    {
        double* pAi = pA;
        double* pCi = pC;
        for (int i = 0; i < m; i++)
        {
            double* pBk = pB;
            for (int k = 0; k < n; k++)
            {
                double d = *(pAi + k);
                if (Avx.IsSupported)
                {
                    int j = 0;
                    var vd = Vector256.Create(d);
                    while (j < lastBlockIndex)
                    {
                        if (Fma.IsSupported)
                            Avx.Store(pCi + j,
                                Fma.MultiplyAdd(
                                Avx.LoadVector256(pBk + j),
                                vd,
                                Avx.LoadVector256(pCi + j)));
                        else
                            Avx.Store(pCi + j,
                                Avx.Add(
                                Avx.LoadVector256(pCi + j),
                                Avx.Multiply(
                                Avx.LoadVector256(pBk + j),
                                vd)));
                        j += 4;
                    }
                    while (j < p)
                    {
                        pCi[j] += d * pBk[j];
                        j++;
                    }
                }
                else
                {
                    for (int j = 0; j < p; j++)
                        pCi[j] += d * pBk[j];
                }
                pBk += p;
            }
            pAi += n;
            pCi += p;
        }
    }
    return c;
}

Observaciones:

  1. Lo peor de trabajar con SIMD es tener que lidiar con vectores que no son múltiplos exactos del tamaño del vector básico. Nuestros vectores básicos tienen cuatro elementos. Si tenemos un vector de 75 elementos, necesitaremos un bucle de 18 repeticiones que procese cuatro elementos por vez, para una mierdecilla de bucle final que maneje los 3 elementos que nos sobran.
  2. Aunque la llamada a Avx.IsSupported está metida dentro de dos bucles anidados, no se preocupe: el compilador JIT la trata como una constante en tiempo de generación de código nativo, y no cuesta nada. Si no se soporta AVX, el compilador JIT solamente genera el código de la cláusula else, que funciona sobre cualquier arquitectura.
  3. Ojo: ese código «para cualquier máquina» podría optimizarse echando mano de la técnica de loop unrolling. Pero mi política en estos casos es: si no tienes una máquina decente, jódete.
  4. En el ejemplo anterior, cuando intercambiamos el orden de los bucles más internos, teníamos un valor escalar que sacábamos fuera del tercer bucle. Pero SIMD no ofrece instrucciones para multiplicar un vector por un escalar: tenemos que convertir ese escalar en todo un vector y utilizar la instrucción de multiplicación más general. No es grave, de todos modos.
  5. Si, además de AVX, la máquina soporta el conjunto FMA de instrucciones, podemos utilizar el método MultiplyAdd para acelerar un poco el algoritmo. Pero con esto hay que tener cuidado: a * b + c puede dar resultados diferentes si se hacen las dos operaciones por separado o a la vez. Si se hacen a la vez, aumenta la exactitud de la operación al existir menos redondeos. Pero el efecto secundario es que los cálculos con y sin esa opción dan resultados ligeramente diferentes. Tenemos que decidir cuándo es aceptable que exista esa diferencia y cuándo no. En cualquier caso, tengamos presente que el resultado de MultiplyAdd es más preciso.

Benchmark.NET

Para estar seguro de las ganancias en velocidad, he utilizado el package Benchmark.NET para generar las pruebas. Estos son los resultados:

Method Mean Error StdDev
MultMatrix 4,482.3 μs 88.75 μs 138.17 μs
UMultMatrix 1,895.2 μs 37.87 μs 63.26 μs
FMultMatrix 506.3 μs 3.44 μs 2.87 μs

La mejora por el uso de SIMD es cerca de cuatro veces, porque es el número de operaciones simultáneas que permite esta arquitectura en particular. Con AVX512 tendríamos vectores de ocho valores, pero necesitaríamos procesadores mucho más modernos, y de momento .NET Core no lo soporta.

Para esta prueba, he utilizado matrices de 128×128. He probado también con matrices de 8×8 e incluso de 4×4. La ganancia no es tan espectacular, pero en total se consigue una cuarta parte del tiempo de ejecución respecto al algoritmo más sencillo.