• About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
Monday, June 15, 2026
mGrowTech
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
No Result
View All Result
mGrowTech
No Result
View All Result
Home Al, Analytics and Automation

How to Design a Fully Streaming Voice Agent with End-to-End Latency Budgets, Incremental ASR, LLM Streaming, and Real-Time TTS

Josh by Josh
January 20, 2026
in Al, Analytics and Automation
0
How to Design a Fully Streaming Voice Agent with End-to-End Latency Budgets, Incremental ASR, LLM Streaming, and Real-Time TTS


In this tutorial, we build an end-to-end streaming voice agent that mirrors how modern low-latency conversational systems operate in real time. We simulate the complete pipeline, from chunked audio input and streaming speech recognition to incremental language model reasoning and streamed text-to-speech output, while explicitly tracking latency at every stage. By working with strict latency budgets and observing metrics such as time to first token and time to first audio, we focus on the practical engineering trade-offs that shape responsive voice-based user experiences. Check out the FULL CODES here.

import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import List, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
   asr_partial: float = 0.0
   asr_complete: float = 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
   tts_first_chunk: float = 0.0
   tts_complete: float = 0.0


   def get_time_to_first_audio(self) -> float:
       return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
class LatencyBudgets:
   asr_processing: float = 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
   tts_first_chunk: float = 0.2
   tts_chunk_generation: float = 0.05
   time_to_first_audio: float = 1.0


class AgentState(Enum):
   LISTENING = "listening"
   PROCESSING_SPEECH = "processing_speech"
   THINKING = "thinking"
   SPEAKING = "speaking"
   INTERRUPTED = "interrupted"

We define the core data structures and state representations that allow us to track latency across the entire voice pipeline. We formalize timing signals for ASR, LLM, and TTS to ensure consistent measurement across all stages. We also establish a clear agent state machine that guides how the system transitions during a conversational turn. Check out the FULL CODES here.

class AudioInputStream:
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, text: str) -> AsyncIterator[np.ndarray]:
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(text) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


       for _ in range(num_chunks):
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
           yield chunk

We simulate real-time audio input by breaking speech into fixed-duration chunks that arrive asynchronously. We model realistic speaking rates and streaming behavior to mimic live microphone input. We use this stream as the foundation for testing downstream latency-sensitive components. Check out the FULL CODES here.

class StreamingASR:
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


   async def transcribe_stream(
       self,
       audio_stream: AsyncIterator[np.ndarray],
       ground_truth: str
   ) -> AsyncIterator[tuple[str, bool]]:
       words = ground_truth.split()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


       async for chunk in audio_stream:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed < len(words):
               words_transcribed += 1
               yield " ".join(words[:words_transcribed]), False


           audio_power = np.mean(np.abs(chunk))
           silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0


           if silence_duration >= self.silence_threshold:
               await asyncio.sleep(0.2)
               yield ground_truth, True
               return


       yield ground_truth, True

We implement a streaming ASR module that produces partial transcriptions before emitting a final result. We progressively reveal words to reflect how modern ASR systems operate in real time. We also introduce silence-based finalization to approximate end-of-utterance detection. Check out the FULL CODES here.

class StreamingLLM:
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, prompt: str) -> AsyncIterator[str]:
       responses = {
           "hello": "Hello! How can I help you today?",
           "weather": "The weather is sunny with a temperature of 72°F.",
           "time": "The current time is 2:30 PM.",
           "default": "I understand. Let me help you with that."
       }


       response = responses["default"]
       for key in responses:
           if key in prompt.lower():
               response = responses[key]
               break


       await asyncio.sleep(self.time_to_first_token)


       for word in response.split():
           yield word + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS:
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
       first_chunk = True
       buffer = ""


       async for text in text_stream:
           buffer += text
           if len(buffer) >= 20 or first_chunk:
               if first_chunk:
                   await asyncio.sleep(self.time_to_first_chunk)
                   first_chunk = False


               duration = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(duration * 0.5)

In this snippet, we model a streaming language model and a streaming text-to-speech engine working together. We generate responses token by token to capture time-to-first-token behavior. We then convert incremental text into audio chunks to simulate early and continuous speech synthesis. Check out the FULL CODES here.

class StreamingVoiceAgent:
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
       self.audio_stream = AudioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: List[LatencyMetrics] = []


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for text, final in self.asr.transcribe_stream(audio_gen, user_input):
           if final:
               metrics.asr_complete = time.time() - start_time
               transcription = text


       metrics.llm_started = time.time() - start_time
       response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


       async def text_stream():
           for word in response.split():
               yield word + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
       return metrics

We orchestrate the full voice agent by wiring audio input, ASR, LLM, and TTS into a single asynchronous flow. We record precise timestamps at each transition to compute critical latency metrics. We treat each user turn as an isolated experiment to enable systematic performance analysis. Check out the FULL CODES here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


   inputs = [
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   ]


   for text in inputs:
       await agent.process_turn(text)
       await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(run_demo())

We run the entire system across multiple conversational turns to observe latency consistency and variance. We apply aggressive latency budgets to stress the pipeline under realistic constraints. We use these runs to validate whether the system meets responsiveness targets across interactions.

In conclusion, we demonstrated how a fully streaming voice agent can be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable performance guarantees. We showed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived latency, even when total computation time remains non-trivial. This approach helps us reason systematically about turn-taking, responsiveness, and optimization levers, and it provides a solid foundation for extending the system toward real-world deployments using production ASR, LLM, and TTS models.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.



Source_link

READ ALSO

Databricks Open-Sources Omnigent: A Meta-Harness That Composes, Governs, and Shares AI Agents Across Claude Code, Codex, and Pi

Serving Multiple Users at Once: How Continuous Batching Keeps LLM Inference Efficient

Related Posts

Databricks Open-Sources Omnigent: A Meta-Harness That Composes, Governs, and Shares AI Agents Across Claude Code, Codex, and Pi
Al, Analytics and Automation

Databricks Open-Sources Omnigent: A Meta-Harness That Composes, Governs, and Shares AI Agents Across Claude Code, Codex, and Pi

June 14, 2026
Serving Multiple Users at Once: How Continuous Batching Keeps LLM Inference Efficient
Al, Analytics and Automation

Serving Multiple Users at Once: How Continuous Batching Keeps LLM Inference Efficient

June 14, 2026
How to Build a QwenPaw Agent Workspace with Custom Skills, Model Providers, Console Access, and Streaming API Testing
Al, Analytics and Automation

How to Build a QwenPaw Agent Workspace with Custom Skills, Model Providers, Console Access, and Streaming API Testing

June 14, 2026
The Roadmap for Mastering LLMOps in 2026
Al, Analytics and Automation

The Roadmap for Mastering LLMOps in 2026

June 13, 2026
When it comes to predicting people’s preferences, it pays to consider “the power of three” | MIT News
Al, Analytics and Automation

When it comes to predicting people’s preferences, it pays to consider “the power of three” | MIT News

June 13, 2026
Moonshot AI Releases Kimi K2.7-Code: a Coding Model Reporting +21.8% on Kimi Code Bench v2 Over K2.6
Al, Analytics and Automation

Moonshot AI Releases Kimi K2.7-Code: a Coding Model Reporting +21.8% on Kimi Code Bench v2 Over K2.6

June 13, 2026
Next Post
How AI Models Are Changing Fashion Photography

How AI Models Are Changing Fashion Photography

POPULAR NEWS

Trump ends trade talks with Canada over a digital services tax

Trump ends trade talks with Canada over a digital services tax

June 28, 2025
15 Trending Songs on TikTok in 2025 (+ How to Use Them)

15 Trending Songs on TikTok in 2025 (+ How to Use Them)

June 18, 2025
Communication Effectiveness Skills For Business Leaders

Communication Effectiveness Skills For Business Leaders

June 10, 2025
App Development Cost in Singapore: Pricing Breakdown & Insights

App Development Cost in Singapore: Pricing Breakdown & Insights

June 22, 2025
Comparing the Top 7 Large Language Models LLMs/Systems for Coding in 2025

Comparing the Top 7 Large Language Models LLMs/Systems for Coding in 2025

November 4, 2025

EDITOR'S PICK

10+ Best Email Deliverability Tools for 2025 (Ranked & Compared)

10+ Best Email Deliverability Tools for 2025 (Ranked & Compared)

May 27, 2025

How communicators can get the most out of qualitative feedback

April 6, 2026
My Take on the 8 Best Employee Experience Software

My Take on the 8 Best Employee Experience Software

May 5, 2026
Experience Coachella 2026 live on YouTube

Experience Coachella 2026 live on YouTube

April 4, 2026

About

We bring you the best Premium WordPress Themes that perfect for news, magazine, personal blog, etc. Check our landing page for details.

Follow us

Categories

  • Account Based Marketing
  • Ad Management
  • Al, Analytics and Automation
  • Brand Management
  • Channel Marketing
  • Digital Marketing
  • Direct Marketing
  • Event Management
  • Google Marketing
  • Marketing Attribution and Consulting
  • Marketing Automation
  • Mobile Marketing
  • PR Solutions
  • Social Media Management
  • Technology And Software
  • Uncategorized

Recent Posts

  • NASA’s X-59 Reaches Speed And Altitude Milestones Ahead Of First Quiet Supersonic Flights
  • How to Communicate a Price Increase to Customers
  • New exhibition explores the enduring legacies of the American Civil War in Canada
  • Read Sundar Pichai’s Stanford commencement speech
  • About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions