ProbableOdyssey | Blake Cook

Real-Time Voice Streaming and Stateful Buffering

· 6 min read · 1158 words

Table of Contents

Last year I wanted to build a live transcriber/fact extractor to learn more about the process of building such a system. To my surprise, downloading and using a Whisper model in Python is pretty straightforward

But like with any side project — the hard part is always something unexpected. In this case, it was streaming audio processing. I manged to get something working last year, but honestly it was a hot mess of threads and queues. This week I revisited this repo and revised my approach (and open sourced it here!).

The challenge with streaming audio processing is knowing when to start listening and when to stop. The Whisper model has some VAD (voice activity detection) built in, but I needed to reduce load on the CPU. Sending any chunks of audio would slow the system down, so I needed to minimise unnecessary calls to the model.

So here is a walkthrough of the buffering solution I used that could detect arbitrary speech onset, capture the full utterance with natural padding, and handle the messy reality of human speech (pauses, false starts, and varying speech lengths).

Architecture Overview

The solution splits into two main components:

  1. VADAudioBuffer: A stateful buffer that implements the “listening logic” using WebRTC’s Voice Activity Detector
  2. VoiceStreamer: A threading wrapper that bridges audio I/O (via sounddevice) with your processing pipeline

The Buffering Logic: Lookback, Trigger, and Padding

The heart of the system is the VADAudioBuffer.process() method, which implements a state machine with three distinct phases:

Phase 1: The Lookback Ring Buffer (Pre-trigger)

Before any speech is detected, audio flows into a ring buffer (self.ring_buffer) that stores the last 300ms of audio (configurable via padding_duration_s). This is crucial because human speech doesn’t start abruptly at the detection threshold. There are often breaths, plosives, or soft beginnings that the VAD might initially miss as “non-speech.”

# Simplified logic
self.ring_buffer.append((frame, is_speech))
if not self.triggered:
    num_voiced = sum(1 for _, s in self.ring_buffer if s)
    if num_voiced > 0.9 * self.ring_buffer.maxlen:
        self.trigger()  # Captures ring_buffer contents + new frames

Phase 2: Active Recording (Triggered State)

Once triggered, the buffer switches modes. Instead of overwriting old data, it accumulates frames in self.voiced_frames. The system stays in this state until it detects sustained silence—specifically, when 90% of the ring buffer contains non-speech frames.

This is where the max_duration_s safety limit becomes important. Without it, a continuous noise (air conditioning, road noise mistaken for speech) could accumulate indefinitely. The system forces a flush when max_frames is reached.

Phase 3: Post-Processing and Padding

When silence is detected (or max duration hit), flush_buffer() combines all accumulated frames into a single byte string, converts it from int16 back to float32, and returns the numpy array.

Notice that the final utterance includes:

  1. The lookback frames (captured speech onset)
  2. All voiced frames during the triggered state
  3. Natural padding at the end (the silence that triggered the stop)

The Streaming Layer: Threading and Generators

The VoiceStreamer class solves the I/O blocking problem. Audio capture happens in a callback thread (via sounddevice), but VAD processing happens in a dedicated worker thread. This separation prevents audio dropouts if your processing callback (e.g., speech-to-text) takes variable time.

The class implements the context manager protocol and yields results as a generator:

with VoiceStreamer(record_device=0, callback=my_stt_function) as streamer:
    for result in streamer:
        print(result)

Writing this with asyncio instead of generators is definitely an option, but I was already pretty far into unexplored territory. I didn’t want to get too lost in the weeds before getting something working!

Edge Cases and Tuning

This architecture handles several real-world scenarios gracefully:

The Complete Picture

import collections
import logging
import queue
import threading
from typing import Any, Callable

import numpy as np
import sounddevice as sd
import webrtcvad

logger = logging.getLogger(__name__)


class VADAudioBuffer:
    sample_rate = 16000
    frame_duration_s = 0.03

    def __init__(
        self,
        padding_duration_s=0.3,
        lookback_s=0.5,
        max_duration_s=5.0,
        min_duration_s=0.3,
    ):
        self.padding_duration_s = padding_duration_s
        self.lookback_s = lookback_s
        self.max_duration_s = max_duration_s
        self.min_duration_s = min_duration_s

        self.frame_size = int(self.sample_rate * self.frame_duration_s)
        self.lookback_frames = int(self.lookback_s / self.frame_duration_s)
        self.padding_frames = int(
            self.padding_duration_s / self.frame_duration_s
        )
        self.max_frames = int(self.max_duration_s / self.frame_duration_s)
        self.min_frames = int(self.min_duration_s / self.frame_duration_s)

        self.vad = webrtcvad.Vad(1)

        self.triggered = False
        self.voiced_frames = []
        self.ring_buffer = collections.deque(maxlen=self.padding_frames)

    def process(self, data: np.ndarray) -> np.ndarray | None:
        audio_np = None
        frame_size_bytes = self.frame_size * 2  # 2 bytes per int16 sample
        if len(data) % frame_size_bytes != 0:
            raise ValueError(
                f"Chunk must be a mutliple of {frame_size_bytes} samples "
                f"(got {len(data)})"
            )

        # Convert float32 audio to int16 for VAD processing
        # data is mono float32 from the stream, flatten in case it's (N, 1)
        data_flat = data.flatten()
        chunk = (data_flat * 32767).astype(np.int16).tobytes()

        for i in range(0, len(chunk), frame_size_bytes):
            frame = chunk[i : i + frame_size_bytes]
            is_speech = self.vad.is_speech(frame, self.sample_rate)

            self.ring_buffer.append((frame, is_speech))

            if not self.triggered:
                num_voiced = sum(1 for _, s in self.ring_buffer if s)

                if num_voiced > 0.9 * self.ring_buffer.maxlen:
                    logger.debug("Triggered")
                    self.load_buffer()

            else:
                self.voiced_frames.append(frame)
                num_unvoiced = sum(1 for _, s in self.ring_buffer if not s)

                # Force flush when max duration is reached
                if len(self.voiced_frames) >= self.max_frames:
                    audio_np = self.flush_buffer()

                elif num_unvoiced > 0.9 * self.ring_buffer.maxlen:
                    audio_np = self.flush_buffer()

                    self.unload_buffer()

        return audio_np

    def post_process(self, utterance: bytes) -> np.ndarray:
        audio_np = np.frombuffer(utterance, dtype=np.int16)
        audio_np = audio_np.astype(np.float32)
        audio_np /= 32768.0
        return audio_np

    def unload_buffer(self):
        logger.debug("Untriggered")
        self.triggered = False
        self.ring_buffer.clear()

    def load_buffer(self):
        logger.debug("Triggered")
        self.triggered = True
        self.voiced_frames = [f for f, _ in self.ring_buffer]
        self.ring_buffer.clear()

    def flush_buffer(self) -> np.ndarray:
        if len(self.voiced_frames) <= self.min_frames:
            return None

        utterance = b"".join(self.voiced_frames)
        audio_np = self.post_process(utterance)

        self.voiced_frames = []
        return audio_np


class VoiceStreamer:
    model_size = "small.en"

    def __init__(
        self,
        record_device: int,
        callback: Callable[[np.ndarray], Any | None] | None,
    ):
        self.record_device = record_device
        self.callback = callback

        self.vad_buffer = VADAudioBuffer()

        self.input_stream = sd.InputStream(
            samplerate=self.vad_buffer.sample_rate,
            blocksize=2
            * int(
                self.vad_buffer.sample_rate * self.vad_buffer.frame_duration_s
            ),
            dtype=np.float32,
            channels=1,
            device=self.record_device,
            callback=self.stream_callback,
        )

        self.worker_thread = threading.Thread(
            target=self.worker,
            daemon=True,
        )

        self.stop_signal = threading.Event()
        self.audio_queue = queue.Queue(maxsize=100)
        self.result_queue = queue.Queue()

    def __enter__(self):
        logger.info("Starting transcriber")
        self.input_stream.start()
        self.worker_thread.start()
        return self.gen()

    def gen(self):
        try:
            yield from self.get_result()
        except KeyboardInterrupt:
            logger.info("Keyboard interrupt received, shutting down...")
            pass  # Exit the iterator, GOTO __exit__

    def get_result(self):
        while not self.stop_signal.is_set():
            try:
                text = self.result_queue.get(timeout=0.1)
            except queue.Empty:
                continue
            yield text

    def stream_callback(self, data, _frames, _time, status):
        if status:
            logger.info("Audio status: {status}")
        try:
            self.audio_queue.put_nowait(data)
        except queue.Full:
            pass  # Drop audio if overflowing

    def worker(self):
        while not self.stop_signal.is_set():
            try:
                data = self.audio_queue.get(timeout=0.1)
            except queue.Empty:
                continue

            logger.debug("Got data")
            utterance = self.vad_buffer.process(data)
            if utterance is None:
                continue

            logger.info("Got utterance")
            if self.callback is None:
                self.result_queue.put(utterance)

            result = self.callback(utterance)
            if result is not None:
                logger.info("Got result")
                self.result_queue.put(result)

    def __exit__(self, *_):
        logger.info("Ending transcriber")
        self.input_stream.stop()
        self.stop_signal.set()


def main():
    results = []
    with VoiceStreamer(record_device=0) as streamer:
        for result in streamer:
            results.append(result)

    return results


if __name__ == "__main__":
    main()

Reply to this post by email blZake@proZbableodyssey.blog (remove Z characters) ↪