Files
mstore/vendor/github.com/dylibso/observe-sdk/go/adapter.go
T
2026-03-13 19:02:42 +02:00

147 lines
3.7 KiB
Go

package observe
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/tetratelabs/wazero"
trace "go.opentelemetry.io/proto/otlp/trace/v1"
)
// The primary interface that every Adapter needs to follow
// Start() and Stop() can just call the implementations on AdapterBase
// or provide some custom logic. HandleTraceEvent is called after
// an invocation of a wasm module is done and all events are collected.
type Adapter interface {
Start(context.Context)
Stop(wait bool)
HandleTraceEvent(e TraceEvent)
}
// The payload that contains all the Events
// from a single wasm module invocation
type TraceEvent struct {
Events []Event
TelemetryId TelemetryId
AdapterMeta interface{}
}
// Shared implementation for all Adapters
type AdapterBase struct {
TraceEvents chan TraceEvent
stop chan bool
eventBucket *EventBucket
flusher Flusher
}
func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, opts *Options) (*TraceCtx, error) {
if opts == nil {
opts = NewDefaultOptions()
}
return newTraceCtx(ctx, a.TraceEvents, r, wasm, opts)
}
func NewAdapterBase(batchSize int, flushPeriod time.Duration) AdapterBase {
bucket := NewEventBucket(batchSize, flushPeriod)
return AdapterBase{
TraceEvents: make(chan TraceEvent, 100),
eventBucket: bucket,
}
}
func (b *AdapterBase) SetFlusher(f Flusher) {
b.flusher = f
}
func (b *AdapterBase) HandleTraceEvent(te TraceEvent) {
b.eventBucket.addEvent(te, b.flusher)
}
func (b *AdapterBase) Start(ctx context.Context, a Adapter) {
b.stop = make(chan bool)
go func() {
for {
select {
case <-ctx.Done():
log.Println("context cancelled")
return
case event := <-b.TraceEvents:
a.HandleTraceEvent(event)
case <-b.stop:
return
}
}
}()
}
// Stops the adapter and waits for all flushes to complete.
// Set wait parameter to false if you don't want to wait
func (b *AdapterBase) Stop(wait bool) {
b.stop <- true
if wait {
b.eventBucket.Wait()
}
}
// MakeOtelCallSpans recursively constructs call spans in open telemetry format
func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string) []*trace.Span {
name := event.FunctionName()
span := NewOtelSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration))
span.Attributes = append(span.Attributes, NewOtelKeyValueString("function-name", fmt.Sprintf("function-call-%s", name)))
spans := []*trace.Span{span}
for _, ev := range event.Within() {
if call, ok := ev.(CallEvent); ok {
spans = append(spans, b.MakeOtelCallSpans(call, span.SpanId, traceId)...)
}
if alloc, ok := ev.(MemoryGrowEvent); ok {
kv := NewOtelKeyValueInt64("allocation", int64(alloc.MemoryGrowAmount()))
i, existing := GetOtelAttrFromSpan("allocation", span)
if existing != nil {
span.Attributes[i] = AddOtelKeyValueInt64(kv, existing)
} else {
span.Attributes = append(span.Attributes, kv)
}
}
if tags, ok := ev.(SpanTagsEvent); ok {
for _, tag := range tags.Tags {
parts := strings.Split(tag, ":")
if len(parts) != 2 {
log.Printf("Invalid tag: %s\n", tag)
continue
}
kv := NewOtelKeyValueString(parts[0], parts[1])
span.Attributes = append(span.Attributes, kv)
}
}
}
return spans
}
// Definition of how to filter our Spans to reduce noise
type SpanFilter struct {
MinDuration time.Duration
}
// Specify options to change what or how the adapter receives ObserveEvents
type Options struct {
SpanFilter *SpanFilter
ChannelBufferSize int
}
// Create a default configuration
func NewDefaultOptions() *Options {
return &Options{
ChannelBufferSize: 1024,
SpanFilter: &SpanFilter{
MinDuration: time.Microsecond * 20,
},
}
}