Este es un truco en Go muy útil y eficiente para hacer tareas de “mantenimiento” en estructuras de forma concurrente (y seguramente paralela).
Supongamos que tenéis una lista de elementos, a esta lista se agregan u obtienen elementos pero hay que mantener solamente los últimos N, donde N puede estar basado en una límite temporal.
Las operaciones sobre la lista deben responder muy rápidamente por lo que no es posible llamar a la función que elimina los más antiguos porque tiene un coste computacional (O(n) o O(log n) en el mejor de los casos) que retrasaría la respuesta.
Hay dos técnicas muy habituales en Go:
- Lanzar al principio una goroutine que periódicamente se ejecute después de una espera con time.Sleep() o usar el time.Ticker que ya hace ambas cosas. El problema con esta solución es que se ejecuta y consume CPU aunque no haya actividad.
- Lanzar la goroutine cada vez que hay actividad, por ejemplo al agregar u obtener elementos de la lista. El problema, además del pequeño overhead de lanzar la goroutine, es que si hay muchas operaciones se creará un número idénticos de goroutines y seguramente cada una de ellas tendrá que esperar que la otra acabe (se necesitan mutex, la estructura es compartida entre varios hilos de ejecución) y el trabajo se estará haciendo trabajo de más sin ninguna utilidad.
Una tercera forma muy eficiente, algunos le llaman actor (por el modelo de actores). Como en el caso 1 anterior, se lanza una goroutine -el actor– cuando se crea la estructura (o la interfaz), esta rutina sólo espera recibir un mensaje de un canal específico para ella. Cuando recibe un mensaje hace su trabajo y vuelve a esperar por uno nuevo.
func (s *Serie) actor() {
for {
<-s.aChannel
s.Lock()
..
s.Unlock()
}
}
Pero tiene su truco. En primer lugar el canal se crea sin buffer (el default en Go), las funciones que operan sobre la estructura envían un mensaje asíncrono.
func (s *Serie) callActor() {
select {
case s.aChannel <- true:
default:
}
}
Si el actor está ocupado procesando el mensaje será descartado, por lo que no hará dos o más veces la misma tarea si recibió mensajes cuando estaba ocupado. Es más, se puede hacer que el actor no se ejecute más de una vez cada x tiempo. Para ello basta agregar un time.Sleep() al actor.
func (s *Serie) actor() {
for {
<-s.aChannel
s.Lock()
..
s.Unlock()
time.Sleep(someDuration)
}
}
Esta solución es eficiente, sólo hay una rutina que hace la tarea, nunca bloquea a los procesos “importantes” y se puede limitar la frecuencia de ejecución.
A continuación el código de la pequeña interfaz para mantener caché de una serie temporal ordenada y con sólo los elementos a partir de un límite.
package main | |
import ( | |
"sync" | |
"time" | |
) | |
type SerieElement struct { | |
ts time.Time | |
data *BookingData | |
} | |
type Serie struct { | |
sync.RWMutex | |
ids map[string]bool | |
elements []*SerieElement | |
maxCache time.Duration | |
lastTs time.Time | |
removeCh chan bool | |
} | |
func NewSerie(maxCache time.Duration) (s *Serie) { | |
s = &Serie{ | |
elements: make([]*SerieElement, 0), | |
maxCache: maxCache, | |
ids: make(map[string]bool), | |
removeCh: make(chan bool), | |
} | |
go s.removeOldActor() | |
return | |
} | |
func (s *Serie) Append(b *BookingData, ts time.Time) { | |
s.Lock() | |
defer s.Unlock() | |
defer s.removeOld() | |
_, ok := s.ids[b.id] | |
if ok { | |
// Element already inserted | |
return | |
} | |
e := &SerieElement{ | |
ts: ts, | |
data: b, | |
} | |
s.ids[e.data.id] = true | |
s.elements = append(s.elements, e) | |
if ts.After(s.lastTs) { | |
s.lastTs = ts | |
} | |
} | |
func (s *Serie) Get(fromTime time.Time) (elements []*SerieElement) { | |
s.RLock() | |
defer s.RUnlock() | |
defer s.removeOld() | |
first := len(s.elements) | |
for i := first – 1; i >= 0; i— { | |
if s.elements[i].ts.After(fromTime) { | |
first = i | |
} else { | |
break | |
} | |
} | |
if first < len(s.elements) { | |
elements = s.elements[first:] | |
} | |
return | |
} | |
func (s *Serie) removeOld() { | |
// Asynchronous, the Actor will ignore if there are frequent calls | |
select { | |
case s.removeCh <- true: | |
default: | |
} | |
} | |
func (s *Serie) removeOldActor() { | |
for { | |
<-s.removeCh | |
s.Lock() | |
now := time.Now() | |
olds := 0 | |
for _, e := range s.elements { | |
if e.ts.Before(now.Add(–s.maxCache)) { | |
delete(s.ids, e.data.id) | |
olds++ | |
} else { | |
break | |
} | |
} | |
if olds > 0 { | |
s.elements = s.elements[olds:] | |
} | |
s.Unlock() | |
time.Sleep(5 * time.Second) // To ignore frequent and innecesary calls | |
} | |
} |
Usé la misma para un demonio con mucha concurrencia y que debía ser muy eficiente. Mientras más cargado estaba el sistema debía medir con mayor precisión el uso de CPU. Para ello necesitaba medirlo a intervalos variables y cada vez que lo necesitaba. Pero debía también asegurar que transcurriese un tiempo mínimo entre ejecuciones para obtener valores fiables de las mediciones de los jiffies de CPU.
func (broker *Broker) updateCPUIdle() { | |
select { | |
case broker.cpuUpdateChannel <- true: | |
default: | |
} | |
} | |
func (broker *Broker) cpuIdleUpdater() { | |
// Initial values required to get stats | |
minPeriod := 20 * time.Millisecond | |
idle, total := getCPUSample() | |
broker.idleJiffies = idle | |
broker.totalJiffies = total | |
broker.updatedJiffies = time.Now() | |
for { | |
<-broker.cpuUpdateChannel | |
now := time.Now() | |
if broker.updatedJiffies.Add(minPeriod).After(now) { | |
continue | |
} | |
idle, total := getCPUSample() | |
if total == broker.totalJiffies { | |
continue | |
} | |
alpha := math.Min(1, 0.5*float64(now.Sub(broker.updatedJiffies)/time.Millisecond)/1000) | |
broker.CPUIdle = float64(idle–broker.idleJiffies) / float64(total–broker.totalJiffies) | |
broker.CPUIdleAvg = (1–alpha)*broker.CPUIdleAvg + alpha*broker.CPUIdle | |
broker.idleJiffies = idle | |
broker.totalJiffies = total | |
broker.updatedJiffies = now | |
// Don't update CPU so frequently, waiting here make updateCPUIdle fails to write | |
time.Sleep(minPeriod) | |
} | |
} |