Python Biodiversity Tracking — Deep Dive
Architecture of a biodiversity monitoring system
A modern monitoring platform integrates multiple data streams into a unified species occurrence database:
Camera Traps → MegaDetector → Species Classifier → Occurrence Records
ARUs → Spectrogram → BirdNET/Custom CNN → Occurrence Records
eDNA Samples → Metabarcoding → Taxonomy Assignment → Occurrence Records
→ Species Occupancy Models → Conservation Status Reports
Camera trap processing with MegaDetector
MegaDetector v5 detects animals in camera trap images with ~95% recall. It outputs bounding boxes, letting species classifiers focus on the animal region:
import torch
import numpy as np
from PIL import Image
from pathlib import Path
class CameraTrapPipeline:
def __init__(
self,
megadetector_path: str,
species_model_path: str,
species_labels: list[str],
confidence_threshold: float = 0.8
):
# MegaDetector for animal detection
self.detector = torch.hub.load(
'ultralytics/yolov5', 'custom',
path=megadetector_path
)
self.detector.conf = 0.2 # Low threshold to catch most animals
# Species classifier (fine-tuned EfficientNet)
self.classifier = torch.jit.load(species_model_path)
self.classifier.eval()
self.species_labels = species_labels
self.confidence_threshold = confidence_threshold
self.transform = torch.nn.Sequential(
# Normalize for ImageNet-pretrained backbone
)
def process_image(self, image_path: str) -> list[dict]:
"""Detect and classify animals in a camera trap image."""
img = Image.open(image_path)
# Step 1: Detect animals
results = self.detector(img)
detections = results.xyxy[0].cpu().numpy()
classifications = []
for det in detections:
x1, y1, x2, y2, conf, cls = det
# Class 0 = animal in MegaDetector
if int(cls) != 0:
continue
# Step 2: Crop and classify species
crop = img.crop((int(x1), int(y1), int(x2), int(y2)))
species, species_conf = self._classify_species(crop)
classifications.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"detection_confidence": float(conf),
"species": species,
"species_confidence": float(species_conf),
"needs_review": species_conf < self.confidence_threshold
})
return classifications
def _classify_species(self, crop: Image.Image) -> tuple[str, float]:
"""Classify a cropped animal image."""
tensor = self._preprocess(crop)
with torch.no_grad():
logits = self.classifier(tensor.unsqueeze(0))
probs = torch.softmax(logits, dim=1).squeeze()
top_idx = probs.argmax().item()
return self.species_labels[top_idx], probs[top_idx].item()
def _preprocess(self, img: Image.Image) -> torch.Tensor:
from torchvision import transforms
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
return preprocess(img)
def batch_process_station(
image_dir: Path,
pipeline: CameraTrapPipeline,
station_id: str
) -> list[dict]:
"""Process all images from a camera trap station."""
results = []
for img_path in sorted(image_dir.glob("*.jpg")):
classifications = pipeline.process_image(str(img_path))
if classifications:
for c in classifications:
results.append({
"station": station_id,
"filename": img_path.name,
"timestamp": extract_exif_datetime(img_path),
**c
})
return results
def extract_exif_datetime(path: Path) -> str:
from PIL.ExifTags import Base
img = Image.open(path)
exif = img.getexif()
dt = exif.get(Base.DateTimeOriginal, "unknown")
return dt
Bioacoustic species identification
Audio monitoring produces continuous recordings that need automated species detection:
import librosa
import numpy as np
import torch
class BioacousticAnalyzer:
def __init__(
self,
model_path: str,
species_labels: list[str],
sample_rate: int = 32000,
segment_duration: float = 3.0,
hop_duration: float = 1.0,
min_confidence: float = 0.5
):
self.model = torch.jit.load(model_path)
self.model.eval()
self.species_labels = species_labels
self.sr = sample_rate
self.segment_samples = int(segment_duration * sample_rate)
self.hop_samples = int(hop_duration * sample_rate)
self.min_confidence = min_confidence
def analyze_recording(self, audio_path: str) -> list[dict]:
"""Detect species in a continuous audio recording."""
audio, sr = librosa.load(audio_path, sr=self.sr, mono=True)
detections = []
for start in range(0, len(audio) - self.segment_samples, self.hop_samples):
segment = audio[start:start + self.segment_samples]
# Compute mel spectrogram
mel_spec = librosa.feature.melspectrogram(
y=segment, sr=self.sr,
n_mels=128, fmin=50, fmax=15000,
n_fft=2048, hop_length=512
)
log_mel = librosa.power_to_db(mel_spec, ref=np.max)
# Normalize to [0, 1]
log_mel = (log_mel - log_mel.min()) / (log_mel.max() - log_mel.min() + 1e-8)
# Classify
tensor = torch.FloatTensor(log_mel).unsqueeze(0).unsqueeze(0)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.sigmoid(logits).squeeze()
# Multi-label: multiple species can vocalize simultaneously
time_sec = start / self.sr
for idx, prob in enumerate(probs):
if prob > self.min_confidence:
detections.append({
"species": self.species_labels[idx],
"confidence": round(prob.item(), 3),
"time_start": round(time_sec, 1),
"time_end": round(time_sec + self.segment_samples / self.sr, 1)
})
return detections
def compute_activity_pattern(
self, detections: list[dict], target_species: str
) -> dict[int, int]:
"""Compute hourly activity pattern for a species."""
hourly_counts = {}
for det in detections:
if det["species"] == target_species:
hour = int(det["time_start"] / 3600) % 24
hourly_counts[hour] = hourly_counts.get(hour, 0) + 1
return hourly_counts
eDNA metabarcoding analysis
Environmental DNA processing bridges molecular biology and bioinformatics:
from pathlib import Path
import subprocess
def run_edna_pipeline(
fastq_dir: Path,
reference_db: Path,
output_dir: Path,
primer_f: str = "GTCGGTAAAACTCGTGCCAGC", # MiFish-U forward
primer_r: str = "CATAGTGGGGTATCTAATCCCAGTTTG", # MiFish-U reverse
min_reads: int = 10,
min_identity: float = 0.97
) -> Path:
"""Run eDNA metabarcoding pipeline using DADA2-style denoising."""
output_dir.mkdir(parents=True, exist_ok=True)
# Step 1: Primer trimming with cutadapt
trimmed_dir = output_dir / "trimmed"
trimmed_dir.mkdir(exist_ok=True)
for fq in fastq_dir.glob("*_R1_*.fastq.gz"):
r2 = str(fq).replace("_R1_", "_R2_")
sample = fq.stem.split("_R1_")[0]
subprocess.run([
"cutadapt",
"-g", primer_f,
"-G", primer_r,
"--discard-untrimmed",
"-o", str(trimmed_dir / f"{sample}_R1.fastq.gz"),
"-p", str(trimmed_dir / f"{sample}_R2.fastq.gz"),
str(fq), r2
], check=True)
# Step 2: Denoising with DADA2 produces ASVs
# (Typically run via R/DADA2 or Python wrapper)
# Step 3: Taxonomy assignment via BLAST
asv_fasta = output_dir / "asvs.fasta"
blast_output = output_dir / "blast_results.tsv"
subprocess.run([
"blastn",
"-query", str(asv_fasta),
"-db", str(reference_db),
"-out", str(blast_output),
"-outfmt", "6 qseqid sseqid pident length stitle",
"-max_target_seqs", "5",
"-perc_identity", str(min_identity * 100)
], check=True)
return blast_output
def parse_edna_results(
blast_file: Path,
read_counts: dict[str, dict[str, int]],
min_reads: int = 10
) -> dict[str, list[str]]:
"""Parse BLAST results into sample-species matrix."""
import pandas as pd
blast = pd.read_csv(
blast_file, sep="\t",
names=["asv", "ref", "identity", "length", "species_name"]
)
# Best hit per ASV
best_hits = blast.sort_values("identity", ascending=False).drop_duplicates("asv")
# Map ASVs to species, filter by read count
sample_species = {}
for sample, asv_counts in read_counts.items():
species_list = []
for asv, count in asv_counts.items():
if count < min_reads:
continue
hit = best_hits[best_hits["asv"] == asv]
if not hit.empty:
species_list.append(hit.iloc[0]["species_name"])
sample_species[sample] = list(set(species_list))
return sample_species
Occupancy modeling
Raw detections don’t account for imperfect detection — just because a camera didn’t photograph a species doesn’t mean it’s absent. Occupancy models estimate true presence probability:
import numpy as np
from scipy.optimize import minimize
def fit_single_season_occupancy(
detection_histories: np.ndarray
) -> dict:
"""Fit single-season occupancy model.
Args:
detection_histories: shape (n_sites, n_surveys)
1 = detected, 0 = not detected, NaN = not surveyed
"""
n_sites, n_surveys = detection_histories.shape
def neg_log_likelihood(params):
psi = 1 / (1 + np.exp(-params[0])) # occupancy (logit)
p = 1 / (1 + np.exp(-params[1])) # detection (logit)
nll = 0.0
for i in range(n_sites):
history = detection_histories[i]
valid = ~np.isnan(history)
detections = history[valid].astype(int)
n_valid = valid.sum()
# Probability of observed history given occupied
prob_history_occ = np.prod(
p**detections * (1 - p)**(1 - detections)
)
# Probability of observed history given not occupied
prob_history_unocc = 1.0 if detections.sum() == 0 else 0.0
likelihood = psi * prob_history_occ + (1 - psi) * prob_history_unocc
nll -= np.log(max(likelihood, 1e-10))
return nll
result = minimize(neg_log_likelihood, [0.0, 0.0], method="Nelder-Mead")
psi = 1 / (1 + np.exp(-result.x[0]))
p = 1 / (1 + np.exp(-result.x[1]))
return {
"occupancy_probability": round(psi, 4),
"detection_probability": round(p, 4),
"naive_occupancy": float(
(detection_histories.nansum(axis=1) > 0).mean()
),
"converged": result.success
}
Naive occupancy (proportion of sites with at least one detection) always underestimates true occupancy. The model corrects for this using repeated surveys at each site.
Tradeoffs
Camera traps vs. acoustic monitors: Cameras identify individual animals (enabling population size estimation via capture-recapture) but miss small, fast, or cryptic species. Acoustic monitors detect more species but rarely identify individuals. The most complete biodiversity assessments use both.
Automated vs. manual identification: AI classifiers process millions of records but make systematic errors (confusing similar species). Expert verification of all records is infeasible. The practical compromise is expert review of a stratified sample plus all low-confidence predictions.
eDNA sensitivity vs. specificity: eDNA can detect species from trace amounts but is vulnerable to contamination. A single DNA molecule from a fish market visitor’s shoe can produce a false detection. Negative controls and minimum read thresholds are essential.
Temporal resolution vs. storage: Continuous acoustic recording at 32kHz produces ~5GB per day per recorder. Networks of 50+ recorders generate terabytes per month. Triggered recording reduces storage but risks missing quiet or unexpected vocalizations.
One thing to remember: Effective biodiversity monitoring in Python requires integrating multiple sensing modalities, accounting for imperfect detection through occupancy modeling, and building workflows where AI handles volume while human expertise handles quality — the statistical rigor behind how you interpret detections matters as much as the detection algorithms themselves.
See Also
- Python Crop Disease Detection How Python looks at photos of plants and figures out if they're sick — like a doctor for crops.
- Python Deforestation Detection How Python spots disappearing forests from space — catching illegal logging and land clearing as it happens.
- Python Drone Image Processing How Python turns hundreds of overlapping drone photos into detailed maps and 3D models of the ground below.
- Python Ocean Data Analysis How Python explores the world's oceans through data — tracking currents, temperatures, and marine life without getting wet.
- Python Precision Agriculture How Python helps farmers give every plant exactly what it needs instead of treating the whole field the same way.