← Week 2: Implementation

Day 13: Instrumentation

Phase 7 · Oct 5, 2026

← Week 2: Implementation

Agenda (2–3 hours)

  • Implement (90 min): Add full OTel instrumentation — traces, metrics, and structured JSON logs — to all three services
  • Test (60 min): Submit 10 tasks; verify the full trace from API → Worker → DLQ Processor appears in Jaeger; verify all metrics appear in Prometheus
  • Review (30 min): Verify trace_id is present in every log line; verify semantic convention attributes on all spans
← Week 2: Implementation

Metrics Registration

pub struct WorkerMetrics {
    tasks_processed: Counter<u64>,
    task_duration:   Histogram<f64>,
    tasks_dead:      Counter<u64>,
    queue_depth:     ObservableGauge<u64>,
}

impl WorkerMetrics {
    pub fn new(meter: &Meter) -> Self {
        Self {
            tasks_processed: meter.u64_counter("tasks_processed_total").build(),
            task_duration:   meter.f64_histogram("task_processing_duration_seconds")
                .with_boundaries(vec![0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 30.0])
                .build(),
            tasks_dead:      meter.u64_counter("tasks_dead_total").build(),
            queue_depth:     meter.u64_observable_gauge("sqs_queue_depth").build(),
        }
    }
}
← Week 2: Implementation

Trace + Log Correlation

#[instrument(
    skip(state, payload),
    fields(
        task_id = tracing::field::Empty,
        task_type = %req.task_type,
        rpc.system = "grpc",
        rpc.method = "SubmitTask",
    )
)]
async fn submit_task(&self, req: SubmitTaskRequest) -> Result<SubmitTaskResponse, AppError> {
    let task_id = generate_task_id();
    Span::current().record("task_id", &task_id);  // set field after generation

    info!(task_id=%task_id, "task submitted");     // trace_id injected automatically

    // ... implementation ...

    self.metrics.tasks_processed.add(1, &[
        KeyValue::new("task_type", req.task_type),
        KeyValue::new("status", "submitted"),
    ]);

    Ok(SubmitTaskResponse { task_id, status: TaskStatus::Pending })
}
← Week 2: Implementation

SQS Trace Propagation (Reminder)

// Producer: inject trace context into message attributes
let mut carrier = HashMap::new();
global::get_text_map_propagator(|p| p.inject_context(&cx, &mut carrier));

sqs.send_message()
    .message_attributes(carrier_to_sqs_message_attributes(carrier))
    .send().await?;

// Consumer: extract and link
let parent_cx = global::get_text_map_propagator(|p|
    p.extract(&sqs_message_attributes_to_carrier(&msg)));

let span = tracer.start_with_context("process_task", &parent_cx);
let _guard = span.set_active();
← Week 2: Implementation

Key Takeaways

  • Span::current().record(field, value) fills in fields that are only known after the function starts
  • OTel histogram boundaries should match your expected latency distribution — too-coarse buckets hide the P95
  • trace_id in every log line is the single most valuable correlation investment in the project
  • Emit metrics at the business level (tasks_processed_total) not just infrastructure level (db_calls_total)

Tomorrow: integration tests — testing the full system end-to-end.