Este es un truco en Go muy útil y eficiente para hacer tareas de «mantenimiento» en estructuras de forma concurrente (y seguramente paralela).
Supongamos que tenéis una lista de elementos, a esta lista se agregan u obtienen elementos pero hay que mantener solamente los últimos N, donde N puede estar basado en una límite temporal.
Las operaciones sobre la lista deben responder muy rápidamente por lo que no es posible llamar a la función que elimina los más antiguos porque tiene un coste computacional (O(n) o O(log n) en el mejor de los casos) que retrasaría la respuesta.
Hay dos técnicas muy habituales en Go:
- Lanzar al principio una goroutine que periódicamente se ejecute después de una espera con time.Sleep() o usar el time.Ticker que ya hace ambas cosas. El problema con esta solución es que se ejecuta y consume CPU aunque no haya actividad.
- Lanzar la goroutine cada vez que hay actividad, por ejemplo al agregar u obtener elementos de la lista. El problema, además del pequeño overhead de lanzar la goroutine, es que si hay muchas operaciones se creará un número idénticos de goroutines y seguramente cada una de ellas tendrá que esperar que la otra acabe (se necesitan mutex, la estructura es compartida entre varios hilos de ejecución) y el trabajo se estará haciendo trabajo de más sin ninguna utilidad.
Una tercera forma muy eficiente, algunos le llaman actor (por el modelo de actores). Como en el caso 1 anterior, se lanza una goroutine -el actor– cuando se crea la estructura (o la interfaz), esta rutina sólo espera recibir un mensaje de un canal específico para ella. Cuando recibe un mensaje hace su trabajo y vuelve a esperar por uno nuevo.
func (s *Serie) actor() { for { <-s.aChannel s.Lock() .. s.Unlock() } }
Pero tiene su truco. En primer lugar el canal se crea sin buffer (el default en Go), las funciones que operan sobre la estructura envían un mensaje asíncrono.
func (s *Serie) callActor() { select { case s.aChannel <- true: default: } }
Si el actor está ocupado procesando el mensaje será descartado, por lo que no hará dos o más veces la misma tarea si recibió mensajes cuando estaba ocupado. Es más, se puede hacer que el actor no se ejecute más de una vez cada x tiempo. Para ello basta agregar un time.Sleep() al actor.
func (s *Serie) actor() { for { <-s.aChannel s.Lock() .. s.Unlock() time.Sleep(someDuration) } }
Esta solución es eficiente, sólo hay una rutina que hace la tarea, nunca bloquea a los procesos «importantes» y se puede limitar la frecuencia de ejecución.
A continuación el código de la pequeña interfaz para mantener caché de una serie temporal ordenada y con sólo los elementos a partir de un límite.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"sync" | |
"time" | |
) | |
type SerieElement struct { | |
ts time.Time | |
data *BookingData | |
} | |
type Serie struct { | |
sync.RWMutex | |
ids map[string]bool | |
elements []*SerieElement | |
maxCache time.Duration | |
lastTs time.Time | |
removeCh chan bool | |
} | |
func NewSerie(maxCache time.Duration) (s *Serie) { | |
s = &Serie{ | |
elements: make([]*SerieElement, 0), | |
maxCache: maxCache, | |
ids: make(map[string]bool), | |
removeCh: make(chan bool), | |
} | |
go s.removeOldActor() | |
return | |
} | |
func (s *Serie) Append(b *BookingData, ts time.Time) { | |
s.Lock() | |
defer s.Unlock() | |
defer s.removeOld() | |
_, ok := s.ids[b.id] | |
if ok { | |
// Element already inserted | |
return | |
} | |
e := &SerieElement{ | |
ts: ts, | |
data: b, | |
} | |
s.ids[e.data.id] = true | |
s.elements = append(s.elements, e) | |
if ts.After(s.lastTs) { | |
s.lastTs = ts | |
} | |
} | |
func (s *Serie) Get(fromTime time.Time) (elements []*SerieElement) { | |
s.RLock() | |
defer s.RUnlock() | |
defer s.removeOld() | |
first := len(s.elements) | |
for i := first – 1; i >= 0; i— { | |
if s.elements[i].ts.After(fromTime) { | |
first = i | |
} else { | |
break | |
} | |
} | |
if first < len(s.elements) { | |
elements = s.elements[first:] | |
} | |
return | |
} | |
func (s *Serie) removeOld() { | |
// Asynchronous, the Actor will ignore if there are frequent calls | |
select { | |
case s.removeCh <- true: | |
default: | |
} | |
} | |
func (s *Serie) removeOldActor() { | |
for { | |
<-s.removeCh | |
s.Lock() | |
now := time.Now() | |
olds := 0 | |
for _, e := range s.elements { | |
if e.ts.Before(now.Add(–s.maxCache)) { | |
delete(s.ids, e.data.id) | |
olds++ | |
} else { | |
break | |
} | |
} | |
if olds > 0 { | |
s.elements = s.elements[olds:] | |
} | |
s.Unlock() | |
time.Sleep(5 * time.Second) // To ignore frequent and innecesary calls | |
} | |
} |
Usé la misma para un demonio con mucha concurrencia y que debía ser muy eficiente. Mientras más cargado estaba el sistema debía medir con mayor precisión el uso de CPU. Para ello necesitaba medirlo a intervalos variables y cada vez que lo necesitaba. Pero debía también asegurar que transcurriese un tiempo mínimo entre ejecuciones para obtener valores fiables de las mediciones de los jiffies de CPU.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (broker *Broker) updateCPUIdle() { | |
select { | |
case broker.cpuUpdateChannel <- true: | |
default: | |
} | |
} | |
func (broker *Broker) cpuIdleUpdater() { | |
// Initial values required to get stats | |
minPeriod := 20 * time.Millisecond | |
idle, total := getCPUSample() | |
broker.idleJiffies = idle | |
broker.totalJiffies = total | |
broker.updatedJiffies = time.Now() | |
for { | |
<-broker.cpuUpdateChannel | |
now := time.Now() | |
if broker.updatedJiffies.Add(minPeriod).After(now) { | |
continue | |
} | |
idle, total := getCPUSample() | |
if total == broker.totalJiffies { | |
continue | |
} | |
alpha := math.Min(1, 0.5*float64(now.Sub(broker.updatedJiffies)/time.Millisecond)/1000) | |
broker.CPUIdle = float64(idle–broker.idleJiffies) / float64(total–broker.totalJiffies) | |
broker.CPUIdleAvg = (1–alpha)*broker.CPUIdleAvg + alpha*broker.CPUIdle | |
broker.idleJiffies = idle | |
broker.totalJiffies = total | |
broker.updatedJiffies = now | |
// Don't update CPU so frequently, waiting here make updateCPUIdle fails to write | |
time.Sleep(minPeriod) | |
} | |
} |