Python Delayed Task Execution — Deep Dive
Redis Sorted Set Scheduler
The sorted set pattern is the workhorse of custom delayed execution. Items are scored by their Unix execution timestamp; a poller periodically checks for due items.
import redis
import json
import time
import uuid
from typing import Optional, Callable
class RedisDelayedQueue:
def __init__(self, redis_client, name='delayed_tasks',
poll_interval=1.0):
self.r = redis_client
self.name = name
self.processing = f"{name}:processing"
self.poll_interval = poll_interval
def schedule(self, payload, delay_seconds=0,
execute_at=None):
task_id = str(uuid.uuid4())
if execute_at is None:
execute_at = time.time() + delay_seconds
task = {
'id': task_id,
'payload': payload,
'scheduled_at': time.time(),
'execute_at': execute_at,
}
self.r.zadd(self.name, {json.dumps(task): execute_at})
return task_id
def cancel(self, task_id):
"""Cancel a scheduled task by ID."""
# Scan for the task — O(n) but acceptable for
# occasional cancellations
for member in self.r.zscan_iter(self.name):
task = json.loads(member[0])
if task['id'] == task_id:
self.r.zrem(self.name, member[0])
return True
return False
def poll_due(self, batch_size=10):
"""Atomically fetch due tasks."""
now = time.time()
# Lua script for atomic pop of due items
script = """
local items = redis.call('ZRANGEBYSCORE',
KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #items > 0 then
for _, item in ipairs(items) do
redis.call('ZREM', KEYS[1], item)
redis.call('SADD', KEYS[2], item)
end
end
return items
"""
items = self.r.eval(
script, 2, self.name, self.processing,
str(now), str(batch_size)
)
return [json.loads(item) for item in (items or [])]
def ack(self, task):
self.r.srem(self.processing, json.dumps(task))
def reclaim_stale(self, timeout=300):
"""Re-schedule tasks stuck in processing."""
members = self.r.smembers(self.processing)
reclaimed = 0
for raw in members:
task = json.loads(raw)
age = time.time() - task.get('execute_at', 0)
if age > timeout:
self.r.srem(self.processing, raw)
# Re-schedule for immediate execution
self.r.zadd(
self.name,
{raw: time.time()}
)
reclaimed += 1
return reclaimed
Running the Poller
import asyncio
async def delayed_queue_worker(queue, handler,
workers=5):
semaphore = asyncio.Semaphore(workers)
async def process(task):
async with semaphore:
try:
await handler(task['payload'])
queue.ack(task)
except Exception as e:
# Task failed — move back to delayed queue
# with exponential backoff
retry_delay = min(
300, 2 ** task.get('retries', 0) * 5
)
task['retries'] = task.get('retries', 0) + 1
queue.r.srem(queue.processing,
json.dumps(task))
queue.schedule(
task['payload'],
delay_seconds=retry_delay
)
while True:
due_tasks = queue.poll_due(batch_size=workers)
if due_tasks:
await asyncio.gather(
*[process(task) for task in due_tasks]
)
else:
await asyncio.sleep(queue.poll_interval)
Timer Wheel Implementation
For systems managing millions of timers (connection timeouts, session expiry), sorted sets become expensive at O(log n) per operation. Timer wheels provide O(1) insertion and O(1) expiry detection.
import time
from collections import defaultdict
class TimerWheel:
def __init__(self, tick_duration_ms=100,
wheel_size=1024):
self.tick_duration = tick_duration_ms / 1000
self.wheel_size = wheel_size
self.wheel = [[] for _ in range(wheel_size)]
self.current_tick = 0
self.start_time = time.monotonic()
def _time_to_tick(self, delay_seconds):
ticks = int(delay_seconds / self.tick_duration)
return (self.current_tick + ticks) % self.wheel_size
def schedule(self, callback, delay_seconds,
*args, **kwargs):
if delay_seconds > self.wheel_size * self.tick_duration:
raise ValueError(
f"Delay exceeds wheel capacity: "
f"{self.wheel_size * self.tick_duration}s"
)
slot = self._time_to_tick(delay_seconds)
expiry = time.monotonic() + delay_seconds
self.wheel[slot].append(
(expiry, callback, args, kwargs)
)
def advance(self):
"""Call periodically to fire due timers."""
now = time.monotonic()
elapsed_ticks = int(
(now - self.start_time) / self.tick_duration
)
while self.current_tick < elapsed_ticks:
slot = self.current_tick % self.wheel_size
remaining = []
for expiry, callback, args, kwargs in \
self.wheel[slot]:
if now >= expiry:
callback(*args, **kwargs)
else:
remaining.append(
(expiry, callback, args, kwargs)
)
self.wheel[slot] = remaining
self.current_tick += 1
Timer wheels are used in Kafka, Netty, and Linux kernel. For Python, they’re useful when managing 100K+ concurrent timers.
Celery ETA Internals
Redis Backend
With Redis, Celery stores ETA tasks in a sorted set (unacked_index). Celery Beat and worker threads poll this set every broker_transport_options['visibility_timeout'] seconds (default: 3600). The actual poll interval for ETA messages is controlled by the worker’s timer precision.
The implication: ETA tasks with Redis have coarse-grained timing. Don’t expect sub-second precision. The default polling is every 5 seconds.
RabbitMQ Backend
With RabbitMQ, Celery uses per-message TTL and dead-letter exchanges. The message is published with a TTL equal to the delay. When the TTL expires, RabbitMQ dead-letters the message to the actual consumer queue. This gives better timing precision (millisecond-level).
ETA Gotchas
- Clock skew — ETA uses UTC timestamps. If the scheduler and worker have different clocks, tasks fire at the wrong time. Use NTP.
- Queue depth — ETA tasks in Redis count against the sorted set, not the queue. A million scheduled tasks can cause Redis memory issues.
- Worker restart — ETA tasks are re-evaluated on worker restart. If a worker was down during the ETA window, the task fires immediately on restart (which is usually correct).
Database-Backed Scheduling
For the highest durability, store schedules in your database:
from sqlalchemy import (
Column, String, DateTime, JSON, Boolean,
create_engine, func
)
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class ScheduledTask(Base):
__tablename__ = 'scheduled_tasks'
id = Column(String, primary_key=True)
task_name = Column(String, nullable=False)
payload = Column(JSON)
execute_at = Column(DateTime, nullable=False, index=True)
claimed_by = Column(String, nullable=True)
claimed_at = Column(DateTime, nullable=True)
completed = Column(Boolean, default=False)
created_at = Column(DateTime, server_default=func.now())
class DatabaseScheduler:
def __init__(self, session_factory, worker_id):
self.session_factory = session_factory
self.worker_id = worker_id
def schedule(self, task_name, payload, execute_at):
with self.session_factory() as session:
task = ScheduledTask(
id=str(uuid.uuid4()),
task_name=task_name,
payload=payload,
execute_at=execute_at,
)
session.add(task)
session.commit()
return task.id
def claim_due(self, batch_size=10):
"""Atomically claim due tasks for this worker."""
with self.session_factory() as session:
# SELECT ... FOR UPDATE SKIP LOCKED
# ensures no two workers claim the same task
tasks = (
session.query(ScheduledTask)
.filter(
ScheduledTask.execute_at <= func.now(),
ScheduledTask.claimed_by.is_(None),
ScheduledTask.completed == False,
)
.with_for_update(skip_locked=True)
.limit(batch_size)
.all()
)
for task in tasks:
task.claimed_by = self.worker_id
task.claimed_at = func.now()
session.commit()
return tasks
def complete(self, task_id):
with self.session_factory() as session:
task = session.get(ScheduledTask, task_id)
task.completed = True
session.commit()
The FOR UPDATE SKIP LOCKED clause is critical for multiple workers polling the same table — it prevents lock contention and duplicate claiming.
Hybrid Architecture
Production systems often combine approaches:
Short delays (< 5 min) → asyncio.sleep or Redis sorted set
Medium delays (5 min - 1h) → Redis sorted set with persistence
Long delays (> 1 hour) → Database-backed scheduler
Recurring schedules → Celery Beat or APScheduler
The reason: short delays need low latency (Redis), long delays need durability (database), and recurring schedules need cron-like semantics.
Exactly-Once Delivery for Delayed Tasks
The hardest problem: ensuring a delayed task runs exactly once, even across failures.
async def exactly_once_delayed(scheduler, handler, task):
# 1. Claim with atomic lock
claimed = scheduler.claim_due(batch_size=1)
if not claimed:
return
task = claimed[0]
try:
# 2. Process within a transaction fence
result = await handler(task.payload)
# 3. Mark complete in same logical operation
scheduler.complete(task.id)
except Exception:
# 4. Release claim so another worker can retry
scheduler.release(task.id)
raise
True exactly-once requires the handler’s side effects and the completion marker to be in the same transaction. When the handler talks to external systems, use the idempotency key pattern (see python-message-deduplication) to make retries safe.
One thing to remember: For delayed execution, durability and timing precision pull in opposite directions. In-memory solutions are precise but fragile; database solutions are durable but imprecise. Choose based on which failure mode your application can tolerate — a task running slightly late, or a task not running at all.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.