Skip to content

Custom Processors Example

This example demonstrates how to use and configure custom processors in the OpenTelemetry Collector to transform, filter, and enrich telemetry data.

Overview

Processors modify telemetry data as it flows through the collector. Common use cases:

  • Adding resource attributes
  • Filtering data
  • Sampling
  • Transforming data
  • Redacting sensitive information

Prerequisites

  • OpenTelemetry Collector configured
  • Understanding of YAML configuration

Common Processors

1. Resource Processor

Add or modify resource attributes:

processors:
  resource:
    attributes:
      # Add service name
      - key: service.name
        value: MyApplication
        action: upsert

      # Add environment
      - key: deployment.environment
        value: ${ENVIRONMENT:production}
        action: upsert

      # Add version
      - key: service.version
        value: ${VERSION:1.0.0}
        action: upsert

      # Add custom attributes
      - key: team.name
        value: platform-team
        action: upsert

      # Delete attribute
      - key: unwanted.attribute
        action: delete

2. Attributes Processor

Modify span/metric/log attributes:

processors:
  attributes:
    actions:
      # Add attribute
      - key: user.id
        value: ${USER_ID}
        action: insert

      # Update attribute
      - key: http.status_code
        from_attribute: status.code
        action: update

      # Delete sensitive data
      - key: password
        action: delete
      - key: credit_card
        action: delete
      - key: ssn
        action: delete

      # Extract from attribute name
      - key: http.method
        from_attribute: http.request.method
        action: extract
        pattern: ^(GET|POST|PUT|DELETE|PATCH)

      # Convert to lowercase
      - key: http.method
        action: lowercase

      # Hash sensitive values
      - key: user.email
        action: hash

3. Filter Processor

Filter telemetry based on conditions:

processors:
  filter:
    # Filter traces
    traces:
      span:
        # Keep only spans with errors
        - 'attributes["http.status_code"] >= 400'
        # Keep spans longer than 1 second
        - 'duration > 1000000000'
        # Keep spans from specific service
        - 'resource.attributes["service.name"] == "MyService"'

    # Filter metrics
    metrics:
      metric:
        # Keep only error metrics
        - 'name == "errors.total"'
        # Keep metrics with specific label
        - 'attributes["environment"] == "production"'

    # Filter logs
    logs:
      log_record:
        # Keep only error logs
        - 'severity_number >= 17'  # ERROR level
        # Keep logs with specific attribute
        - 'attributes["component"] == "database"'

4. Probabilistic Sampler

Reduce data volume with sampling:

processors:
  probabilistic_sampler:
    # Sample 10% of traces
    sampling_percentage: 10.0
    hash_seed: 42

5. Tail Sampling Processor

Advanced sampling based on conditions:

processors:
  tail_sampling:
    decision_wait: 10s
    num_traces: 100
    expected_new_traces_per_sec: 10
    policies:
      # Always sample errors
      - name: error-policy
        type: status_code
        status_code:
          status_codes: [ERROR]

      # Sample slow requests
      - name: latency-policy
        type: latency
        latency:
          threshold_ms: 1000

      # Sample based on attribute
      - name: important-policy
        type: attribute
        attribute:
          key: important
          values: [true]

      # Sample based on span name
      - name: endpoint-policy
        type: string_attribute
        string_attribute:
          key: http.route
          values: ["/api/users", "/api/orders"]
          invert_match: false

6. Batch Processor

Optimize data export:

processors:
  batch:
    # Timeout before sending batch
    timeout: 1s
    # Send batch when this many items collected
    send_batch_size: 1024
    # Maximum batch size
    send_batch_max_size: 2048
    # Metadata keys to include
    metadata_keys:
      - tenant.id
      - environment

7. Memory Limiter Processor

Prevent memory exhaustion:

processors:
  memory_limiter:
    # Memory limit in MiB
    limit_mib: 400
    # Spike limit (temporary increase)
    spike_limit_mib: 100
    # How often to check
    check_interval: 5s

Complete Example

Production Configuration

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  # Memory management
  memory_limiter:
    limit_mib: 400
    spike_limit_mib: 100
    check_interval: 5s

  # Add resource attributes
  resource:
    attributes:
      - key: service.name
        value: MyApplication
        action: upsert
      - key: deployment.environment
        value: production
        action: upsert
      - key: service.version
        value: 1.0.0
        action: upsert

  # Redact sensitive data
  attributes:
    actions:
      - key: password
        action: delete
      - key: credit_card
        action: delete
      - key: api_key
        action: delete
      - key: user.email
        action: hash

  # Filter data
  filter:
    traces:
      span:
        # Keep errors and slow requests
        - 'attributes["http.status_code"] >= 400 || duration > 1000000000'
    logs:
      log_record:
        # Keep only error logs
        - 'severity_number >= 17'

  # Sample traces
  probabilistic_sampler:
    sampling_percentage: 10.0

  # Batch for efficiency
  batch:
    timeout: 1s
    send_batch_size: 1024
    send_batch_max_size: 2048

exporters:
  debug:
    verbosity: basic
  prometheus:
    endpoint: 0.0.0.0:8889
  otlp/jaeger:
    endpoint: jaeger:4317

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, resource, attributes, filter, probabilistic_sampler, batch]
      exporters: [debug, otlp/jaeger]
    metrics:
      receivers: [otlp]
      processors: [memory_limiter, resource, batch]
      exporters: [debug, prometheus]
    logs:
      receivers: [otlp]
      processors: [memory_limiter, resource, attributes, filter, batch]
      exporters: [debug]

Processor Order

Processors are executed in order. Important considerations:

  1. Memory Limiter should be first (prevents memory issues)
  2. Resource should be early (adds context for filtering)
  3. Attributes before filtering (modify before filtering)
  4. Filter before sampling (filter unwanted data first)
  5. Sampling before batching (reduce data volume)
  6. Batch should be last (optimize export)

Custom Processor Examples

Add Request ID

processors:
  attributes:
    actions:
      - key: request.id
        from_attribute: trace_id
        action: insert

Normalize HTTP Status Codes

processors:
  attributes:
    actions:
      - key: http.status_code
        from_attribute: status.code
        action: update
        value: ${http.status_code}

Extract Service Name from Path

processors:
  attributes:
    actions:
      - key: service.name
        from_attribute: http.target
        action: extract
        pattern: ^/([^/]+)

Troubleshooting

Processor Not Working

Problem: Processor doesn't seem to be applying

Solution:

  1. Check processor order
  2. Verify processor is in pipeline
  3. Check collector logs for errors
  4. Use debug exporter to see data before/after

High Memory Usage

Problem: Memory limiter dropping data

Solution:

  • Increase limit_mib
  • Reduce batch sizes
  • Enable sampling
  • Filter data earlier

Data Loss

Problem: Data not appearing in backends

Solution:

  • Check filter conditions (may be too restrictive)
  • Verify sampling percentage
  • Check exporter queue depths
  • Review processor logs

Further Reading