Meilisearch Integration in Python — Deep Dive
Meilisearch’s simplicity makes getting started trivial, but production deployments need careful attention to index design, data synchronization, multi-tenancy, and the new hybrid search capabilities.
1) Index design strategies
Single index vs multi-index
Use a single index when all documents share the same schema and search context. Use separate indices when document types differ significantly:
import meilisearch
client = meilisearch.Client('http://localhost:7700', 'master-key')
# Separate indices for different content types
products_index = client.index('products')
articles_index = client.index('articles')
users_index = client.index('users')
# Configure each independently
products_index.update_searchable_attributes(['name', 'description', 'brand'])
products_index.update_filterable_attributes(['category', 'price', 'in_stock'])
articles_index.update_searchable_attributes(['title', 'body', 'tags'])
articles_index.update_filterable_attributes(['published_at', 'author', 'category'])
Federated search (multi-index search)
Meilisearch v1.6+ supports searching across multiple indices in a single request:
results = client.multi_search([
{"indexUid": "products", "q": "python", "limit": 5},
{"indexUid": "articles", "q": "python", "limit": 5},
{"indexUid": "users", "q": "python", "limit": 3}
])
for index_result in results['results']:
print(f"\n{index_result['indexUid']}:")
for hit in index_result['hits']:
print(f" {hit.get('name') or hit.get('title')}")
2) Hybrid search with embeddings
Meilisearch v1.6+ supports vector search alongside keyword search, combining BM25-style matching with semantic similarity:
# Configure embedder
index.update_embedders({
"default": {
"source": "userProvided",
"dimensions": 384
}
})
# Index with embeddings
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
docs_with_vectors = []
for doc in documents:
text = f"{doc['title']} {doc.get('description', '')}"
embedding = model.encode(text).tolist()
doc['_vectors'] = {"default": embedding}
docs_with_vectors.append(doc)
task = index.add_documents(docs_with_vectors)
client.wait_for_task(task.task_uid)
# Hybrid search: combines keyword + semantic
query_vector = model.encode("stories about time travel").tolist()
results = index.search("time travel", {
"hybrid": {
"semanticRatio": 0.5, # 0 = pure keyword, 1 = pure semantic
"embedder": "default"
},
"vector": query_vector
})
Tune semanticRatio per use case: product search benefits from lower ratios (keywords matter for SKUs/brands), while article search benefits from higher ratios (meaning matters more than exact words).
3) Multi-tenant architecture
Tenant tokens for data isolation
Meilisearch supports tenant tokens that restrict search to specific filtered subsets:
import meilisearch
import json
import base64
import hmac
import hashlib
import time
def create_tenant_token(api_key_uid, api_key, tenant_id, expires_at=None):
"""Create a token that restricts search to a specific tenant."""
header = base64.urlsafe_b64encode(json.dumps(
{"alg": "HS256", "typ": "JWT"}
).encode()).rstrip(b'=').decode()
payload_data = {
"searchRules": {
"*": {"filter": f"tenant_id = {tenant_id}"}
},
"apiKeyUid": api_key_uid,
"exp": expires_at or int(time.time()) + 3600
}
payload = base64.urlsafe_b64encode(
json.dumps(payload_data).encode()
).rstrip(b'=').decode()
signature = base64.urlsafe_b64encode(
hmac.new(api_key.encode(), f"{header}.{payload}".encode(), hashlib.sha256).digest()
).rstrip(b'=').decode()
return f"{header}.{payload}.{signature}"
# Client-side: use tenant token (can only see their own data)
tenant_client = meilisearch.Client(
'http://localhost:7700',
create_tenant_token(api_key_uid, search_api_key, tenant_id=42)
)
results = tenant_client.index('documents').search("quarterly report")
Index-per-tenant
For strict isolation with different schemas per tenant, create separate indices:
def get_tenant_index(client, tenant_id):
index_name = f"docs_{tenant_id}"
try:
return client.get_index(index_name)
except meilisearch.errors.MeilisearchApiError:
task = client.create_index(index_name, {"primaryKey": "id"})
client.wait_for_task(task.task_uid)
idx = client.index(index_name)
idx.update_filterable_attributes(['category', 'created_at'])
return idx
4) Data synchronization pipeline
Keep Meilisearch in sync with your primary database:
import asyncio
from datetime import datetime, timedelta
class MeilisearchSyncer:
def __init__(self, db, meili_client, index_name, batch_size=1000):
self.db = db
self.index = meili_client.index(index_name)
self.client = meili_client
self.batch_size = batch_size
self.last_sync = None
async def incremental_sync(self):
"""Sync only changed documents since last run."""
since = self.last_sync or datetime.utcnow() - timedelta(hours=1)
# Fetch changed records from primary DB
changed = await self.db.fetch(
"SELECT * FROM articles WHERE updated_at > $1 ORDER BY updated_at",
since
)
if not changed:
return 0
# Batch upsert to Meilisearch
for i in range(0, len(changed), self.batch_size):
batch = changed[i:i + self.batch_size]
docs = [self._transform(row) for row in batch]
task = self.index.add_documents(docs)
self.client.wait_for_task(task.task_uid, timeout_in_ms=60000)
# Handle deletions
deleted_ids = await self.db.fetch(
"SELECT id FROM deleted_articles WHERE deleted_at > $1", since
)
if deleted_ids:
self.index.delete_documents([str(r['id']) for r in deleted_ids])
self.last_sync = datetime.utcnow()
return len(changed)
def _transform(self, row):
return {
"id": str(row['id']),
"title": row['title'],
"content": row['content'],
"category": row['category'],
"published_at": row['published_at'].isoformat(),
}
Run incremental syncs every 30-60 seconds for near-real-time freshness. Run a full reindex weekly to catch any drift.
5) Performance tuning
Payload size limits
Meilisearch has a default payload limit of 100MB. For large batch imports:
def chunked_import(index, documents, chunk_size=10000):
"""Import in chunks to stay within payload limits."""
tasks = []
for i in range(0, len(documents), chunk_size):
chunk = documents[i:i + chunk_size]
task = index.add_documents(chunk)
tasks.append(task.task_uid)
# Wait for all tasks
for uid in tasks:
status = client.wait_for_task(uid, timeout_in_ms=120000)
if status.status == 'failed':
raise RuntimeError(f"Task {uid} failed: {status.error}")
Search performance
- Reduce searchable attributes — fewer searchable fields means faster indexing and search
- Use
attributesToRetrieveto return only needed fields:
results = index.search("python", {
"attributesToRetrieve": ["title", "id", "rating"],
"limit": 20
})
- Pre-compute filterable attributes — only attributes marked filterable can be used in filters; keep the list minimal
Distinct attribute
For results that might have many near-duplicates (same product in different colors):
index.update_distinct_attribute('product_group_id')
6) Monitoring and health checks
def meilisearch_health_check(client):
try:
health = client.health()
stats = client.get_all_stats()
report = {
"status": health['status'],
"indexes": {}
}
for name, index_stats in stats['indexes'].items():
report["indexes"][name] = {
"documents": index_stats['numberOfDocuments'],
"indexing": index_stats['isIndexing'],
"field_distribution": index_stats.get('fieldDistribution', {})
}
# Check for stuck tasks
tasks = client.get_tasks({"statuses": ["processing", "enqueued"], "limit": 100})
report["pending_tasks"] = tasks.results.__len__() if hasattr(tasks, 'results') else 0
return report
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Alert on: health endpoint returning non-200, task queue growing beyond 1000, indexing taking longer than expected (indicates resource starvation), and disk usage approaching limits.
One thing to remember: Meilisearch’s power comes from its opinionated defaults — fight the urge to over-configure, lean into its strengths for user-facing search, and pair it with a reliable sync pipeline from your source-of-truth database.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.