Python Water Quality Monitoring — Deep Dive

System architecture

A production water quality monitoring system integrates hardware communication, time-series storage, analytics, and alerting:

Sensors (Modbus/SDI-12) → Data Logger → MQTT Broker
    → Python Ingestion Service → TimescaleDB/InfluxDB
        → Analytics Engine (anomaly detection, WQI)
            → Dashboard (Grafana/Plotly Dash) + Alerts (SMS/Email/Webhook)

Sensor data ingestion via MQTT

Most water quality sensors communicate through data loggers that publish to an MQTT broker:

import json
import paho.mqtt.client as mqtt
from datetime import datetime, timezone
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

class WaterQualityIngester:
    def __init__(
        self,
        mqtt_broker: str,
        mqtt_topic: str,
        influx_url: str,
        influx_token: str,
        influx_org: str,
        influx_bucket: str,
    ):
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_message = self._on_message
        self.mqtt_client.connect(mqtt_broker, 1883)
        self.mqtt_client.subscribe(mqtt_topic)
        
        self.influx = InfluxDBClient(
            url=influx_url, token=influx_token, org=influx_org
        )
        self.write_api = self.influx.write_api(write_options=SYNCHRONOUS)
        self.bucket = influx_bucket
        self.org = influx_org
    
    def _on_message(self, client, userdata, msg):
        payload = json.loads(msg.payload.decode())
        
        # Validate and transform sensor data
        station_id = payload.get("station_id", "unknown")
        timestamp = datetime.fromisoformat(
            payload.get("timestamp", datetime.now(timezone.utc).isoformat())
        )
        
        parameters = {
            "temperature_c": payload.get("temp"),
            "ph": payload.get("ph"),
            "dissolved_oxygen_mgl": payload.get("do"),
            "turbidity_ntu": payload.get("turbidity"),
            "conductivity_us": payload.get("conductivity"),
            "chlorophyll_ugl": payload.get("chlorophyll"),
        }
        
        # Write to InfluxDB
        for param, value in parameters.items():
            if value is not None:
                point = (
                    Point("water_quality")
                    .tag("station", station_id)
                    .field(param, float(value))
                    .time(timestamp, WritePrecision.S)
                )
                self.write_api.write(self.bucket, self.org, point)
        
        # Run real-time checks
        self._check_thresholds(station_id, parameters, timestamp)
    
    def _check_thresholds(self, station_id, params, timestamp):
        alerts = []
        
        if params.get("ph") and (params["ph"] < 6.0 or params["ph"] > 9.5):
            alerts.append(f"pH critical: {params['ph']:.1f}")
        
        if params.get("dissolved_oxygen_mgl") and params["dissolved_oxygen_mgl"] < 3.0:
            alerts.append(f"DO critical: {params['dissolved_oxygen_mgl']:.1f} mg/L")
        
        if params.get("turbidity_ntu") and params["turbidity_ntu"] > 100:
            alerts.append(f"Turbidity high: {params['turbidity_ntu']:.0f} NTU")
        
        if alerts:
            self._send_alert(station_id, alerts, timestamp)
    
    def _send_alert(self, station_id, alerts, timestamp):
        message = (
            f"⚠️ Water Quality Alert — Station {station_id}\n"
            f"Time: {timestamp.isoformat()}\n"
            + "\n".join(f"• {a}" for a in alerts)
        )
        # Send via configured channels (email, SMS, webhook)
        print(message)  # Replace with actual notification
    
    def run(self):
        self.mqtt_client.loop_forever()

Data quality control

Raw sensor data requires rigorous QC before analysis. Sensor drift, biofouling, and communication errors produce invalid readings:

import pandas as pd
import numpy as np

def quality_control_pipeline(
    df: pd.DataFrame,
    parameter: str,
    physical_limits: tuple[float, float],
    spike_threshold: float = 3.0,
    gap_max_hours: float = 4.0
) -> pd.DataFrame:
    """Apply multi-stage QC to water quality time series."""
    df = df.copy()
    df["qc_flag"] = "valid"
    
    # Stage 1: Physical range check
    out_of_range = (
        (df[parameter] < physical_limits[0]) |
        (df[parameter] > physical_limits[1])
    )
    df.loc[out_of_range, "qc_flag"] = "range_fail"
    
    # Stage 2: Spike detection (rate of change)
    diff = df[parameter].diff().abs()
    rolling_std = df[parameter].rolling("6h", min_periods=10).std()
    spikes = diff > (spike_threshold * rolling_std)
    df.loc[spikes & (df["qc_flag"] == "valid"), "qc_flag"] = "spike"
    
    # Stage 3: Flat-line detection (sensor stuck)
    rolling_range = df[parameter].rolling("2h", min_periods=20).apply(
        lambda x: x.max() - x.min()
    )
    flat = rolling_range < 0.001 * abs(df[parameter].median())
    df.loc[flat & (df["qc_flag"] == "valid"), "qc_flag"] = "flatline"
    
    # Stage 4: Gap filling (linear interpolation for short gaps only)
    valid_mask = df["qc_flag"] == "valid"
    df[f"{parameter}_qc"] = df[parameter].where(valid_mask)
    df[f"{parameter}_qc"] = df[f"{parameter}_qc"].interpolate(
        method="time", limit=int(gap_max_hours * 4)  # assuming 15-min data
    )
    
    return df

ML-based anomaly detection

Threshold-based alerts miss subtle multi-parameter anomalies. Isolation Forest detects unusual combinations:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np

class WaterAnomalyDetector:
    def __init__(self, contamination: float = 0.02):
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=200,
            random_state=42
        )
        self.feature_cols = [
            "temperature_c", "ph", "dissolved_oxygen_mgl",
            "turbidity_ntu", "conductivity_us"
        ]
    
    def fit(self, historical_data: pd.DataFrame):
        """Train on historical normal-condition data."""
        features = historical_data[self.feature_cols].dropna()
        
        # Add derived features
        features = self._add_derived(features, historical_data)
        
        scaled = self.scaler.fit_transform(features)
        self.model.fit(scaled)
    
    def predict(self, new_data: pd.DataFrame) -> pd.Series:
        """Score new readings. Returns -1 for anomalies, 1 for normal."""
        features = new_data[self.feature_cols].copy()
        features = self._add_derived(features, new_data)
        
        # Handle missing values
        features = features.fillna(features.median())
        scaled = self.scaler.transform(features)
        
        scores = self.model.decision_function(scaled)
        labels = self.model.predict(scaled)
        
        return pd.DataFrame({
            "anomaly_score": scores,
            "is_anomaly": labels == -1
        }, index=new_data.index)
    
    def _add_derived(self, features, source_df):
        """Add rate-of-change and cross-parameter features."""
        for col in self.feature_cols:
            if col in source_df.columns:
                features[f"{col}_rate"] = source_df[col].diff()
        
        # DO saturation depends on temperature
        if "temperature_c" in source_df and "dissolved_oxygen_mgl" in source_df:
            do_sat = 14.62 - 0.3898 * source_df["temperature_c"]
            features["do_saturation_pct"] = (
                source_df["dissolved_oxygen_mgl"] / do_sat * 100
            )
        
        return features.fillna(0)

Water Quality Index computation

The CCME WQI is widely used because it handles variable numbers of parameters and missing data gracefully:

import numpy as np

def compute_ccme_wqi(
    measurements: dict[str, list[float]],
    objectives: dict[str, tuple[float, float]]  # parameter: (min, max)
) -> float:
    """Compute Canadian Council WQI from measurements and objectives.
    
    Args:
        measurements: {parameter_name: [measured_values]}
        objectives: {parameter_name: (min_acceptable, max_acceptable)}
    """
    total_tests = 0
    failed_tests = 0
    failed_params = set()
    excursions = []
    
    for param, values in measurements.items():
        if param not in objectives:
            continue
        obj_min, obj_max = objectives[param]
        
        for val in values:
            total_tests += 1
            if val < obj_min or val > obj_max:
                failed_tests += 1
                failed_params.add(param)
                
                if val < obj_min:
                    excursion = (obj_min - val) / obj_min
                else:
                    excursion = (val - obj_max) / obj_max
                excursions.append(excursion)
    
    if total_tests == 0:
        return 0.0
    
    total_params = len(measurements)
    
    # F1: Scope — percentage of parameters that fail
    f1 = len(failed_params) / total_params * 100
    
    # F2: Frequency — percentage of individual tests that fail
    f2 = failed_tests / total_tests * 100
    
    # F3: Amplitude — amount by which failing tests exceed objectives
    nse = sum(excursions) / total_tests if excursions else 0
    f3 = nse / (0.01 * nse + 0.01) if nse > 0 else 0
    
    # Combine into WQI
    wqi = 100 - (np.sqrt(f1**2 + f2**2 + f3**2) / 1.732)
    return max(0, min(100, wqi))

Trend analysis and seasonal decomposition

Long-term monitoring data reveals seasonal patterns and multi-year trends:

from statsmodels.tsa.seasonal import STL
import pandas as pd

def decompose_water_quality(
    series: pd.Series,
    period_days: int = 365
) -> dict:
    """Decompose water quality time series into trend, seasonal, residual."""
    # Resample to daily for stable decomposition
    daily = series.resample("1D").mean().interpolate(limit=7)
    
    stl = STL(daily, period=period_days, robust=True)
    result = stl.fit()
    
    # Compute trend direction over last year
    recent_trend = result.trend.iloc[-365:]
    trend_slope = np.polyfit(range(len(recent_trend)), recent_trend.values, 1)[0]
    
    return {
        "trend": result.trend,
        "seasonal": result.seasonal,
        "residual": result.resid,
        "trend_slope_per_year": trend_slope * 365,
        "seasonal_amplitude": result.seasonal.max() - result.seasonal.min(),
    }

Tradeoffs

Sensor cost vs. parameter coverage: A basic multiparameter sonde (pH, DO, turbidity, conductivity, temperature) costs $3,000-8,000. Adding nutrient sensors (nitrate, phosphate) doubles the price. Satellite-based monitoring is free but limited to surface indicators (chlorophyll, turbidity) at lower temporal resolution.

Sampling frequency vs. battery life: Remote stations on solar/battery power must balance measurement frequency against energy consumption. 15-minute intervals are standard; 1-minute intervals drain batteries in days but catch rapid contamination events.

Real-time vs. accuracy: Field sensors are less accurate than laboratory instruments. Turbidity sensors measure ±5% in the field vs. ±1% in the lab. The value of continuous monitoring comes from temporal resolution, not individual measurement precision.

Model retraining frequency: Water quality patterns shift with seasons, land-use changes, and climate trends. Anomaly detection models should be retrained quarterly or whenever a sustained regime change is detected.

One thing to remember: Building a water quality monitoring system in Python means engineering the full stack — MQTT ingestion, rigorous QC, multi-parameter anomaly detection, and trend analysis — where data quality control is the unglamorous but essential foundation that determines whether alerts are trustworthy.

pythonenvironmental-scienceiotdata-sciencetime-series

See Also