Real-Time Voice Streaming and Stateful Buffering
Table of Contents
Last year I wanted to build a live transcriber/fact extractor to learn more about the process of building such a system. To my surprise, downloading and using a Whisper model in Python is pretty straightforward
But like with any side project — the hard part is always something unexpected. In this case, it was streaming audio processing. I manged to get something working last year, but honestly it was a hot mess of threads and queues. This week I revisited this repo and revised my approach (and open sourced it here!).
The challenge with streaming audio processing is knowing when to start listening and when to stop. The Whisper model has some VAD (voice activity detection) built in, but I needed to reduce load on the CPU. Sending any chunks of audio would slow the system down, so I needed to minimise unnecessary calls to the model.
So here is a walkthrough of the buffering solution I used that could detect arbitrary speech onset, capture the full utterance with natural padding, and handle the messy reality of human speech (pauses, false starts, and varying speech lengths).
Architecture Overview
The solution splits into two main components:
VADAudioBuffer: A stateful buffer that implements the “listening logic” using WebRTC’s Voice Activity DetectorVoiceStreamer: A threading wrapper that bridges audio I/O (viasounddevice) with your processing pipeline
The Buffering Logic: Lookback, Trigger, and Padding
The heart of the system is the VADAudioBuffer.process() method, which
implements a state machine with three distinct phases:
Phase 1: The Lookback Ring Buffer (Pre-trigger)
Before any speech is detected, audio flows into a ring buffer
(self.ring_buffer) that stores the last 300ms of audio (configurable via
padding_duration_s). This is crucial because human speech doesn’t start
abruptly at the detection threshold. There are often breaths, plosives, or soft
beginnings that the VAD might initially miss as “non-speech.”
# Simplified logic
self.ring_buffer.append((frame, is_speech))
if not self.triggered:
num_voiced = sum(1 for _, s in self.ring_buffer if s)
if num_voiced > 0.9 * self.ring_buffer.maxlen:
self.trigger() # Captures ring_buffer contents + new framesPhase 2: Active Recording (Triggered State)
Once triggered, the buffer switches modes. Instead of overwriting old data, it
accumulates frames in self.voiced_frames. The system stays in this state until
it detects sustained silence—specifically, when 90% of the ring buffer contains
non-speech frames.
This is where the max_duration_s safety limit becomes important. Without it, a
continuous noise (air conditioning, road noise mistaken for speech) could
accumulate indefinitely. The system forces a flush when max_frames is reached.
Phase 3: Post-Processing and Padding
When silence is detected (or max duration hit), flush_buffer() combines all
accumulated frames into a single byte string, converts it from int16 back to
float32, and returns the numpy array.
Notice that the final utterance includes:
- The lookback frames (captured speech onset)
- All voiced frames during the triggered state
- Natural padding at the end (the silence that triggered the stop)
The Streaming Layer: Threading and Generators
The VoiceStreamer class solves the I/O blocking problem. Audio capture happens
in a callback thread (via sounddevice), but VAD processing happens in a
dedicated worker thread. This separation prevents audio dropouts if your
processing callback (e.g., speech-to-text) takes variable time.
The class implements the context manager protocol and yields results as a generator:
with VoiceStreamer(record_device=0, callback=my_stt_function) as streamer:
for result in streamer:
print(result)Writing this with asyncio instead of generators is definitely an option, but I
was already pretty far into unexplored territory. I didn’t want to get too lost
in the weeds before getting something working!
Edge Cases and Tuning
This architecture handles several real-world scenarios gracefully:
- False starts: If someone coughs or a chair scrapes, the lookback buffer prevents the system from capturing only the tail end of the noise. The 90% threshold requires sustained speech.
- Pause in speech: The padding duration (300ms) allows for natural pauses between words. If the pause exceeds the padding, the buffer flushes, and a new utterance starts when speech resumes.
- Continuous noise: The max duration limit prevents memory issues.
The Complete Picture
import collections
import logging
import queue
import threading
from typing import Any, Callable
import numpy as np
import sounddevice as sd
import webrtcvad
logger = logging.getLogger(__name__)
class VADAudioBuffer:
sample_rate = 16000
frame_duration_s = 0.03
def __init__(
self,
padding_duration_s=0.3,
lookback_s=0.5,
max_duration_s=5.0,
min_duration_s=0.3,
):
self.padding_duration_s = padding_duration_s
self.lookback_s = lookback_s
self.max_duration_s = max_duration_s
self.min_duration_s = min_duration_s
self.frame_size = int(self.sample_rate * self.frame_duration_s)
self.lookback_frames = int(self.lookback_s / self.frame_duration_s)
self.padding_frames = int(
self.padding_duration_s / self.frame_duration_s
)
self.max_frames = int(self.max_duration_s / self.frame_duration_s)
self.min_frames = int(self.min_duration_s / self.frame_duration_s)
self.vad = webrtcvad.Vad(1)
self.triggered = False
self.voiced_frames = []
self.ring_buffer = collections.deque(maxlen=self.padding_frames)
def process(self, data: np.ndarray) -> np.ndarray | None:
audio_np = None
frame_size_bytes = self.frame_size * 2 # 2 bytes per int16 sample
if len(data) % frame_size_bytes != 0:
raise ValueError(
f"Chunk must be a mutliple of {frame_size_bytes} samples "
f"(got {len(data)})"
)
# Convert float32 audio to int16 for VAD processing
# data is mono float32 from the stream, flatten in case it's (N, 1)
data_flat = data.flatten()
chunk = (data_flat * 32767).astype(np.int16).tobytes()
for i in range(0, len(chunk), frame_size_bytes):
frame = chunk[i : i + frame_size_bytes]
is_speech = self.vad.is_speech(frame, self.sample_rate)
self.ring_buffer.append((frame, is_speech))
if not self.triggered:
num_voiced = sum(1 for _, s in self.ring_buffer if s)
if num_voiced > 0.9 * self.ring_buffer.maxlen:
logger.debug("Triggered")
self.load_buffer()
else:
self.voiced_frames.append(frame)
num_unvoiced = sum(1 for _, s in self.ring_buffer if not s)
# Force flush when max duration is reached
if len(self.voiced_frames) >= self.max_frames:
audio_np = self.flush_buffer()
elif num_unvoiced > 0.9 * self.ring_buffer.maxlen:
audio_np = self.flush_buffer()
self.unload_buffer()
return audio_np
def post_process(self, utterance: bytes) -> np.ndarray:
audio_np = np.frombuffer(utterance, dtype=np.int16)
audio_np = audio_np.astype(np.float32)
audio_np /= 32768.0
return audio_np
def unload_buffer(self):
logger.debug("Untriggered")
self.triggered = False
self.ring_buffer.clear()
def load_buffer(self):
logger.debug("Triggered")
self.triggered = True
self.voiced_frames = [f for f, _ in self.ring_buffer]
self.ring_buffer.clear()
def flush_buffer(self) -> np.ndarray:
if len(self.voiced_frames) <= self.min_frames:
return None
utterance = b"".join(self.voiced_frames)
audio_np = self.post_process(utterance)
self.voiced_frames = []
return audio_np
class VoiceStreamer:
model_size = "small.en"
def __init__(
self,
record_device: int,
callback: Callable[[np.ndarray], Any | None] | None,
):
self.record_device = record_device
self.callback = callback
self.vad_buffer = VADAudioBuffer()
self.input_stream = sd.InputStream(
samplerate=self.vad_buffer.sample_rate,
blocksize=2
* int(
self.vad_buffer.sample_rate * self.vad_buffer.frame_duration_s
),
dtype=np.float32,
channels=1,
device=self.record_device,
callback=self.stream_callback,
)
self.worker_thread = threading.Thread(
target=self.worker,
daemon=True,
)
self.stop_signal = threading.Event()
self.audio_queue = queue.Queue(maxsize=100)
self.result_queue = queue.Queue()
def __enter__(self):
logger.info("Starting transcriber")
self.input_stream.start()
self.worker_thread.start()
return self.gen()
def gen(self):
try:
yield from self.get_result()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received, shutting down...")
pass # Exit the iterator, GOTO __exit__
def get_result(self):
while not self.stop_signal.is_set():
try:
text = self.result_queue.get(timeout=0.1)
except queue.Empty:
continue
yield text
def stream_callback(self, data, _frames, _time, status):
if status:
logger.info("Audio status: {status}")
try:
self.audio_queue.put_nowait(data)
except queue.Full:
pass # Drop audio if overflowing
def worker(self):
while not self.stop_signal.is_set():
try:
data = self.audio_queue.get(timeout=0.1)
except queue.Empty:
continue
logger.debug("Got data")
utterance = self.vad_buffer.process(data)
if utterance is None:
continue
logger.info("Got utterance")
if self.callback is None:
self.result_queue.put(utterance)
result = self.callback(utterance)
if result is not None:
logger.info("Got result")
self.result_queue.put(result)
def __exit__(self, *_):
logger.info("Ending transcriber")
self.input_stream.stop()
self.stop_signal.set()
def main():
results = []
with VoiceStreamer(record_device=0) as streamer:
for result in streamer:
results.append(result)
return results
if __name__ == "__main__":
main()Reply to this post by email blZake@proZbableodyssey.blog (remove Z characters) ↪