Python sched Scheduler — Deep Dive
Implementation internals
The sched.scheduler class is surprisingly small — about 130 lines in CPython. Internally it maintains a list of events sorted by (time, priority, sequence) and uses a condition variable for thread coordination.
The Event namedtuple
Event = namedtuple('Event', 'time, priority, sequence, action, argument, kwargs')
The sequence field is an auto-incrementing counter that ensures stable ordering when time and priority are identical. This prevents non-deterministic behavior from Python’s sort stability alone.
The run() loop
Simplified pseudocode for scheduler.run():
def run(self, blocking=True):
lock = self._lock
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
while True:
with lock:
if not q:
break
time, priority, sequence, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
delay = True
else:
event = heapq.heappop(q)
delay = False
if delay:
if not blocking:
return time - now
delayfunc(time - now)
else:
action(*argument, **kwargs)
Key observations:
- The lock is released before calling
delayfuncor the action. This allows other threads to schedule new events while the scheduler is sleeping or executing a callback. - After sleeping, the loop re-checks the queue from the top. A newly inserted event with an earlier time will be found and processed first.
- In non-blocking mode (
blocking=False),run()returns the time until the next event instead of sleeping. This integrates with external event loops.
Thread safety details
Since Python 3.3, sched.scheduler is thread-safe for enter, enterabs, and cancel operations. The internal _lock is a threading.RLock. However, there’s a subtle window: between releasing the lock and calling delayfunc, another thread could modify the queue. The scheduler handles this by rechecking the queue after waking up.
import sched, time, threading
s = sched.scheduler(time.time, time.sleep)
def worker():
# This is safe — enter() acquires the lock
s.enter(1, 1, print, argument=("from worker",))
# Schedule from main thread
s.enter(5, 1, print, argument=("from main",))
# Schedule from another thread while scheduler is running
t = threading.Thread(target=worker)
t.start()
s.run() # will process both events
Non-blocking mode for integration
The blocking=False parameter (added in Python 3.3) makes run() return immediately with the delay until the next event:
import sched, time
s = sched.scheduler(time.time, time.sleep)
s.enter(10, 1, print, argument=("hello",))
remaining = s.run(blocking=False)
print(f"Next event in {remaining:.1f} seconds")
# You can now do other work and call run() again later
This is essential for integrating sched with GUI event loops, asyncio, or game loops where you can’t block the main thread.
Async integration pattern
import asyncio
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
async def async_scheduler_loop():
while True:
remaining = scheduler.run(blocking=False)
if remaining is None:
await asyncio.sleep(1) # check for new events periodically
else:
await asyncio.sleep(min(remaining, 1))
# Schedule events from sync code, process them from async loop
scheduler.enter(5, 1, print, argument=("async-compatible!",))
asyncio.run(async_scheduler_loop())
Simulation and testing patterns
The pluggable time/delay functions make sched excellent for discrete-event simulation:
import sched
from collections import deque
class SimClock:
def __init__(self):
self.now = 0.0
self.log = []
def time(self):
return self.now
def sleep(self, duration):
self.now += duration
clock = SimClock()
sim = sched.scheduler(clock.time, clock.sleep)
def arrive(customer_id):
clock.log.append((clock.now, f"Customer {customer_id} arrives"))
# Schedule service completion
service_time = 2.0 + customer_id * 0.5
sim.enter(service_time, 1, depart, argument=(customer_id,))
def depart(customer_id):
clock.log.append((clock.now, f"Customer {customer_id} departs"))
# Schedule arrivals
for i in range(5):
sim.enterabs(i * 3.0, 1, arrive, argument=(i,))
sim.run()
for t, msg in clock.log:
print(f"t={t:5.1f}: {msg}")
This runs instantly regardless of simulated time spans. The same pattern works for network simulations, queueing theory models, and game AI testing.
Deterministic testing
Replace time.time and time.sleep with controlled functions to make scheduled tests deterministic:
import sched
class FakeClock:
def __init__(self):
self.t = 1000.0
def time(self):
return self.t
def sleep(self, s):
self.t += s
def test_retry_with_backoff():
clock = FakeClock()
s = sched.scheduler(clock.time, clock.sleep)
attempts = []
def attempt():
attempts.append(clock.time())
if len(attempts) < 3:
backoff = 2 ** len(attempts)
s.enter(backoff, 1, attempt)
s.enter(0, 1, attempt)
s.run()
assert len(attempts) == 3
assert attempts[1] - attempts[0] == 2.0 # 2^1
assert attempts[2] - attempts[1] == 4.0 # 2^2
Building a production-grade scheduler on sched
While raw sched isn’t production-ready, it can serve as a foundation. Here’s a pattern that adds persistence and graceful shutdown:
import sched
import time
import threading
import json
import os
class PersistentScheduler:
def __init__(self, state_file="scheduler_state.json"):
self.scheduler = sched.scheduler(time.time, time.sleep)
self.state_file = state_file
self._running = False
self._thread = None
self._events = {} # id -> event details
self._counter = 0
def schedule(self, delay, callback_name, args=()):
self._counter += 1
eid = f"evt_{self._counter}"
abs_time = time.time() + delay
event = self.scheduler.enterabs(
abs_time, 1, self._dispatch,
argument=(eid, callback_name, args)
)
self._events[eid] = {
"time": abs_time,
"callback": callback_name,
"args": list(args),
}
self._save_state()
return eid
def _dispatch(self, eid, callback_name, args):
# Look up and call the registered callback
cb = self._callbacks.get(callback_name)
if cb:
cb(*args)
self._events.pop(eid, None)
self._save_state()
def _save_state(self):
with open(self.state_file, "w") as f:
json.dump(self._events, f)
def restore(self, callbacks):
self._callbacks = callbacks
if os.path.exists(self.state_file):
with open(self.state_file) as f:
saved = json.load(f)
now = time.time()
for eid, info in saved.items():
delay = max(0, info["time"] - now)
self.scheduler.enter(
delay, 1, self._dispatch,
argument=(eid, info["callback"], tuple(info["args"]))
)
self._events[eid] = info
def start(self):
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def _run_loop(self):
while self._running:
if not self.scheduler.empty():
self.scheduler.run(blocking=True)
else:
time.sleep(0.5)
def stop(self):
self._running = False
self._save_state()
This adds crash recovery (events persist to JSON) and background execution. It’s still single-process, but suitable for small services and CLI tools.
Comparison with other scheduling tools
| Feature | sched | APScheduler | Celery Beat | asyncio |
|---|---|---|---|---|
| Built-in | ✅ | ❌ | ❌ | ✅ |
| Persistence | ❌ | ✅ (multiple backends) | ✅ (DB) | ❌ |
| Distributed | ❌ | Limited | ✅ | ❌ |
| Custom time functions | ✅ | ❌ | ❌ | ✅ (loop.time) |
| Cron expressions | ❌ | ✅ | ✅ | ❌ |
| Non-blocking mode | ✅ | ✅ | N/A | ✅ |
| Thread-safe | ✅ | ✅ | N/A | N/A (single-thread) |
sched’s unique advantage is the pluggable clock — no other standard library tool offers this.
Performance characteristics
The internal queue uses heapq, so:
enter/enterabs: O(log n)cancel: O(n) — it performs a linear search then callslist.removerunper event: O(log n) for the heappop
For most applications with fewer than 10,000 scheduled events, performance is not a concern. If cancel performance matters, consider maintaining a cancellation set and lazy-deleting events (similar to the heapq priority queue pattern).
Pitfalls
-
Callback exceptions kill the scheduler. If a callback raises,
run()propagates the exception and stops processing remaining events. Wrap callbacks in try/except or use a dispatching wrapper. -
Sleep precision.
time.sleepis not precise to the millisecond on most operating systems. Events may fire slightly late. For sub-millisecond precision, you’d need a busy-wait loop — but that defeats the purpose of a scheduler. -
No missed-event recovery. If the process crashes, unprocessed events are lost. Add persistence (as shown above) for critical applications.
-
GIL interaction. The scheduler releases its lock during
delayfunc, but the callback runs under the GIL like all Python code. CPU-heavy callbacks delay subsequent events. -
Queue drift with repeating events. If you reschedule via
s.enter(5, ...)inside a callback, the actual interval is 5 seconds + callback execution time. For fixed-rate scheduling, useenterabs(last_time + interval, ...)instead.
The one thing to remember: sched’s pluggable clock makes it uniquely powerful for simulation and testing — pair it with persistence and error handling, and it becomes a surprisingly capable scheduler without any external dependencies.
See Also
- Python Atexit How Python's atexit module lets your program clean up after itself right before it shuts down.
- Python Bisect Sorted Lists How Python's bisect module finds things in sorted lists the way you'd find a word in a dictionary — by jumping to the middle.
- Python Contextlib How Python's contextlib module makes the 'with' statement work for anything, not just files.
- Python Copy Module Why copying data in Python isn't as simple as it sounds, and how the copy module prevents sneaky bugs.
- Python Dataclass Field Metadata How Python dataclass fields can carry hidden notes — like sticky notes on a filing cabinet that tools read automatically.