Background Worker Observability
Instrument background job processing with aperture.
Signals and Keys
package signals
import "github.com/zoobz-io/capitan"
// Job lifecycle
var (
JobStarted = capitan.NewSignal("job.started", "Job started processing")
JobCompleted = capitan.NewSignal("job.completed", "Job completed successfully")
JobFailed = capitan.NewSignal("job.failed", "Job failed")
JobRetrying = capitan.NewSignal("job.retrying", "Job retry scheduled")
)
// Queue events
var (
QueueEnqueue = capitan.NewSignal("queue.enqueue", "Job enqueued")
QueueDepth = capitan.NewSignal("queue.depth", "Queue depth measurement")
)
// Field keys
var (
JobID = capitan.NewStringKey("job_id")
JobType = capitan.NewStringKey("job_type")
QueueName = capitan.NewStringKey("queue")
Duration = capitan.NewDurationKey("duration")
ErrorMsg = capitan.NewStringKey("error")
RetryCount = capitan.NewIntKey("retry_count")
Depth = capitan.NewInt64Key("depth")
)
Configuration
package main
import "github.com/zoobz-io/aperture"
func workerSchema() aperture.Schema {
return aperture.Schema{
Metrics: []aperture.MetricSchema{
// Jobs processed
{
Signal: "job.completed",
Name: "jobs_completed_total",
Type: "counter",
Description: "Total jobs completed successfully",
},
// Jobs failed
{
Signal: "job.failed",
Name: "jobs_failed_total",
Type: "counter",
Description: "Total jobs failed",
},
// Job duration
{
Signal: "job.completed",
Name: "job_duration_ms",
Type: "histogram",
ValueKey: "duration",
Description: "Job processing duration",
},
// Queue depth gauge
{
Signal: "queue.depth",
Name: "queue_depth",
Type: "gauge",
ValueKey: "depth",
},
},
Traces: []aperture.TraceSchema{
// Trace job processing
{
Start: "job.started",
End: "job.completed",
CorrelationKey: "job_id",
SpanName: "job_processing",
},
},
Logs: &aperture.LogSchema{
Whitelist: []string{
"job.started",
"job.completed",
"job.failed",
"job.retrying",
},
},
}
}
Worker Implementation
package worker
import (
"context"
"time"
"github.com/zoobz-io/capitan"
"myapp/signals"
)
type Job struct {
ID string
Type string
Data any
}
type Worker struct {
cap *capitan.Capitan
queueName string
}
func NewWorker(cap *capitan.Capitan, queueName string) *Worker {
return &Worker{cap: cap, queueName: queueName}
}
func (w *Worker) Process(ctx context.Context, job Job) error {
start := time.Now()
// Emit job started
w.cap.Emit(ctx, signals.JobStarted,
signals.JobID.Field(job.ID),
signals.JobType.Field(job.Type),
signals.QueueName.Field(w.queueName),
)
// Process the job
err := w.processJob(ctx, job)
duration := time.Since(start)
if err != nil {
// Emit job failed
w.cap.Emit(ctx, signals.JobFailed,
signals.JobID.Field(job.ID),
signals.JobType.Field(job.Type),
signals.QueueName.Field(w.queueName),
signals.Duration.Field(duration),
signals.ErrorMsg.Field(err.Error()),
)
return err
}
// Emit job completed
w.cap.Emit(ctx, signals.JobCompleted,
signals.JobID.Field(job.ID),
signals.JobType.Field(job.Type),
signals.QueueName.Field(w.queueName),
signals.Duration.Field(duration),
)
return nil
}
func (w *Worker) processJob(ctx context.Context, job Job) error {
// Job-specific processing
switch job.Type {
case "email":
return w.sendEmail(ctx, job)
case "report":
return w.generateReport(ctx, job)
default:
return nil
}
}
Queue with Depth Monitoring
package queue
import (
"context"
"sync"
"time"
"github.com/zoobz-io/capitan"
"myapp/signals"
"myapp/worker"
)
type Queue struct {
name string
cap *capitan.Capitan
jobs chan worker.Job
mu sync.RWMutex
depth int64
}
func NewQueue(cap *capitan.Capitan, name string, size int) *Queue {
q := &Queue{
name: name,
cap: cap,
jobs: make(chan worker.Job, size),
}
// Start depth monitoring
go q.monitorDepth(context.Background())
return q
}
func (q *Queue) Enqueue(ctx context.Context, job worker.Job) {
q.mu.Lock()
q.depth++
q.mu.Unlock()
q.cap.Emit(ctx, signals.QueueEnqueue,
signals.JobID.Field(job.ID),
signals.JobType.Field(job.Type),
signals.QueueName.Field(q.name),
)
q.jobs <- job
}
func (q *Queue) Dequeue() worker.Job {
job := <-q.jobs
q.mu.Lock()
q.depth--
q.mu.Unlock()
return job
}
func (q *Queue) monitorDepth(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
q.mu.RLock()
depth := q.depth
q.mu.RUnlock()
q.cap.Emit(ctx, signals.QueueDepth,
signals.QueueName.Field(q.name),
signals.Depth.Field(depth),
)
}
}
}
Retry Handler
package worker
import (
"context"
"time"
"myapp/signals"
)
func (w *Worker) ProcessWithRetry(ctx context.Context, job Job, maxRetries int) error {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
err := w.Process(ctx, job)
if err == nil {
return nil
}
lastErr = err
if attempt < maxRetries {
// Emit retry signal
w.cap.Emit(ctx, signals.JobRetrying,
signals.JobID.Field(job.ID),
signals.JobType.Field(job.Type),
signals.QueueName.Field(w.queueName),
signals.RetryCount.Field(attempt+1),
signals.ErrorMsg.Field(err.Error()),
)
// Exponential backoff
time.Sleep(time.Duration(1<<attempt) * time.Second)
}
}
return lastErr
}
Main Application
package main
import (
"context"
"fmt"
"log"
"github.com/zoobz-io/aperture"
"github.com/zoobz-io/capitan"
"myapp/queue"
"myapp/worker"
)
func main() {
ctx := context.Background()
// Setup providers (as in HTTP example)
// ...
cap := capitan.Default()
defer cap.Shutdown()
ap, _ := aperture.New(cap, logProvider, meterProvider, traceProvider)
ap.Apply(workerSchema())
defer ap.Close()
// Create queue and workers
q := queue.NewQueue(cap, "default", 1000)
// Start workers
for i := 0; i < 10; i++ {
w := worker.NewWorker(cap, "default")
go func() {
for {
job := q.Dequeue()
_ = w.ProcessWithRetry(ctx, job, 3)
}
}()
}
// Enqueue jobs
for i := 0; i < 100; i++ {
q.Enqueue(ctx, worker.Job{
ID: fmt.Sprintf("job-%d", i),
Type: "email",
Data: nil,
})
}
select {} // Run forever
}
Resulting Telemetry
Metrics
jobs_completed_total{job_type="email", queue="default"} 95
jobs_failed_total{job_type="email", queue="default"} 5
job_duration_ms_bucket{job_type="email", queue="default", le="100"} 50
job_duration_ms_bucket{job_type="email", queue="default", le="500"} 90
queue_depth{queue="default"} 23
Logs
{
"timestamp": "2025-01-15T10:30:00Z",
"severity": "ERROR",
"body": "Job failed",
"attributes": {
"capitan.signal": "job.failed",
"job_id": "job-42",
"job_type": "email",
"queue": "default",
"error": "connection refused"
}
}
Traces
Span: job_processing
TraceID: def456...
Duration: 234ms
Attributes:
- job_id: job-42
- job_type: email
- queue: default