Etiquetas

, ,

Este es un truco en Go muy útil y eficiente para hacer tareas de “mantenimiento” en estructuras de forma concurrente (y seguramente paralela).

Supongamos que tenéis una lista de elementos, a esta lista se agregan u obtienen elementos pero hay que mantener solamente los últimos N, donde N puede estar basado en una límite temporal.

Las operaciones sobre la lista deben responder muy rápidamente por lo que no es posible llamar a la función que elimina los más antiguos porque tiene un coste computacional (O(n) o O(log n) en el mejor de los casos) que retrasaría la respuesta.

Hay dos técnicas muy habituales en Go:

  1. Lanzar al principio una goroutine que periódicamente se ejecute después de una espera con time.Sleep() o usar el time.Ticker que ya hace ambas cosas. El problema con esta solución es que se ejecuta y consume CPU aunque no haya actividad.
  2. Lanzar la goroutine cada vez que hay actividad, por ejemplo al agregar u obtener elementos de la lista. El problema, además del pequeño overhead de lanzar la goroutine, es que si hay muchas operaciones se creará un número idénticos de goroutines y seguramente cada una de ellas tendrá que esperar que la otra acabe (se necesitan mutex, la estructura es compartida entre varios hilos de ejecución) y el trabajo se estará haciendo trabajo de más sin ninguna utilidad.

Una tercera forma muy eficiente, algunos le llaman actor (por el modelo de actores). Como en el caso 1 anterior, se lanza una goroutine -el actor– cuando se crea la estructura (o la interfaz), esta rutina sólo espera recibir un mensaje de un canal específico para ella. Cuando recibe un mensaje hace su trabajo y vuelve a esperar por uno nuevo.

func (s *Serie) actor() {
for {
<-s.aChannel
s.Lock()
..
s.Unlock()
}
}

Pero tiene su truco. En primer lugar el canal se crea sin buffer (el default en Go), las funciones que operan sobre la estructura envían un mensaje asíncrono.

func (s *Serie) callActor() {
select {
case s.aChannel &lt;- true:
default:
}
}

Si el actor está ocupado procesando el mensaje será descartado, por lo que no hará dos o más veces la misma tarea si recibió mensajes cuando estaba ocupado. Es más, se puede hacer que el actor no se ejecute más de una vez cada x tiempo. Para ello basta agregar un time.Sleep() al actor.

func (s *Serie) actor() {
for {
<-s.aChannel
s.Lock()
..
s.Unlock()
time.Sleep(someDuration)
}
}

Esta solución es eficiente, sólo hay una rutina que hace la tarea, nunca bloquea a los procesos “importantes” y se puede limitar la frecuencia de ejecución.

A continuación el código de la pequeña interfaz para mantener caché de una serie temporal ordenada y con sólo los elementos a partir de un límite.

package main
import (
"sync"
"time"
)
type SerieElement struct {
ts time.Time
data *BookingData
}
type Serie struct {
sync.RWMutex
ids map[string]bool
elements []*SerieElement
maxCache time.Duration
lastTs time.Time
removeCh chan bool
}
func NewSerie(maxCache time.Duration) (s *Serie) {
s = &Serie{
elements: make([]*SerieElement, 0),
maxCache: maxCache,
ids: make(map[string]bool),
removeCh: make(chan bool),
}
go s.removeOldActor()
return
}
func (s *Serie) Append(b *BookingData, ts time.Time) {
s.Lock()
defer s.Unlock()
defer s.removeOld()
_, ok := s.ids[b.id]
if ok {
// Element already inserted
return
}
e := &SerieElement{
ts: ts,
data: b,
}
s.ids[e.data.id] = true
s.elements = append(s.elements, e)
if ts.After(s.lastTs) {
s.lastTs = ts
}
}
func (s *Serie) Get(fromTime time.Time) (elements []*SerieElement) {
s.RLock()
defer s.RUnlock()
defer s.removeOld()
first := len(s.elements)
for i := first 1; i >= 0; i {
if s.elements[i].ts.After(fromTime) {
first = i
} else {
break
}
}
if first < len(s.elements) {
elements = s.elements[first:]
}
return
}
func (s *Serie) removeOld() {
// Asynchronous, the Actor will ignore if there are frequent calls
select {
case s.removeCh <- true:
default:
}
}
func (s *Serie) removeOldActor() {
for {
<-s.removeCh
s.Lock()
now := time.Now()
olds := 0
for _, e := range s.elements {
if e.ts.Before(now.Add(s.maxCache)) {
delete(s.ids, e.data.id)
olds++
} else {
break
}
}
if olds > 0 {
s.elements = s.elements[olds:]
}
s.Unlock()
time.Sleep(5 * time.Second) // To ignore frequent and innecesary calls
}
}

view raw
cache.go
hosted with ❤ by GitHub

Usé la misma para un demonio con mucha concurrencia y que debía ser muy eficiente. Mientras más cargado estaba el sistema debía medir con mayor precisión el uso de CPU. Para ello necesitaba medirlo a intervalos variables y cada vez que lo necesitaba. Pero debía también asegurar que transcurriese un tiempo mínimo entre ejecuciones para obtener valores fiables de las mediciones de los jiffies de CPU.

func (broker *Broker) updateCPUIdle() {
select {
case broker.cpuUpdateChannel <- true:
default:
}
}
func (broker *Broker) cpuIdleUpdater() {
// Initial values required to get stats
minPeriod := 20 * time.Millisecond
idle, total := getCPUSample()
broker.idleJiffies = idle
broker.totalJiffies = total
broker.updatedJiffies = time.Now()
for {
<-broker.cpuUpdateChannel
now := time.Now()
if broker.updatedJiffies.Add(minPeriod).After(now) {
continue
}
idle, total := getCPUSample()
if total == broker.totalJiffies {
continue
}
alpha := math.Min(1, 0.5*float64(now.Sub(broker.updatedJiffies)/time.Millisecond)/1000)
broker.CPUIdle = float64(idlebroker.idleJiffies) / float64(totalbroker.totalJiffies)
broker.CPUIdleAvg = (1alpha)*broker.CPUIdleAvg + alpha*broker.CPUIdle
broker.idleJiffies = idle
broker.totalJiffies = total
broker.updatedJiffies = now
// Don't update CPU so frequently, waiting here make updateCPUIdle fails to write
time.Sleep(minPeriod)
}
}

view raw
cpuidle.go
hosted with ❤ by GitHub