Python Water Quality Monitoring — Deep Dive
System architecture
A production water quality monitoring system integrates hardware communication, time-series storage, analytics, and alerting:
Sensors (Modbus/SDI-12) → Data Logger → MQTT Broker
→ Python Ingestion Service → TimescaleDB/InfluxDB
→ Analytics Engine (anomaly detection, WQI)
→ Dashboard (Grafana/Plotly Dash) + Alerts (SMS/Email/Webhook)
Sensor data ingestion via MQTT
Most water quality sensors communicate through data loggers that publish to an MQTT broker:
import json
import paho.mqtt.client as mqtt
from datetime import datetime, timezone
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
class WaterQualityIngester:
def __init__(
self,
mqtt_broker: str,
mqtt_topic: str,
influx_url: str,
influx_token: str,
influx_org: str,
influx_bucket: str,
):
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_message = self._on_message
self.mqtt_client.connect(mqtt_broker, 1883)
self.mqtt_client.subscribe(mqtt_topic)
self.influx = InfluxDBClient(
url=influx_url, token=influx_token, org=influx_org
)
self.write_api = self.influx.write_api(write_options=SYNCHRONOUS)
self.bucket = influx_bucket
self.org = influx_org
def _on_message(self, client, userdata, msg):
payload = json.loads(msg.payload.decode())
# Validate and transform sensor data
station_id = payload.get("station_id", "unknown")
timestamp = datetime.fromisoformat(
payload.get("timestamp", datetime.now(timezone.utc).isoformat())
)
parameters = {
"temperature_c": payload.get("temp"),
"ph": payload.get("ph"),
"dissolved_oxygen_mgl": payload.get("do"),
"turbidity_ntu": payload.get("turbidity"),
"conductivity_us": payload.get("conductivity"),
"chlorophyll_ugl": payload.get("chlorophyll"),
}
# Write to InfluxDB
for param, value in parameters.items():
if value is not None:
point = (
Point("water_quality")
.tag("station", station_id)
.field(param, float(value))
.time(timestamp, WritePrecision.S)
)
self.write_api.write(self.bucket, self.org, point)
# Run real-time checks
self._check_thresholds(station_id, parameters, timestamp)
def _check_thresholds(self, station_id, params, timestamp):
alerts = []
if params.get("ph") and (params["ph"] < 6.0 or params["ph"] > 9.5):
alerts.append(f"pH critical: {params['ph']:.1f}")
if params.get("dissolved_oxygen_mgl") and params["dissolved_oxygen_mgl"] < 3.0:
alerts.append(f"DO critical: {params['dissolved_oxygen_mgl']:.1f} mg/L")
if params.get("turbidity_ntu") and params["turbidity_ntu"] > 100:
alerts.append(f"Turbidity high: {params['turbidity_ntu']:.0f} NTU")
if alerts:
self._send_alert(station_id, alerts, timestamp)
def _send_alert(self, station_id, alerts, timestamp):
message = (
f"⚠️ Water Quality Alert — Station {station_id}\n"
f"Time: {timestamp.isoformat()}\n"
+ "\n".join(f"• {a}" for a in alerts)
)
# Send via configured channels (email, SMS, webhook)
print(message) # Replace with actual notification
def run(self):
self.mqtt_client.loop_forever()
Data quality control
Raw sensor data requires rigorous QC before analysis. Sensor drift, biofouling, and communication errors produce invalid readings:
import pandas as pd
import numpy as np
def quality_control_pipeline(
df: pd.DataFrame,
parameter: str,
physical_limits: tuple[float, float],
spike_threshold: float = 3.0,
gap_max_hours: float = 4.0
) -> pd.DataFrame:
"""Apply multi-stage QC to water quality time series."""
df = df.copy()
df["qc_flag"] = "valid"
# Stage 1: Physical range check
out_of_range = (
(df[parameter] < physical_limits[0]) |
(df[parameter] > physical_limits[1])
)
df.loc[out_of_range, "qc_flag"] = "range_fail"
# Stage 2: Spike detection (rate of change)
diff = df[parameter].diff().abs()
rolling_std = df[parameter].rolling("6h", min_periods=10).std()
spikes = diff > (spike_threshold * rolling_std)
df.loc[spikes & (df["qc_flag"] == "valid"), "qc_flag"] = "spike"
# Stage 3: Flat-line detection (sensor stuck)
rolling_range = df[parameter].rolling("2h", min_periods=20).apply(
lambda x: x.max() - x.min()
)
flat = rolling_range < 0.001 * abs(df[parameter].median())
df.loc[flat & (df["qc_flag"] == "valid"), "qc_flag"] = "flatline"
# Stage 4: Gap filling (linear interpolation for short gaps only)
valid_mask = df["qc_flag"] == "valid"
df[f"{parameter}_qc"] = df[parameter].where(valid_mask)
df[f"{parameter}_qc"] = df[f"{parameter}_qc"].interpolate(
method="time", limit=int(gap_max_hours * 4) # assuming 15-min data
)
return df
ML-based anomaly detection
Threshold-based alerts miss subtle multi-parameter anomalies. Isolation Forest detects unusual combinations:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
class WaterAnomalyDetector:
def __init__(self, contamination: float = 0.02):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=200,
random_state=42
)
self.feature_cols = [
"temperature_c", "ph", "dissolved_oxygen_mgl",
"turbidity_ntu", "conductivity_us"
]
def fit(self, historical_data: pd.DataFrame):
"""Train on historical normal-condition data."""
features = historical_data[self.feature_cols].dropna()
# Add derived features
features = self._add_derived(features, historical_data)
scaled = self.scaler.fit_transform(features)
self.model.fit(scaled)
def predict(self, new_data: pd.DataFrame) -> pd.Series:
"""Score new readings. Returns -1 for anomalies, 1 for normal."""
features = new_data[self.feature_cols].copy()
features = self._add_derived(features, new_data)
# Handle missing values
features = features.fillna(features.median())
scaled = self.scaler.transform(features)
scores = self.model.decision_function(scaled)
labels = self.model.predict(scaled)
return pd.DataFrame({
"anomaly_score": scores,
"is_anomaly": labels == -1
}, index=new_data.index)
def _add_derived(self, features, source_df):
"""Add rate-of-change and cross-parameter features."""
for col in self.feature_cols:
if col in source_df.columns:
features[f"{col}_rate"] = source_df[col].diff()
# DO saturation depends on temperature
if "temperature_c" in source_df and "dissolved_oxygen_mgl" in source_df:
do_sat = 14.62 - 0.3898 * source_df["temperature_c"]
features["do_saturation_pct"] = (
source_df["dissolved_oxygen_mgl"] / do_sat * 100
)
return features.fillna(0)
Water Quality Index computation
The CCME WQI is widely used because it handles variable numbers of parameters and missing data gracefully:
import numpy as np
def compute_ccme_wqi(
measurements: dict[str, list[float]],
objectives: dict[str, tuple[float, float]] # parameter: (min, max)
) -> float:
"""Compute Canadian Council WQI from measurements and objectives.
Args:
measurements: {parameter_name: [measured_values]}
objectives: {parameter_name: (min_acceptable, max_acceptable)}
"""
total_tests = 0
failed_tests = 0
failed_params = set()
excursions = []
for param, values in measurements.items():
if param not in objectives:
continue
obj_min, obj_max = objectives[param]
for val in values:
total_tests += 1
if val < obj_min or val > obj_max:
failed_tests += 1
failed_params.add(param)
if val < obj_min:
excursion = (obj_min - val) / obj_min
else:
excursion = (val - obj_max) / obj_max
excursions.append(excursion)
if total_tests == 0:
return 0.0
total_params = len(measurements)
# F1: Scope — percentage of parameters that fail
f1 = len(failed_params) / total_params * 100
# F2: Frequency — percentage of individual tests that fail
f2 = failed_tests / total_tests * 100
# F3: Amplitude — amount by which failing tests exceed objectives
nse = sum(excursions) / total_tests if excursions else 0
f3 = nse / (0.01 * nse + 0.01) if nse > 0 else 0
# Combine into WQI
wqi = 100 - (np.sqrt(f1**2 + f2**2 + f3**2) / 1.732)
return max(0, min(100, wqi))
Trend analysis and seasonal decomposition
Long-term monitoring data reveals seasonal patterns and multi-year trends:
from statsmodels.tsa.seasonal import STL
import pandas as pd
def decompose_water_quality(
series: pd.Series,
period_days: int = 365
) -> dict:
"""Decompose water quality time series into trend, seasonal, residual."""
# Resample to daily for stable decomposition
daily = series.resample("1D").mean().interpolate(limit=7)
stl = STL(daily, period=period_days, robust=True)
result = stl.fit()
# Compute trend direction over last year
recent_trend = result.trend.iloc[-365:]
trend_slope = np.polyfit(range(len(recent_trend)), recent_trend.values, 1)[0]
return {
"trend": result.trend,
"seasonal": result.seasonal,
"residual": result.resid,
"trend_slope_per_year": trend_slope * 365,
"seasonal_amplitude": result.seasonal.max() - result.seasonal.min(),
}
Tradeoffs
Sensor cost vs. parameter coverage: A basic multiparameter sonde (pH, DO, turbidity, conductivity, temperature) costs $3,000-8,000. Adding nutrient sensors (nitrate, phosphate) doubles the price. Satellite-based monitoring is free but limited to surface indicators (chlorophyll, turbidity) at lower temporal resolution.
Sampling frequency vs. battery life: Remote stations on solar/battery power must balance measurement frequency against energy consumption. 15-minute intervals are standard; 1-minute intervals drain batteries in days but catch rapid contamination events.
Real-time vs. accuracy: Field sensors are less accurate than laboratory instruments. Turbidity sensors measure ±5% in the field vs. ±1% in the lab. The value of continuous monitoring comes from temporal resolution, not individual measurement precision.
Model retraining frequency: Water quality patterns shift with seasons, land-use changes, and climate trends. Anomaly detection models should be retrained quarterly or whenever a sustained regime change is detected.
One thing to remember: Building a water quality monitoring system in Python means engineering the full stack — MQTT ingestion, rigorous QC, multi-parameter anomaly detection, and trend analysis — where data quality control is the unglamorous but essential foundation that determines whether alerts are trustworthy.
See Also
- Python Biodiversity Tracking How Python helps scientists count and protect every kind of animal and plant on Earth — from whales to wildflowers.
- Python Crop Disease Detection How Python looks at photos of plants and figures out if they're sick — like a doctor for crops.
- Python Deforestation Detection How Python spots disappearing forests from space — catching illegal logging and land clearing as it happens.
- Python Drone Image Processing How Python turns hundreds of overlapping drone photos into detailed maps and 3D models of the ground below.
- Python Ocean Data Analysis How Python explores the world's oceans through data — tracking currents, temperatures, and marine life without getting wet.