Python Watchdog File Monitoring — Deep Dive

Understanding OS-level file notification

inotify on Linux

Linux’s inotify system provides per-file and per-directory watches. Each watch consumes a file descriptor, and the system has a default limit (typically 8192 watches). For large directory trees, you may need to increase this:

# Check current limit
cat /proc/sys/fs/inotify/max_user_watches

# Increase it
echo 65536 | sudo tee /proc/sys/fs/inotify/max_user_watches

Watchdog creates one inotify watch per directory when recursive=True. A project with 5,000 subdirectories needs 5,000 watches. If you hit the limit, Watchdog silently falls back to polling — a common source of confusion when monitoring stops being instant.

FSEvents on macOS

macOS FSEvents operates at the volume level. It does not use per-directory watches, so there is no file descriptor limit concern. However, FSEvents can batch events, meaning multiple rapid changes may arrive as a single notification with a slight delay. Watchdog handles this transparently, but expect events to arrive in bursts rather than one-at-a-time on macOS.

ReadDirectoryChangesW on Windows

Windows provides change notifications per directory handle. Recursive watching is supported natively, making it efficient. The main caveat: some file operations that appear atomic (like saving in certain editors) actually create a temporary file, delete the original, and rename the temp file — generating delete, create, and move events instead of a single modify event.

The duplicate event problem

The most common production issue with Watchdog is duplicate events. A single file save can trigger multiple on_modified events because:

  1. The editor writes new content (modify event)
  2. The editor updates the file’s metadata/timestamp (another modify event)
  3. Some editors write to a temp file then rename (create + delete + move events)

Debouncing solution

import time
import threading
from watchdog.events import FileSystemEventHandler

class DebouncedHandler(FileSystemEventHandler):
    def __init__(self, callback, delay=0.5):
        self.callback = callback
        self.delay = delay
        self._timers = {}
        self._lock = threading.Lock()

    def on_modified(self, event):
        if event.is_directory:
            return
        self._debounce(event.src_path, event)

    def on_created(self, event):
        if event.is_directory:
            return
        self._debounce(event.src_path, event)

    def _debounce(self, key, event):
        with self._lock:
            if key in self._timers:
                self._timers[key].cancel()
            timer = threading.Timer(self.delay, self._fire, args=[key, event])
            self._timers[key] = timer
            timer.start()

    def _fire(self, key, event):
        with self._lock:
            self._timers.pop(key, None)
        self.callback(event)

This waits 500ms after the last event for a given file before calling the callback. If multiple events arrive within that window, only the last one triggers processing.

Event coalescing

For high-throughput scenarios (watching a directory where hundreds of files arrive per second), debouncing per-file is expensive. Instead, coalesce events into batches:

import queue
import threading
from watchdog.events import FileSystemEventHandler

class BatchHandler(FileSystemEventHandler):
    def __init__(self, process_batch, interval=2.0):
        self.queue = queue.Queue()
        self.process_batch = process_batch
        self.interval = interval
        self._start_consumer()

    def on_created(self, event):
        if not event.is_directory:
            self.queue.put(event)

    def _start_consumer(self):
        def consume():
            while True:
                batch = []
                try:
                    # Block until at least one event
                    batch.append(self.queue.get(timeout=self.interval))
                    # Drain remaining events
                    while not self.queue.empty():
                        batch.append(self.queue.get_nowait())
                except queue.Empty:
                    continue
                if batch:
                    # Deduplicate by path
                    seen = set()
                    unique = []
                    for e in batch:
                        if e.src_path not in seen:
                            seen.add(e.src_path)
                            unique.append(e)
                    self.process_batch(unique)

        thread = threading.Thread(target=consume, daemon=True)
        thread.start()

Multi-directory watching

You can schedule multiple directories on the same observer:

observer = Observer()
observer.schedule(upload_handler, "/data/uploads", recursive=False)
observer.schedule(config_handler, "/etc/myapp", recursive=False)
observer.schedule(log_handler, "/var/log/myapp", recursive=True)
observer.start()

Each schedule call returns a watch object you can use to unschedule later:

watch = observer.schedule(handler, "/data/uploads")
# Later...
observer.unschedule(watch)

This is useful for dynamic configuration: your app reads a config file that lists directories to watch, and you can add or remove watches at runtime.

Building a production daemon

A robust file-watching daemon needs more than just an Observer loop:

import signal
import logging
import sys
from watchdog.observers import Observer

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger(__name__)

class GracefulWatcher:
    def __init__(self, handler, path, recursive=True):
        self.observer = Observer()
        self.observer.schedule(handler, path, recursive=recursive)
        self._setup_signals()

    def _setup_signals(self):
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def _shutdown(self, signum, frame):
        log.info(f"Received signal {signum}, shutting down...")
        self.observer.stop()

    def run(self):
        log.info("Starting file watcher...")
        self.observer.start()
        try:
            self.observer.join()
        except Exception as e:
            log.error(f"Observer crashed: {e}")
            self.observer.stop()
            sys.exit(1)
        log.info("Watcher stopped cleanly")

Key production concerns:

  • Signal handling — clean shutdown on SIGTERM (what systemd sends)
  • Logging — structured logs for debugging events in production
  • Error recovery — if the observer thread dies, the main thread should detect it and either restart or exit with a non-zero code
  • Systemd integration — run as a systemd service with Type=simple and Restart=on-failure

Watchdog follows symlinks by default on some platforms but not others. For consistent behavior:

import os

real_path = os.path.realpath("/data/current")  # Resolve symlinks
observer.schedule(handler, real_path, recursive=True)

If the symlink target changes (common in deployment patterns), you need to detect the symlink change and reschedule the watch on the new target.

Performance characteristics

ScenarioLinux (inotify)macOS (FSEvents)Polling fallback
100 files, 1 change/sec~0% CPU~0% CPU~1% CPU
10,000 files, 100 changes/sec~1% CPU~1% CPU~15% CPU
100,000 files, rare changes~0% CPU (but 100K watches)~0% CPU~30% CPU

The polling fallback checks every file’s modification time at a configurable interval (default 1 second). For large trees, this becomes expensive. Always prefer native backends in production.

Testing Watchdog code

Testing file watchers is tricky because events are asynchronous. A reliable pattern:

import tempfile
import time
from pathlib import Path
from watchdog.observers import Observer

def test_handler_fires_on_create():
    events = []
    handler = MyHandler(callback=lambda e: events.append(e))

    with tempfile.TemporaryDirectory() as tmpdir:
        observer = Observer()
        observer.schedule(handler, tmpdir)
        observer.start()

        # Give the observer time to start
        time.sleep(0.2)

        # Trigger an event
        Path(tmpdir, "test.txt").write_text("hello")

        # Wait for event propagation
        time.sleep(1.0)

        observer.stop()
        observer.join()

    assert len(events) >= 1
    assert "test.txt" in events[0].src_path

The time.sleep() calls are necessary because filesystem events are asynchronous. In CI environments, increase the sleep times — CI runners are often slower than local machines.

Alternatives and when to use them

  • asyncio + aionotify — if your application is already async and you want non-blocking file watching
  • inotifywait (CLI) — for quick shell-script-based watches without Python
  • systemd.path — for triggering systemd services on file changes, no Python needed
  • fswatch — cross-platform CLI tool, useful when you want file watching without writing code

Watchdog remains the best choice when you need file watching embedded in a Python application with cross-platform support.

The one thing to remember: Production Watchdog requires debouncing, proper signal handling, and awareness of OS-specific event quirks — the basic example from the docs is a starting point, not a production-ready solution.

pythonautomationfilesystemmonitoringdevops

See Also

  • Python Fabric Remote Execution Run commands on faraway computers from your desk using Python Fabric — like a universal remote for servers.
  • Python Invoke Task Runner Automate boring computer chores with Python Invoke — like teaching your computer a recipe book of tasks.
  • Python Netmiko Network Automation Talk to routers and switches with Python Netmiko — like a translator that speaks every network device's language.
  • Python Schedule Task Scheduling Make Python run tasks on a timer — like setting an alarm clock for your code.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.