zoobzio December 12, 2025 Edit this page

Background Worker Observability

Instrument background job processing with aperture.

Signals and Keys

package signals

import "github.com/zoobz-io/capitan"

// Job lifecycle
var (
    JobStarted   = capitan.NewSignal("job.started", "Job started processing")
    JobCompleted = capitan.NewSignal("job.completed", "Job completed successfully")
    JobFailed    = capitan.NewSignal("job.failed", "Job failed")
    JobRetrying  = capitan.NewSignal("job.retrying", "Job retry scheduled")
)

// Queue events
var (
    QueueEnqueue = capitan.NewSignal("queue.enqueue", "Job enqueued")
    QueueDepth   = capitan.NewSignal("queue.depth", "Queue depth measurement")
)

// Field keys
var (
    JobID       = capitan.NewStringKey("job_id")
    JobType     = capitan.NewStringKey("job_type")
    QueueName   = capitan.NewStringKey("queue")
    Duration    = capitan.NewDurationKey("duration")
    ErrorMsg    = capitan.NewStringKey("error")
    RetryCount  = capitan.NewIntKey("retry_count")
    Depth       = capitan.NewInt64Key("depth")
)

Configuration

package main

import "github.com/zoobz-io/aperture"

func workerSchema() aperture.Schema {
    return aperture.Schema{
        Metrics: []aperture.MetricSchema{
            // Jobs processed
            {
                Signal:      "job.completed",
                Name:        "jobs_completed_total",
                Type:        "counter",
                Description: "Total jobs completed successfully",
            },
            // Jobs failed
            {
                Signal:      "job.failed",
                Name:        "jobs_failed_total",
                Type:        "counter",
                Description: "Total jobs failed",
            },
            // Job duration
            {
                Signal:      "job.completed",
                Name:        "job_duration_ms",
                Type:        "histogram",
                ValueKey:    "duration",
                Description: "Job processing duration",
            },
            // Queue depth gauge
            {
                Signal:   "queue.depth",
                Name:     "queue_depth",
                Type:     "gauge",
                ValueKey: "depth",
            },
        },
        Traces: []aperture.TraceSchema{
            // Trace job processing
            {
                Start:          "job.started",
                End:            "job.completed",
                CorrelationKey: "job_id",
                SpanName:       "job_processing",
            },
        },
        Logs: &aperture.LogSchema{
            Whitelist: []string{
                "job.started",
                "job.completed",
                "job.failed",
                "job.retrying",
            },
        },
    }
}

Worker Implementation

package worker

import (
    "context"
    "time"

    "github.com/zoobz-io/capitan"
    "myapp/signals"
)

type Job struct {
    ID   string
    Type string
    Data any
}

type Worker struct {
    cap       *capitan.Capitan
    queueName string
}

func NewWorker(cap *capitan.Capitan, queueName string) *Worker {
    return &Worker{cap: cap, queueName: queueName}
}

func (w *Worker) Process(ctx context.Context, job Job) error {
    start := time.Now()

    // Emit job started
    w.cap.Emit(ctx, signals.JobStarted,
        signals.JobID.Field(job.ID),
        signals.JobType.Field(job.Type),
        signals.QueueName.Field(w.queueName),
    )

    // Process the job
    err := w.processJob(ctx, job)

    duration := time.Since(start)

    if err != nil {
        // Emit job failed
        w.cap.Emit(ctx, signals.JobFailed,
            signals.JobID.Field(job.ID),
            signals.JobType.Field(job.Type),
            signals.QueueName.Field(w.queueName),
            signals.Duration.Field(duration),
            signals.ErrorMsg.Field(err.Error()),
        )
        return err
    }

    // Emit job completed
    w.cap.Emit(ctx, signals.JobCompleted,
        signals.JobID.Field(job.ID),
        signals.JobType.Field(job.Type),
        signals.QueueName.Field(w.queueName),
        signals.Duration.Field(duration),
    )

    return nil
}

func (w *Worker) processJob(ctx context.Context, job Job) error {
    // Job-specific processing
    switch job.Type {
    case "email":
        return w.sendEmail(ctx, job)
    case "report":
        return w.generateReport(ctx, job)
    default:
        return nil
    }
}

Queue with Depth Monitoring

package queue

import (
    "context"
    "sync"
    "time"

    "github.com/zoobz-io/capitan"
    "myapp/signals"
    "myapp/worker"
)

type Queue struct {
    name  string
    cap   *capitan.Capitan
    jobs  chan worker.Job
    mu    sync.RWMutex
    depth int64
}

func NewQueue(cap *capitan.Capitan, name string, size int) *Queue {
    q := &Queue{
        name: name,
        cap:  cap,
        jobs: make(chan worker.Job, size),
    }

    // Start depth monitoring
    go q.monitorDepth(context.Background())

    return q
}

func (q *Queue) Enqueue(ctx context.Context, job worker.Job) {
    q.mu.Lock()
    q.depth++
    q.mu.Unlock()

    q.cap.Emit(ctx, signals.QueueEnqueue,
        signals.JobID.Field(job.ID),
        signals.JobType.Field(job.Type),
        signals.QueueName.Field(q.name),
    )

    q.jobs <- job
}

func (q *Queue) Dequeue() worker.Job {
    job := <-q.jobs

    q.mu.Lock()
    q.depth--
    q.mu.Unlock()

    return job
}

func (q *Queue) monitorDepth(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            q.mu.RLock()
            depth := q.depth
            q.mu.RUnlock()

            q.cap.Emit(ctx, signals.QueueDepth,
                signals.QueueName.Field(q.name),
                signals.Depth.Field(depth),
            )
        }
    }
}

Retry Handler

package worker

import (
    "context"
    "time"

    "myapp/signals"
)

func (w *Worker) ProcessWithRetry(ctx context.Context, job Job, maxRetries int) error {
    var lastErr error

    for attempt := 0; attempt <= maxRetries; attempt++ {
        err := w.Process(ctx, job)
        if err == nil {
            return nil
        }

        lastErr = err

        if attempt < maxRetries {
            // Emit retry signal
            w.cap.Emit(ctx, signals.JobRetrying,
                signals.JobID.Field(job.ID),
                signals.JobType.Field(job.Type),
                signals.QueueName.Field(w.queueName),
                signals.RetryCount.Field(attempt+1),
                signals.ErrorMsg.Field(err.Error()),
            )

            // Exponential backoff
            time.Sleep(time.Duration(1<<attempt) * time.Second)
        }
    }

    return lastErr
}

Main Application

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/zoobz-io/aperture"
    "github.com/zoobz-io/capitan"
    "myapp/queue"
    "myapp/worker"
)

func main() {
    ctx := context.Background()

    // Setup providers (as in HTTP example)
    // ...

    cap := capitan.Default()
    defer cap.Shutdown()

    ap, _ := aperture.New(cap, logProvider, meterProvider, traceProvider)
    ap.Apply(workerSchema())
    defer ap.Close()

    // Create queue and workers
    q := queue.NewQueue(cap, "default", 1000)

    // Start workers
    for i := 0; i < 10; i++ {
        w := worker.NewWorker(cap, "default")
        go func() {
            for {
                job := q.Dequeue()
                _ = w.ProcessWithRetry(ctx, job, 3)
            }
        }()
    }

    // Enqueue jobs
    for i := 0; i < 100; i++ {
        q.Enqueue(ctx, worker.Job{
            ID:   fmt.Sprintf("job-%d", i),
            Type: "email",
            Data: nil,
        })
    }

    select {} // Run forever
}

Resulting Telemetry

Metrics

jobs_completed_total{job_type="email", queue="default"} 95
jobs_failed_total{job_type="email", queue="default"} 5
job_duration_ms_bucket{job_type="email", queue="default", le="100"} 50
job_duration_ms_bucket{job_type="email", queue="default", le="500"} 90
queue_depth{queue="default"} 23

Logs

{
  "timestamp": "2025-01-15T10:30:00Z",
  "severity": "ERROR",
  "body": "Job failed",
  "attributes": {
    "capitan.signal": "job.failed",
    "job_id": "job-42",
    "job_type": "email",
    "queue": "default",
    "error": "connection refused"
  }
}

Traces

Span: job_processing
  TraceID: def456...
  Duration: 234ms
  Attributes:
    - job_id: job-42
    - job_type: email
    - queue: default