
关于
使用异步工作管道、流式转录、LLM 代理和 TTS 合成构建实时对话式 AI 语音引擎,支持中断处理和多提供商。
name: voice-ai-engine-development description: "使用异步工作管道、流式转录、LLM代理和TTS合成构建实时对话式AI语音引擎,支持中断处理和多提供商" risk: unknown source: community date_added: "2026-02-27"
语音AI引擎开发
概述
本技能指导你构建具有实时对话能力的生产级语音AI引擎。语音AI引擎通过流式音频处理、语音转文本转录、LLM驱动的响应和文本转语音合成,实现用户与AI代理之间自然的双向对话。
核心架构使用基于异步队列的工作管道,每个组件独立运行并通过 asyncio.Queue 对象通信,在每个阶段实现并发处理、中断处理和实时流式传输。
何时使用本技能
在以下情况使用本技能:
- 构建实时语音对话系统
- 实现语音助手或聊天机器人
- 创建语音驱动的客服代理
- 开发具有中断能力的语音AI应用
- 集成多个转录、LLM或TTS提供商
- 处理流式音频处理管道
- 用户提到 Vocode、语音引擎或对话式AI
核心架构原则
工作管道模式
每个语音AI引擎遵循此管道:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
(Worker 1) (Worker 2) (Worker 3)
核心优势:
- 解耦:工作节点只关心其输入/输出队列
- 并发:所有工作节点通过asyncio同时运行
- 背压:队列自动处理速率差异
- 可中断:一切都可以在流式传输中途停止
基础工作节点模式
每个工作节点遵循此模式:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # asyncio.Queue to consume from
self.output_queue = output_queue # asyncio.Queue to produce to
self.active = False
def start(self):
"""Start the worker's processing loop"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""Main processing loop - runs forever until terminated"""
while self.active:
item = await self.input_queue.get() # Block until item arrives
await self.process(item) # Process the item
async def process(self, item):
"""Override this - does the actual work"""
raise NotImplementedError
def terminate(self):
"""Stop the worker"""
self.active = False
组件实现指南
1. 转录器(音频 → 文本)
用途:将传入的音频块转换为文本转录
接口要求:
class BaseTranscriber:
def __init__(self, config: TranscriberConfig):
self.input_queue = asyncio.Queue() # Receives AudioChunk objects
self.output_queue = asyncio.Queue() # Produces Transcription objects
self.config = config
async def process(self, audio_chunk: AudioChunk):
"""Convert audio to text - must handle streaming partial results"""
transcription = await self._transcribe(audio_chunk)
if transcription.is_final:
await self.output_queue.put(transcription)
提供商集成模式:
class DeepgramTranscriber(BaseTranscriber):
"""Real-time streaming transcription via WebSocket"""
async def connect(self):
self.ws = await websockets.connect(
f"wss://api.deepgram.com/v1/listen?"
f"model={self.config.model}&"
f"language={self.config.language}&"
f"encoding=linear16&sample_rate=16000&"
f"endpointing={self.config.endpointing_ms}"
)
asyncio.create_task(self._receive_loop())
async def process(self, audio_chunk):
await self.ws.send(audio_chunk.data)
async def _receive_loop(self):
async for msg in self.ws:
result = json.loads(msg)
if result["is_final"]:
await self.output_queue.put(
Transcription(text=result["channel"]["alternatives"][0]["transcript"],
confidence=result["channel"]["alternatives"][0]["confidence"],
is_final=True)
)
2. 代理(文本 → 响应文本)
用途:处理转录文本并生成AI响应
流式响应模式:
class StreamingAgent:
def __init__(self, system_prompt: str, llm_client):
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.conversation_history = []
self.system_prompt = system_prompt
self.llm_client = llm_client
self.current_task = None
async def process(self, transcription: Transcription):
# Cancel any in-progress generation (interrupt handling)
if self.current_task and not self.current_task.done():
self.current_task.cancel()
self.conversation_history.append({"role": "user", "content": transcription.text})
self.current_task = asyncio.create_task(self._generate(transcription.text))
async def _generate(self, user_text: str):
try:
stream = await self.llm_client.chat.completions.create(
messages=[{"role": "system", "content": self.system_prompt}] + self.conversation_history,
stream=True
)
full_response = ""
async for chunk in stream:
token = chunk.choices[0].delta.content or ""
full_response += token
# Send each sentence as it completes for low-latency TTS
if token in ".!?\n":
await self.output_queue.put(AgentResponse(text=full_response, is_final=False))
full_response = ""
if full_response:
await self.output_queue.put(AgentResponse(text=full_response, is_final=True))
except asyncio.CancelledError:
pass # Interrupted by new user input
3. 合成器(文本 → 音频)
用途:将AI响应文本转换为语音音频
class BaseSynthesizer:
def __init__(self, config: SynthesizerConfig):
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.config = config
async def process(self, agent_response: AgentResponse):
audio_chunks = await self._synthesize(agent_response.text)
for chunk in audio_chunks:
await self.output_queue.put(chunk)
class ElevenLabsSynthesizer(BaseSynthesizer):
async def _synthesize(self, text: str) -> list[AudioChunk]:
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://api.elevenlabs.io/v1/text-to-speech/{self.config.voice_id}/stream",
json={"text": text, "model_id": self.config.model_id},
headers={"xi-api-key": self.config.api_key}
) as resp:
chunks = []
async for data in resp.content.iter_chunked(4096):
chunks.append(AudioChunk(data=data, sample_rate=self.config.sample_rate))
return chunks
中断处理
中断处理是语音AI中最关键的功能——当用户开始说话时,AI必须立即停止:
class ConversationOrchestrator:
def __init__(self, transcriber, agent, synthesizer, audio_output):
self.transcriber = transcriber
self.agent = agent
self.synthesizer = synthesizer
self.audio_output = audio_output
async def handle_interrupt(self):
"""Called when user starts speaking while AI is outputting"""
# 1. Stop audio playback immediately
self.audio_output.flush()
# 2. Cancel in-progress synthesis
self.synthesizer.cancel_current()
# 3. Cancel in-progress LLM generation
self.agent.cancel_current()
# 4. Clear all intermediate queues
self._flush_queues()
def _flush_queues(self):
for q in [self.agent.output_queue, self.synthesizer.output_queue]:
while not q.empty():
q.get_nowait()
完整引擎组装
async def create_voice_engine(config: EngineConfig) -> ConversationOrchestrator:
transcriber = DeepgramTranscriber(config.transcriber)
agent = StreamingAgent(config.system_prompt, config.llm_client)
synthesizer = ElevenLabsSynthesizer(config.synthesizer)
audio_output = AudioOutputStream(config.audio_output)
# Wire the pipeline
agent.input_queue = transcriber.output_queue
synthesizer.input_queue = agent.output_queue
audio_output.input_queue = synthesizer.output_queue
orchestrator = ConversationOrchestrator(transcriber, agent, synthesizer, audio_output)
# Start all workers
transcriber.start()
agent.start()
synthesizer.start()
audio_output.start()
return orchestrator
生产环境注意事项
- 延迟预算:目标端到端延迟 < 1秒(转录200ms + LLM首token 300ms + TTS首块200ms)
- 音频格式:使用16kHz、16位PCM用于转录输入;使用24kHz用于TTS输出以获得更好质量
- WebSocket管理:实现重连逻辑和心跳机制
- 内存管理:限制对话历史长度,实现滑动窗口
- 错误恢复:每个工作节点应独立处理错误而不崩溃整个管道
- 指标:跟踪每个阶段的延迟、中断率和转录准确度
兼容工具
Claude CodeCursor
标签
AI与机器学习