Pipeline Nodes and Hooks
参照元: LiveKit Agents Documentation ロードマップ: 学習ロードマップ
GitHub参照元(docsのコードブロックはJSレンダリングのため取得不可):
What(何についてか)
LiveKit Agentsのパイプライン処理において、各処理の境界点(ノード)にカスタムロジックを差し込む仕組み。
Agent サブクラスで対象メソッドをoverrideし、デフォルト動作を保持したい部分は Agent.default.<node-name>() に委譲する。
Why(なぜ必要か)
音声エージェントの実運用では、「特定のモデルを差し替えたい」「発音を制御したい」「RAGを注入したい」「PIIをマスクしたい」といった要件が生じる。 これらをコアの処理フローを壊さずに実装できるよう、差し込み点が体系的に定義されている。
How(どう動くか)
graph TD U["ユーザー音声入力"] --> STT["stt_node\n音声→テキスト変換"] STT --> UTC["on_user_turn_completed\nRAG注入/フィルタ/キャンセル"] UTC --> LLM["llm_node\n推論・ツールコール"] LLM --> TTS["tts_node\nテキスト→音声変換"] TTS --> TN["transcription_node\n転写テキスト整形"] TN --> U2["ユーザーへ出力"] RT["Realtimeモデル"] --> RAON["realtime_audio_output_node\n音声加工"] RAON --> TN OE["on_enter\nセッション開始時"] -.-> Agent OX["on_exit\nエージェント切替前"] -.-> Agent
Lifecycle Hooks
on_enter エージェントがセッションのアクティブエージェントになった直後に呼ばれる。 挨拶・状態初期化・ログ開始などの「登場時処理」を定義する。
async def on_enter(self) -> None:
await self.say("こんにちは!何でも聞いてください。")on_exit エージェントが別エージェントに制御を渡す直前に呼ばれる(Workflowによるハンドオフ時のみ)。 セッション終了(ユーザー切断)では発火しない点に注意。
async def on_exit(self) -> None:
await self.say("担当を引き継ぎます。")on_user_turn_completed
ユーザーのターンが終わり、LLMが返答する前に呼ばれる。
パラメータは2つ:turn_ctx(ユーザー最新メッセージを含まない履歴)と new_message(ユーザーの最新発話)。
このメソッドが終わってから new_message が turn_ctx に追加される。そのため、new_message 自体を編集すると以降のチャット履歴に永続する。
turn_ctx.add_message() での追加は当該ターン限りで永続しない。
async def on_user_turn_completed(
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
) -> None:
# RAG: 最新発話に関連するドキュメントを注入(今のターン限り)
docs = await retrieve(new_message.text_content)
turn_ctx.add_message(role="user", content=docs)
# PIIマスク: new_messageの編集は永続する
new_message.content = mask_pii(new_message.content)
# 応答キャンセル(例: push-to-talk)
# raise StopResponse()Realtimeモデルで使う場合は、ターン検出をエージェント側で設定する必要がある。
STT-LLM-TTS Pipeline Nodes
stt_node 音声フレームをテキストに変換するノード。STTがストリーミング非対応の場合、LiveKitが自動でVADをラップする。 カスタムSTT差し替えや音声前処理が必要な場合のみoverride。基本はデフォルトのまま。
def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[stt.SpeechEvent | str]:
return Agent.default.stt_node(self, audio, model_settings)llm_node
チャットコンテキストをもとに推論し、返答またはツールコールを生成するノード。
yieldできる型は str(テキスト)/ llm.ChatChunk(ツールコール・メタデータ含む)/ FlushSentinel(ストリーム区切り)の3種。
カスタムLLMプロバイダーへの差し替えが主な用途。静的なプロンプト追加は Agent(instructions=...) や update_instructions() で行うべきで、llm_nodeのoverride対象ではない。
def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool],
model_settings: ModelSettings,
) -> AsyncIterable[llm.ChatChunk | str | FlushSentinel]:
return Agent.default.llm_node(self, chat_ctx, tools, model_settings)tts_node LLMのテキスト出力を音声フレームに変換するノード。TTSがストリーミング非対応の場合、sentence tokenizerで自動分割する。 発音ルールの適用や独自TTSエンジンの差し替えが主な用途。
class MyAgent(Agent):
async def tts_node(self, text: AsyncIterable[str], model_settings):
async def pronunciation_corrected(source):
async for chunk in source:
chunk = chunk.replace("LiveKit", "ライブキット")
chunk = chunk.replace("LLM", "エルエルエム")
yield chunk
async for frame in Agent.default.tts_node(
self, pronunciation_corrected(text), model_settings
):
yield frameRealtime Model Node
realtime_audio_output_node Realtimeモデルが出力した音声をユーザーへ届ける前に加工するノード。 Realtimeモデルはテキストを経由しないため、発音ルールの適用はできず、音声レベルでの加工(音量調整など)のみ可能。
Transcription Node
transcription_node エージェントの発話テキストをユーザーへ転送する前に整形するノード。 フォーマット整理・句読点修正・不要文字の除去のほか、TTS aligned transcriptionのタイムスタンプ取得にも使う。
Key Concepts
| 用語 | 説明 |
|---|---|
| node | パイプライン処理の境界点。overrideで差し込みロジックを実装する |
| Agent.default | デフォルト実装への委譲先。super() ではなくこちらを使う |
| on_user_turn_completed | ユーザー発話確定〜LLM推論前の唯一のフック。RAG・フィルタ・キャンセル制御 |
| turn_ctx追加 | 今のターン限りの補助情報注入(永続しない) |
| new_message編集 | ユーザー発話自体の変更。以降のチャット履歴に永続する |
| FlushSentinel | llm_nodeのストリーム区切りを示すシグナル |
実装判断の軸
| やりたいこと | 使うべきフック/ノード |
|---|---|
| 挨拶・初期化 | on_enter |
| ハンドオフ前の後処理 | on_exit |
| RAG注入・PIIマスク・応答キャンセル | on_user_turn_completed |
| 発音ルール・TTS差し替え | tts_node |
| カスタムLLMプロバイダー差し替え | llm_node |
| Realtime音声の音量調整 | realtime_audio_output_node |
| エージェント発話テキストの整形 | transcription_node |
| 静的な指示・言語設定 | Agent(instructions=...) |
普段の開発で積極的に触るのは on_enter / on_exit / on_user_turn_completed の3つ。
STT/LLM/TTSの各ノードはカスタムプロバイダー差し替えや特殊要件が生じた時だけoverrideする。