Python Real-Time Audio Streaming — Deep Dive

Architecture of a real-time audio system

A production real-time audio pipeline has three layers:

  1. Audio I/O thread — High-priority, callback-driven, talks to hardware via PortAudio. Does minimal work: copies data to/from shared buffers.
  2. Processing thread — Pulls from input ring buffer, applies DSP (filters, effects, analysis), pushes to output ring buffer.
  3. Control/UI thread — Main Python thread handling user interaction, parameter changes, visualization.
Mic → [Audio Thread] → Ring Buffer → [Processing Thread] → Ring Buffer → [Audio Thread] → Speaker

                                    [UI/Control Thread]

This separation ensures the audio thread never blocks on processing, and processing never blocks on UI.

Lock-free ring buffer

Standard queue.Queue uses locks, which can cause priority inversion in the audio thread. A lock-free ring buffer uses atomic operations:

import numpy as np
from multiprocessing import RawArray, Value
import ctypes

class LockFreeRingBuffer:
    """Single-producer, single-consumer lock-free ring buffer."""
    
    def __init__(self, capacity: int):
        self.capacity = capacity
        self._buffer = RawArray(ctypes.c_float, capacity)
        self._write_pos = Value(ctypes.c_long, 0)
        self._read_pos = Value(ctypes.c_long, 0)
    
    def write(self, data: np.ndarray) -> int:
        """Write samples. Returns number written (may be < len(data) if full)."""
        w = self._write_pos.value
        r = self._read_pos.value
        available = self.capacity - (w - r)
        n = min(len(data), available)
        
        for i in range(n):
            self._buffer[(w + i) % self.capacity] = data[i]
        self._write_pos.value = w + n
        return n
    
    def read(self, count: int) -> np.ndarray:
        """Read up to count samples. Returns what's available."""
        w = self._write_pos.value
        r = self._read_pos.value
        available = w - r
        n = min(count, available)
        
        result = np.zeros(n, dtype=np.float32)
        for i in range(n):
            result[i] = self._buffer[(r + i) % self.capacity]
        self._read_pos.value = r + n
        return result

For production use, consider sounddevice’s built-in CallbackFlags and the cffi-based RingBuffer class, or use rtmidi’s lock-free primitives.

Overlap-add processing

Many DSP operations (FFT-based filtering, spectral effects) require windows larger than one callback block. The overlap-add method handles this:

class OverlapAddProcessor:
    def __init__(self, fft_size=2048, hop_size=512, process_fn=None):
        self.fft_size = fft_size
        self.hop_size = hop_size
        self.process_fn = process_fn or (lambda s: s)
        
        self.input_buffer = np.zeros(fft_size, dtype=np.float32)
        self.output_buffer = np.zeros(fft_size, dtype=np.float32)
        self.window = np.hanning(fft_size).astype(np.float32)
    
    def process_block(self, block: np.ndarray) -> np.ndarray:
        """Process a hop_size block, returning hop_size output samples."""
        # Shift input buffer and append new samples
        self.input_buffer[:-self.hop_size] = self.input_buffer[self.hop_size:]
        self.input_buffer[-self.hop_size:] = block
        
        # Window and FFT
        windowed = self.input_buffer * self.window
        spectrum = np.fft.rfft(windowed)
        
        # Process in frequency domain
        processed_spectrum = self.process_fn(spectrum)
        
        # IFFT and overlap-add
        processed = np.fft.irfft(processed_spectrum, n=self.fft_size)
        processed *= self.window
        
        # Add to output buffer
        self.output_buffer += processed
        
        # Extract output and shift
        output = self.output_buffer[:self.hop_size].copy()
        self.output_buffer[:-self.hop_size] = self.output_buffer[self.hop_size:]
        self.output_buffer[-self.hop_size:] = 0
        
        return output

Example: real-time spectral filter

def bandpass_filter(spectrum, low_bin=10, high_bin=200):
    """Zero out frequencies outside the passband."""
    result = np.zeros_like(spectrum)
    result[low_bin:high_bin] = spectrum[low_bin:high_bin]
    return result

ola = OverlapAddProcessor(fft_size=2048, hop_size=512, process_fn=bandpass_filter)

def audio_callback(indata, outdata, frames, time, status):
    mono = indata[:, 0]
    processed = ola.process_block(mono)
    outdata[:, 0] = processed

Multi-threaded DSP pipeline

For heavy processing (ML inference, multi-stage effects), separate audio I/O from DSP:

import threading
import queue

class AudioPipeline:
    def __init__(self, sr=44100, blocksize=512):
        self.sr = sr
        self.blocksize = blocksize
        self.input_q = queue.Queue(maxsize=32)
        self.output_q = queue.Queue(maxsize=32)
        self._running = False
    
    def _audio_callback(self, indata, outdata, frames, time, status):
        try:
            self.input_q.put_nowait(indata[:, 0].copy())
        except queue.Full:
            pass  # Drop frame rather than block
        
        try:
            outdata[:, 0] = self.output_q.get_nowait()
        except queue.Empty:
            outdata.fill(0)  # Silence on underrun
    
    def _processing_thread(self):
        while self._running:
            try:
                block = self.input_q.get(timeout=0.1)
                processed = self.process(block)
                self.output_q.put(processed, timeout=0.1)
            except queue.Empty:
                continue
            except queue.Full:
                continue
    
    def process(self, block):
        """Override this with your DSP logic."""
        return block
    
    def start(self):
        self._running = True
        self._proc_thread = threading.Thread(target=self._processing_thread, daemon=True)
        self._proc_thread.start()
        self._stream = sd.Stream(
            samplerate=self.sr, blocksize=self.blocksize,
            channels=1, callback=self._audio_callback
        )
        self._stream.start()
    
    def stop(self):
        self._running = False
        self._stream.stop()

Network audio streaming

UDP streaming (lowest latency)

import socket
import struct

# Sender
def stream_to_network(indata, frames, time, status):
    sock.sendto(indata.tobytes(), (TARGET_IP, 5000))

# Receiver
def receive_audio():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("0.0.0.0", 5000))
    while True:
        data, addr = sock.recvfrom(4096)
        samples = np.frombuffer(data, dtype=np.float32)
        output_q.put(samples)

UDP has no retransmission — lost packets become silence gaps. For voice chat, this is acceptable; for music, use RTP with jitter buffering.

WebSocket streaming

import asyncio
import websockets

async def stream_audio(websocket):
    with sd.InputStream(samplerate=16000, channels=1, blocksize=1024) as stream:
        while True:
            data, _ = stream.read(1024)
            await websocket.send(data.tobytes())

# Server receives and processes
async def handle_client(websocket):
    async for message in websocket:
        samples = np.frombuffer(message, dtype=np.float32)
        # Process: speech recognition, analysis, etc.

GStreamer integration

For production media pipelines, GStreamer handles codecs, network protocols, and buffering:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst

Gst.init(None)
pipeline = Gst.parse_launch(
    "autoaudiosrc ! audioconvert ! audioresample ! "
    "opusenc ! rtpopuspay ! udpsink host=192.168.1.100 port=5000"
)
pipeline.set_state(Gst.State.PLAYING)

Latency profiling

Measure actual round-trip latency with a loopback test:

import time

class LatencyProfiler:
    def __init__(self):
        self.pulse_sent = None
        self.latencies = []
    
    def callback(self, indata, outdata, frames, time_info, status):
        now = time.perf_counter()
        
        # Detect pulse in input
        if np.max(np.abs(indata)) > 0.5 and self.pulse_sent:
            latency_ms = (now - self.pulse_sent) * 1000
            self.latencies.append(latency_ms)
            self.pulse_sent = None
        
        outdata.fill(0)
        
        # Send a pulse periodically
        if self.pulse_sent is None and len(self.latencies) < 100:
            outdata[0, 0] = 1.0  # impulse
            self.pulse_sent = now

Connect output to input (loopback cable or virtual audio device) and measure the distribution of round-trip times.

Platform-specific optimizations

PlatformBest APINotes
LinuxJACKProfessional low-latency, requires JACK server
LinuxPipeWireModern replacement for JACK + PulseAudio
macOSCoreAudioDefault, excellent latency
WindowsWASAPI ExclusiveBypasses mixer, lowest latency
WindowsASIOProfessional audio interfaces

Configure sounddevice to use a specific host API:

# Find JACK devices
jack_devices = [d for d in sd.query_devices() if 'JACK' in sd.query_hostapis(d['hostapi'])['name']]

Performance budget

For a 256-sample buffer at 44.1 kHz (5.8 ms period):

OperationTypical timeBudget fraction
Callback overhead0.01 ms0.2%
NumPy multiply (256 samples)0.005 ms0.1%
FFT (2048 point)0.05 ms0.9%
Simple IIR filter (256 samples)0.02 ms0.3%
PyTorch inference (small model)1–3 ms17–52%
Safety margin (target < 70%)< 4 ms< 70%

Stay under 70% of the buffer period to absorb OS scheduling jitter.

One thing to remember: Production real-time audio in Python requires separating I/O from processing, using lock-free data structures, keeping callbacks minimal, and profiling latency — the audio thread’s timing contract is the non-negotiable constraint around which everything else is designed.

pythonaudiostreamingreal-timedspportaudiolow-latency

See Also

  • Python Arcade Library Think of a magical art table that draws your game characters, listens when you press buttons, and cleans up the mess — that's Python Arcade.
  • Python Audio Fingerprinting Ever wonder how Shazam identifies a song from just a few seconds of noisy audio? Audio fingerprinting is the magic behind it, and Python can do it too.
  • Python Barcode Generation Picture the stripy labels on grocery items to understand how Python can create those machine-readable barcodes from numbers.
  • Python Cellular Automata Imagine a checkerboard where each square follows simple rules to turn on or off — and suddenly complex patterns emerge like magic.
  • Python Godot Gdscript Bridge Imagine speaking English to a friend who speaks French, with a translator in the middle — that's how Python talks to the Godot game engine.