Python RxPY Reactive Programming — Deep Dive

RxPY 3.x Architecture

RxPY 3 restructured the API around pipe() composition. Instead of method chaining on Observable objects, you import operators as standalone functions and compose them:

import reactivex as rx
from reactivex import operators as ops

source = rx.of(1, 2, 3, 4, 5)

source.pipe(
    ops.filter(lambda x: x % 2 == 0),
    ops.map(lambda x: x * 10),
).subscribe(
    on_next=lambda v: print(f"Received: {v}"),
    on_error=lambda e: print(f"Error: {e}"),
    on_completed=lambda: print("Complete"),
)
# Output: Received: 20, Received: 40

This design makes operators tree-shakeable and easier to type-check. The reactivex package (the modern name for RxPY 3+) follows this pattern throughout.

Creating Observables

From Existing Data

import reactivex as rx

# From iterable
rx.of(1, 2, 3)

# From a list
rx.from_iterable([10, 20, 30])

# Empty, single error, or never-ending
rx.empty()
rx.throw(ValueError("boom"))
rx.never()

Custom Observable with create

import reactivex as rx

def my_source(observer, scheduler):
    try:
        observer.on_next(1)
        observer.on_next(2)
        observer.on_next(3)
        observer.on_completed()
    except Exception as e:
        observer.on_error(e)

source = rx.create(my_source)

Interval and Timer

import reactivex as rx

# Emit incrementing integers every 500ms
rx.interval(0.5).pipe(
    ops.take(5)  # Stop after 5 emissions
)

# Single emission after 2 seconds
rx.timer(2.0)

Operator Deep Dive

flat_map vs switch_map

flat_map (also called merge_map) subscribes to every inner Observable concurrently. If a new inner Observable arrives before the previous one finishes, both run in parallel:

source.pipe(
    ops.flat_map(lambda x: rx.of(x * 10).pipe(ops.delay(1.0)))
)
# All inner streams run concurrently

switch_map cancels the previous inner Observable when a new one arrives — critical for typeahead search where only the latest query matters:

keystrokes.pipe(
    ops.debounce(0.3),
    ops.distinct_until_changed(),
    ops.switch_map(lambda query: search_api(query))
)

This automatically cancels stale searches. Without switch_map, old results could arrive after new ones and overwrite them.

scan: Running Aggregation

scan is like reduce but emits every intermediate result — useful for maintaining running state:

clicks.pipe(
    ops.scan(lambda count, _: count + 1, 0)
)
# Emits: 1, 2, 3, 4, ... (running click count)

combine_latest: Reactive Joins

Combines the latest values from multiple streams whenever any stream emits:

temperature = rx.interval(2.0).pipe(ops.map(lambda _: read_temp()))
humidity = rx.interval(3.0).pipe(ops.map(lambda _: read_humidity()))

rx.combine_latest(temperature, humidity).pipe(
    ops.map(lambda values: {
        "temp": values[0],
        "humidity": values[1],
        "comfort": compute_index(values[0], values[1]),
    })
)

Writing Custom Operators

Custom operators are functions that take an Observable and return an Observable:

from reactivex import Observable
import reactivex as rx
from reactivex import operators as ops

def moving_average(window_size: int):
    def _operator(source: Observable):
        return source.pipe(
            ops.buffer_with_count(window_size, 1),
            ops.filter(lambda buf: len(buf) == window_size),
            ops.map(lambda buf: sum(buf) / len(buf)),
        )
    return _operator

# Usage
sensor_data.pipe(
    moving_average(5),
).subscribe(on_next=lambda avg: print(f"Moving avg: {avg:.2f}"))

For more control, you can implement the operator from scratch:

from reactivex import Observable, Observer
from reactivex.disposable import CompositeDisposable

def rate_limit(max_per_second: float):
    min_interval = 1.0 / max_per_second

    def _operator(source: Observable):
        def subscribe(observer: Observer, scheduler=None):
            import time
            last_emit = [0.0]

            def on_next(value):
                now = time.monotonic()
                if now - last_emit[0] >= min_interval:
                    last_emit[0] = now
                    observer.on_next(value)

            return source.subscribe(
                on_next=on_next,
                on_error=observer.on_error,
                on_completed=observer.on_completed,
                scheduler=scheduler,
            )
        return rx.create(subscribe)
    return _operator

Asyncio Integration

RxPY works with asyncio through the AsyncIOScheduler:

import asyncio
import reactivex as rx
from reactivex.scheduler.eventloop import AsyncIOScheduler
from reactivex import operators as ops

async def main():
    loop = asyncio.get_event_loop()
    scheduler = AsyncIOScheduler(loop=loop)

    source = rx.interval(1.0, scheduler=scheduler).pipe(
        ops.take(5),
        ops.map(lambda i: i ** 2),
    )

    future = asyncio.Future()

    source.subscribe(
        on_next=lambda v: print(f"Value: {v}"),
        on_completed=lambda: future.set_result(True),
        on_error=lambda e: future.set_exception(e),
        scheduler=scheduler,
    )

    await future

asyncio.run(main())

For wrapping async functions as Observables:

import reactivex as rx
import asyncio

def from_coroutine(coro_fn, *args):
    def subscribe(observer, scheduler):
        async def run():
            try:
                result = await coro_fn(*args)
                observer.on_next(result)
                observer.on_completed()
            except Exception as e:
                observer.on_error(e)

        asyncio.ensure_future(run())

    return rx.create(subscribe)

# Usage
from_coroutine(fetch_user, user_id).pipe(
    ops.flat_map(lambda user: from_coroutine(fetch_orders, user.id))
)

Backpressure Strategies

RxPY Observables are push-based — the producer controls the speed. When producers are faster than consumers, you need backpressure strategies:

Lossy strategies (drop data you can’t process):

  • throttle_first / throttle_last — emit at most one value per time window
  • sample — periodically take the latest value
  • debounce — only emit after a quiet period

Lossless strategies (buffer data for later):

  • buffer_with_time — collect items into batches by time window
  • buffer_with_count — collect items into fixed-size batches
  • window — like buffer, but emits sub-Observables instead of lists

Hybrid approach with bounded buffers:

fast_source.pipe(
    ops.buffer_with_time(1.0),       # Batch into 1-second chunks
    ops.filter(lambda batch: len(batch) > 0),
    ops.flat_map(lambda batch: process_batch(batch)),
)

Error Handling Patterns

Retry with Exponential Backoff

import reactivex as rx
from reactivex import operators as ops

def retry_with_backoff(max_retries=3, initial_delay=1.0):
    def _operator(source):
        return source.pipe(
            ops.retry(max_retries),
        )
    return _operator

# More sophisticated: delay between retries
api_call.pipe(
    ops.catch(lambda err, src: src.pipe(
        ops.delay(2.0),
    ) if isinstance(err, ConnectionError) else rx.throw(err)),
    ops.retry(3),
)

Fallback Values

primary_source.pipe(
    ops.timeout(5.0),
    ops.on_error_resume_next(fallback_source),
)

Per-Item Error Isolation

source.pipe(
    ops.flat_map(lambda item: process(item).pipe(
        ops.catch(lambda err, _: rx.of({"error": str(err), "item": item}))
    ))
)

Real-World Pattern: IoT Sensor Pipeline

import reactivex as rx
from reactivex import operators as ops

def sensor_pipeline(raw_readings):
    return raw_readings.pipe(
        # Remove outliers
        ops.filter(lambda r: -50 <= r["temp"] <= 150),

        # Smooth with 5-reading moving average
        moving_average(5),

        # Only emit when value changes by more than 0.5 degrees
        ops.distinct_until_changed(
            comparer=lambda prev, curr: abs(prev - curr) < 0.5
        ),

        # Batch for database writes
        ops.buffer_with_time(10.0),
        ops.filter(lambda batch: len(batch) > 0),

        # Write batch to database
        ops.flat_map(lambda batch: write_to_db(batch)),
    )

Testing Reactive Pipelines

RxPY includes TestScheduler for deterministic testing of time-dependent operators:

from reactivex.testing import TestScheduler, ReactiveTest

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed

def test_debounce_pipeline():
    scheduler = TestScheduler()

    source = scheduler.create_hot_observable(
        on_next(210, "a"),
        on_next(220, "b"),   # Replaces "a" (within debounce window)
        on_next(500, "c"),   # Emitted after gap
        on_completed(600),
    )

    def create():
        return source.pipe(ops.debounce(0.1, scheduler=scheduler))

    results = scheduler.start(create)

    assert results.messages == [
        on_next(320, "b"),   # "b" emitted 100ms after last keystroke
        on_next(600, "c"),
        on_completed(600),
    ]

Performance Considerations

  • Subscription overhead: Each operator creates a new Observable and subscription chain. For hot paths processing millions of events per second, consider combining multiple simple operators into one custom operator.
  • Scheduler selection: The default CurrentThreadScheduler is synchronous and avoids thread overhead. Switch to NewThreadScheduler or AsyncIOScheduler only when you need concurrency.
  • Memory: buffer and window operators accumulate items in memory. Always use bounded variants (buffer_with_count, buffer_with_time) rather than unbounded collection.
  • Disposal: Always dispose of subscriptions when done. Leaked subscriptions keep the entire operator chain alive in memory.

RxPY vs Alternatives

FeatureRxPYasyncio StreamsTrio channels
Operator library100+ built-inManualManual
BackpressurePush-based (lossy)Pull-basedPull-based
Time operatorsBuilt-inManual timersManual
Learning curveSteepModerateModerate
Community sizeMediumLargeSmall

RxPY excels when you need complex stream transformations with time-based operations. For simple producer-consumer patterns, asyncio queues are simpler and more Pythonic.

One thing to remember: RxPY’s real strength isn’t any single operator — it’s that you can compose dozens of operators into a declarative pipeline where each step is independently testable, and the framework handles subscription lifecycle, error propagation, and scheduling for you.

pythonreactive-programmingrxpy

See Also

  • Python Event Emitter Patterns How Python programs shout 'something happened!' so other parts of the code can react — like a school bell that tells everyone it's recess.
  • Python Observer Vs Pubsub Two ways Python code can share news — one is like telling your friends directly, the other is like posting on a bulletin board for anyone to read.
  • Python State Machines Transitions How the transitions library helps Python code manage things that change between clear stages — like a traffic light that only goes green → yellow → red.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.