87 lines
2.1 KiB
Go
87 lines
2.1 KiB
Go
package observe
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Flusher interface {
|
|
Flush(events []TraceEvent) error
|
|
}
|
|
|
|
// EventBucket is a bucket for outgoing TraceEvents.
|
|
// It only schedules flushes when the bucket goes from empty to 1 item.
|
|
// At most the latency to flush the bucket will be flushPeriod.
|
|
// It will also flush the TraceEvents in batches according to batch size
|
|
type EventBucket struct {
|
|
mu sync.Mutex
|
|
wg sync.WaitGroup
|
|
bucket []TraceEvent
|
|
flushPeriod time.Duration
|
|
batchSize int
|
|
}
|
|
|
|
// NewEventBucket creates an EventBucket
|
|
func NewEventBucket(batchSize int, flushPeriod time.Duration) *EventBucket {
|
|
return &EventBucket{
|
|
flushPeriod: flushPeriod,
|
|
batchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
// addEvent adds a TraceEvent and schedules to flush to Flusher if needed
|
|
func (b *EventBucket) addEvent(e TraceEvent, f Flusher) {
|
|
b.mu.Lock()
|
|
wasEmpty := len(b.bucket) == 0
|
|
b.bucket = append(b.bucket, e)
|
|
b.mu.Unlock()
|
|
// if this is the first event in the bucket,
|
|
// we schedule a flush
|
|
if wasEmpty {
|
|
b.scheduleFlush(f)
|
|
}
|
|
}
|
|
|
|
// Wait will block until all pending flushes are done
|
|
func (b *EventBucket) Wait() {
|
|
b.wg.Wait()
|
|
}
|
|
|
|
// scheduleFlush schedules a goroutine to flush
|
|
// the bucket at some time in the future depending on flushPeriod.
|
|
// Events will continue to build up until the flush comes due
|
|
func (b *EventBucket) scheduleFlush(f Flusher) {
|
|
// we start this routine and immediately wait, we are effectively
|
|
// scheduling the flush to run flushPeriod sections later. In the meantime,
|
|
// events may still be coming into the eventBucket
|
|
go func() {
|
|
// register this flush with the wait group
|
|
defer b.wg.Done()
|
|
b.wg.Add(1)
|
|
|
|
// wait for flushPeriod
|
|
time.Sleep(b.flushPeriod)
|
|
|
|
// move the events out of the EventBucket to a slice
|
|
// and add 1 to the waitgroup
|
|
b.mu.Lock()
|
|
bucket := b.bucket
|
|
b.bucket = nil
|
|
b.mu.Unlock()
|
|
|
|
// flush the bucket in chunks of batchSize
|
|
for i := 0; i < len(bucket); i += b.batchSize {
|
|
j := i + b.batchSize
|
|
if j > len(bucket) {
|
|
j = len(bucket)
|
|
}
|
|
// TODO retry logic?
|
|
err := f.Flush(bucket[i:j])
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}()
|
|
}
|