Python Biodiversity Tracking — Deep Dive

Architecture of a biodiversity monitoring system

A modern monitoring platform integrates multiple data streams into a unified species occurrence database:

Camera Traps → MegaDetector → Species Classifier → Occurrence Records
ARUs → Spectrogram → BirdNET/Custom CNN → Occurrence Records
eDNA Samples → Metabarcoding → Taxonomy Assignment → Occurrence Records
    → Species Occupancy Models → Conservation Status Reports

Camera trap processing with MegaDetector

MegaDetector v5 detects animals in camera trap images with ~95% recall. It outputs bounding boxes, letting species classifiers focus on the animal region:

import torch
import numpy as np
from PIL import Image
from pathlib import Path

class CameraTrapPipeline:
    def __init__(
        self,
        megadetector_path: str,
        species_model_path: str,
        species_labels: list[str],
        confidence_threshold: float = 0.8
    ):
        # MegaDetector for animal detection
        self.detector = torch.hub.load(
            'ultralytics/yolov5', 'custom',
            path=megadetector_path
        )
        self.detector.conf = 0.2  # Low threshold to catch most animals
        
        # Species classifier (fine-tuned EfficientNet)
        self.classifier = torch.jit.load(species_model_path)
        self.classifier.eval()
        self.species_labels = species_labels
        self.confidence_threshold = confidence_threshold
        
        self.transform = torch.nn.Sequential(
            # Normalize for ImageNet-pretrained backbone
        )
    
    def process_image(self, image_path: str) -> list[dict]:
        """Detect and classify animals in a camera trap image."""
        img = Image.open(image_path)
        
        # Step 1: Detect animals
        results = self.detector(img)
        detections = results.xyxy[0].cpu().numpy()
        
        classifications = []
        for det in detections:
            x1, y1, x2, y2, conf, cls = det
            
            # Class 0 = animal in MegaDetector
            if int(cls) != 0:
                continue
            
            # Step 2: Crop and classify species
            crop = img.crop((int(x1), int(y1), int(x2), int(y2)))
            species, species_conf = self._classify_species(crop)
            
            classifications.append({
                "bbox": [float(x1), float(y1), float(x2), float(y2)],
                "detection_confidence": float(conf),
                "species": species,
                "species_confidence": float(species_conf),
                "needs_review": species_conf < self.confidence_threshold
            })
        
        return classifications
    
    def _classify_species(self, crop: Image.Image) -> tuple[str, float]:
        """Classify a cropped animal image."""
        tensor = self._preprocess(crop)
        
        with torch.no_grad():
            logits = self.classifier(tensor.unsqueeze(0))
            probs = torch.softmax(logits, dim=1).squeeze()
        
        top_idx = probs.argmax().item()
        return self.species_labels[top_idx], probs[top_idx].item()
    
    def _preprocess(self, img: Image.Image) -> torch.Tensor:
        from torchvision import transforms
        preprocess = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
        return preprocess(img)

def batch_process_station(
    image_dir: Path,
    pipeline: CameraTrapPipeline,
    station_id: str
) -> list[dict]:
    """Process all images from a camera trap station."""
    results = []
    
    for img_path in sorted(image_dir.glob("*.jpg")):
        classifications = pipeline.process_image(str(img_path))
        
        if classifications:
            for c in classifications:
                results.append({
                    "station": station_id,
                    "filename": img_path.name,
                    "timestamp": extract_exif_datetime(img_path),
                    **c
                })
    
    return results

def extract_exif_datetime(path: Path) -> str:
    from PIL.ExifTags import Base
    img = Image.open(path)
    exif = img.getexif()
    dt = exif.get(Base.DateTimeOriginal, "unknown")
    return dt

Bioacoustic species identification

Audio monitoring produces continuous recordings that need automated species detection:

import librosa
import numpy as np
import torch

class BioacousticAnalyzer:
    def __init__(
        self,
        model_path: str,
        species_labels: list[str],
        sample_rate: int = 32000,
        segment_duration: float = 3.0,
        hop_duration: float = 1.0,
        min_confidence: float = 0.5
    ):
        self.model = torch.jit.load(model_path)
        self.model.eval()
        self.species_labels = species_labels
        self.sr = sample_rate
        self.segment_samples = int(segment_duration * sample_rate)
        self.hop_samples = int(hop_duration * sample_rate)
        self.min_confidence = min_confidence
    
    def analyze_recording(self, audio_path: str) -> list[dict]:
        """Detect species in a continuous audio recording."""
        audio, sr = librosa.load(audio_path, sr=self.sr, mono=True)
        
        detections = []
        for start in range(0, len(audio) - self.segment_samples, self.hop_samples):
            segment = audio[start:start + self.segment_samples]
            
            # Compute mel spectrogram
            mel_spec = librosa.feature.melspectrogram(
                y=segment, sr=self.sr,
                n_mels=128, fmin=50, fmax=15000,
                n_fft=2048, hop_length=512
            )
            log_mel = librosa.power_to_db(mel_spec, ref=np.max)
            
            # Normalize to [0, 1]
            log_mel = (log_mel - log_mel.min()) / (log_mel.max() - log_mel.min() + 1e-8)
            
            # Classify
            tensor = torch.FloatTensor(log_mel).unsqueeze(0).unsqueeze(0)
            with torch.no_grad():
                logits = self.model(tensor)
                probs = torch.sigmoid(logits).squeeze()
            
            # Multi-label: multiple species can vocalize simultaneously
            time_sec = start / self.sr
            for idx, prob in enumerate(probs):
                if prob > self.min_confidence:
                    detections.append({
                        "species": self.species_labels[idx],
                        "confidence": round(prob.item(), 3),
                        "time_start": round(time_sec, 1),
                        "time_end": round(time_sec + self.segment_samples / self.sr, 1)
                    })
        
        return detections
    
    def compute_activity_pattern(
        self, detections: list[dict], target_species: str
    ) -> dict[int, int]:
        """Compute hourly activity pattern for a species."""
        hourly_counts = {}
        for det in detections:
            if det["species"] == target_species:
                hour = int(det["time_start"] / 3600) % 24
                hourly_counts[hour] = hourly_counts.get(hour, 0) + 1
        return hourly_counts

eDNA metabarcoding analysis

Environmental DNA processing bridges molecular biology and bioinformatics:

from pathlib import Path
import subprocess

def run_edna_pipeline(
    fastq_dir: Path,
    reference_db: Path,
    output_dir: Path,
    primer_f: str = "GTCGGTAAAACTCGTGCCAGC",  # MiFish-U forward
    primer_r: str = "CATAGTGGGGTATCTAATCCCAGTTTG",  # MiFish-U reverse
    min_reads: int = 10,
    min_identity: float = 0.97
) -> Path:
    """Run eDNA metabarcoding pipeline using DADA2-style denoising."""
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Step 1: Primer trimming with cutadapt
    trimmed_dir = output_dir / "trimmed"
    trimmed_dir.mkdir(exist_ok=True)
    
    for fq in fastq_dir.glob("*_R1_*.fastq.gz"):
        r2 = str(fq).replace("_R1_", "_R2_")
        sample = fq.stem.split("_R1_")[0]
        
        subprocess.run([
            "cutadapt",
            "-g", primer_f,
            "-G", primer_r,
            "--discard-untrimmed",
            "-o", str(trimmed_dir / f"{sample}_R1.fastq.gz"),
            "-p", str(trimmed_dir / f"{sample}_R2.fastq.gz"),
            str(fq), r2
        ], check=True)
    
    # Step 2: Denoising with DADA2 produces ASVs
    # (Typically run via R/DADA2 or Python wrapper)
    
    # Step 3: Taxonomy assignment via BLAST
    asv_fasta = output_dir / "asvs.fasta"
    blast_output = output_dir / "blast_results.tsv"
    
    subprocess.run([
        "blastn",
        "-query", str(asv_fasta),
        "-db", str(reference_db),
        "-out", str(blast_output),
        "-outfmt", "6 qseqid sseqid pident length stitle",
        "-max_target_seqs", "5",
        "-perc_identity", str(min_identity * 100)
    ], check=True)
    
    return blast_output

def parse_edna_results(
    blast_file: Path,
    read_counts: dict[str, dict[str, int]],
    min_reads: int = 10
) -> dict[str, list[str]]:
    """Parse BLAST results into sample-species matrix."""
    import pandas as pd
    
    blast = pd.read_csv(
        blast_file, sep="\t",
        names=["asv", "ref", "identity", "length", "species_name"]
    )
    
    # Best hit per ASV
    best_hits = blast.sort_values("identity", ascending=False).drop_duplicates("asv")
    
    # Map ASVs to species, filter by read count
    sample_species = {}
    for sample, asv_counts in read_counts.items():
        species_list = []
        for asv, count in asv_counts.items():
            if count < min_reads:
                continue
            hit = best_hits[best_hits["asv"] == asv]
            if not hit.empty:
                species_list.append(hit.iloc[0]["species_name"])
        sample_species[sample] = list(set(species_list))
    
    return sample_species

Occupancy modeling

Raw detections don’t account for imperfect detection — just because a camera didn’t photograph a species doesn’t mean it’s absent. Occupancy models estimate true presence probability:

import numpy as np
from scipy.optimize import minimize

def fit_single_season_occupancy(
    detection_histories: np.ndarray
) -> dict:
    """Fit single-season occupancy model.
    
    Args:
        detection_histories: shape (n_sites, n_surveys)
            1 = detected, 0 = not detected, NaN = not surveyed
    """
    n_sites, n_surveys = detection_histories.shape
    
    def neg_log_likelihood(params):
        psi = 1 / (1 + np.exp(-params[0]))  # occupancy (logit)
        p = 1 / (1 + np.exp(-params[1]))     # detection (logit)
        
        nll = 0.0
        for i in range(n_sites):
            history = detection_histories[i]
            valid = ~np.isnan(history)
            detections = history[valid].astype(int)
            n_valid = valid.sum()
            
            # Probability of observed history given occupied
            prob_history_occ = np.prod(
                p**detections * (1 - p)**(1 - detections)
            )
            
            # Probability of observed history given not occupied
            prob_history_unocc = 1.0 if detections.sum() == 0 else 0.0
            
            likelihood = psi * prob_history_occ + (1 - psi) * prob_history_unocc
            nll -= np.log(max(likelihood, 1e-10))
        
        return nll
    
    result = minimize(neg_log_likelihood, [0.0, 0.0], method="Nelder-Mead")
    
    psi = 1 / (1 + np.exp(-result.x[0]))
    p = 1 / (1 + np.exp(-result.x[1]))
    
    return {
        "occupancy_probability": round(psi, 4),
        "detection_probability": round(p, 4),
        "naive_occupancy": float(
            (detection_histories.nansum(axis=1) > 0).mean()
        ),
        "converged": result.success
    }

Naive occupancy (proportion of sites with at least one detection) always underestimates true occupancy. The model corrects for this using repeated surveys at each site.

Tradeoffs

Camera traps vs. acoustic monitors: Cameras identify individual animals (enabling population size estimation via capture-recapture) but miss small, fast, or cryptic species. Acoustic monitors detect more species but rarely identify individuals. The most complete biodiversity assessments use both.

Automated vs. manual identification: AI classifiers process millions of records but make systematic errors (confusing similar species). Expert verification of all records is infeasible. The practical compromise is expert review of a stratified sample plus all low-confidence predictions.

eDNA sensitivity vs. specificity: eDNA can detect species from trace amounts but is vulnerable to contamination. A single DNA molecule from a fish market visitor’s shoe can produce a false detection. Negative controls and minimum read thresholds are essential.

Temporal resolution vs. storage: Continuous acoustic recording at 32kHz produces ~5GB per day per recorder. Networks of 50+ recorders generate terabytes per month. Triggered recording reduces storage but risks missing quiet or unexpected vocalizations.

One thing to remember: Effective biodiversity monitoring in Python requires integrating multiple sensing modalities, accounting for imperfect detection through occupancy modeling, and building workflows where AI handles volume while human expertise handles quality — the statistical rigor behind how you interpret detections matters as much as the detection algorithms themselves.

pythonenvironmental-scienceecologydeep-learningbioacoustics

See Also