• About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
Thursday, January 22, 2026
mGrowTech
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
No Result
View All Result
mGrowTech
No Result
View All Result
Home Al, Analytics and Automation

How to Design a Fully Streaming Voice Agent with End-to-End Latency Budgets, Incremental ASR, LLM Streaming, and Real-Time TTS

Josh by Josh
January 20, 2026
in Al, Analytics and Automation
0
How to Design a Fully Streaming Voice Agent with End-to-End Latency Budgets, Incremental ASR, LLM Streaming, and Real-Time TTS
0
SHARES
0
VIEWS
Share on FacebookShare on Twitter


In this tutorial, we build an end-to-end streaming voice agent that mirrors how modern low-latency conversational systems operate in real time. We simulate the complete pipeline, from chunked audio input and streaming speech recognition to incremental language model reasoning and streamed text-to-speech output, while explicitly tracking latency at every stage. By working with strict latency budgets and observing metrics such as time to first token and time to first audio, we focus on the practical engineering trade-offs that shape responsive voice-based user experiences. Check out the FULL CODES here.

import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import List, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
   asr_partial: float = 0.0
   asr_complete: float = 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
   tts_first_chunk: float = 0.0
   tts_complete: float = 0.0


   def get_time_to_first_audio(self) -> float:
       return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
class LatencyBudgets:
   asr_processing: float = 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
   tts_first_chunk: float = 0.2
   tts_chunk_generation: float = 0.05
   time_to_first_audio: float = 1.0


class AgentState(Enum):
   LISTENING = "listening"
   PROCESSING_SPEECH = "processing_speech"
   THINKING = "thinking"
   SPEAKING = "speaking"
   INTERRUPTED = "interrupted"

We define the core data structures and state representations that allow us to track latency across the entire voice pipeline. We formalize timing signals for ASR, LLM, and TTS to ensure consistent measurement across all stages. We also establish a clear agent state machine that guides how the system transitions during a conversational turn. Check out the FULL CODES here.

class AudioInputStream:
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, text: str) -> AsyncIterator[np.ndarray]:
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(text) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


       for _ in range(num_chunks):
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
           yield chunk

We simulate real-time audio input by breaking speech into fixed-duration chunks that arrive asynchronously. We model realistic speaking rates and streaming behavior to mimic live microphone input. We use this stream as the foundation for testing downstream latency-sensitive components. Check out the FULL CODES here.

class StreamingASR:
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


   async def transcribe_stream(
       self,
       audio_stream: AsyncIterator[np.ndarray],
       ground_truth: str
   ) -> AsyncIterator[tuple[str, bool]]:
       words = ground_truth.split()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


       async for chunk in audio_stream:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed < len(words):
               words_transcribed += 1
               yield " ".join(words[:words_transcribed]), False


           audio_power = np.mean(np.abs(chunk))
           silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0


           if silence_duration >= self.silence_threshold:
               await asyncio.sleep(0.2)
               yield ground_truth, True
               return


       yield ground_truth, True

We implement a streaming ASR module that produces partial transcriptions before emitting a final result. We progressively reveal words to reflect how modern ASR systems operate in real time. We also introduce silence-based finalization to approximate end-of-utterance detection. Check out the FULL CODES here.

class StreamingLLM:
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, prompt: str) -> AsyncIterator[str]:
       responses = {
           "hello": "Hello! How can I help you today?",
           "weather": "The weather is sunny with a temperature of 72°F.",
           "time": "The current time is 2:30 PM.",
           "default": "I understand. Let me help you with that."
       }


       response = responses["default"]
       for key in responses:
           if key in prompt.lower():
               response = responses[key]
               break


       await asyncio.sleep(self.time_to_first_token)


       for word in response.split():
           yield word + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS:
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
       first_chunk = True
       buffer = ""


       async for text in text_stream:
           buffer += text
           if len(buffer) >= 20 or first_chunk:
               if first_chunk:
                   await asyncio.sleep(self.time_to_first_chunk)
                   first_chunk = False


               duration = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(duration * 0.5)

In this snippet, we model a streaming language model and a streaming text-to-speech engine working together. We generate responses token by token to capture time-to-first-token behavior. We then convert incremental text into audio chunks to simulate early and continuous speech synthesis. Check out the FULL CODES here.

class StreamingVoiceAgent:
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
       self.audio_stream = AudioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: List[LatencyMetrics] = []


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for text, final in self.asr.transcribe_stream(audio_gen, user_input):
           if final:
               metrics.asr_complete = time.time() - start_time
               transcription = text


       metrics.llm_started = time.time() - start_time
       response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


       async def text_stream():
           for word in response.split():
               yield word + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
       return metrics

We orchestrate the full voice agent by wiring audio input, ASR, LLM, and TTS into a single asynchronous flow. We record precise timestamps at each transition to compute critical latency metrics. We treat each user turn as an isolated experiment to enable systematic performance analysis. Check out the FULL CODES here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


   inputs = [
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   ]


   for text in inputs:
       await agent.process_turn(text)
       await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(run_demo())

We run the entire system across multiple conversational turns to observe latency consistency and variance. We apply aggressive latency budgets to stress the pipeline under realistic constraints. We use these runs to validate whether the system meets responsiveness targets across interactions.

In conclusion, we demonstrated how a fully streaming voice agent can be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable performance guarantees. We showed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived latency, even when total computation time remains non-trivial. This approach helps us reason systematically about turn-taking, responsiveness, and optimization levers, and it provides a solid foundation for extending the system toward real-world deployments using production ASR, LLM, and TTS models.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.



Source_link

READ ALSO

FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning

Salesforce AI Introduces FOFPred: A Language-Driven Future Optical Flow Prediction Framework that Enables Improved Robot Control and Video Generation

Related Posts

FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning
Al, Analytics and Automation

FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning

January 22, 2026
Al, Analytics and Automation

Salesforce AI Introduces FOFPred: A Language-Driven Future Optical Flow Prediction Framework that Enables Improved Robot Control and Video Generation

January 21, 2026
Why it’s critical to move beyond overly aggregated machine-learning metrics | MIT News
Al, Analytics and Automation

Why it’s critical to move beyond overly aggregated machine-learning metrics | MIT News

January 21, 2026
What are Context Graphs? – MarkTechPost
Al, Analytics and Automation

What are Context Graphs? – MarkTechPost

January 21, 2026
IVO’s $55M Boost Signals AI-Driven Law Future (and It’s Just Getting Started)
Al, Analytics and Automation

IVO’s $55M Boost Signals AI-Driven Law Future (and It’s Just Getting Started)

January 20, 2026
HotChat.AI Image Generator Review: Features and Pricing Explained
Al, Analytics and Automation

HotChat.AI Image Generator Review: Features and Pricing Explained

January 20, 2026
Next Post
How AI Models Are Changing Fashion Photography

How AI Models Are Changing Fashion Photography

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

POPULAR NEWS

Trump ends trade talks with Canada over a digital services tax

Trump ends trade talks with Canada over a digital services tax

June 28, 2025
Communication Effectiveness Skills For Business Leaders

Communication Effectiveness Skills For Business Leaders

June 10, 2025
15 Trending Songs on TikTok in 2025 (+ How to Use Them)

15 Trending Songs on TikTok in 2025 (+ How to Use Them)

June 18, 2025
App Development Cost in Singapore: Pricing Breakdown & Insights

App Development Cost in Singapore: Pricing Breakdown & Insights

June 22, 2025
Google announced the next step in its nuclear energy plans 

Google announced the next step in its nuclear energy plans 

August 20, 2025

EDITOR'S PICK

Hydrow’s new Arc rowing machine has a bigger screen and, alas, AI

Hydrow’s new Arc rowing machine has a bigger screen and, alas, AI

September 16, 2025
Where to pre-order Google’s Pixel 9 series to get the best deal

Where to pre-order Google’s Pixel 9 series to get the best deal

December 9, 2025
AI Insurance Software Development in the UK

AI Insurance Software Development in the UK

January 15, 2026
How Simplicity Builds Competitive Advantage

How Simplicity Builds Competitive Advantage

June 5, 2025

About

We bring you the best Premium WordPress Themes that perfect for news, magazine, personal blog, etc. Check our landing page for details.

Follow us

Categories

  • Account Based Marketing
  • Ad Management
  • Al, Analytics and Automation
  • Brand Management
  • Channel Marketing
  • Digital Marketing
  • Direct Marketing
  • Event Management
  • Google Marketing
  • Marketing Attribution and Consulting
  • Marketing Automation
  • Mobile Marketing
  • PR Solutions
  • Social Media Management
  • Technology And Software
  • Uncategorized

Recent Posts

  • Why CEOs and CHROs Are Turning to Louis Carter to Engineer the Workplaces of the Future
  • What Type of Mattress Is Right for You? (2026)
  • FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning
  • Insights on Marketing Automation in 2026
  • About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions

Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?