Python Delayed Task Execution — Deep Dive

Redis Sorted Set Scheduler

The sorted set pattern is the workhorse of custom delayed execution. Items are scored by their Unix execution timestamp; a poller periodically checks for due items.

import redis
import json
import time
import uuid
from typing import Optional, Callable

class RedisDelayedQueue:
    def __init__(self, redis_client, name='delayed_tasks',
                 poll_interval=1.0):
        self.r = redis_client
        self.name = name
        self.processing = f"{name}:processing"
        self.poll_interval = poll_interval

    def schedule(self, payload, delay_seconds=0,
                 execute_at=None):
        task_id = str(uuid.uuid4())
        if execute_at is None:
            execute_at = time.time() + delay_seconds
        task = {
            'id': task_id,
            'payload': payload,
            'scheduled_at': time.time(),
            'execute_at': execute_at,
        }
        self.r.zadd(self.name, {json.dumps(task): execute_at})
        return task_id

    def cancel(self, task_id):
        """Cancel a scheduled task by ID."""
        # Scan for the task — O(n) but acceptable for
        # occasional cancellations
        for member in self.r.zscan_iter(self.name):
            task = json.loads(member[0])
            if task['id'] == task_id:
                self.r.zrem(self.name, member[0])
                return True
        return False

    def poll_due(self, batch_size=10):
        """Atomically fetch due tasks."""
        now = time.time()
        # Lua script for atomic pop of due items
        script = """
        local items = redis.call('ZRANGEBYSCORE',
            KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
        if #items > 0 then
            for _, item in ipairs(items) do
                redis.call('ZREM', KEYS[1], item)
                redis.call('SADD', KEYS[2], item)
            end
        end
        return items
        """
        items = self.r.eval(
            script, 2, self.name, self.processing,
            str(now), str(batch_size)
        )
        return [json.loads(item) for item in (items or [])]

    def ack(self, task):
        self.r.srem(self.processing, json.dumps(task))

    def reclaim_stale(self, timeout=300):
        """Re-schedule tasks stuck in processing."""
        members = self.r.smembers(self.processing)
        reclaimed = 0
        for raw in members:
            task = json.loads(raw)
            age = time.time() - task.get('execute_at', 0)
            if age > timeout:
                self.r.srem(self.processing, raw)
                # Re-schedule for immediate execution
                self.r.zadd(
                    self.name,
                    {raw: time.time()}
                )
                reclaimed += 1
        return reclaimed

Running the Poller

import asyncio

async def delayed_queue_worker(queue, handler,
                                workers=5):
    semaphore = asyncio.Semaphore(workers)

    async def process(task):
        async with semaphore:
            try:
                await handler(task['payload'])
                queue.ack(task)
            except Exception as e:
                # Task failed — move back to delayed queue
                # with exponential backoff
                retry_delay = min(
                    300, 2 ** task.get('retries', 0) * 5
                )
                task['retries'] = task.get('retries', 0) + 1
                queue.r.srem(queue.processing,
                            json.dumps(task))
                queue.schedule(
                    task['payload'],
                    delay_seconds=retry_delay
                )

    while True:
        due_tasks = queue.poll_due(batch_size=workers)
        if due_tasks:
            await asyncio.gather(
                *[process(task) for task in due_tasks]
            )
        else:
            await asyncio.sleep(queue.poll_interval)

Timer Wheel Implementation

For systems managing millions of timers (connection timeouts, session expiry), sorted sets become expensive at O(log n) per operation. Timer wheels provide O(1) insertion and O(1) expiry detection.

import time
from collections import defaultdict

class TimerWheel:
    def __init__(self, tick_duration_ms=100,
                 wheel_size=1024):
        self.tick_duration = tick_duration_ms / 1000
        self.wheel_size = wheel_size
        self.wheel = [[] for _ in range(wheel_size)]
        self.current_tick = 0
        self.start_time = time.monotonic()

    def _time_to_tick(self, delay_seconds):
        ticks = int(delay_seconds / self.tick_duration)
        return (self.current_tick + ticks) % self.wheel_size

    def schedule(self, callback, delay_seconds,
                 *args, **kwargs):
        if delay_seconds > self.wheel_size * self.tick_duration:
            raise ValueError(
                f"Delay exceeds wheel capacity: "
                f"{self.wheel_size * self.tick_duration}s"
            )
        slot = self._time_to_tick(delay_seconds)
        expiry = time.monotonic() + delay_seconds
        self.wheel[slot].append(
            (expiry, callback, args, kwargs)
        )

    def advance(self):
        """Call periodically to fire due timers."""
        now = time.monotonic()
        elapsed_ticks = int(
            (now - self.start_time) / self.tick_duration
        )

        while self.current_tick < elapsed_ticks:
            slot = self.current_tick % self.wheel_size
            remaining = []
            for expiry, callback, args, kwargs in \
                    self.wheel[slot]:
                if now >= expiry:
                    callback(*args, **kwargs)
                else:
                    remaining.append(
                        (expiry, callback, args, kwargs)
                    )
            self.wheel[slot] = remaining
            self.current_tick += 1

Timer wheels are used in Kafka, Netty, and Linux kernel. For Python, they’re useful when managing 100K+ concurrent timers.

Celery ETA Internals

Redis Backend

With Redis, Celery stores ETA tasks in a sorted set (unacked_index). Celery Beat and worker threads poll this set every broker_transport_options['visibility_timeout'] seconds (default: 3600). The actual poll interval for ETA messages is controlled by the worker’s timer precision.

The implication: ETA tasks with Redis have coarse-grained timing. Don’t expect sub-second precision. The default polling is every 5 seconds.

RabbitMQ Backend

With RabbitMQ, Celery uses per-message TTL and dead-letter exchanges. The message is published with a TTL equal to the delay. When the TTL expires, RabbitMQ dead-letters the message to the actual consumer queue. This gives better timing precision (millisecond-level).

ETA Gotchas

  1. Clock skew — ETA uses UTC timestamps. If the scheduler and worker have different clocks, tasks fire at the wrong time. Use NTP.
  2. Queue depth — ETA tasks in Redis count against the sorted set, not the queue. A million scheduled tasks can cause Redis memory issues.
  3. Worker restart — ETA tasks are re-evaluated on worker restart. If a worker was down during the ETA window, the task fires immediately on restart (which is usually correct).

Database-Backed Scheduling

For the highest durability, store schedules in your database:

from sqlalchemy import (
    Column, String, DateTime, JSON, Boolean,
    create_engine, func
)
from sqlalchemy.orm import declarative_base, Session

Base = declarative_base()

class ScheduledTask(Base):
    __tablename__ = 'scheduled_tasks'

    id = Column(String, primary_key=True)
    task_name = Column(String, nullable=False)
    payload = Column(JSON)
    execute_at = Column(DateTime, nullable=False, index=True)
    claimed_by = Column(String, nullable=True)
    claimed_at = Column(DateTime, nullable=True)
    completed = Column(Boolean, default=False)
    created_at = Column(DateTime, server_default=func.now())

class DatabaseScheduler:
    def __init__(self, session_factory, worker_id):
        self.session_factory = session_factory
        self.worker_id = worker_id

    def schedule(self, task_name, payload, execute_at):
        with self.session_factory() as session:
            task = ScheduledTask(
                id=str(uuid.uuid4()),
                task_name=task_name,
                payload=payload,
                execute_at=execute_at,
            )
            session.add(task)
            session.commit()
            return task.id

    def claim_due(self, batch_size=10):
        """Atomically claim due tasks for this worker."""
        with self.session_factory() as session:
            # SELECT ... FOR UPDATE SKIP LOCKED
            # ensures no two workers claim the same task
            tasks = (
                session.query(ScheduledTask)
                .filter(
                    ScheduledTask.execute_at <= func.now(),
                    ScheduledTask.claimed_by.is_(None),
                    ScheduledTask.completed == False,
                )
                .with_for_update(skip_locked=True)
                .limit(batch_size)
                .all()
            )
            for task in tasks:
                task.claimed_by = self.worker_id
                task.claimed_at = func.now()
            session.commit()
            return tasks

    def complete(self, task_id):
        with self.session_factory() as session:
            task = session.get(ScheduledTask, task_id)
            task.completed = True
            session.commit()

The FOR UPDATE SKIP LOCKED clause is critical for multiple workers polling the same table — it prevents lock contention and duplicate claiming.

Hybrid Architecture

Production systems often combine approaches:

Short delays (< 5 min)     → asyncio.sleep or Redis sorted set
Medium delays (5 min - 1h) → Redis sorted set with persistence
Long delays (> 1 hour)     → Database-backed scheduler
Recurring schedules        → Celery Beat or APScheduler

The reason: short delays need low latency (Redis), long delays need durability (database), and recurring schedules need cron-like semantics.

Exactly-Once Delivery for Delayed Tasks

The hardest problem: ensuring a delayed task runs exactly once, even across failures.

async def exactly_once_delayed(scheduler, handler, task):
    # 1. Claim with atomic lock
    claimed = scheduler.claim_due(batch_size=1)
    if not claimed:
        return

    task = claimed[0]
    try:
        # 2. Process within a transaction fence
        result = await handler(task.payload)
        # 3. Mark complete in same logical operation
        scheduler.complete(task.id)
    except Exception:
        # 4. Release claim so another worker can retry
        scheduler.release(task.id)
        raise

True exactly-once requires the handler’s side effects and the completion marker to be in the same transaction. When the handler talks to external systems, use the idempotency key pattern (see python-message-deduplication) to make retries safe.

One thing to remember: For delayed execution, durability and timing precision pull in opposite directions. In-memory solutions are precise but fragile; database solutions are durable but imprecise. Choose based on which failure mode your application can tolerate — a task running slightly late, or a task not running at all.

pythonschedulingtask-processing

See Also