Skip to main content
When developing real-time, multimodal AI applications, monitoring two key factors is crucial: performance (latency) and LLM/TTS usage. Performance impacts user experience, while usage can affect operational costs. Pipecat offers built-in metrics for both, which can be enabled with straightforward configuration options.

Enabling performance metrics

Set enable_metrics=True in PipelineParams when creating a task:
Example config
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                ...
            ),
        )
Once enabled, Pipecat logs the following metrics:
MetricDescription
TTFBTime To First Byte in seconds
Processing TimeTime taken by the service to respond in seconds (deprecated in 0.0.104)
Text AggregationTime from first LLM token to first complete sentence in seconds
Sample output
AnthropicLLMService#0 TTFB: 0.8378312587738037
CartesiaTTSService#0 processing time: 0.0005071163177490234
CartesiaTTSService#0 TTFB: 0.17177796363830566
AnthropicLLMService#0 processing time: 2.4927797317504883

Limiting TTFB responses

If you only want the first TTFB measurement for each service, you can optionally pass report_only_initial_ttfb=True in PipelineParams:
Example config
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                report_only_initial_ttfb=True,
                ...
            ),
        )
Note: enable_metrics=True is required for this setting to have an effect.

Disabling initial empty metrics

By default, Pipecat sends an initial MetricsFrame with zero values for all services when the pipeline starts. To disable this behavior:
Example config
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                send_initial_empty_metrics=False,
                ...
            ),
        )

Enabling LLM/TTS Usage Metrics

Set enable_usage_metrics=True in PipelineParams when creating a task:
Example config
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_usage_metrics=True,
                ...
            ),
        )
Pipecat will log the following as applicable:
MetricDescription
LLM UsageNumber of prompt and completion tokens used
TTS UsageNumber of characters processed
Sample output
CartesiaTTSService#0 usage characters: 65
AnthropicLLMService#0 prompt tokens: 104, completion tokens: 53
Note: Usage metrics are recorded per interaction and do not represent running totals.

Capturing Metrics Data

When metrics are enabled, Pipecat emits a MetricsFrame for each interaction. The MetricsFrame contains a list of metrics data objects, which can include:
  • TTFBMetricsData — Time To First Byte
  • ProcessingMetricsData — Processing time (deprecated in 0.0.104)
  • LLMUsageMetricsData — LLM token usage
  • TTSUsageMetricsData — TTS character usage
  • TextAggregationMetricsData — Sentence aggregation latency
  • TurnMetricsData — Turn completion predictions
You can access the metrics data by either adding a custom FrameProcessor to your pipeline or adding an observer to monitor MetricsFrames.

Example: Using MetricsLogObserver

The simplest way to log metrics is with the built-in MetricsLogObserver. Pass it as an observer when creating your PipelineTask:
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver

task = PipelineTask(
    pipeline,
    params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
    observers=[MetricsLogObserver()],
)
You can filter which metrics types are logged by passing include_metrics:
from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver

observers = [
    MetricsLogObserver(
        include_metrics={LLMUsageMetricsData, TTSUsageMetricsData}
    )
]

Example: Using a Custom FrameProcessor

Create a custom FrameProcessor to handle metrics data. Here’s an example Metrics Processor that can be added to your pipeline after the TTS processor.
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
    LLMUsageMetricsData,
    ProcessingMetricsData,
    TTFBMetricsData,
    TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

class MetricsLogger(FrameProcessor):
    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, MetricsFrame):
            for d in frame.data:
                if isinstance(d, TTFBMetricsData):
                    print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
                elif isinstance(d, ProcessingMetricsData):
                    print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
                elif isinstance(d, LLMUsageMetricsData):
                    tokens = d.value
                    print(
                        f"!!! MetricsFrame: {frame}, prompt_tokens: {tokens.prompt_tokens}, completion_tokens: {tokens.completion_tokens}"
                    )
                elif isinstance(d, TTSUsageMetricsData):
                    print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
        await self.push_frame(frame, direction)

Metrics Data Reference

All metrics data classes inherit from MetricsData, which includes processor (the name of the processor that generated the metric) and an optional model field.

TTFBMetricsData

Time To First Byte — measures how long until the first byte of a response is received from a service.
FieldTypeDescription
valuefloatTTFB measurement in seconds

ProcessingMetricsData (Deprecated)

ProcessingMetricsData is deprecated as of version 0.0.104 and will be removed in a future release.
Measures the total time taken by a service to process a request.
FieldTypeDescription
valuefloatProcessing time measurement in seconds

TextAggregationMetricsData

Measures the time from the first LLM token to the first complete sentence, representing the latency cost of sentence aggregation in the TTS pipeline.
FieldTypeDescription
valuefloatAggregation time in seconds

LLMUsageMetricsData

Token usage for an LLM interaction. The value field is an LLMTokenUsage object with:
FieldTypeDescription
prompt_tokensintNumber of tokens in the input prompt
completion_tokensintNumber of tokens in the generated completion
total_tokensintTotal tokens used (prompt + completion)
cache_read_input_tokensOptional[int]Tokens read from cache, if applicable
cache_creation_input_tokensOptional[int]Tokens used to create cache entries
reasoning_tokensOptional[int]Reasoning tokens (for reasoning models)

TTSUsageMetricsData

Character usage for a TTS interaction.
FieldTypeDescription
valueintNumber of characters processed by TTS

TurnMetricsData

Metrics from turn completion prediction, emitted by turn analyzers like Krisp Viva Turn and Smart Turn.
FieldTypeDescription
is_completeboolWhether the turn is predicted to be complete
probabilityfloatConfidence probability of the prediction
e2e_processing_time_msfloatEnd-to-end processing time in ms, from VAD speech-to-silence transition to turn completion
In addition to MetricsLogObserver, Pipecat provides observers that track higher-level conversational metrics.

UserBotLatencyObserver

Measures the time between when a user stops speaking and when the bot starts speaking.
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver

latency_observer = UserBotLatencyObserver()

@latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
    print(f"User-to-bot latency: {latency_seconds:.3f}s")

task = PipelineTask(pipeline, observers=[latency_observer])

TurnTrackingObserver

Tracks conversation turns, emitting events when turns start and end. Handles interruptions and configurable timeouts.
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

turn_observer = TurnTrackingObserver(turn_end_timeout_secs=2.5)

@turn_observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_count):
    print(f"Turn {turn_count} started")

@turn_observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_count, duration, was_interrupted):
    status = "interrupted" if was_interrupted else "completed"
    print(f"Turn {turn_count} {status} after {duration:.2f}s")

task = PipelineTask(pipeline, observers=[turn_observer])