Etiquetas
compare&swap, link-load/store-conditional, ll/sc, problema ABA, test&set
Este artículo es el primer manuscrito del capítulo Soluciones por hardware del libro que estoy escribiendo Principios de concurrencia para programadores. Todo el código de los ejemplos están en Github.
Soluciones por hardware
Hasta ahora hemos visto soluciones al problema de la exclusión mutua sin soporte de hardware y con solo registros de lectura-escritura atómicos, todos ellos son spinlocks pero muy ineficientes. Por un lado por el consumo de memoria, tanto para la solución de dos procesos (Dekker y Peterson) como para N procesos (Panadería) el número de registros necesarios es proporcional al número de procesos máximos que sincronizarán[31]. Esta necesidad de memoria impone una sobrecarga importante para mantener la consistencia de la memoria caché, además del consumo de CPU por la espera activa todos los procesos involucrados tienen que acceder al mismo rango de memoria que los demás. Es una penalización muy importante para sistemas con varios procesadores o núcleos. Por otro lado, si se tiene un único procesador el avance es tan lento que lo que tarda décimas en uno puede tomar horas en otros[32] en el caso de que exista mucha competencia (contention) porque varios procesos desean entrar a la sección crítica de forma casi simultánea.
Desde el inicio se buscaron soluciones por hardware que permitiesen implementar algoritmos de sincronización de forma mucho más eficiente.
Si el problema fundamental es el intercalado de instrucciones generado por las interrupciones al procesador ¿por qué no deshabilitarlas?. Aunque es una solución que se usa en casos muy específicos en sistemas operativos[33] no es una solución genérica segura: si los procesos pueden deshabilitar las interrupciones entonces pueden tomar control del sistema[34]. Por lo tanto no es una solución válida para procesos de usuario.
Aún al nivel del núcleo del sistema operativo presenta dificultades: la complicación de deshabilitar todas las interrupciones en todos los procesadores y el riesgo de perderlas. Por esta dificultad de deshabilitar las interrupciones en varios procesadores simultáneamente tampoco es una solución para asegurar exclusión mutua dado que un registro puede ser modificado por un procesador diferente sin que haya habido intercalación en el mismo procesador.
Por ello se buscaron otras alternativas que en general son conocidas como primitivas de sincronización de hardware. Son instrucciones del procesador que leen y modifican el valor de uno o varios registros sin ser interrumpidas y asegurando la coherencia de caché (introducen barreras de memoria). Hay muchas variantes de estas instrucciones, los fabricantes diseñaron e implementaron sus propias instrucciones. No es fácil decidir cuál de ellas es la que implementarán en silicio, casi todas las arquitecturas ofrecen un conjunto variado: getAndAdd, testAndSet (TAS), swap, compareAndSwap (CAS), etc.
Pero antes de estudiarlas estudiaremos las definiciones de registros que proporcionen modelos simples y genéricos de consistencia por encima de las interrupciones o mecanismos de consistencia de caché.
El término registro no se refiere solo a los registros del procesador. Puede involucrar a zonas de memoria RAM, memoria caché o en general a un objeto compartido. A nivel de hardware los hilos se comunican leyendo y escribiendo en memoria compartida, desde ese punto de vista la comunicación se hace vía registros de lectura-escritura. Un registro de este tipo encapsula dos métodos, read y store. O get y write cuando nos referimos a objetos compartidos en general y que pueden ser implementados en lenguajes de alto nivel.
Los registros con las propiedades de consistencia más débiles son los registros seguros, su nombre es un mero accidente histórico, son muy inseguros ([Herlihy12]). Este tipo de registros solo aseguran que se retorna el último valor escrito si no hay operaciones get y write concurrentes, caso contrario podrá devolver cualquier valor aceptable en el rango del registro. Por ejemplo, si se trata de un único byte y haya procesos concurrentes que escriben solo 0
o 1
el get de un registro seguro podría devolver cualquier valor entre 0
y 255
ya que todos están en el rango de 8 bits.
Una alternativa intermedia son los registros regulares donde una operación de lectura concurrente con escrituras puede retornar alguno de los valores que están siendo escritos. En un caso similar al anterior, el get de este tipo de registros solo devolvería 0
o 1
. Se dice que estos registros solo aseguran consistencia por inactividad (quiescent consistency) ya que solo aseguran retornar el mismo valor después de un período de inactividad entre el último write y el siguiente get.
Los registros de procesadores modernos que usamos para la implementación de los algoritmos de exclusión mutua son registros atómicos, generalmente se tratan de registros de un único bytes, una palabra, enteros de varios bytes, o referencias a memoria u objetos que cumplen las siguientes dos condiciones ([Lamport2]):
- El método get retorna el último valor escrito, si una lectura retorna un valor y no hubo otra escritura intermedia la siguiente lectura será el mismo valor.
- Si hay varios write concurrentes el valor que retornará el siguiente get es uno de los valores del write no un rango de posibles valores. Por ejemplo, si un write concurrente es el número
0
y otro es el número1000
, el get retornará0
o1000
.
Como hemos observado en la implementación del algoritmo de la panadería, el requerimiento de registros atómicos para solucionar los problemas de exclusión mutua crece linealmente con el número de hilos o procesos involucrados. Tiene otros problemas, son spinlocks sobre muchos registros que introducen mucha presión sobre grandes zonas de la caché, son muy ineficientes. El otro problema hemos visto es que obliga a la especificación manual -no siempre sencilla ni eficiente- para asegurar la consistencia secuencial.
Por ello no es de extrañar que desde el principio se introdujeron instrucciones atómicas del procesador que permitieron implementar algoritmos de sincronización más sencillos y eficientes. Este tipo de operaciones pueden ser expresadas como construcciones de registros denominados genéricamente registros Read-Modify-Write o RMW. Los registros RMW son interesantes porque pueden implementarse en construcciones de lenguajes o como instrucciones de procesador, sus propiedades son similares.
REGISTROS RMW NO TRIVIALES
Los registros RMW que proveen operaciones adicionales a la función identidad se denominan no triviales. Maurice Herlihy demostró que los registros no triviales tienen un consenso de al menos 2 (o son declase 2). Todas las instrucciones mencionadas anterioremente (a excepción de la identidad, obviamente) implementan registros RMW no triviales.
De todas las implementaciones, compareAndSwap (CAS) es la más potente. Herlihy también demostró que pertenece a la clase de consenso N (o infinito). Este tipo de instrucciones son, en palabras del autor,a la computación asincrónica el equivalente de las máquinas de Turing de la computación secuencial. Afortunadamente la mayoría de procesadores implementan CAS o la similar load-linked/store-conditional (o LL/SC, disponible en las arquitecturas PowerPC, MIPS, Alpha y ARM).
Hay varios tipos de instrucciones que implementan registros RMW:
- get: Retorna el valor del registro, se denomina también función identidad.
- getAndSet: Asigna un nuevo valor al registro y retorna el anterior.
- getAndAdd: Incrementa el valor del registro en el valor indicado y retorna el valor anterior.
- testAndSet: verifica el valor del registro, si es cero asigna el nuyevo valor (habitualmente 1, por ejemplo en IBM 360 el registro es binario o booleano, solo admite 0 y 1).
- swap: Intercambia el valor de dos registros.
- compareAndSwap: Compara el valor del registro con otro, si son iguales asigna un nuevo y retorna el anterior.
Los fabricantes proveen conjuntos de operaciones diferentes, para facilitar la programación y portabilidad los compiladores proveen macros que generan la secuencias de instrucciones que implementan o simulan las operaciones en cada arquitectura. Por ejemplo GCC tenía los macros __sync_*
que en las últimas versiones fueron reemplazados por nuevos que están más cercanos al modelo de memoria del C11 y C++11.
IMPORTANTE
A partir de aquí usaremos las primitivas atómicas de las últimas versiones de GCC. Para probar los ejemplos en C aseguraros de tener versiones nuevas del GCC, estos ejemplos fueron probados con la versión 4.9.
Usaremos una variable global mutex
que estará inicializada a cero que indica que no hay procesos en la sección crítica. En la entrada de la sección crítica se almacena 1
y se verifica si el valor anterior era 0
(es decir, no había ningún proceso en la sección crítica). Si era diferente a cero esperará hasta que lo sea.
La función lock()
es la entrada a la sección crítica y unlock()
la salida.
mutex = 0 def lock(): while getAndSet(mutex, 1) != 0: pass def unlock(): mutex = 0
En “getAndSet” está el código en C implementado con el macro __atomic_exchange_n
. A pesar de su nombre no es la instrucción swap sino un equivalente para getAndSet.
Se puede implementar exclusión mutua con un algoritmo muy similar al de la panadería, cada proceso obtiene un número y espera a u turno, solo que esta vez la obtención del siguiente número es atómica y por lo tanto no se necesita un array de números ni hacer un bucle de controles adicionales.
Usaremos dos variables, number
para el siguiente número y turn
para indicar a qué número le corresponde entrar a la sección crítica.
number = 0 turn = 0 def lock(): """ current is a local variable """ current = getAndAdd(number, 1) while current != turn: pass def unlock(): getAndAdd(turn, 1)
El código en C está implementado con el macro __atomic_fetch_add
y en Go con atomic.AddUint32
.[35] A diferencia de la implementación con getAndSet esta implementación asegura que no se producen esperas infinitas ya que el número que elige cada proceso es único y creciente, aunque hay que tener en cuenta que el valor de number
llegará a un máximo y rotará. Losspinlocks de este tipo son también llamados Ticket locks y son muy usados en el núcleo de Linux, aseguran que no se producen esperas infinitas y que los procesos entran a la sección crítica en orden FIFO (fairness).
La instrucción testAndSet o TAS fue la instrucción más usada para control de concurrencia hasta la década de 1970 cuando fue reemplazada por operaciones que permiten niveles (clase) de consenso más elevados. La implementación consiste de una variable entera binaria (o booleana) que puede tomar valores 0 y 1. La instrucción solo recibe un argumento, la dirección de memoria. Si el valor de la dirección de memoria es 0
le asigna 1
y retorna 1
(o true), caso contrario retorna 0
(o _false).
def testAndSet(register): if register == 0: register = 1 return 0 return 1
La implementación de exclusión mutua con TAS es muy similar a getAndSet:
mutex = 0 def lock(): while testAndSet(mutex) == 0: pass def unlock(): mutex = 0
El código en C está implementado con el macro __atomic_test_and_set
.
Esta instrucción intercambia atómicamente dos posiciones de memoria, usualmente enteros de 32 o 64 bits[36]. El algoritmo de la instrucción es tan sencillo como parece:
def swap(register1, register2): tmp = register1 register1 = register2 register2 = tmp
El algoritmo de exclusión mutua con swap:
mutex = 0 def lock(): local = 1 while local != 0: swap(mutex, local) def unlock(): mutex = 0
La implementación en C es con el macro __atomic_exchange
de las últimas versiones de GCC. En Go se pueden usar las funciones atómicas implementadas en el paquete sync/atomic
, por ejemplo con atomic.SwapInt32
[37].
Esta instrucción, o CAS, es la más común[38] y la que provee el mayor nivel de consenso (ver nota Registros RMW no triviales)[39]. La instrucción trabaja con tres valores:
- Registro
- La dirección de memoria cuyo valores se comparará y asignará un nuevo valor si corresponde.
- Nuevo valor
- El valor que se asignará al registro.
- Valor a comparar
- Si el valor del registro es igual a éste entonces se le asignará, caso contrario se copia el valor del registro al valor a comparar.
En la versión moderna[40] de macros atómicos las dos versiones son __atomic_compare_exchange_n
y __atomic_compare_exchange_n
, ambas retornan un booleano si se pudo hacer el cambio, lo único que cambia es la forma de los parámetros (en el último caso son todos punteros). El algoritmo de estas instrucciones es:
def compareAndSwap(register, expected, desired): if registro == expected: registro = desired return True else: expected = register return False
La implementación de exclusión mutua en C es sencilla, necesitamos una variable local porque hay que pasar un puntero y ambas instrucciones copiarán el valor de mutex a la posición indicada por el puntero:
mutex = 0 def lock(): local = 0 while not compareAndSwap(mutex, local, 1): local = 0 def unlock(): mutex = 0
La instrucción CompareAndSwapInt32
en en Go es algo diferente y más similar al antiguo macro de GCC, los argumentos del valor esperado y el nuevo no se pasan por puntero sino por valor:
func lock() { for ! atomic.CompareAndSwapInt32(&mutex, 0, 1) {} }
CAS tiene un problema conocido, el problema ABA. Aunque no se presenta en casos sencillos como el de exclusión mutua sino en casos de intercalados donde un proceso lee el valor A y cede la CPU a otro proceso, otro modifica el registro con el valor B y vuelve a poner el mismo valor A antes que el primero se vuelva a ejecutar. Éste ejecutará la instrucción CAS sin habernotado el cambio.
Un caso práctico: tenemos implementada una pila de estructuras node, es simplemente un puntero al siguiente elemento (next) y una estructura que guarda los datos (o payload, su estructura interna nos es irrelevante):
struct node { struct node *next; struct node_data data; };
Las funciones push y pop agregan y quitan elementos de la pila. Push recibe como argumentos el puntero a la variable cabecera de la pila y el puntero al nodo a añadir. Pop solo recibe el puntero a la cabeza de la pila y devuelve el puntero al primer elemento de la pila o NULL si está vacía. A continuación el código en C simplificado de ambas funciones.
void push(struct node **head, struct node *e) { e->next = *head; ❶ while (! CAS(head, &e->next, &e); ❷ } struct node *pop(struct node **head) { struct node *result, *orig; orig = *head; ❸ do { if (! orig) { return NULL; ❹ } } while (! CAS(head, &orig, &orig->next); ❺ return orig; ❻ }
❶ | push: El nodo siguiente al nodo a insertar será el apuntado por la cabecera. |
❷ | push: Si la cabecera no fue modificada se hará el cambio y ahora apuntará al nuevo nodo e . Si por el contrario head fue modificada, el nuevo valor de head se copia a e->next (ahora apuntará al elemento nuevo que apuntaba head ) y se volverá a intentar. Cuando se haya podido hacer el swap head apuntará correctamente a e y e->next al elemento que estaba antes. |
❸ | pop: Se hace una copia de la cabecera. |
❹ | pop: Si es NULL la pila está vacía y retorna el mismo valor. Recordad que CAS copia el valor anterior de head en orig , por lo que podría darse el caso que sea NULL, de allí que la comparación esté dentro del bucle do... while . |
❺ | pop: Si por el contrario la cabecera apuntaba a un nodo y ésta no fue modificada se hará el cambio y la cebecera apuntará al siguiente nodo. Si por el contrario fue modificada se hará una copia del último valor a orig y se volverá a intentar. |
❻ | pop: Se retorna el puntero al nodo que antes apuntaba la cabecera. |
Este algoritmo funciona sin problemas, de hecho es un algoritmo correcto para gestionar una pila concurrente… solo si es imposible eliminar un nodo y volver a insertar otro nuevo con la misma dirección de memoria. Con CAS es imposible saber si otro proceso ha modificado y vuelto a poner el mismo valor que copiamos (en este caso orig
). Supongamos que tenemos una pila con tres nodos que comienzan en la direcciones 10, 20 y 30:
head -> [10] -> [20] -> [30]
El proceso P1 acaba de ejecutar orig = *head;
dentro de pop y es interrumpido. Otro u otros procesos eliminan dos elementos de la pila:
head -> [30]
Y luego uno de ellos inserta un nuevo nodo con una dirección de memoria usada previamente:
head -> [10] -> [30]
Cuando P1 continúe su ejecución CAS hará el cambio ya que la dirección es también 10
. El problema es que era una copia antigua que apuntaba antes a [20]
por lo que dejará la cabecera apuntando a un nodo que ya no existe y los siguientes habrán quedado descolgados de la pila:
head -> ¿20? [30]
Este caso es muy habitual si usamos malloc
para cada nuevo nodo que insertamos y luego el free
cuando lo eliminamos de la lista[41]. El siguiente programa en C usa estas funciones en cuatro hilos diferentes, cada uno de ellos ejecuta repetidamente el siguiente código:
e = malloc(sizeof(struct node)); e->data.tid = tid; e->data.c = i; push(&head, e); ❶ e = pop(&head); ❷ if (e) { e->next = NULL; ❸ free(e); } else { printf("Error, stack empty\n"); ❹ }
❶ | Se agrega el elemento nuevo a la pila, la memoria de éste fue obtenida con el malloc de la línea anterior. |
❷ | Inmediatamente se lo elimina de la lista. El resultado nunca debería ser NULL ya que siempre debería haber al menos un elemento: todos los hilos primero agregan y luego lo quitan. |
❸ | Antes de liberar la memoria del elemento recién eliminado se pone next en NULL. No debería hacer falta pero lo hacemos por seguridad y para que observéis claramente que los errores son por el problema ABA. |
❹ | Si no pudo obtener un elemento de la lista es un error y lo indicamos. |
Si lo ejecutáis veréis que en todos los casos da el error de la pila vacía y/o de error por intentar liberar dos veces la misma memoria.
Error, stack empty *** Error in `./stack_cas_malloc': free(): invalid pointer: 0x00007fcc700008b0 *** Aborted (core dumped)
En sistemas con un único procesador, como en Raspberry 1, quizás necesites de varias ejecuciones o aumentar el número de operaciones en OPERATIONS
para que aparezca el error. Es uno de los problemas inherentes de la programación concurrente, a veces la probabilidad de que ocurra el error es muy baja y hace más difícil detectarlos. Algunas implementaciones de malloc
no retornan las direcciones usadas recientemente por lo que quizás no observes el error de doble liberación del mismo puntero. Podemos forzar al reuso de direcciones recientes mediante una segunda pila.
En vez de liberar la memoria de los nodos con el free
los insertamos en una segunda lista free_nodes
, los nodos que se eliminan de la lista head
son insertados en la lista de libres. En vez de asignar memoria con malloc
cada vez que se crea un nuevo nodo se busca primero de la lista de libres y se lo reusa. El programa ejecutará repetidamente el siguiente código:
e = pop(&free_nodes); ❶ if (! e) { e = malloc(sizeof(struct node)); ❷ printf("malloc\n"); } e->data.tid = tid; e->data.c = i; push(&head, e); ❸ e = pop(&head); ❹ if (e) { push(&free_nodes, e); ❺ } else { printf("Error, stack empty\n"); ❻ }
❶ | Obtiene un nodo de la lista de libres. |
❷ | La lista de libres estaba vacía, se solicita memoria. En la siguiente línea se imprime, debería haber como máximo tantos malloc como hilos. |
❸ | Se agrega el elemento a la pila de head . |
❹ | Se elimina un elemento de la pila de head . |
❺ | Se se pudo obtener el elemento se agrega el elemento a la pila de libres. |
❻ | La lista estaba vacía, es un error. |
La ejecución del programa dará numerosos errores de de la pila vacía y se harán también más malloc
de los que debería. Es consecuencia del problema ABA.
Una solución para el problema ABA es el usar bits adicionales como etiquetas para identificar una transacción (tagged CAS). Para ello algunas arquitecturas introdujeron instrucciones CASque permiten la verificación e intercambio de más de una palabra[42], por ejemplo Intel con las instrucciones cmpxchg8b
y cmpxchg16b
dobles que permiten trabajar con estructuras de 64 y 128 bit, en vez de solo registros atómicos de 32 o 64 bits. En nuestro caso necesitamos hacerlo solo para verificar el intercambio de las cabeceras por lo que usaremos la estructura node_head
para ambas.
struct node_head { struct node *node; ❶ uintptr_t aba; ❷ }; struct node_head stack_head; ❸ struct node_head free_nodes;
❶ | El puntero al nodo que contiene los datos. |
❷ | Será usada como etiqueta, un contador que se incrementará en cada transacción. Es un entero del mismo tamaño que los punteros (32 o 64 bits según la arquitectura), |
❸ | Los punteros a las pilas no serán un simple puntero sino la estructura con el puntero y la etiqueta. |
El código completo en C está en stack_cas_tagged.c, pero analicemos el funcionamiento de push.
void push(struct node_head *head, struct node *e) { struct node_head orig, next; __atomic_load(head, &orig); ❶ do { next.aba = orig.aba + 1; ❷ next.node = e; e->next = orig.node; ❸ } while (!CAS(head, &orig, &next); ❹ }
❶ | Al tratarse de una estructura no es un registro atómico mas bien un registro seguro, debemos asegurar que se hace una copia atómica de head a orig . |
❷ | next tendrá los datos de head después del CAS, en este incrementamos el valor de aba . |
❸ | El nodo siguiente de nuevo nodo es el que está ahora en la cola. |
❹ | Se intenta el intercambio, solo se hará si tanto el puntero al nodo y el entero aba son idénticos a los copiados en orig . Si entre <1> y <4> el valor de head es cambiado por otros procesos el valor de aba habrá cambiado (será un valor mayor) por lo que CAS retornará falso aunque el puntero al nodo sea el mismo. |
compareAndSwap es la más potente de las operaciones atómicas anteriores ya que permite el consenso con infinitos procesos (consenso de clase N). Sin embargo en algunas arquitecturas RISC (PowerPC, Alpha, MIPS y ARM) diseñaron una técnica diferente para implementar registros RMW, es tan potente que puede emular a cualquiera de las anteriores: el LL/SC. De hecho, si has compilado los programas de ejemplos en algunas de esas arquitecturas (por ejemplo en una Raspberry) el compilador habrá reemplazado por llamadas a esas operaciones por una serie de instrucciones con LL/SC que las emulan.
El diseño de LL/SC es muy ingenioso, se basa en dos operaciones diferentes que trabajan en cooperación con la gestión de caché. Una es similar a la tradicional cargar (load) una dirección de memoria en un registro: LWARX en PowerPC, LL en MIPS, LDREX en ARM. La otra a la de almacenar (store) un registro en una dirección de memoria: STWC en PowerPC, SC en MIPS y STREX en ARM. El matiz importante es que ambas están enlazadas, la ejecución de la segunda es condicional si el registro objetivo no fue modificado desde la ejecución de la primera. Tomemos por LDREX y STREX de la arquitectura ARM.
- LDREX
- Carga una dirección de memoria en un registro y etiqueta o marca esa dirección como de acceso exclusivo. Luego puede ejecutarse cualquier número de instrucciones hasta el STREX.
- STREX
- Almacena el valor de un registro en una dirección de memoria pero solo si esa dirección ha sido reservada anteriormente con un LDREX y no ha sido modificada por ningún otro proceso. Por ejemplo la siguiente instrucción :
El siguiente código carga el contenido de la dirección indicada por r0
en el registro r1
y marca esa dirección[43]:
ldrex r1, [r0] ❶ ... strex r2, r1, [r0] ❷
❶ | Carga el contenido de la dirección indicada por r0 en el registro r1 y marca esa dirección[44] |
❷ | Almacena el valor del registro r1 en la dirección apuntada por r0 si y solo sí esa dirección no fue modificada por otro proceso. Si se almacenó se pone r2 en 0 caso contrario en 1 . |
Vale la pena analizar algunas de las emulaciones de instrucciones atómicas[45], por ejemplo getAndAdd y compareAndSwap:
getAndAdd.
.L1: ldrex r1, [r0] ❶ add r1, r1, #1 ❷ strex r2, r1, [r0] ❸ cmp r2, #0 bne .L1 ❹
❶ | Carga la dirección especificada por r0 en r1 . |
❷ | Incrementa en 1. |
❸ | Almacena condicionalmente la suma. |
❹ | Si falló vuelve a intentarlo cargando el nuevo valor. |
ldr r0, [r2] ❶ .L1 ldrex r1, [r3] ❷ cmp r1, r0 bne .L2 ❸ strex lr, ip, [r3] ❹ cmp lr, #0 bne .L1 ❺ .L2 ...
❶ | Carga el contenido de la primera dirección en r0 . |
❷ | Carga el contenido de la segunda dirección en r1 . |
❸ | El resultado de la comparación es falso, sale del CAS. |
❹ | Intenta almacenar el nuevo valor en la dirección indicada por r3 (es decir, hace el swap). |
❺ | Si no se pudo almacenar vuelve a intentarlo. |
Las implementaciones en hardware de las instrucciones LL/SC tiene algunos problemas que afectan a la eficiencia. El resultado del store condicional puede retornar error[46] espurio por cambios de contexto, emisiones broadcast en el bus de caché, actualizaciones en la misma línea de caché o incluso otras operaciones de lectura o escritura no relacionadas entre el load y elstore. Por eso la recomendación general es que el fragmento de código dentro de una sección exclusiva sea breve y que se minimicen los almacenamientos a memoria.
La mayor ventaja de las instrucciones LL/SC es que no sufren del problema ABA, el primer cambio ya invalidaría el store condicional posterior. Cuando analizamos el problema ABA vimos cómo se puede reproducir el problema con un par de colas, una para los nodos y la otra para los que quedan libres. El algoritmo usa el macro atómico para compareAndSwap y cuando se traduce a ensamblador para arquitecturas como ARM se traduce a código que emula el compareAndSwap. En una arquitectura con LL/SC es mejor implementarlo directamente con esas instrucciones, pero a menos que lo hagas con los compiladores de los fabricantes no contamos con los macros adecuados[47], por lo que debemos recurrir a ensamblador para hacerlo.
Vamos a ello.
Dividimos el código en dos partes. La de C es similar al ejemplo anterior con doble pila pero sin la implementación de las funciones pop() y push(). Éstas están implementadas en ensamblador de ARM[48] y trabajan con la misma estructura de pila anterior.
El código es bastante sencillo de entender, vamos a ver analizar en detalle la función pop() que es la más breve de ambas:
pop().
pop: push {ip, lr} 1: ldrex r1, [r0] ❶ cmp r1, #0 beq 2f ❷ ldr r2, [r1] ❸ strex ip, r2, [r0] ❹ cmp ip, #0 bne 1b ❺ 2: mov r0, r1 ❻ pop {ip, pc}
❶ | Carga LL del primer argumento de la función (head), la dirección del primer elemento de la lista puntero[49]. |
❷ | En la línea anterior se compara si es igual a cero, de ser así es porque la cola está vacía, sale del bucle para devolver el puntero NULL. |
❸ | Carga en r2 el puntero del siguiente elemento[50] de la lista, la dirección de e→next de la estructura del nodo. |
❹ | Almacena el siguiente elemento en head. |
❺ | Copia el contenido de r1 a r0 , que es el valor devuelto por la función. |
Una vez conocidas las características y posibilidades de LL/SC es relativamente sencillo simular las otras operaciones atómicas y quizás aún más sencillo implementar el algoritmo directamente basado en LL/SC. La dificultad es que no es habitual contar con macros genéricos debido a que en arquitecturas sin LL/SC es muy complicado simular estas operaciones con instrucciones CAS, por lo que habrá que recurrir a ensamblador y además con una versión para cada plataforma que lo implemente.
Pero si se hace correctamente además de evitar el problema ABA se puede hacer mucho más eficiente. Los siguientes son los tiempos de ejecución de los últimos algoritmos vistos en una Raspberry 1.
Tabla 1. Comparación de tiempos en Raspberry 1
Programa | Tiempo de reloj |
---|---|
Pila con malloc, CAS (problema ABA) | 8.6 seg |
Doble pila, CAS (problema ABA) | 4.9 seg |
Doble pila CAS etiquetado (sin ABA) | 10.0 seg |
Doble pila con LL/SC (ensamblador, sin ABA) | 2.3 seg |
La implementación con LL/SC nativo es más de dos veces más rápido que el siguiente más rápido, que sufre del problema ABA[51] y más de cuatro veces más rápido que la simulación deCAS etiquetado.
En este capítulo hemos visto las instrucciones por hardware esenciales, tanto para sistemas operativos como lenguajes, para construir primitivas de sincronización de más alto nivel. Las técnicas que usan estas primitivas -directa o indirectamente- son llamados spinlocks. Las hemos analizado desde las más básicas hasta las más potentes como CAS y LL/SC. Aunque comenzamos solo con el objetivo de resolver el problema fundamental de sincronización entre procesos -exclusión mutua- hemos introducido el uso de las mismas para problemas más sofisticados, como el CAS etiquetado y el uso de LL/SC para gestión de pilas concurrentes.
No hay instrucciones de hardware unificadas para todas las arquitecturas, tampoco una estandarización a nivel de lenguajes de programación. Esa es la razón por la que los compiladores implementan sus propios macros atómicos que luego son convertidos a funciones más complejas que simulan a las instrucciones o registros RMW definidos por el macro. Lo vimos claramente con la arquitectura ARM, todas las operaciones se simulan con LL/SC. La inversa es más complicada -sino imposible- por lo que habitualmente no se cuentan con esos macros[52]y hay que recurrir al ensamblador para poder aprovechar las capacidades de nativas de cada procesador, muy habitual en los sistemas operativos[53].
De todas maneras los spinlocks basados en instrucciones por hardware son fundamentales y se requieren algoritmos muy eficientes sobre todo para multiprocesadores o núcleos. Además de solucionar problemas la exclusión mutua interesa gestionar estructuras concurrentes habituales (pilas, listas, lectores-escritores, etc.) que minimicen el impacto sobre el sistema de caché. Este será el tema del siguiente capítulo.
En todos los ejemplos de exclusión mutua vistos hasta ahora la sección crítica consistía solo en incrementar un contador compartido. Es perfecto para mostrar que una instrucción y operación aritmética que en apariencia son tan simples también son víctimas del acceso concurrente desorganizado. Pero espero que os hayáis dado cuenta que no hace falta recurrir a un spinlock para hacerlo correctamente, que ya hay instrucciones de hardware que lo hacen de forma eficiente, como el getAndAdd o addAndGet. Por ejemplo en C:
for (i=0; i < max; i++) { c = __atomic_add_fetch(&counter, 1, __ATOMIC_RELAXED); }
O en Go:
for i := 0; i < max; i++ { c = atomic.AddInt32(&counter, 1) }
[31] Está demostrado ([Herlihy12]) que dichos algoritmos son óptimos en cuestión de espacio
[32] Como pasa en la Raspberry 1.
[33] Como local_irq_disable() o local_irq_enable() en Linux.
[34] Deshabilita la cualidad de apropiativo (o preemptive) del scheduler.
[35] Estrictamente no es getAndAdd sino addAndGet, devuelve el valor después de sumar, pero son equivalentes, solo hay que cambiar la inicialización de la variable turn.
[36] No todas las arquitecturas la tienen, en Intel es XCHG para enteros de 32 bits.
[37] Esta función no estaba disponible en Go para ARM hasta 2013, si la pruebas en una Raspberry asegúrate de tener una versión de Go moderna.
[38] Es la que se usa en la arquitectura Intel/AMD.
[39] Aunque sufre el problema ABA.
[40] En los antiguos macros de GCC las instrucciones equivalentes son __sync_bool_compare_and_swap
y __sync_val_compare_and_swap
respectivamente. La diferencia fundamental es que no se modifica el registro del valor a comparar.
[41] Las implementaciones de malloc
suelen reusar las direcciones de los elementos que acaban de ser liberados.
[42] Los registros atómicos explicados antes.
[43] En ARM se etiqueta en el sistema del monitor de acceso exclusivo, en otras arquitecturas se asocia un bit del TLB o de memoria caché.
[44] En ARM se etiqueta en el sistema del monitor de acceso exclusivo, en otras arquitecturas se asocia un bit del TLB o de memoria caché.
[45] Si quieres presumir has de llamarles «implementaciones de registros RMW«.
[46] No implica que falle el algoritmo implementado, solo que fuerza que se haga otro bucle de lectura y escritura.
[47] Al menos no en GCC.
[48] Para que funcione en una Raspberry, agradezco a Sergio L. Pascual por ayudarme a mejorar y probar el código.
[49] Recordad que el primer argumento de la función es la dirección del puntero, es decir un puntero a puntero.
[50] Dado que next es el primer campo del nodo su dirección coincide con la del nodo, por eso no hay desplazamieno en el código ensamblador cuando leemos o modificamos next.
[51] Y por lo tanto incorrecto.
[52] Salvo los compiladores de los propios fabricantes que los incluyen en sus compiladores propietarios, en ARM se llaman intrinsics
[53] Por ejemplo en Linux se usa el ensamblador inline, ASM().
Bibliografía
- [Stallings] William Stallings. Operating Systems: Internals and Design Principles (8th Edition), 2014.
- [railroad] A Treasury of Railroad Folklore, B.A. Botkin & A.F. Harlow, p. 381.
- [Peterson] G. L. Peterson. Myths About the Mutual Exclusion Problem, Information Processing Letters 12(3) 1981, 115–116
- [Lamport] Leslie Lamport. A New Solution of Dijkstra’s Concurrent Programming Problem. 1974research.microsoft.com/en-us/um/people/lamport/pubs/bakery.pdf
- [Lamport2] Leslie Lamport. _The mutual exclusion problem – Part I: A theory of interprocess communication. Journal of the ACM. 1986.
- [Barriers] David Howells. Linux Kernel Memory Barriers. www.kernel.org/doc/Documentation/memory-barriers.txt
- [Atomics] GCC Built-in functions for atomic memory access. gcc.gnu.org/onlinedocs/gcc-4.4.3/gcc/Atomic-Builtins.html
- [Atomics_C11] Built-in functions for memory model aware atomic operations. gcc.gnu.org/onlinedocs/gcc-4.9.2/gcc/_005f_005fatomic-Builtins.html#_005f_005fatomic-Builtins
- [Herlihy12] Maurice Herlihy, Nir Shavit. The Art of Multiprocessor Programming. 2012. ISBN 978-0123973375.
- [Herlihy91] Maurice Herlihy. Wait-Free Synchronization. ACM Transactions on Programming Languages and Systems, 1991. cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf
Código fuente
Instrucciones por hardware
getAndSet
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define MAX_COUNT 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
int mutex = 0;
int counter = 0;
void lock() {
while(__atomic_exchange_n(&mutex, 1, __ATOMIC_SEQ_CST));
}
void unlock() {
mutex = 0;
}
void *count(void *ptr) {
long i, max = MAX_COUNT/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
for (i=0; i < max; i++) {
lock();
counter += 1; // The global variable, i.e. the critical section
unlock();
}
printf("End %d counter: %d\n", tid, counter);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
return 0;
}
getAndAdd
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define MAX_COUNT 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
// Used for get&add, the first is the number obtained by a process
// The second is the next turn
int number = 0;
int turn = 0;
int counter = 0;
void lock() {
int current;
current = __atomic_fetch_add(&number, 1, __ATOMIC_RELAXED);
while(current != turn);
}
void unlock() {
__atomic_fetch_add(&turn, 1, __ATOMIC_SEQ_CST);
}
void *count(void *ptr) {
long i, max = MAX_COUNT/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
for (i=0; i < max; i++) {
lock();
counter += 1; // The global variable, i.e. the critical section
unlock();
}
printf("End %d counter: %d\n", tid, counter);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
return 0;
}
/* Implements mutual exclusion with atomic.AddUint32 */
package main
import (
"fmt"
"runtime"
"sync/atomic"
)
const (
MAX_COUNT = 10000000
GOROUTINES = 4
)
var (
counter int
number uint32
turn uint32 = 1 // AddUint32 returns the new value
)
func lock() {
var current uint32
current = atomic.AddUint32(&number, 1)
/* We have to use LoadUint32 because there is no volatile vars
and turn is read from the local cache */
for current != atomic.LoadUint32(&turn) {}
}
func unlock() {
atomic.AddUint32(&turn, 1)
}
func run(id, counts int, done chan bool) {
for i := 0; i < counts; i++ {
lock()
counter++
unlock()
}
fmt.Printf("End %d counter: %d\n", id, counter)
done <- true
}
func main() {
runtime.GOMAXPROCS(GOROUTINES)
done := make(chan bool, 1)
for i := 0; i < GOROUTINES; i++ {
go run(i, MAX_COUNT/GOROUTINES, done)
}
for i := 0; i < GOROUTINES; i++ {
<-done
}
fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}
testAndSet
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define MAX_COUNT 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
char mutex = 0;
int counter = 0;
void lock() {
while(__atomic_test_and_set(&mutex, __ATOMIC_RELAXED));
}
void unlock() {
mutex = 0;
// Recomended is __atomic_clear(&mutex,...) but we already have cache coherence
}
void *count(void *ptr) {
long i, max = MAX_COUNT/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
for (i=0; i < max; i++) {
lock();
counter += 1; // The global variable, i.e. the critical section
unlock();
}
printf("End %d counter: %d\n", tid, counter);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
return 0;
}
Swap
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define MAX_COUNT 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
int mutex = 0;
int counter = 0;
void lock(int i) {
int local = 1;
do {
__atomic_exchange(&mutex, &local, &local, __ATOMIC_RELAXED);
} while (local);
}
void unlock(int i) {
mutex = 0;
}
void *count(void *ptr) {
long i, max = MAX_COUNT/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
for (i=0; i < max; i++) {
lock(tid);
counter += 1; // The global variable, i.e. the critical section
unlock(tid);
}
printf("End %d counter: %d\n", tid, counter);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
return 0;
}
/* Implements mutual exclusion with atomic.SwapInt32 */
package main
import (
"fmt"
"runtime"
"sync/atomic"
)
const (
MAX_COUNT = 10000000
GOROUTINES = 4
)
var (
counter int
mutex int32
)
func lock() {
for atomic.SwapInt32(&mutex, 1) == 1 {} // While it returns 1
}
func unlock() {
mutex = 0
}
func run(id, counts int, done chan bool) {
for i := 0; i < counts; i++ {
lock()
counter++
unlock()
}
fmt.Printf("End %d counter: %d\n", id, counter)
done <- true
}
func main() {
runtime.GOMAXPROCS(GOROUTINES)
done := make(chan bool, 1)
for i := 0; i < GOROUTINES; i++ {
go run(i, MAX_COUNT/GOROUTINES, done)
}
for i := 0; i < GOROUTINES; i++ {
<-done
}
fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}
compareAndSwap
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define MAX_COUNT 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
int mutex = 0;
int counter = 0;
void lock(int i) {
int local;
do {
local = 0;
} while (! __atomic_compare_exchange_n(&mutex, &local, 1, 1, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
}
void unlock(int i) {
mutex = 0;
}
void *count(void *ptr) {
long i, max = MAX_COUNT/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
for (i=0; i < max; i++) {
lock(tid);
counter += 1; // The global variable, i.e. the critical section
unlock(tid);
}
printf("End %d counter: %d\n", tid, counter);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, count, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
return 0;
}
/* Implements mutual exclusion with atomic.CompareAndSwapInt32 */
package main
import (
"fmt"
"runtime"
"sync/atomic"
)
const (
MAX_COUNT = 10000000
GOROUTINES = 4
)
var (
counter = 0
mutex int32 = 0
)
func lock() {
for ! atomic.CompareAndSwapInt32(&mutex, 0, 1) {}
}
func unlock() {
mutex = 0
}
func run(id, counts int, done chan bool) {
for i := 0; i < counts; i++ {
lock()
counter++
unlock()
}
fmt.Printf("End %d counter: %d\n", id, counter)
done <- true
}
func main() {
runtime.GOMAXPROCS(GOROUTINES)
done := make(chan bool, 1)
for i := 0; i < GOROUTINES; i++ {
go run(i, MAX_COUNT/GOROUTINES, done)
}
for i := 0; i < GOROUTINES; i++ {
<-done
}
fmt.Printf("Counter value: %d Expected: %d\n", counter, MAX_COUNT);
}
ABA
// This code implements an FAULTY concurrent stack
// It's suffers of the ABA problem it will generate a Segmentation Fault
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define OPERATIONS 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
struct node_data {
int tid;
int c;
};
struct node {
struct node *next;
struct node_data data;
};
struct node *head = NULL;
void push(struct node **head, struct node *e) {
e->next = *head;
while (! __atomic_compare_exchange(head, &e->next, &e, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}
struct node *pop(struct node **head) {
struct node *old_head;
old_head = *head;
do {
if (! old_head) {
return NULL;
}
} while (! __atomic_compare_exchange(head, &old_head, &old_head->next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
return old_head;
}
void *add_elements(void *ptr) {
long i, elems = OPERATIONS/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
struct node *e;
for (i=0; i < elems; i++) {
e = malloc(sizeof(struct node));
e->data.tid = tid;
e->data.c = i;
push(&head, e);
// Pop an element and add it to the free list
e = pop(&head);
if (e) {
e->next = NULL;
free(e);
} else {
printf("Error in %d it shouldn't be empty\n", tid);
}
}
printf("End %d\n", tid);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
struct node *e;
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
}
// This code implements an FAULTY concurrent stack
// It's suffers of the ABA problem it will generate a Segmentation Fault
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define OPERATIONS 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
struct node_data {
int tid;
int c;
};
struct node {
struct node *next;
struct node_data data;
};
struct node *head = NULL;
struct node *free_nodes = NULL;
void push(struct node **head, struct node *e) {
e->next = *head;
while (! __atomic_compare_exchange(head, &e->next, &e, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}
struct node *pop(struct node **head) {
struct node *old_head;
old_head = *head;
do {
if (! old_head) {
return NULL;
}
} while (! __atomic_compare_exchange(head, &old_head, &old_head->next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
return old_head;
}
void *add_elements(void *ptr) {
long i, elems = OPERATIONS/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
struct node *e;
for (i=0; i < elems; i++) {
// Get an element from the free list
e = pop(&free_nodes);
if (! e) {
// It was empty, request memory
e = malloc(sizeof(struct node));
printf("%d malloc\n", tid);
}
e->data.tid = tid;
e->data.c = i;
push(&head, e);
// Pop an element and add it to the free list
e = pop(&head);
if (e) {
e->next = NULL;
push(&free_nodes, e);
} else {
printf("Error in %d it shouldn't be empty\n", tid);
}
}
printf("End %d\n", tid);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
struct node *e;
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
}
// This code implements the right concurrent stack with ABA check
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <stdint.h>
#define NUM_THREADS 4
#define OPERATIONS 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
struct node_data {
int tid;
int c;
};
struct node_head {
struct node *node;
uintptr_t aba;
};
struct node {
struct node_data data;
struct node *next;
};
struct node_head stack_head;
struct node_head free_nodes;
void push(struct node_head *head, struct node *e) {
struct node_head old_head, next;
// The structure is not an "atomic register", we must force the atomic load
__atomic_load(head, &old_head, __ATOMIC_RELAXED);
do {
next.aba = old_head.aba + 1;
next.node = e;
e->next = old_head.node;
} while (! __atomic_compare_exchange(head, &old_head, &next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
}
struct node *pop(struct node_head *head) {
struct node_head old_head, next;
// The structure is not an "atomic register", we must force the atomic load
__atomic_load(head, &old_head, __ATOMIC_SEQ_CST);
do {
if (! old_head.node) {
return NULL;
}
next.aba = old_head.aba + 1;
next.node = old_head.node->next;
} while (! __atomic_compare_exchange(head, &old_head, &next, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
return old_head.node;
}
void *add_elements(void *ptr) {
long i, elems = OPERATIONS/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
struct node *e;
for (i=0; i < elems; i++) {
e = pop(&free_nodes); // Get an element from the free list
if (! e) { // If there is none, reserve memory
e = malloc(sizeof(struct node));
printf("%d malloc\n", tid);
}
e->data.tid = tid;
e->data.c = i;
push(&stack_head, e);
// Pop an element and add it to the free list
e = pop(&stack_head);
if (e) {
push(&free_nodes, e);
} else {
printf("Error in %d it shouldn't be empty\n", tid);
}
}
printf("End %d\n", tid);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
struct node *e, *tmp;
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
return 0;
}
LL/SC
// Implementation of a concurrent stack for ARM with LL/SC
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#define NUM_THREADS 4
#define OPERATIONS 10000000
// Just used to send the index of the id
struct tdata {
int tid;
};
struct node_data {
int tid;
int c;
};
struct node {
struct node *next;
struct node_data data;
};
// Defined in the assembly code stack_llsc.s
void push(struct node **, struct node *);
struct node *pop(struct node **);
struct node *head = NULL;
struct node *free_nodes = NULL;
void *add_elements(void *ptr) {
long i, elems = OPERATIONS/NUM_THREADS;
int tid = ((struct tdata *) ptr)->tid;
struct node *e;
for (i=0; i < elems; i++) {
// Get an element from the free list
e = pop(&free_nodes);
if (! e) {
// It was empty, request memory
e = malloc(sizeof(struct node));
printf("%d malloc\n", tid);
}
e->data.tid = tid;
e->data.c = i;
push(&head, e);
// Pop an element and add it to the free list
e = pop(&head);
if (e) {
e->next = NULL;
push(&free_nodes, e);
} else {
printf("Error in %d it shouldn't be empty\n", tid);
}
}
printf("End %d\n", tid);
pthread_exit(NULL);
}
int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, i;
struct tdata id[NUM_THREADS];
for(i=0; i<NUM_THREADS; i++){
id[i].tid = i;
rc = pthread_create(&threads[i], NULL, add_elements, (void *) &id[i]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(i=0; i<NUM_THREADS; i++){
pthread_join(threads[i], NULL);
}
}
.align 4
.global push
.type push, %function
push:
push {ip, lr}
1:
ldr r3, [r0]
str r3, [r1] @@ To avoid storing e->next inside ldrex/strex
@ mcr p15, 0, r0, c7, c10, 5 @@ It shouldn't be needed
ldrex r2, [r0]
cmp r2, r3
bne 1b
strex ip, r1, [r0]
cmp ip, #0
bne 1b
mcr p15, 0, r0, c7, c10, 5
pop {ip, pc}
.align 4
.global pop
.type pop, %function
pop:
push {ip, lr}
1:
ldrex r1, [r0]
cmp r1, #0
beq 2f
ldr r2, [r1]
strex ip, r2, [r0]
cmp ip, #0
bne 1b
mcr p15, 0, r0, c7, c10, 5
2:
mov r0, r1
pop {ip, pc}