Etiquetas

, , , ,

Este artículo es el primer manuscrito del capítulo Soluciones por hardware del libro que estoy escribiendo Principios de concurrencia para programadores. Todo el código de los ejemplos están en Github.

Soluciones por hardware

Hasta ahora hemos visto soluciones al problema de la exclusión mutua sin soporte de hardware y con solo registros de lectura-escritura atómicos, todos ellos son spinlocks pero muy ineficientes. Por un lado por el consumo de memoria, tanto para la solución de dos procesos (Dekker y Peterson) como para N procesos (Panadería) el número de registros necesarios es proporcional al número de procesos máximos que sincronizarán[31]. Esta necesidad de memoria impone una sobrecarga importante para mantener la consistencia de la memoria caché, además del consumo de CPU por la espera activa todos los procesos involucrados tienen que acceder al mismo rango de memoria que los demás. Es una penalización muy importante para sistemas con varios procesadores o núcleos. Por otro lado, si se tiene un único procesador el avance es tan lento que lo que tarda décimas en uno puede tomar horas en otros[32] en el caso de que exista mucha competencia (contention) porque varios procesos desean entrar a la sección crítica de forma casi simultánea.

Desde el inicio se buscaron soluciones por hardware que permitiesen implementar algoritmos de sincronización de forma mucho más eficiente.

Deshabilitar interrupciones

Si el problema fundamental es el intercalado de instrucciones generado por las interrupciones al procesador ¿por qué no deshabilitarlas?. Aunque es una solución que se usa en casos muy específicos en sistemas operativos[33] no es una solución genérica segura: si los procesos pueden deshabilitar las interrupciones entonces pueden tomar control del sistema[34]. Por lo tanto no es una solución válida para procesos de usuario.

Aún al nivel del núcleo del sistema operativo presenta dificultades: la complicación de deshabilitar todas las interrupciones en todos los procesadores y el riesgo de perderlas. Por esta dificultad de deshabilitar las interrupciones en varios procesadores simultáneamente tampoco es una solución para asegurar exclusión mutua dado que un registro puede ser modificado por un procesador diferente sin que haya habido intercalación en el mismo procesador.

Sincronización por hardware

Por ello se buscaron otras alternativas que en general son conocidas como primitivas de sincronización de hardware. Son instrucciones del procesador que leen y modifican el valor de uno o varios registros sin ser interrumpidas y asegurando la coherencia de caché (introducen barreras de memoria). Hay muchas variantes de estas instrucciones, los fabricantes diseñaron e implementaron sus propias instrucciones. No es fácil decidir cuál de ellas es la que implementarán en silicio, casi todas las arquitecturas ofrecen un conjunto variado: getAndAdd, testAndSet (TAS), swap, compareAndSwap (CAS), etc.

Pero antes de estudiarlas estudiaremos las definiciones de registros que proporcionen modelos simples y genéricos de consistencia por encima de las interrupciones o mecanismos de consistencia de caché.

Tipos de registros

El término registro no se refiere solo a los registros del procesador. Puede involucrar a zonas de memoria RAM, memoria caché o en general a un objeto compartido. A nivel de hardware los hilos se comunican leyendo y escribiendo en memoria compartida, desde ese punto de vista la comunicación se hace vía registros de lectura-escritura. Un registro de este tipo encapsula dos métodos, read y store. O get y write cuando nos referimos a objetos compartidos en general y que pueden ser implementados en lenguajes de alto nivel.

Registros seguros

Los registros con las propiedades de consistencia más débiles son los registros seguros, su nombre es un mero accidente histórico, son muy inseguros ([Herlihy12]). Este tipo de registros solo aseguran que se retorna el último valor escrito si no hay operaciones get y write concurrentes, caso contrario podrá devolver cualquier valor aceptable en el rango del registro. Por ejemplo, si se trata de un único byte y haya procesos concurrentes que escriben solo 0 o 1 el get de un registro seguro podría devolver cualquier valor entre 0 y 255 ya que todos están en el rango de 8 bits.

Registros regulares

Una alternativa intermedia son los registros regulares donde una operación de lectura concurrente con escrituras puede retornar alguno de los valores que están siendo escritos. En un caso similar al anterior, el get de este tipo de registros solo devolvería 0 o 1. Se dice que estos registros solo aseguran consistencia por inactividad (quiescent consistency) ya que solo aseguran retornar el mismo valor después de un período de inactividad entre el último write y el siguiente get.

Registros atómicos

Los registros de procesadores modernos que usamos para la implementación de los algoritmos de exclusión mutua son registros atómicos, generalmente se tratan de registros de un único bytes, una palabra, enteros de varios bytes, o referencias a memoria u objetos que cumplen las siguientes dos condiciones ([Lamport2]):

  1. El método get retorna el último valor escrito, si una lectura retorna un valor y no hubo otra escritura intermedia la siguiente lectura será el mismo valor.
  2. Si hay varios write concurrentes el valor que retornará el siguiente get es uno de los valores del write no un rango de posibles valores. Por ejemplo, si un write concurrente es el número 0 y otro es el número 1000, el get retornará 0 o 1000.

Primitivas de hardware y registros Read-Modify-Write

Como hemos observado en la implementación del algoritmo de la panadería, el requerimiento de registros atómicos para solucionar los problemas de exclusión mutua crece linealmente con el número de hilos o procesos involucrados. Tiene otros problemas, son spinlocks sobre muchos registros que introducen mucha presión sobre grandes zonas de la caché, son muy ineficientes. El otro problema hemos visto es que obliga a la especificación manual -no siempre sencilla ni eficiente- para asegurar la consistencia secuencial.

Por ello no es de extrañar que desde el principio se introdujeron instrucciones atómicas del procesador que permitieron implementar algoritmos de sincronización más sencillos y eficientes. Este tipo de operaciones pueden ser expresadas como construcciones de registros denominados genéricamente registros Read-Modify-Write o RMW. Los registros RMW son interesantes porque pueden implementarse en construcciones de lenguajes o como instrucciones de procesador, sus propiedades son similares.

REGISTROS RMW NO TRIVIALES

Los registros RMW que proveen operaciones adicionales a la función identidad se denominan no triviales. Maurice Herlihy demostró que los registros no triviales tienen un consenso de al menos 2 (o son declase 2). Todas las instrucciones mencionadas anterioremente (a excepción de la identidad, obviamente) implementan registros RMW no triviales.

De todas las implementaciones, compareAndSwap (CAS) es la más potente. Herlihy también demostró que pertenece a la clase de consenso N (o infinito). Este tipo de instrucciones son, en palabras del autor,a la computación asincrónica el equivalente de las máquinas de Turing de la computación secuencial. Afortunadamente la mayoría de procesadores implementan CAS o la similar load-linked/store-conditional (o LL/SC, disponible en las arquitecturas PowerPC, MIPS, Alpha y ARM).

Hay varios tipos de instrucciones que implementan registros RMW:

  • get: Retorna el valor del registro, se denomina también función identidad.
  • getAndSet: Asigna un nuevo valor al registro y retorna el anterior.
  • getAndAdd: Incrementa el valor del registro en el valor indicado y retorna el valor anterior.
  • testAndSet: verifica el valor del registro, si es cero asigna el nuyevo valor (habitualmente 1, por ejemplo en IBM 360 el registro es binario o booleano, solo admite 0 y 1).
  • swap: Intercambia el valor de dos registros.
  • compareAndSwap: Compara el valor del registro con otro, si son iguales asigna un nuevo y retorna el anterior.

Los fabricantes proveen conjuntos de operaciones diferentes, para facilitar la programación y portabilidad los compiladores proveen macros que generan la secuencias de instrucciones que implementan o simulan las operaciones en cada arquitectura. Por ejemplo GCC tenía los macros __sync_* que en las últimas versiones fueron reemplazados por nuevos que están más cercanos al modelo de memoria del C11 y C++11.

IMPORTANTE

A partir de aquí usaremos las primitivas atómicas de las últimas versiones de GCC. Para probar los ejemplos en C aseguraros de tener versiones nuevas del GCC, estos ejemplos fueron probados con la versión 4.9.

getAndSet

Usaremos una variable global mutex que estará inicializada a cero que indica que no hay procesos en la sección crítica. En la entrada de la sección crítica se almacena 1 y se verifica si el valor anterior era 0 (es decir, no había ningún proceso en la sección crítica). Si era diferente a cero esperará hasta que lo sea.

La función lock() es la entrada a la sección crítica y unlock() la salida.

        mutex = 0

def lock():
    while getAndSet(mutex, 1) != 0:
        pass

def unlock():
    mutex = 0

En “getAndSet” está el código en C implementado con el macro __atomic_exchange_n. A pesar de su nombre no es la instrucción swap sino un equivalente para getAndSet.

getAndAdd

Se puede implementar exclusión mutua con un algoritmo muy similar al de la panadería, cada proceso obtiene un número y espera a u turno, solo que esta vez la obtención del siguiente número es atómica y por lo tanto no se necesita un array de números ni hacer un bucle de controles adicionales.

Usaremos dos variables, number para el siguiente número y turn para indicar a qué número le corresponde entrar a la sección crítica.

        number = 0
        turn = 0

def lock():
    """ current is a local variable """
    current = getAndAdd(number, 1)
    while current != turn:
        pass

def unlock():
    getAndAdd(turn, 1)

El código en C está implementado con el macro __atomic_fetch_add y en Go con atomic.AddUint32.[35] A diferencia de la implementación con getAndSet esta implementación asegura que no se producen esperas infinitas ya que el número que elige cada proceso es único y creciente, aunque hay que tener en cuenta que el valor de number llegará a un máximo y rotará. Losspinlocks de este tipo son también llamados Ticket locks y son muy usados en el núcleo de Linux, aseguran que no se producen esperas infinitas y que los procesos entran a la sección crítica en orden FIFO (fairness).

testAndSet

La instrucción testAndSet o TAS fue la instrucción más usada para control de concurrencia hasta la década de 1970 cuando fue reemplazada por operaciones que permiten niveles (clase) de consenso más elevados. La implementación consiste de una variable entera binaria (o booleana) que puede tomar valores 0 y 1. La instrucción solo recibe un argumento, la dirección de memoria. Si el valor de la dirección de memoria es 0 le asigna 1 y retorna 1 (o true), caso contrario retorna 0 (o _false).

def testAndSet(register):
    if register == 0:
        register = 1
        return 0

    return 1

La implementación de exclusión mutua con TAS es muy similar a getAndSet:

        mutex = 0

def lock():
    while testAndSet(mutex) == 0:
        pass

def unlock():
    mutex = 0

El código en C está implementado con el macro __atomic_test_and_set.

Swap

Esta instrucción intercambia atómicamente dos posiciones de memoria, usualmente enteros de 32 o 64 bits[36]. El algoritmo de la instrucción es tan sencillo como parece:

def swap(register1, register2):
    tmp = register1
    register1 = register2
    register2 = tmp

El algoritmo de exclusión mutua con swap:

        mutex = 0

def lock():
    local = 1
    while local != 0:
        swap(mutex, local)

def unlock():
    mutex = 0

La implementación en C es con el macro __atomic_exchange de las últimas versiones de GCC. En Go se pueden usar las funciones atómicas implementadas en el paquete sync/atomic, por ejemplo con atomic.SwapInt32 [37].

Compare&Swap

Esta instrucción, o CAS, es la más común[38] y la que provee el mayor nivel de consenso (ver nota Registros RMW no triviales)[39]. La instrucción trabaja con tres valores:

Registro
La dirección de memoria cuyo valores se comparará y asignará un nuevo valor si corresponde.
Nuevo valor
El valor que se asignará al registro.
Valor a comparar
Si el valor del registro es igual a éste entonces se le asignará, caso contrario se copia el valor del registro al valor a comparar.

En la versión moderna[40] de macros atómicos las dos versiones son __atomic_compare_exchange_n y __atomic_compare_exchange_n, ambas retornan un booleano si se pudo hacer el cambio, lo único que cambia es la forma de los parámetros (en el último caso son todos punteros). El algoritmo de estas instrucciones es:

def compareAndSwap(register, expected, desired):
    if registro == expected:
        registro = desired
        return True
    else:
        expected = register
        return False

La implementación de exclusión mutua en C es sencilla, necesitamos una variable local porque hay que pasar un puntero y ambas instrucciones copiarán el valor de mutex a la posición indicada por el puntero:

        mutex = 0

def lock():
    local = 0
    while not compareAndSwap(mutex, local, 1):
        local = 0

def unlock():
    mutex = 0

La instrucción CompareAndSwapInt32 en en Go es algo diferente y más similar al antiguo macro de GCC, los argumentos del valor esperado y el nuevo no se pasan por puntero sino por valor:

func lock() {
    for ! atomic.CompareAndSwapInt32(&mutex, 0, 1) {}
}

El problema ABA

CAS tiene un problema conocido, el problema ABA. Aunque no se presenta en casos sencillos como el de exclusión mutua sino en casos de intercalados donde un proceso lee el valor A y cede la CPU a otro proceso, otro modifica el registro con el valor B y vuelve a poner el mismo valor A antes que el primero se vuelva a ejecutar. Éste ejecutará la instrucción CAS sin habernotado el cambio.

Un caso práctico: tenemos implementada una pila de estructuras node, es simplemente un puntero al siguiente elemento (next) y una estructura que guarda los datos (o payload, su estructura interna nos es irrelevante):

struct node {
    struct node *next;
    struct node_data data;
};

Las funciones push y pop agregan y quitan elementos de la pila. Push recibe como argumentos el puntero a la variable cabecera de la pila y el puntero al nodo a añadir. Pop solo recibe el puntero a la cabeza de la pila y devuelve el puntero al primer elemento de la pila o NULL si está vacía. A continuación el código en C simplificado de ambas funciones.

void push(struct node **head, struct node *e) {
    e->next = *head;     ❶
    while (! CAS(head, &e->next, &e); ❷
}


struct node *pop(struct node **head) {
    struct node *result, *orig;

    orig = *head;        ❸
    do {
        if (! orig) {
            return NULL; ❹
        }
    } while (! CAS(head, &orig, &orig->next); ❺

    return orig; ❻
}
push: El nodo siguiente al nodo a insertar será el apuntado por la cabecera.
push: Si la cabecera no fue modificada se hará el cambio y ahora apuntará al nuevo nodo e. Si por el contrario head fue modificada, el nuevo valor de head se copia a e->next (ahora apuntará al elemento nuevo que apuntaba head) y se volverá a intentar. Cuando se haya podido hacer el swap head apuntará correctamente a e y e->next al elemento que estaba antes.
pop: Se hace una copia de la cabecera.
pop: Si es NULL la pila está vacía y retorna el mismo valor. Recordad que CAS copia el valor anterior de head en orig, por lo que podría darse el caso que sea NULL, de allí que la comparación esté dentro del bucle do... while.
pop: Si por el contrario la cabecera apuntaba a un nodo y ésta no fue modificada se hará el cambio y la cebecera apuntará al siguiente nodo. Si por el contrario fue modificada se hará una copia del último valor a orig y se volverá a intentar.
pop: Se retorna el puntero al nodo que antes apuntaba la cabecera.

Este algoritmo funciona sin problemas, de hecho es un algoritmo correcto para gestionar una pila concurrente…​ solo si es imposible eliminar un nodo y volver a insertar otro nuevo con la misma dirección de memoria. Con CAS es imposible saber si otro proceso ha modificado y vuelto a poner el mismo valor que copiamos (en este caso orig). Supongamos que tenemos una pila con tres nodos que comienzan en la direcciones 10, 20 y 30:

head -> [10] -> [20] -> [30]

El proceso P1 acaba de ejecutar orig = *head; dentro de pop y es interrumpido. Otro u otros procesos eliminan dos elementos de la pila:

head -> [30]

Y luego uno de ellos inserta un nuevo nodo con una dirección de memoria usada previamente:

head -> [10] -> [30]

Cuando P1 continúe su ejecución CAS hará el cambio ya que la dirección es también 10. El problema es que era una copia antigua que apuntaba antes a [20] por lo que dejará la cabecera apuntando a un nodo que ya no existe y los siguientes habrán quedado descolgados de la pila:

head -> ¿20?    [30]

Este caso es muy habitual si usamos malloc para cada nuevo nodo que insertamos y luego el free cuando lo eliminamos de la lista[41]. El siguiente programa en C usa estas funciones en cuatro hilos diferentes, cada uno de ellos ejecuta repetidamente el siguiente código:

e = malloc(sizeof(struct node));
e->data.tid = tid;
e->data.c = i;
push(&head, e);     ❶
e = pop(&head);     ❷
if (e) {
    e->next = NULL; ❸
    free(e);
} else {
    printf("Error, stack empty\n"); ❹
}
Se agrega el elemento nuevo a la pila, la memoria de éste fue obtenida con el malloc de la línea anterior.
Inmediatamente se lo elimina de la lista. El resultado nunca debería ser NULL ya que siempre debería haber al menos un elemento: todos los hilos primero agregan y luego lo quitan.
Antes de liberar la memoria del elemento recién eliminado se pone next en NULL. No debería hacer falta pero lo hacemos por seguridad y para que observéis claramente que los errores son por el problema ABA.
Si no pudo obtener un elemento de la lista es un error y lo indicamos.

Si lo ejecutáis veréis que en todos los casos da el error de la pila vacía y/o de error por intentar liberar dos veces la misma memoria.

Error, stack empty
*** Error in `./stack_cas_malloc': free(): invalid pointer: 0x00007fcc700008b0 ***
Aborted (core dumped)

En sistemas con un único procesador, como en Raspberry 1, quizás necesites de varias ejecuciones o aumentar el número de operaciones en OPERATIONS para que aparezca el error. Es uno de los problemas inherentes de la programación concurrente, a veces la probabilidad de que ocurra el error es muy baja y hace más difícil detectarlos. Algunas implementaciones de malloc no retornan las direcciones usadas recientemente por lo que quizás no observes el error de doble liberación del mismo puntero. Podemos forzar al reuso de direcciones recientes mediante una segunda pila.

En vez de liberar la memoria de los nodos con el free los insertamos en una segunda lista free_nodes, los nodos que se eliminan de la lista head son insertados en la lista de libres. En vez de asignar memoria con malloc cada vez que se crea un nuevo nodo se busca primero de la lista de libres y se lo reusa. El programa ejecutará repetidamente el siguiente código:

e = pop(&free_nodes);     ❶
if (! e) {
    e = malloc(sizeof(struct node)); ❷
    printf("malloc\n");
}
e->data.tid = tid;
e->data.c = i;
push(&head, e);           ❸
e = pop(&head);           ❹
if (e) {
    push(&free_nodes, e); ❺
} else {
    printf("Error, stack empty\n"); ❻
}
Obtiene un nodo de la lista de libres.
La lista de libres estaba vacía, se solicita memoria. En la siguiente línea se imprime, debería haber como máximo tantos malloc como hilos.
Se agrega el elemento a la pila de head.
Se elimina un elemento de la pila de head.
Se se pudo obtener el elemento se agrega el elemento a la pila de libres.
La lista estaba vacía, es un error.

La ejecución del programa dará numerosos errores de de la pila vacía y se harán también más malloc de los que debería. Es consecuencia del problema ABA.

Compare&Swap etiquetado

Una solución para el problema ABA es el usar bits adicionales como etiquetas para identificar una transacción (tagged CAS). Para ello algunas arquitecturas introdujeron instrucciones CASque permiten la verificación e intercambio de más de una palabra[42], por ejemplo Intel con las instrucciones cmpxchg8b y cmpxchg16b dobles que permiten trabajar con estructuras de 64 y 128 bit, en vez de solo registros atómicos de 32 o 64 bits. En nuestro caso necesitamos hacerlo solo para verificar el intercambio de las cabeceras por lo que usaremos la estructura node_head para ambas.

struct node_head {
    struct node *node; ❶
    uintptr_t aba;     ❷
};

struct node_head stack_head; ❸
struct node_head free_nodes;
El puntero al nodo que contiene los datos.
Será usada como etiqueta, un contador que se incrementará en cada transacción. Es un entero del mismo tamaño que los punteros (32 o 64 bits según la arquitectura),
Los punteros a las pilas no serán un simple puntero sino la estructura con el puntero y la etiqueta.

El código completo en C está en stack_cas_tagged.c, pero analicemos el funcionamiento de push.

void push(struct node_head *head, struct node *e) {
    struct node_head orig, next;

    __atomic_load(head, &orig);  ❶
    do {
        next.aba = orig.aba + 1; ❷
        next.node = e;
        e->next = orig.node;     ❸
    } while (!CAS(head, &orig, &next); ❹
}
Al tratarse de una estructura no es un registro atómico mas bien un registro seguro, debemos asegurar que se hace una copia atómica de head a orig.
next tendrá los datos de head después del CAS, en este incrementamos el valor de aba.
El nodo siguiente de nuevo nodo es el que está ahora en la cola.
Se intenta el intercambio, solo se hará si tanto el puntero al nodo y el entero aba son idénticos a los copiados en orig. Si entre <1> y <4> el valor de head es cambiado por otros procesos el valor de aba habrá cambiado (será un valor mayor) por lo que CAS retornará falso aunque el puntero al nodo sea el mismo.

Load-link/store-conditional (LL/SC)

compareAndSwap es la más potente de las operaciones atómicas anteriores ya que permite el consenso con infinitos procesos (consenso de clase N). Sin embargo en algunas arquitecturas RISC (PowerPC, Alpha, MIPS y ARM) diseñaron una técnica diferente para implementar registros RMW, es tan potente que puede emular a cualquiera de las anteriores: el LL/SC. De hecho, si has compilado los programas de ejemplos en algunas de esas arquitecturas (por ejemplo en una Raspberry) el compilador habrá reemplazado por llamadas a esas operaciones por una serie de instrucciones con LL/SC que las emulan.

El diseño de LL/SC es muy ingenioso, se basa en dos operaciones diferentes que trabajan en cooperación con la gestión de caché. Una es similar a la tradicional cargar (load) una dirección de memoria en un registro: LWARX en PowerPC, LL en MIPS, LDREX en ARM. La otra a la de almacenar (store) un registro en una dirección de memoria: STWC en PowerPC, SC en MIPS y STREX en ARM. El matiz importante es que ambas están enlazadas, la ejecución de la segunda es condicional si el registro objetivo no fue modificado desde la ejecución de la primera. Tomemos por LDREX y STREX de la arquitectura ARM.

LDREX
Carga una dirección de memoria en un registro y etiqueta o marca esa dirección como de acceso exclusivo. Luego puede ejecutarse cualquier número de instrucciones hasta el STREX.
STREX
Almacena el valor de un registro en una dirección de memoria pero solo si esa dirección ha sido reservada anteriormente con un LDREX y no ha sido modificada por ningún otro proceso. Por ejemplo la siguiente instrucción :

El siguiente código carga el contenido de la dirección indicada por r0 en el registro r1 y marca esa dirección[43]:

ldrex   r1, [r0]     ❶
...
strex   r2, r1, [r0] 
Carga el contenido de la dirección indicada por r0 en el registro r1 y marca esa dirección[44]
Almacena el valor del registro r1 en la dirección apuntada por r0 si y solo sí esa dirección no fue modificada por otro proceso. Si se almacenó se pone r2 en 0 caso contrario en 1.

Vale la pena analizar algunas de las emulaciones de instrucciones atómicas[45], por ejemplo getAndAdd y compareAndSwap:

getAndAdd

.L1:
    ldrex   r1, [r0]     ❶
    add     r1, r1, #1   ❷
    strex   r2, r1, [r0] ❸
    cmp     r2, #0
    bne     .L1 
Carga la dirección especificada por r0 en r1.
Incrementa en 1.
Almacena condicionalmente la suma.
Si falló vuelve a intentarlo cargando el nuevo valor.

compareAndSwap

    ldr     r0, [r2]     ❶
.L1
    ldrex   r1, [r3]     ❷
    cmp     r1, r0
    bne     .L2          ❸
    strex   lr, ip, [r3] ❹
    cmp     lr, #0
    bne     .L1          ❺
.L2
    ...
Carga el contenido de la primera dirección en r0.
Carga el contenido de la segunda dirección en r1.
El resultado de la comparación es falso, sale del CAS.
Intenta almacenar el nuevo valor en la dirección indicada por r3 (es decir, hace el swap).
Si no se pudo almacenar vuelve a intentarlo.

LL/SC y ABA

Las implementaciones en hardware de las instrucciones LL/SC tiene algunos problemas que afectan a la eficiencia. El resultado del store condicional puede retornar error[46] espurio por cambios de contexto, emisiones broadcast en el bus de caché, actualizaciones en la misma línea de caché o incluso otras operaciones de lectura o escritura no relacionadas entre el load y elstore. Por eso la recomendación general es que el fragmento de código dentro de una sección exclusiva sea breve y que se minimicen los almacenamientos a memoria.

La mayor ventaja de las instrucciones LL/SC es que no sufren del problema ABA, el primer cambio ya invalidaría el store condicional posterior. Cuando analizamos el problema ABA vimos cómo se puede reproducir el problema con un par de colas, una para los nodos y la otra para los que quedan libres. El algoritmo usa el macro atómico para compareAndSwap y cuando se traduce a ensamblador para arquitecturas como ARM se traduce a código que emula el compareAndSwap. En una arquitectura con LL/SC es mejor implementarlo directamente con esas instrucciones, pero a menos que lo hagas con los compiladores de los fabricantes no contamos con los macros adecuados[47], por lo que debemos recurrir a ensamblador para hacerlo.

Vamos a ello.

LL/SC en ensamblador nativo

Dividimos el código en dos partes. La de C es similar al ejemplo anterior con doble pila pero sin la implementación de las funciones pop() y push(). Éstas están implementadas en ensamblador de ARM[48] y trabajan con la misma estructura de pila anterior.

El código es bastante sencillo de entender, vamos a ver analizar en detalle la función pop() que es la más breve de ambas:

pop(). 

pop:
    push    {ip, lr}
1:
    ldrex   r1, [r0]     ❶
    cmp     r1, #0
    beq     2f           ❷
    ldr     r2, [r1]     ❸
    strex   ip, r2, [r0] ❹
    cmp     ip, #0
    bne     1b           ❺
2:
    mov     r0, r1       ❻
    pop     {ip, pc}
Carga LL del primer argumento de la función (head), la dirección del primer elemento de la lista puntero[49].
En la línea anterior se compara si es igual a cero, de ser así es porque la cola está vacía, sale del bucle para devolver el puntero NULL.
Carga en r2 el puntero del siguiente elemento[50] de la lista, la dirección de e→next de la estructura del nodo.
Almacena el siguiente elemento en head.
Copia el contenido de r1 a r0, que es el valor devuelto por la función.

Una vez conocidas las características y posibilidades de LL/SC es relativamente sencillo simular las otras operaciones atómicas y quizás aún más sencillo implementar el algoritmo directamente basado en LL/SC. La dificultad es que no es habitual contar con macros genéricos debido a que en arquitecturas sin LL/SC es muy complicado simular estas operaciones con instrucciones CAS, por lo que habrá que recurrir a ensamblador y además con una versión para cada plataforma que lo implemente.

Pero si se hace correctamente además de evitar el problema ABA se puede hacer mucho más eficiente. Los siguientes son los tiempos de ejecución de los últimos algoritmos vistos en una Raspberry 1.

Tabla 1. Comparación de tiempos en Raspberry 1

Programa Tiempo de reloj
Pila con malloc, CAS (problema ABA) 8.6 seg
Doble pila, CAS (problema ABA) 4.9 seg
Doble pila CAS etiquetado (sin ABA) 10.0 seg
Doble pila con LL/SC (ensamblador, sin ABA) 2.3 seg


La implementación con LL/SC nativo es más de dos veces más rápido que el siguiente más rápido, que sufre del problema ABA[51] y más de cuatro veces más rápido que la simulación deCAS etiquetado.

Recapitulación

En este capítulo hemos visto las instrucciones por hardware esenciales, tanto para sistemas operativos como lenguajes, para construir primitivas de sincronización de más alto nivel. Las técnicas que usan estas primitivas -directa o indirectamente- son llamados spinlocks. Las hemos analizado desde las más básicas hasta las más potentes como CAS y LL/SC. Aunque comenzamos solo con el objetivo de resolver el problema fundamental de sincronización entre procesos -exclusión mutua- hemos introducido el uso de las mismas para problemas más sofisticados, como el CAS etiquetado y el uso de LL/SC para gestión de pilas concurrentes.

No hay instrucciones de hardware unificadas para todas las arquitecturas, tampoco una estandarización a nivel de lenguajes de programación. Esa es la razón por la que los compiladores implementan sus propios macros atómicos que luego son convertidos a funciones más complejas que simulan a las instrucciones o registros RMW definidos por el macro. Lo vimos claramente con la arquitectura ARM, todas las operaciones se simulan con LL/SC. La inversa es más complicada -sino imposible- por lo que habitualmente no se cuentan con esos macros[52]y hay que recurrir al ensamblador para poder aprovechar las capacidades de nativas de cada procesador, muy habitual en los sistemas operativos[53].

De todas maneras los spinlocks basados en instrucciones por hardware son fundamentales y se requieren algoritmos muy eficientes sobre todo para multiprocesadores o núcleos. Además de solucionar problemas la exclusión mutua interesa gestionar estructuras concurrentes habituales (pilas, listas, lectores-escritores, etc.) que minimicen el impacto sobre el sistema de caché. Este será el tema del siguiente capítulo.

Hay que decirlo, por las dudas

En todos los ejemplos de exclusión mutua vistos hasta ahora la sección crítica consistía solo en incrementar un contador compartido. Es perfecto para mostrar que una instrucción y operación aritmética que en apariencia son tan simples también son víctimas del acceso concurrente desorganizado. Pero espero que os hayáis dado cuenta que no hace falta recurrir a un spinlock para hacerlo correctamente, que ya hay instrucciones de hardware que lo hacen de forma eficiente, como el getAndAdd o addAndGet. Por ejemplo en C:

for (i=0; i < max; i++) {
    c = __atomic_add_fetch(&counter, 1, __ATOMIC_RELAXED);
}

O en Go:

for i := 0; i < max; i++ {
    c = atomic.AddInt32(&counter, 1)
}

[31] Está demostrado ([Herlihy12]) que dichos algoritmos son óptimos en cuestión de espacio

[32] Como pasa en la Raspberry 1.

[33] Como local_irq_disable() o local_irq_enable() en Linux.

[34] Deshabilita la cualidad de apropiativo (o preemptive) del scheduler.

[35] Estrictamente no es getAndAdd sino addAndGet, devuelve el valor después de sumar, pero son equivalentes, solo hay que cambiar la inicialización de la variable turn.

[36] No todas las arquitecturas la tienen, en Intel es XCHG para enteros de 32 bits.

[37] Esta función no estaba disponible en Go para ARM hasta 2013, si la pruebas en una Raspberry asegúrate de tener una versión de Go moderna.

[38] Es la que se usa en la arquitectura Intel/AMD.

[39] Aunque sufre el problema ABA.

[40] En los antiguos macros de GCC las instrucciones equivalentes son __sync_bool_compare_and_swap y __sync_val_compare_and_swap respectivamente. La diferencia fundamental es que no se modifica el registro del valor a comparar.

[41] Las implementaciones de malloc suelen reusar las direcciones de los elementos que acaban de ser liberados.

[42] Los registros atómicos explicados antes.

[43] En ARM se etiqueta en el sistema del monitor de acceso exclusivo, en otras arquitecturas se asocia un bit del TLB o de memoria caché.

[44] En ARM se etiqueta en el sistema del monitor de acceso exclusivo, en otras arquitecturas se asocia un bit del TLB o de memoria caché.

[45] Si quieres presumir has de llamarles “implementaciones de registros RMW“.

[46] No implica que falle el algoritmo implementado, solo que fuerza que se haga otro bucle de lectura y escritura.

[47] Al menos no en GCC.

[48] Para que funcione en una Raspberry, agradezco a Sergio L. Pascual por ayudarme a mejorar y probar el código.

[49] Recordad que el primer argumento de la función es la dirección del puntero, es decir un puntero a puntero.

[50] Dado que next es el primer campo del nodo su dirección coincide con la del nodo, por eso no hay desplazamieno en el código ensamblador cuando leemos o modificamos next.

[51] Y por lo tanto incorrecto.

[52] Salvo los compiladores de los propios fabricantes que los incluyen en sus compiladores propietarios, en ARM se llaman intrinsics

[53] Por ejemplo en Linux se usa el ensamblador inline, ASM().

Bibliografía

Código fuente

Instrucciones por hardware

getAndSet

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define MAX_COUNT 10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

int mutex = 0;

int counter = 0;

void lock() {
    while(__atomic_exchange_n(&mutex, 1, __ATOMIC_SEQ_CST));
}

void unlock() {
    mutex = 0;
}

void *count(void *ptr) {
    long i, max = MAX_COUNT/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;

    for (i=0; i < max; i++) {
        lock();
        counter += 1; // The global variable, i.e. the critical section
        unlock();
    }

    printf("End %d counter: %d\n", tid, counter);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
    return 0;
}

getAndAdd

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define MAX_COUNT 10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

// Used for get&add, the first is the number obtained by a process
// The second is the next turn
int number = 0;
int turn = 0;

int counter = 0;

void lock() {
    int current;

    current = __atomic_fetch_add(&number, 1, __ATOMIC_RELAXED);
    while(current != turn);
}

void unlock() {
    __atomic_fetch_add(&turn, 1, __ATOMIC_SEQ_CST);
}

void *count(void *ptr) {
    long i, max = MAX_COUNT/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;

    for (i=0; i < max; i++) {
        lock();
        counter += 1; // The global variable, i.e. the critical section
        unlock();
    }

    printf("End %d counter: %d\n", tid, counter);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
    return 0;
}
/* Implements mutual exclusion with atomic.AddUint32 */

package main

import (
    "fmt"
    "runtime"
    "sync/atomic"
)

const (
    MAX_COUNT  = 10000000
    GOROUTINES = 4
)

var (
    counter int
    number uint32
    turn uint32 = 1 // AddUint32 returns the new value
)

func lock() {
    var current uint32

    current = atomic.AddUint32(&number, 1)
    /* We have to use LoadUint32 because there is no volatile vars
       and turn is read from the local cache */
    for current != atomic.LoadUint32(&turn) {}
}

func unlock() {
    atomic.AddUint32(&turn, 1)
}

func run(id, counts int, done chan bool) {
    for i := 0; i < counts; i++ {
        lock()
        counter++
        unlock()
    }
    fmt.Printf("End %d counter: %d\n", id, counter)
    done <- true
}

func main() {
    runtime.GOMAXPROCS(GOROUTINES)
    done := make(chan bool, 1)

    for i := 0; i < GOROUTINES; i++ {
        go run(i, MAX_COUNT/GOROUTINES, done)
    }

    for i := 0; i < GOROUTINES; i++ {
        <-done
    }

    fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}

testAndSet

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define MAX_COUNT 10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

char mutex = 0;

int counter = 0;

void lock() {
    while(__atomic_test_and_set(&mutex, __ATOMIC_RELAXED));
}

void unlock() {
    mutex = 0;
    // Recomended is __atomic_clear(&mutex,...) but we already have cache coherence
}

void *count(void *ptr) {
    long i, max = MAX_COUNT/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;

    for (i=0; i < max; i++) {
        lock();
        counter += 1; // The global variable, i.e. the critical section
        unlock();
    }

    printf("End %d counter: %d\n", tid, counter);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
    return 0;
}

Swap

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define MAX_COUNT 10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

int mutex = 0;

int counter = 0;

void lock(int i) {
    int local = 1;
    do {
        __atomic_exchange(&mutex, &local, &local, __ATOMIC_RELAXED);
    } while (local);
}

void unlock(int i) {
    mutex = 0;
}

void *count(void *ptr) {
    long i, max = MAX_COUNT/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;

    for (i=0; i < max; i++) {
        lock(tid);
        counter += 1; // The global variable, i.e. the critical section
        unlock(tid);
    }

    printf("End %d counter: %d\n", tid, counter);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
    return 0;
}
/* Implements mutual exclusion with atomic.SwapInt32 */

package main

import (
    "fmt"
    "runtime"
    "sync/atomic"
)

const (
    MAX_COUNT  = 10000000
    GOROUTINES = 4
)

var (
    counter int
    mutex int32
)

func lock() {
    for atomic.SwapInt32(&mutex, 1) == 1 {} // While it returns 1
}

func unlock() {
    mutex = 0
}

func run(id, counts int, done chan bool) {
    for i := 0; i < counts; i++ {
        lock()
        counter++
        unlock()
    }
    fmt.Printf("End %d counter: %d\n", id, counter)
    done <- true
}

func main() {
    runtime.GOMAXPROCS(GOROUTINES)
    done := make(chan bool, 1)

    for i := 0; i < GOROUTINES; i++ {
        go run(i, MAX_COUNT/GOROUTINES, done)
    }

    for i := 0; i < GOROUTINES; i++ {
        <-done
    }

    fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}

compareAndSwap

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define MAX_COUNT 10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

int mutex = 0;

int counter = 0;

void lock(int i) {
    int local;
    do {
        local = 0;
    } while (! __atomic_compare_exchange_n(&mutex, &local, 1, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
}

void unlock(int i) {
    mutex = 0;
}

void *count(void *ptr) {
    long i, max = MAX_COUNT/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;

    for (i=0; i < max; i++) {
        lock(tid);
        counter += 1; // The global variable, i.e. the critical section
        unlock(tid);
    }

    printf("End %d counter: %d\n", tid, counter);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
    return 0;
}
/* Implements mutual exclusion with atomic.CompareAndSwapInt32 */

package main

import (
    "fmt"
    "runtime"
    "sync/atomic"
)

const (
    MAX_COUNT  = 10000000
    GOROUTINES = 4
)

var (
    counter = 0
    mutex int32 = 0
)

func lock() {
    for ! atomic.CompareAndSwapInt32(&mutex, 0, 1) {}
}

func unlock() {
    mutex = 0
}

func run(id, counts int, done chan bool) {
    for i := 0; i < counts; i++ {
        lock()
        counter++
        unlock()
    }
    fmt.Printf("End %d counter: %d\n", id, counter)
    done <- true
}

func main() {
    runtime.GOMAXPROCS(GOROUTINES)
    done := make(chan bool, 1)

    for i := 0; i < GOROUTINES; i++ {
        go run(i, MAX_COUNT/GOROUTINES, done)
    }

    for i := 0; i < GOROUTINES; i++ {
        <-done
    }

    fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}

ABA

// This code implements an FAULTY concurrent stack
// It's suffers of the ABA problem it will generate a Segmentation Fault

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define OPERATIONS       10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

struct node_data {
    int tid;
    int c;
};

struct node {
    struct node *next;
    struct node_data data;
};

struct node *head = NULL;

void push(struct node **head, struct node *e) {
    e->next = *head;
    while (! __atomic_compare_exchange(head, &e->next, &e, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}

struct node *pop(struct node **head) {
    struct node *old_head;

    old_head = *head;
    do {
        if (! old_head) {
            return NULL;
        }
    } while (! __atomic_compare_exchange(head, &old_head, &old_head->next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));

    return old_head;
}

void *add_elements(void *ptr) {
    long i, elems = OPERATIONS/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;
    struct node *e;

    for (i=0; i < elems; i++) {
        e = malloc(sizeof(struct node));
        e->data.tid = tid;
        e->data.c = i;
        push(&head, e);
        // Pop an element and add it to the free list
        e = pop(&head);
        if (e) {
            e->next = NULL;
            free(e);
        } else {
            printf("Error in %d it shouldn't be empty\n", tid);
        }
    }

    printf("End %d\n", tid);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];
    struct node *e;

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }
}
// This code implements an FAULTY concurrent stack
// It's suffers of the ABA problem it will generate a Segmentation Fault

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define OPERATIONS       10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

struct node_data {
    int tid;
    int c;
};

struct node {
    struct node *next;
    struct node_data data;
};

struct node *head = NULL;
struct node *free_nodes = NULL;

void push(struct node **head, struct node *e) {
    e->next = *head;
    while (! __atomic_compare_exchange(head, &e->next, &e, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}

struct node *pop(struct node **head) {
    struct node *old_head;

    old_head = *head;
    do {
        if (! old_head) {
            return NULL;
        }
    } while (! __atomic_compare_exchange(head, &old_head, &old_head->next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));

    return old_head;
}

void *add_elements(void *ptr) {
    long i, elems = OPERATIONS/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;
    struct node *e;

    for (i=0; i < elems; i++) {
        // Get an element from the free list
        e = pop(&free_nodes);
        if (! e) {
            // It was empty, request memory
            e = malloc(sizeof(struct node));
            printf("%d malloc\n", tid);
        }
        e->data.tid = tid;
        e->data.c = i;
        push(&head, e);
        // Pop an element and add it to the free list
        e = pop(&head);
        if (e) {
            e->next = NULL;
            push(&free_nodes, e);
        } else {
            printf("Error in %d it shouldn't be empty\n", tid);
        }
    }

    printf("End %d\n", tid);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];
    struct node *e;

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }
}
// This code implements the right concurrent stack with ABA check
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <stdint.h>


#define NUM_THREADS      4
#define OPERATIONS       10000000

// Just used to send the index of the id
struct tdata {
    int tid;
};

struct node_data {
    int tid;
    int c;
};

struct node_head {
    struct node *node;
    uintptr_t aba;
};

struct node {
    struct node_data data;
    struct node *next;
};

struct node_head stack_head;
struct node_head free_nodes;

void push(struct node_head *head, struct node *e) {
    struct node_head old_head, next;

    // The structure is not an "atomic register", we must force the atomic load
    __atomic_load(head, &old_head, __ATOMIC_RELAXED);
    do {
        next.aba = old_head.aba + 1;
        next.node = e;
        e->next = old_head.node;
    } while (! __atomic_compare_exchange(head, &old_head, &next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}


struct node *pop(struct node_head *head) {
    struct node_head old_head, next;

    // The structure is not an "atomic register", we must force the atomic load
    __atomic_load(head, &old_head, __ATOMIC_SEQ_CST);
    do {
        if (! old_head.node) {
            return NULL;
        }
        next.aba = old_head.aba + 1;
        next.node = old_head.node->next;
    } while (! __atomic_compare_exchange(head, &old_head, &next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));

    return old_head.node;
}

void *add_elements(void *ptr) {
    long i, elems = OPERATIONS/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;
    struct node *e;

    for (i=0; i < elems; i++) {
        e = pop(&free_nodes); // Get an element from the free list
        if (! e) { // If there is none, reserve memory
            e = malloc(sizeof(struct node));
             printf("%d malloc\n", tid);
        }
        e->data.tid = tid;
        e->data.c = i;
        push(&stack_head, e);
        // Pop an element and add it to the free list
        e = pop(&stack_head);
        if (e) {
            push(&free_nodes, e);
        } else {
            printf("Error in %d it shouldn't be empty\n", tid);
        }
    }

    printf("End %d\n", tid);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];
    struct node *e, *tmp;

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }

    return 0;
}

LL/SC

// Implementation of a concurrent stack for ARM with LL/SC

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>


#define NUM_THREADS      4
#define OPERATIONS       10000000


// Just used to send the index of the id
struct tdata {
    int tid;
};

struct node_data {
    int tid;
    int c;
};

struct node {
    struct node *next;
    struct node_data data;
};

// Defined in the assembly code stack_llsc.s
void push(struct node **, struct node *);
struct node *pop(struct node **);


struct node *head = NULL;
struct node *free_nodes = NULL;

void *add_elements(void *ptr) {
    long i, elems = OPERATIONS/NUM_THREADS;
    int tid = ((struct tdata *) ptr)->tid;
    struct node *e;

    for (i=0; i < elems; i++) {
        // Get an element from the free list
        e = pop(&free_nodes);
        if (! e) {
            // It was empty, request memory
            e = malloc(sizeof(struct node));
            printf("%d malloc\n", tid);
        }
        e->data.tid = tid;
        e->data.c = i;
        push(&head, e);
        // Pop an element and add it to the free list
        e = pop(&head);
        if (e) {
            e->next = NULL;
            push(&free_nodes, e);
        } else {
            printf("Error in %d it shouldn't be empty\n", tid);
        }
    }

    printf("End %d\n", tid);
    pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
    pthread_t threads[NUM_THREADS];
    int rc, i;
    struct tdata id[NUM_THREADS];

    for(i=0; i<NUM_THREADS; i++){
        id[i].tid = i;
        rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
        if (rc){
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    for(i=0; i<NUM_THREADS; i++){
        pthread_join(threads[i], NULL);
    }
}
	.align	4
	.global	push
	.type	push, %function
push:
	push	{ip, lr}
1:
	ldr	r3, [r0]
	str	r3, [r1] @@ To avoid storing e->next inside ldrex/strex
	@ mcr	p15, 0, r0, c7, c10, 5 @@ It shouldn't be needed
	ldrex	r2, [r0]
	cmp	r2, r3
	bne	1b
	strex	ip, r1, [r0]
	cmp	ip, #0
	bne	1b
	mcr	p15, 0, r0, c7, c10, 5

	pop	{ip, pc}

	.align	4
	.global	pop
	.type	pop, %function
pop:
	push	{ip, lr}
1:
	ldrex	r1, [r0]
	cmp	r1, #0
	beq	2f
	ldr	r2, [r1]
	strex	ip, r2, [r0]
	cmp	ip, #0
	bne	1b
	mcr	p15, 0, r0, c7, c10, 5
2:
	mov	r0, r1
	pop	{ip, pc}