Files
mstore/vendor/github.com/dylibso/observe-sdk/go/bucket.go
T
2026-03-13 19:02:42 +02:00

87 lines
2.1 KiB
Go

package observe
import (
"log"
"sync"
"time"
)
type Flusher interface {
Flush(events []TraceEvent) error
}
// EventBucket is a bucket for outgoing TraceEvents.
// It only schedules flushes when the bucket goes from empty to 1 item.
// At most the latency to flush the bucket will be flushPeriod.
// It will also flush the TraceEvents in batches according to batch size
type EventBucket struct {
mu sync.Mutex
wg sync.WaitGroup
bucket []TraceEvent
flushPeriod time.Duration
batchSize int
}
// NewEventBucket creates an EventBucket
func NewEventBucket(batchSize int, flushPeriod time.Duration) *EventBucket {
return &EventBucket{
flushPeriod: flushPeriod,
batchSize: batchSize,
}
}
// addEvent adds a TraceEvent and schedules to flush to Flusher if needed
func (b *EventBucket) addEvent(e TraceEvent, f Flusher) {
b.mu.Lock()
wasEmpty := len(b.bucket) == 0
b.bucket = append(b.bucket, e)
b.mu.Unlock()
// if this is the first event in the bucket,
// we schedule a flush
if wasEmpty {
b.scheduleFlush(f)
}
}
// Wait will block until all pending flushes are done
func (b *EventBucket) Wait() {
b.wg.Wait()
}
// scheduleFlush schedules a goroutine to flush
// the bucket at some time in the future depending on flushPeriod.
// Events will continue to build up until the flush comes due
func (b *EventBucket) scheduleFlush(f Flusher) {
// we start this routine and immediately wait, we are effectively
// scheduling the flush to run flushPeriod sections later. In the meantime,
// events may still be coming into the eventBucket
go func() {
// register this flush with the wait group
defer b.wg.Done()
b.wg.Add(1)
// wait for flushPeriod
time.Sleep(b.flushPeriod)
// move the events out of the EventBucket to a slice
// and add 1 to the waitgroup
b.mu.Lock()
bucket := b.bucket
b.bucket = nil
b.mu.Unlock()
// flush the bucket in chunks of batchSize
for i := 0; i < len(bucket); i += b.batchSize {
j := i + b.batchSize
if j > len(bucket) {
j = len(bucket)
}
// TODO retry logic?
err := f.Flush(bucket[i:j])
if err != nil {
log.Println(err)
}
}
}()
}