Python RxPY Reactive Programming — Deep Dive
RxPY 3.x Architecture
RxPY 3 restructured the API around pipe() composition. Instead of method chaining on Observable objects, you import operators as standalone functions and compose them:
import reactivex as rx
from reactivex import operators as ops
source = rx.of(1, 2, 3, 4, 5)
source.pipe(
ops.filter(lambda x: x % 2 == 0),
ops.map(lambda x: x * 10),
).subscribe(
on_next=lambda v: print(f"Received: {v}"),
on_error=lambda e: print(f"Error: {e}"),
on_completed=lambda: print("Complete"),
)
# Output: Received: 20, Received: 40
This design makes operators tree-shakeable and easier to type-check. The reactivex package (the modern name for RxPY 3+) follows this pattern throughout.
Creating Observables
From Existing Data
import reactivex as rx
# From iterable
rx.of(1, 2, 3)
# From a list
rx.from_iterable([10, 20, 30])
# Empty, single error, or never-ending
rx.empty()
rx.throw(ValueError("boom"))
rx.never()
Custom Observable with create
import reactivex as rx
def my_source(observer, scheduler):
try:
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()
except Exception as e:
observer.on_error(e)
source = rx.create(my_source)
Interval and Timer
import reactivex as rx
# Emit incrementing integers every 500ms
rx.interval(0.5).pipe(
ops.take(5) # Stop after 5 emissions
)
# Single emission after 2 seconds
rx.timer(2.0)
Operator Deep Dive
flat_map vs switch_map
flat_map (also called merge_map) subscribes to every inner Observable concurrently. If a new inner Observable arrives before the previous one finishes, both run in parallel:
source.pipe(
ops.flat_map(lambda x: rx.of(x * 10).pipe(ops.delay(1.0)))
)
# All inner streams run concurrently
switch_map cancels the previous inner Observable when a new one arrives — critical for typeahead search where only the latest query matters:
keystrokes.pipe(
ops.debounce(0.3),
ops.distinct_until_changed(),
ops.switch_map(lambda query: search_api(query))
)
This automatically cancels stale searches. Without switch_map, old results could arrive after new ones and overwrite them.
scan: Running Aggregation
scan is like reduce but emits every intermediate result — useful for maintaining running state:
clicks.pipe(
ops.scan(lambda count, _: count + 1, 0)
)
# Emits: 1, 2, 3, 4, ... (running click count)
combine_latest: Reactive Joins
Combines the latest values from multiple streams whenever any stream emits:
temperature = rx.interval(2.0).pipe(ops.map(lambda _: read_temp()))
humidity = rx.interval(3.0).pipe(ops.map(lambda _: read_humidity()))
rx.combine_latest(temperature, humidity).pipe(
ops.map(lambda values: {
"temp": values[0],
"humidity": values[1],
"comfort": compute_index(values[0], values[1]),
})
)
Writing Custom Operators
Custom operators are functions that take an Observable and return an Observable:
from reactivex import Observable
import reactivex as rx
from reactivex import operators as ops
def moving_average(window_size: int):
def _operator(source: Observable):
return source.pipe(
ops.buffer_with_count(window_size, 1),
ops.filter(lambda buf: len(buf) == window_size),
ops.map(lambda buf: sum(buf) / len(buf)),
)
return _operator
# Usage
sensor_data.pipe(
moving_average(5),
).subscribe(on_next=lambda avg: print(f"Moving avg: {avg:.2f}"))
For more control, you can implement the operator from scratch:
from reactivex import Observable, Observer
from reactivex.disposable import CompositeDisposable
def rate_limit(max_per_second: float):
min_interval = 1.0 / max_per_second
def _operator(source: Observable):
def subscribe(observer: Observer, scheduler=None):
import time
last_emit = [0.0]
def on_next(value):
now = time.monotonic()
if now - last_emit[0] >= min_interval:
last_emit[0] = now
observer.on_next(value)
return source.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed,
scheduler=scheduler,
)
return rx.create(subscribe)
return _operator
Asyncio Integration
RxPY works with asyncio through the AsyncIOScheduler:
import asyncio
import reactivex as rx
from reactivex.scheduler.eventloop import AsyncIOScheduler
from reactivex import operators as ops
async def main():
loop = asyncio.get_event_loop()
scheduler = AsyncIOScheduler(loop=loop)
source = rx.interval(1.0, scheduler=scheduler).pipe(
ops.take(5),
ops.map(lambda i: i ** 2),
)
future = asyncio.Future()
source.subscribe(
on_next=lambda v: print(f"Value: {v}"),
on_completed=lambda: future.set_result(True),
on_error=lambda e: future.set_exception(e),
scheduler=scheduler,
)
await future
asyncio.run(main())
For wrapping async functions as Observables:
import reactivex as rx
import asyncio
def from_coroutine(coro_fn, *args):
def subscribe(observer, scheduler):
async def run():
try:
result = await coro_fn(*args)
observer.on_next(result)
observer.on_completed()
except Exception as e:
observer.on_error(e)
asyncio.ensure_future(run())
return rx.create(subscribe)
# Usage
from_coroutine(fetch_user, user_id).pipe(
ops.flat_map(lambda user: from_coroutine(fetch_orders, user.id))
)
Backpressure Strategies
RxPY Observables are push-based — the producer controls the speed. When producers are faster than consumers, you need backpressure strategies:
Lossy strategies (drop data you can’t process):
throttle_first/throttle_last— emit at most one value per time windowsample— periodically take the latest valuedebounce— only emit after a quiet period
Lossless strategies (buffer data for later):
buffer_with_time— collect items into batches by time windowbuffer_with_count— collect items into fixed-size batcheswindow— like buffer, but emits sub-Observables instead of lists
Hybrid approach with bounded buffers:
fast_source.pipe(
ops.buffer_with_time(1.0), # Batch into 1-second chunks
ops.filter(lambda batch: len(batch) > 0),
ops.flat_map(lambda batch: process_batch(batch)),
)
Error Handling Patterns
Retry with Exponential Backoff
import reactivex as rx
from reactivex import operators as ops
def retry_with_backoff(max_retries=3, initial_delay=1.0):
def _operator(source):
return source.pipe(
ops.retry(max_retries),
)
return _operator
# More sophisticated: delay between retries
api_call.pipe(
ops.catch(lambda err, src: src.pipe(
ops.delay(2.0),
) if isinstance(err, ConnectionError) else rx.throw(err)),
ops.retry(3),
)
Fallback Values
primary_source.pipe(
ops.timeout(5.0),
ops.on_error_resume_next(fallback_source),
)
Per-Item Error Isolation
source.pipe(
ops.flat_map(lambda item: process(item).pipe(
ops.catch(lambda err, _: rx.of({"error": str(err), "item": item}))
))
)
Real-World Pattern: IoT Sensor Pipeline
import reactivex as rx
from reactivex import operators as ops
def sensor_pipeline(raw_readings):
return raw_readings.pipe(
# Remove outliers
ops.filter(lambda r: -50 <= r["temp"] <= 150),
# Smooth with 5-reading moving average
moving_average(5),
# Only emit when value changes by more than 0.5 degrees
ops.distinct_until_changed(
comparer=lambda prev, curr: abs(prev - curr) < 0.5
),
# Batch for database writes
ops.buffer_with_time(10.0),
ops.filter(lambda batch: len(batch) > 0),
# Write batch to database
ops.flat_map(lambda batch: write_to_db(batch)),
)
Testing Reactive Pipelines
RxPY includes TestScheduler for deterministic testing of time-dependent operators:
from reactivex.testing import TestScheduler, ReactiveTest
on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
def test_debounce_pipeline():
scheduler = TestScheduler()
source = scheduler.create_hot_observable(
on_next(210, "a"),
on_next(220, "b"), # Replaces "a" (within debounce window)
on_next(500, "c"), # Emitted after gap
on_completed(600),
)
def create():
return source.pipe(ops.debounce(0.1, scheduler=scheduler))
results = scheduler.start(create)
assert results.messages == [
on_next(320, "b"), # "b" emitted 100ms after last keystroke
on_next(600, "c"),
on_completed(600),
]
Performance Considerations
- Subscription overhead: Each operator creates a new Observable and subscription chain. For hot paths processing millions of events per second, consider combining multiple simple operators into one custom operator.
- Scheduler selection: The default
CurrentThreadScheduleris synchronous and avoids thread overhead. Switch toNewThreadSchedulerorAsyncIOScheduleronly when you need concurrency. - Memory:
bufferandwindowoperators accumulate items in memory. Always use bounded variants (buffer_with_count,buffer_with_time) rather than unbounded collection. - Disposal: Always dispose of subscriptions when done. Leaked subscriptions keep the entire operator chain alive in memory.
RxPY vs Alternatives
| Feature | RxPY | asyncio Streams | Trio channels |
|---|---|---|---|
| Operator library | 100+ built-in | Manual | Manual |
| Backpressure | Push-based (lossy) | Pull-based | Pull-based |
| Time operators | Built-in | Manual timers | Manual |
| Learning curve | Steep | Moderate | Moderate |
| Community size | Medium | Large | Small |
RxPY excels when you need complex stream transformations with time-based operations. For simple producer-consumer patterns, asyncio queues are simpler and more Pythonic.
One thing to remember: RxPY’s real strength isn’t any single operator — it’s that you can compose dozens of operators into a declarative pipeline where each step is independently testable, and the framework handles subscription lifecycle, error propagation, and scheduling for you.
See Also
- Python Event Emitter Patterns How Python programs shout 'something happened!' so other parts of the code can react — like a school bell that tells everyone it's recess.
- Python Observer Vs Pubsub Two ways Python code can share news — one is like telling your friends directly, the other is like posting on a bulletin board for anyone to read.
- Python State Machines Transitions How the transitions library helps Python code manage things that change between clear stages — like a traffic light that only goes green → yellow → red.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.