Python Real-Time Audio Streaming — Deep Dive
Architecture of a real-time audio system
A production real-time audio pipeline has three layers:
- Audio I/O thread — High-priority, callback-driven, talks to hardware via PortAudio. Does minimal work: copies data to/from shared buffers.
- Processing thread — Pulls from input ring buffer, applies DSP (filters, effects, analysis), pushes to output ring buffer.
- Control/UI thread — Main Python thread handling user interaction, parameter changes, visualization.
Mic → [Audio Thread] → Ring Buffer → [Processing Thread] → Ring Buffer → [Audio Thread] → Speaker
↕
[UI/Control Thread]
This separation ensures the audio thread never blocks on processing, and processing never blocks on UI.
Lock-free ring buffer
Standard queue.Queue uses locks, which can cause priority inversion in the audio thread. A lock-free ring buffer uses atomic operations:
import numpy as np
from multiprocessing import RawArray, Value
import ctypes
class LockFreeRingBuffer:
"""Single-producer, single-consumer lock-free ring buffer."""
def __init__(self, capacity: int):
self.capacity = capacity
self._buffer = RawArray(ctypes.c_float, capacity)
self._write_pos = Value(ctypes.c_long, 0)
self._read_pos = Value(ctypes.c_long, 0)
def write(self, data: np.ndarray) -> int:
"""Write samples. Returns number written (may be < len(data) if full)."""
w = self._write_pos.value
r = self._read_pos.value
available = self.capacity - (w - r)
n = min(len(data), available)
for i in range(n):
self._buffer[(w + i) % self.capacity] = data[i]
self._write_pos.value = w + n
return n
def read(self, count: int) -> np.ndarray:
"""Read up to count samples. Returns what's available."""
w = self._write_pos.value
r = self._read_pos.value
available = w - r
n = min(count, available)
result = np.zeros(n, dtype=np.float32)
for i in range(n):
result[i] = self._buffer[(r + i) % self.capacity]
self._read_pos.value = r + n
return result
For production use, consider sounddevice’s built-in CallbackFlags and the cffi-based RingBuffer class, or use rtmidi’s lock-free primitives.
Overlap-add processing
Many DSP operations (FFT-based filtering, spectral effects) require windows larger than one callback block. The overlap-add method handles this:
class OverlapAddProcessor:
def __init__(self, fft_size=2048, hop_size=512, process_fn=None):
self.fft_size = fft_size
self.hop_size = hop_size
self.process_fn = process_fn or (lambda s: s)
self.input_buffer = np.zeros(fft_size, dtype=np.float32)
self.output_buffer = np.zeros(fft_size, dtype=np.float32)
self.window = np.hanning(fft_size).astype(np.float32)
def process_block(self, block: np.ndarray) -> np.ndarray:
"""Process a hop_size block, returning hop_size output samples."""
# Shift input buffer and append new samples
self.input_buffer[:-self.hop_size] = self.input_buffer[self.hop_size:]
self.input_buffer[-self.hop_size:] = block
# Window and FFT
windowed = self.input_buffer * self.window
spectrum = np.fft.rfft(windowed)
# Process in frequency domain
processed_spectrum = self.process_fn(spectrum)
# IFFT and overlap-add
processed = np.fft.irfft(processed_spectrum, n=self.fft_size)
processed *= self.window
# Add to output buffer
self.output_buffer += processed
# Extract output and shift
output = self.output_buffer[:self.hop_size].copy()
self.output_buffer[:-self.hop_size] = self.output_buffer[self.hop_size:]
self.output_buffer[-self.hop_size:] = 0
return output
Example: real-time spectral filter
def bandpass_filter(spectrum, low_bin=10, high_bin=200):
"""Zero out frequencies outside the passband."""
result = np.zeros_like(spectrum)
result[low_bin:high_bin] = spectrum[low_bin:high_bin]
return result
ola = OverlapAddProcessor(fft_size=2048, hop_size=512, process_fn=bandpass_filter)
def audio_callback(indata, outdata, frames, time, status):
mono = indata[:, 0]
processed = ola.process_block(mono)
outdata[:, 0] = processed
Multi-threaded DSP pipeline
For heavy processing (ML inference, multi-stage effects), separate audio I/O from DSP:
import threading
import queue
class AudioPipeline:
def __init__(self, sr=44100, blocksize=512):
self.sr = sr
self.blocksize = blocksize
self.input_q = queue.Queue(maxsize=32)
self.output_q = queue.Queue(maxsize=32)
self._running = False
def _audio_callback(self, indata, outdata, frames, time, status):
try:
self.input_q.put_nowait(indata[:, 0].copy())
except queue.Full:
pass # Drop frame rather than block
try:
outdata[:, 0] = self.output_q.get_nowait()
except queue.Empty:
outdata.fill(0) # Silence on underrun
def _processing_thread(self):
while self._running:
try:
block = self.input_q.get(timeout=0.1)
processed = self.process(block)
self.output_q.put(processed, timeout=0.1)
except queue.Empty:
continue
except queue.Full:
continue
def process(self, block):
"""Override this with your DSP logic."""
return block
def start(self):
self._running = True
self._proc_thread = threading.Thread(target=self._processing_thread, daemon=True)
self._proc_thread.start()
self._stream = sd.Stream(
samplerate=self.sr, blocksize=self.blocksize,
channels=1, callback=self._audio_callback
)
self._stream.start()
def stop(self):
self._running = False
self._stream.stop()
Network audio streaming
UDP streaming (lowest latency)
import socket
import struct
# Sender
def stream_to_network(indata, frames, time, status):
sock.sendto(indata.tobytes(), (TARGET_IP, 5000))
# Receiver
def receive_audio():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", 5000))
while True:
data, addr = sock.recvfrom(4096)
samples = np.frombuffer(data, dtype=np.float32)
output_q.put(samples)
UDP has no retransmission — lost packets become silence gaps. For voice chat, this is acceptable; for music, use RTP with jitter buffering.
WebSocket streaming
import asyncio
import websockets
async def stream_audio(websocket):
with sd.InputStream(samplerate=16000, channels=1, blocksize=1024) as stream:
while True:
data, _ = stream.read(1024)
await websocket.send(data.tobytes())
# Server receives and processes
async def handle_client(websocket):
async for message in websocket:
samples = np.frombuffer(message, dtype=np.float32)
# Process: speech recognition, analysis, etc.
GStreamer integration
For production media pipelines, GStreamer handles codecs, network protocols, and buffering:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
Gst.init(None)
pipeline = Gst.parse_launch(
"autoaudiosrc ! audioconvert ! audioresample ! "
"opusenc ! rtpopuspay ! udpsink host=192.168.1.100 port=5000"
)
pipeline.set_state(Gst.State.PLAYING)
Latency profiling
Measure actual round-trip latency with a loopback test:
import time
class LatencyProfiler:
def __init__(self):
self.pulse_sent = None
self.latencies = []
def callback(self, indata, outdata, frames, time_info, status):
now = time.perf_counter()
# Detect pulse in input
if np.max(np.abs(indata)) > 0.5 and self.pulse_sent:
latency_ms = (now - self.pulse_sent) * 1000
self.latencies.append(latency_ms)
self.pulse_sent = None
outdata.fill(0)
# Send a pulse periodically
if self.pulse_sent is None and len(self.latencies) < 100:
outdata[0, 0] = 1.0 # impulse
self.pulse_sent = now
Connect output to input (loopback cable or virtual audio device) and measure the distribution of round-trip times.
Platform-specific optimizations
| Platform | Best API | Notes |
|---|---|---|
| Linux | JACK | Professional low-latency, requires JACK server |
| Linux | PipeWire | Modern replacement for JACK + PulseAudio |
| macOS | CoreAudio | Default, excellent latency |
| Windows | WASAPI Exclusive | Bypasses mixer, lowest latency |
| Windows | ASIO | Professional audio interfaces |
Configure sounddevice to use a specific host API:
# Find JACK devices
jack_devices = [d for d in sd.query_devices() if 'JACK' in sd.query_hostapis(d['hostapi'])['name']]
Performance budget
For a 256-sample buffer at 44.1 kHz (5.8 ms period):
| Operation | Typical time | Budget fraction |
|---|---|---|
| Callback overhead | 0.01 ms | 0.2% |
| NumPy multiply (256 samples) | 0.005 ms | 0.1% |
| FFT (2048 point) | 0.05 ms | 0.9% |
| Simple IIR filter (256 samples) | 0.02 ms | 0.3% |
| PyTorch inference (small model) | 1–3 ms | 17–52% |
| Safety margin (target < 70%) | < 4 ms | < 70% |
Stay under 70% of the buffer period to absorb OS scheduling jitter.
One thing to remember: Production real-time audio in Python requires separating I/O from processing, using lock-free data structures, keeping callbacks minimal, and profiling latency — the audio thread’s timing contract is the non-negotiable constraint around which everything else is designed.
See Also
- Python Arcade Library Think of a magical art table that draws your game characters, listens when you press buttons, and cleans up the mess — that's Python Arcade.
- Python Audio Fingerprinting Ever wonder how Shazam identifies a song from just a few seconds of noisy audio? Audio fingerprinting is the magic behind it, and Python can do it too.
- Python Barcode Generation Picture the stripy labels on grocery items to understand how Python can create those machine-readable barcodes from numbers.
- Python Cellular Automata Imagine a checkerboard where each square follows simple rules to turn on or off — and suddenly complex patterns emerge like magic.
- Python Godot Gdscript Bridge Imagine speaking English to a friend who speaks French, with a translator in the middle — that's how Python talks to the Godot game engine.