Custom Processors Example¶
This example demonstrates how to use and configure custom processors in the OpenTelemetry Collector to transform, filter, and enrich telemetry data.
Overview¶
Processors modify telemetry data as it flows through the collector. Common use cases:
- Adding resource attributes
- Filtering data
- Sampling
- Transforming data
- Redacting sensitive information
Prerequisites¶
- OpenTelemetry Collector configured
- Understanding of YAML configuration
Common Processors¶
1. Resource Processor¶
Add or modify resource attributes:
processors:
resource:
attributes:
# Add service name
- key: service.name
value: MyApplication
action: upsert
# Add environment
- key: deployment.environment
value: ${ENVIRONMENT:production}
action: upsert
# Add version
- key: service.version
value: ${VERSION:1.0.0}
action: upsert
# Add custom attributes
- key: team.name
value: platform-team
action: upsert
# Delete attribute
- key: unwanted.attribute
action: delete
2. Attributes Processor¶
Modify span/metric/log attributes:
processors:
attributes:
actions:
# Add attribute
- key: user.id
value: ${USER_ID}
action: insert
# Update attribute
- key: http.status_code
from_attribute: status.code
action: update
# Delete sensitive data
- key: password
action: delete
- key: credit_card
action: delete
- key: ssn
action: delete
# Extract from attribute name
- key: http.method
from_attribute: http.request.method
action: extract
pattern: ^(GET|POST|PUT|DELETE|PATCH)
# Convert to lowercase
- key: http.method
action: lowercase
# Hash sensitive values
- key: user.email
action: hash
3. Filter Processor¶
Filter telemetry based on conditions:
processors:
filter:
# Filter traces
traces:
span:
# Keep only spans with errors
- 'attributes["http.status_code"] >= 400'
# Keep spans longer than 1 second
- 'duration > 1000000000'
# Keep spans from specific service
- 'resource.attributes["service.name"] == "MyService"'
# Filter metrics
metrics:
metric:
# Keep only error metrics
- 'name == "errors.total"'
# Keep metrics with specific label
- 'attributes["environment"] == "production"'
# Filter logs
logs:
log_record:
# Keep only error logs
- 'severity_number >= 17' # ERROR level
# Keep logs with specific attribute
- 'attributes["component"] == "database"'
4. Probabilistic Sampler¶
Reduce data volume with sampling:
5. Tail Sampling Processor¶
Advanced sampling based on conditions:
processors:
tail_sampling:
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
policies:
# Always sample errors
- name: error-policy
type: status_code
status_code:
status_codes: [ERROR]
# Sample slow requests
- name: latency-policy
type: latency
latency:
threshold_ms: 1000
# Sample based on attribute
- name: important-policy
type: attribute
attribute:
key: important
values: [true]
# Sample based on span name
- name: endpoint-policy
type: string_attribute
string_attribute:
key: http.route
values: ["/api/users", "/api/orders"]
invert_match: false
6. Batch Processor¶
Optimize data export:
processors:
batch:
# Timeout before sending batch
timeout: 1s
# Send batch when this many items collected
send_batch_size: 1024
# Maximum batch size
send_batch_max_size: 2048
# Metadata keys to include
metadata_keys:
- tenant.id
- environment
7. Memory Limiter Processor¶
Prevent memory exhaustion:
processors:
memory_limiter:
# Memory limit in MiB
limit_mib: 400
# Spike limit (temporary increase)
spike_limit_mib: 100
# How often to check
check_interval: 5s
Complete Example¶
Production Configuration¶
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Memory management
memory_limiter:
limit_mib: 400
spike_limit_mib: 100
check_interval: 5s
# Add resource attributes
resource:
attributes:
- key: service.name
value: MyApplication
action: upsert
- key: deployment.environment
value: production
action: upsert
- key: service.version
value: 1.0.0
action: upsert
# Redact sensitive data
attributes:
actions:
- key: password
action: delete
- key: credit_card
action: delete
- key: api_key
action: delete
- key: user.email
action: hash
# Filter data
filter:
traces:
span:
# Keep errors and slow requests
- 'attributes["http.status_code"] >= 400 || duration > 1000000000'
logs:
log_record:
# Keep only error logs
- 'severity_number >= 17'
# Sample traces
probabilistic_sampler:
sampling_percentage: 10.0
# Batch for efficiency
batch:
timeout: 1s
send_batch_size: 1024
send_batch_max_size: 2048
exporters:
debug:
verbosity: basic
prometheus:
endpoint: 0.0.0.0:8889
otlp/jaeger:
endpoint: jaeger:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, resource, attributes, filter, probabilistic_sampler, batch]
exporters: [debug, otlp/jaeger]
metrics:
receivers: [otlp]
processors: [memory_limiter, resource, batch]
exporters: [debug, prometheus]
logs:
receivers: [otlp]
processors: [memory_limiter, resource, attributes, filter, batch]
exporters: [debug]
Processor Order¶
Processors are executed in order. Important considerations:
- Memory Limiter should be first (prevents memory issues)
- Resource should be early (adds context for filtering)
- Attributes before filtering (modify before filtering)
- Filter before sampling (filter unwanted data first)
- Sampling before batching (reduce data volume)
- Batch should be last (optimize export)
Custom Processor Examples¶
Add Request ID¶
Normalize HTTP Status Codes¶
processors:
attributes:
actions:
- key: http.status_code
from_attribute: status.code
action: update
value: ${http.status_code}
Extract Service Name from Path¶
processors:
attributes:
actions:
- key: service.name
from_attribute: http.target
action: extract
pattern: ^/([^/]+)
Troubleshooting¶
Processor Not Working¶
Problem: Processor doesn't seem to be applying
Solution:
- Check processor order
- Verify processor is in pipeline
- Check collector logs for errors
- Use debug exporter to see data before/after
High Memory Usage¶
Problem: Memory limiter dropping data
Solution:
- Increase
limit_mib - Reduce batch sizes
- Enable sampling
- Filter data earlier
Data Loss¶
Problem: Data not appearing in backends
Solution:
- Check filter conditions (may be too restrictive)
- Verify sampling percentage
- Check exporter queue depths
- Review processor logs
Related Documentation¶
- OpenTelemetry Collector: OpenTelemetry Collector overview and configuration
- Observability Stacks: Observability stack comparison and selection