Comment identifier les goulots d’étranglement avec cProfile
Il était 2h du matin quand notre API de recommandations s’est mise à répondre en 15 secondes au lieu des 500ms habituelles. Notre petit service Python qui gérait tranquillement 50 requêtes par seconde pour notre marketplace de produits artisanaux venait de se prendre un pic de trafic inattendu suite à un passage TV. 200 utilisateurs simultanés, et tout s’effondrait.
Articles connexes: Comment implémenter MFA dans vos API Python
J’ai passé les deux heures suivantes à regarder nos métriques Grafana sans comprendre où était le problème. CPU à 90%, mémoire stable, base de données PostgreSQL qui répondait normalement. Le code semblait pourtant identique à la veille. C’est là que j’ai découvert cProfile, et surtout appris à m’en servir correctement.
Ce qui m’a frappé, c’est que 90% des articles sur le profilage Python montrent comment lancer python -m cProfile script.py
et s’arrêtent là. Mais en production, avec FastAPI, des utilisateurs réels et des données qui changent, c’est une autre histoire. Après six mois à optimiser notre stack (Python 3.11, FastAPI, PostgreSQL sur un VPS OVH), j’ai développé une méthodologie qui nous a permis de passer de 15s à 180ms de temps de réponse.
Dans cet article, je partage ma méthode step-by-step pour diagnostiquer les problèmes de performance, les trois techniques de profilage qui m’ont sauvé la mise, et surtout les quatre pièges classiques que j’ai appris à éviter.
Anatomie d’un problème de performance
Le contexte : notre API de recommandations
Notre service recommande des produits artisanaux basés sur l’historique d’achat et les préférences utilisateur. Rien de révolutionnaire : une API FastAPI qui expose un endpoint /recommendations/{user_id}
, connectée à une base PostgreSQL avec environ 50 000 produits et 5 000 utilisateurs actifs.
Voici les métriques que j’ai collectées ce fameux soir :
# Métriques observées (novembre 2024)
- Temps de réponse P95 : 15.2s (objectif : <500ms)
- CPU utilization : 92% constant
- Memory usage : 1.8GB (limite container : 2GB)
- Throughput : 12 req/s (objectif : 50 req/s)
- Erreurs timeout : 45% des requêtes
Ma première erreur a été de lancer cProfile sur l’ensemble de l’application sans hypothèse. Résultat : 200MB de données de profiling incompréhensibles et aucune piste claire. J’ai perdu une heure précieuse.
Ma méthodologie en quatre étapes
Après cette nuit blanche, j’ai développé une approche systématique :
1. Observation méthodique
Avant de profiler quoi que ce soit, je collecte les métriques infrastructure ET business. Dans notre cas : temps de réponse par endpoint, utilisation CPU/mémoire, mais aussi nombre de produits retournés par recommandation, taille moyenne des réponses JSON.
2. Formulation d’hypothèses
Avec l’équipe (on est trois développeurs), on liste 2-3 zones suspectes. Pour nous c’était : l’algorithme de scoring des produits, les requêtes SQL, ou la sérialisation JSON des réponses.

3. Profilage ciblé
Plutôt que de profiler toute l’app, je profile séparément chaque zone suspecte avec des données réalistes.
4. Validation des gains
Chaque optimisation est mesurée avec des métriques avant/après sur un environnement de staging identique à la production.
Le principe clé que j’ai appris : plus vous profilez large, moins vous trouvez de signal utile. Cette leçon m’a pris six mois à assimiler complètement.
Articles connexes: Comment combiner Gin et Python pour des API ultra-rapides
cProfile en action : setup pratique
Configuration pour FastAPI
Voici le middleware de profilage que j’utilise maintenant en production :
# profiling_middleware.py
import cProfile
import pstats
import time
from contextlib import contextmanager
from pathlib import Path
@contextmanager
def profile_request(request_id: str, threshold_seconds: float = 1.0):
"""Profile une requête si elle dépasse le seuil de temps"""
profiler = cProfile.Profile()
start_time = time.time()
profiler.enable()
try:
yield profiler
finally:
profiler.disable()
duration = time.time() - start_time
# Sauvegarde seulement si la requête est lente
if duration > threshold_seconds:
profile_dir = Path("profiles")
profile_dir.mkdir(exist_ok=True)
stats = pstats.Stats(profiler)
stats.dump_stats(profile_dir / f"{request_id}_{duration:.2f}s.prof")
# Log pour debug
print(f"Profil sauvé pour requête {request_id}: {duration:.2f}s")
# Integration FastAPI
from fastapi import FastAPI, Request
import uuid
app = FastAPI()
@app.middleware("http")
async def profiling_middleware(request: Request, call_next):
# Activation conditionnelle via header
should_profile = request.headers.get("X-Enable-Profiling") == "true"
if should_profile:
request_id = str(uuid.uuid4())[:8]
with profile_request(request_id):
response = await call_next(request)
return response
else:
return await call_next(request)
Point crucial : j’ai appris à mes dépens qu’activer cProfile partout peut ralentir l’application de 30%. Ma solution actuelle utilise trois modes selon l’environnement.
Les trois modes de profilage
Mode développement (local)
Profilage complet avec des datasets réalistes. J’utilise snakeviz pour la visualisation :
# Génération du profil
python -m cProfile -o recommendation.prof main.py
# Visualisation interactive
snakeviz recommendation.prof
Mode staging (pré-production)
Échantillonnage intelligent : profilage automatique des requêtes qui dépassent 2 secondes, avec agrégation des stats sur 24h.
# Configuration staging
PROFILING_THRESHOLD = 2.0 # secondes
PROFILING_SAMPLE_RATE = 0.05 # 5% des requêtes
Mode production (critique)
Activation manuelle via feature flag, durée limitée à 10 minutes maximum, profilage ciblé sur des utilisateurs spécifiques.
La règle que j’applique : 80% de vos problèmes de performance viennent de 20% de votre code, mais ces 20% ne sont jamais où vous pensez qu’ils sont.
Interpréter les résultats : au-delà des métriques de base
Notre découverte surprenante
Voici le profil de notre endpoint de recommandations avant optimisation :

# Top 10 fonctions par temps cumulé (cumtime)
ncalls tottime cumtime filename:lineno(function)
1 0.000 12.847 recommendation_service.py:45(get_recommendations)
847 0.023 11.234 database.py:78(execute_query)
847 8.901 8.901 psycopg2/cursor.py:308(execute)
2341 1.456 2.103 json_encoder.py:23(encode_product)
847 0.890 1.456 models/product.py:156(__init__)
5023 0.234 0.789 utils/scoring.py:67(calculate_score)
Le coupable n’était pas l’algorithme de machine learning comme on le pensait, mais 847 requêtes SQL individuelles au lieu d’une seule requête optimisée. Un classique problème N+1 que notre ORM SQLAlchemy masquait bien.
Ma grille d’analyse en quatre métriques
1. cumtime vs tottime : l’indicateur clé
– cumtime
élevé + tottime
faible = problème dans les sous-fonctions
– cumtime
faible + tottime
élevé = algorithme inefficace
– Les deux élevés = boucle coûteuse
Dans notre cas : cumtime=11.234s
et tottime=0.023s
pour execute_query
indiquait clairement un problème de volume d’appels.
2. ncalls : le révélateur de patterns
847 appels à execute_query
pour une seule recommandation = problème N+1 évident.
3. Ratio cumtime/ncalls : coût par appel
– > 0.1s par appel = investigation prioritaire
– < 0.001s mais ncalls > 10000 = optimisation de masse nécessaire
4. filename:lineno : localisation précise
J’utilise cette info pour créer des liens directs vers le code dans nos outils de monitoring.
Articles connexes: Comment envoyer des alertes Slack automatisées avec Python
Les trois patterns de performance que je traque
Pattern 1 : Le N+1 caché
# Code problématique découvert
async def get_recommendations(user_id: int) -> List[Product]:
recommendation_ids = await get_recommendation_ids(user_id)
products = []
# 847 requêtes SQL au lieu d'une seule !
for product_id in recommendation_ids:
product = await db.get_product(product_id)
products.append(await enrich_product(product))
return products
# Solution optimisée
async def get_recommendations(user_id: int) -> List[Product]:
recommendation_ids = await get_recommendation_ids(user_id)
# Une seule requête avec jointures
products = await db.get_products_batch(recommendation_ids)
return [await enrich_product(p) for p in products]
Pattern 2 : La sérialisation coûteuse
2.1s perdues sur json.dumps()
d’objets complexes, résolu avec un cache de sérialisation :
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def serialize_product(product_hash: str, product_data: dict) -> str:
"""Cache la sérialisation des produits par hash"""
return json.dumps(product_data, cls=CustomProductEncoder)
def get_product_hash(product: Product) -> str:
"""Génère un hash basé sur les données pertinentes"""
key_data = f"{product.id}_{product.updated_at}_{product.price}"
return hashlib.md5(key_data.encode()).hexdigest()
Pattern 3 : Les imports cachés
0.8s perdues sur des imports dynamiques dans des boucles :
# Problématique
def process_products(products):
results = []
for product in products:
# Import à chaque itération !
from utils.complex_scorer import calculate_advanced_score
score = calculate_advanced_score(product)
results.append((product, score))
return results
# Solution : import au niveau module
from utils.complex_scorer import calculate_advanced_score
def process_products(products):
return [(p, calculate_advanced_score(p)) for p in products]
Ma technique du « profil différentiel » : je compare toujours les profils avant/après chaque optimisation. Cette approche m’a évité trois régressions majeures en 2024.
Techniques avancées et intégration
Profilage conditionnel intelligent
Voici ma solution de profilage adaptatif, développée après six mois d’itérations :

# smart_profiler.py
import random
import time
from contextlib import contextmanager
from typing import Optional, Callable
class SmartProfiler:
def __init__(self, threshold_ms: int = 1000, sample_rate: float = 0.01):
self.threshold = threshold_ms / 1000
self.sample_rate = sample_rate
self.active_profiles = {}
def should_profile_request(self, request_path: str) -> bool:
"""Décide si on doit profiler cette requête"""
# Toujours profiler les endpoints critiques lents
critical_paths = ["/recommendations/", "/search/"]
if any(path in request_path for path in critical_paths):
return random.random() < self.sample_rate * 10 # 10x plus de chances
return random.random() < self.sample_rate
@contextmanager
def profile_if_slow(self, request_id: str, max_duration: int = 30):
"""Profile avec timeout automatique"""
if len(self.active_profiles) >= 5: # Limite concurrentielle
yield None
return
profiler = cProfile.Profile()
start_time = time.time()
self.active_profiles[request_id] = start_time
try:
profiler.enable()
yield profiler
finally:
profiler.disable()
duration = time.time() - start_time
if request_id in self.active_profiles:
del self.active_profiles[request_id]
# Sauvegarde conditionnelle
if duration > self.threshold:
self._save_profile(profiler, request_id, duration)
def _save_profile(self, profiler: cProfile.Profile,
request_id: str, duration: float):
"""Sauvegarde avec métadonnées enrichies"""
stats = pstats.Stats(profiler)
# Enrichissement avec contexte
metadata = {
"request_id": request_id,
"duration": duration,
"timestamp": time.time(),
"top_functions": self._extract_top_functions(stats, limit=5)
}
profile_path = f"profiles/{request_id}_{duration:.2f}s.prof"
stats.dump_stats(profile_path)
# Sauvegarde métadonnées séparément
import json
with open(f"profiles/{request_id}_metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
Profilage par segments critiques
Au lieu de profiler toute la requête, je profile séparément chaque couche :
class SegmentedProfiler:
def __init__(self):
self.segments = {}
@contextmanager
def profile_segment(self, segment_name: str):
"""Profile un segment spécifique de la requête"""
profiler = cProfile.Profile()
profiler.enable()
start_time = time.time()
try:
yield
finally:
profiler.disable()
duration = time.time() - start_time
self.segments[segment_name] = {
"duration": duration,
"profiler": profiler
}
def get_segment_report(self) -> dict:
"""Génère un rapport par segment"""
report = {}
for name, data in self.segments.items():
stats = pstats.Stats(data["profiler"])
report[name] = {
"duration": data["duration"],
"top_functions": self._get_top_functions(stats, 3)
}
return report
# Utilisation dans l'endpoint
@app.get("/recommendations/{user_id}")
async def get_recommendations(user_id: int):
profiler = SegmentedProfiler()
with profiler.profile_segment("database"):
user_data = await get_user_preferences(user_id)
products = await get_candidate_products()
with profiler.profile_segment("algorithm"):
scores = calculate_recommendation_scores(user_data, products)
with profiler.profile_segment("serialization"):
response = format_recommendations(scores)
# Log du rapport pour analyse
if profiler.segments:
logger.info("Segment timing", extra=profiler.get_segment_report())
return response
Optimisations concrètes et résultats
Cas d’étude : de 15s à 180ms
Optimisation #1 : Résolution du problème N+1
# Avant : 847 requêtes individuelles (8.9s)
async def get_product_details(product_ids: List[int]) -> List[Product]:
products = []
for pid in product_ids:
product = await db.execute(
"SELECT * FROM products WHERE id = $1", pid
)
products.append(product)
return products
# Après : 1 requête avec jointures (0.12s)
async def get_product_details(product_ids: List[int]) -> List[Product]:
query = """
SELECT p.*, c.name as category_name, AVG(r.rating) as avg_rating
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN reviews r ON p.id = r.product_id
WHERE p.id = ANY($1)
GROUP BY p.id, c.name
"""
return await db.fetch_all(query, product_ids)
Impact mesuré : -8.8s (-74% du temps total)
Optimisation #2 : Cache de sérialisation intelligent
# Cache avec invalidation automatique
from datetime import datetime, timedelta
class ProductSerializationCache:
def __init__(self, ttl_minutes: int = 30):
self.cache = {}
self.ttl = timedelta(minutes=ttl_minutes)
def get_serialized(self, product: Product) -> Optional[str]:
cache_key = f"{product.id}_{product.updated_at}"
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.ttl:
return cached_data
else:
del self.cache[cache_key]
return None
def set_serialized(self, product: Product, serialized: str):
cache_key = f"{product.id}_{product.updated_at}"
self.cache[cache_key] = (serialized, datetime.now())
# Nettoyage périodique
if len(self.cache) > 1000:
self._cleanup_expired()
# Utilisation
cache = ProductSerializationCache()
def serialize_product(product: Product) -> str:
cached = cache.get_serialized(product)
if cached:
return cached
serialized = json.dumps({
"id": product.id,
"name": product.name,
"price": float(product.price),
"category": product.category.name,
"rating": product.avg_rating
}, ensure_ascii=False)
cache.set_serialized(product, serialized)
return serialized
Impact mesuré : -1.8s (-15% du temps restant)
Optimisation #3 : Vectorisation de l’algorithme de scoring
import numpy as np
# Avant : boucles Python (1.2s pour 847 produits)
def calculate_scores_old(user_prefs: dict, products: List[Product]) -> List[float]:
scores = []
for product in products:
score = 0.0
for category, weight in user_prefs.items():
if product.category == category:
score += weight * product.rating
scores.append(score)
return scores
# Après : opérations vectorisées NumPy (0.08s)
def calculate_scores_vectorized(user_prefs: dict, products: List[Product]) -> np.ndarray:
# Préparation des matrices
product_categories = np.array([p.category_id for p in products])
product_ratings = np.array([p.rating for p in products])
# Matrice des préférences utilisateur
pref_matrix = np.zeros(len(products))
for category_id, weight in user_prefs.items():
mask = product_categories == category_id
pref_matrix[mask] = weight
# Calcul vectorisé
scores = pref_matrix * product_ratings
return scores
Impact mesuré : -1.2s (-30% du temps de calcul)
Articles connexes: Surveiller vos pipelines Airflow pour prévenir les échecs coûteux
Métriques finales (décembre 2024)
Comparaison avant/après optimisation :
Métrique | Avant | Après | Amélioration |
---|---|---|---|
Temps de réponse P95 | 15.2s | 180ms | -98.8% |
Throughput | 12 req/s | 65 req/s | +440% |
CPU utilization | 92% | 45% | -51% |
Erreurs timeout | 45% | <1% | -97% |
Impact business concret :
– Coût serveur : -30% (downgrade de 4 à 2 vCPU)
– Taux de conversion : +25% (moins d’abandons)
– Satisfaction utilisateur : 4.2/5 → 4.7/5
Pièges évités et leçons apprises
Piège #1 : Optimiser les mauvaises métriques
J’ai d’abord focalisé sur tottime
au lieu de cumtime
, passant à côté du vrai problème.

Piège #2 : Profilage uniquement en développement
Les données de test (100 produits) ne révélaient pas le problème N+1 visible avec 50 000 produits.
Piège #3 : Ignorer l’impact du profilage
Activer cProfile partout ralentissait l’app de 30%, faussant les mesures.
Piège #4 : Ne pas mesurer l’impact business
Les optimisations techniques doivent se traduire par des gains mesurables : conversion, satisfaction, coûts.
Méthodologie et prochaines étapes
Ma checklist de profilage performance
Phase 1 : Diagnostic (1-2 jours)
– [ ] Métriques baseline établies avec outils monitoring
– [ ] Hypothèses formulées et priorisées en équipe
– [ ] Environnement de profilage configuré et testé
Phase 2 : Investigation (2-3 jours)
– [ ] Profilage ciblé des hot paths identifiés
– [ ] Analyse comparative des résultats par segment
– [ ] Identification des quick wins vs. optimisations lourdes
Phase 3 : Optimisation (1-2 semaines)
– [ ] Implémentation des fixes prioritaires avec tests
– [ ] Validation des gains en staging puis production
– [ ] Documentation des changements et monitoring continu
Outils recommandés
Stack de profilage complète :
– cProfile + snakeviz : Profilage détaillé avec visualisation
– py-spy : Profilage sans overhead en production
– memory_profiler : Analyse de la consommation mémoire
– line_profiler : Profilage ligne par ligne pour les cas complexes
Intégration monitoring :
# Configuration Prometheus pour métriques custom
from prometheus_client import Histogram, Counter
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'Request duration',
['method', 'endpoint', 'status']
)
SLOW_REQUESTS = Counter(
'slow_requests_total',
'Number of slow requests',
['endpoint', 'duration_bucket']
)
Cette méthodologie m’a permis d’optimiser huit services différents en 2024, avec une amélioration moyenne de 70% des temps de réponse. Le prochain défi : appliquer ces techniques au profilage mémoire avec memory_profiler pour nos services de traitement d’images.
À Propos de l’Auteur : Pierre Dubois est un ingénieur logiciel senior passionné par le partage de solutions d’ingénierie pratiques et d’insights techniques approfondis. Tout le contenu est original et basé sur une expérience réelle de projets. Les exemples de code sont testés dans des environnements de production et suivent les bonnes pratiques actuelles de l’industrie.