Comment optimiser vos API avec Go et Python : Retour d’expérience sur une architecture hybride
Il y a trois mois, notre équipe de développement s’est retrouvée face à un défi familier : notre API Python (FastAPI 0.104) qui gérait les profils utilisateur de notre application web commençait à montrer ses limites. Avec 15 000 requêtes par minute en période de pointe, nous observions une latence p95 à 850ms sur les endpoints de recherche et une consommation mémoire de 2,1GB pour 50 000 utilisateurs actifs.
Articles connexes: Pourquoi analyser vos logs en temps réel avec Python
Plutôt que de partir sur une réécriture complète en Go – solution souvent préconisée mais risquée – j’ai expérimenté une approche hybride qui nous a permis de réduire de 73% notre temps de réponse tout en conservant la vélocité de développement Python là où elle était pertinente.
L’approche « Selective Rewrite » : identifier les goulots critiques
Analyse des patterns de trafic
Mon premier réflexe a été d’analyser précisément où se situaient nos goulots d’étranglement. Après une semaine de monitoring avec Prometheus, les résultats étaient clairs :
/api/search/users
: 45% du trafic total, CPU-intensive/api/analytics/dashboard
: 23% du trafic, calculs mathématiques lourds/api/users/profile
: 18% du trafic, I/O intensif vers PostgreSQL- Autres endpoints : 14% du trafic, logique métier complexe
Insight technique #1 : Plutôt que de migrer l’ensemble de l’API, j’ai découvert qu’appliquer la règle des 80/20 était plus pragmatique. Les deux premiers endpoints représentaient 68% de la charge serveur mais seulement 20% de la complexité métier.
Critères de décision pour la migration
J’ai établi une grille de décision basée sur trois métriques :

type EndpointAnalysis struct {
Path string
RequestsPerSec float64
CPUIntensity float64 // 0-1 scale
BusinessLogic float64 // 0-1 complexity scale
}
func (e EndpointAnalysis) ShouldMigrateToGo() bool {
// Candidat Go si throughput élevé ET logique simple
return e.RequestsPerSec > 1000 &&
e.CPUIntensity > 0.7 &&
e.BusinessLogic < 0.4
}
Cette approche m’a permis d’identifier que seuls 3 endpoints sur 15 justifiaient une migration vers Go.
Architecture d’intégration : le pattern API Gateway
Configuration Traefik pour le routage intelligent
J’ai opté pour Traefik 2.10 comme reverse proxy, permettant de router transparently selon les patterns d’URL :
Articles connexes: Comment tester vos Webhooks Python efficacement
# traefik.yml
api:
dashboard: true
insecure: true
entryPoints:
web:
address: ":8080"
providers:
file:
filename: /etc/traefik/dynamic.yml
watch: true
# dynamic.yml
http:
routers:
high-performance-routes:
rule: "Path(`/api/search/users`) || Path(`/api/analytics/dashboard`)"
service: go-backend
middlewares:
- rate-limit-high
business-logic-routes:
rule: "PathPrefix(`/api/users`) || PathPrefix(`/api/orders`)"
service: python-backend
middlewares:
- rate-limit-standard
services:
go-backend:
loadBalancer:
servers:
- url: "http://localhost:8081"
healthCheck:
path: "/health"
interval: "10s"
python-backend:
loadBalancer:
servers:
- url: "http://localhost:8000"
healthCheck:
path: "/health"
interval: "10s"
middlewares:
rate-limit-high:
rateLimit:
burst: 100
average: 50
rate-limit-standard:
rateLimit:
burst: 50
average: 20
Leçon apprise : Le routage basé sur les patterns d’URL s’est révélé beaucoup plus maintenable que les tentatives de routage par headers personnalisés que j’avais initialement envisagées.
Communication inter-services avec gestion d’erreurs robuste
Pour les cas où le service Go doit interroger la logique métier Python, j’ai implémenté un client HTTP avec retry et circuit breaker :
package client
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
type PythonClient struct {
baseURL string
httpClient *http.Client
retries int
}
func NewPythonClient(baseURL string) *PythonClient {
return &PythonClient{
baseURL: baseURL,
httpClient: &http.Client{
Transport: &http.Transport{
MaxIdleConns: 50,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 5 * time.Second,
},
retries: 3,
}
}
type UserProfile struct {
ID string `json:"id"`
Name string `json:"name"`
Preferences map[string]string `json:"preferences"`
CreatedAt time.Time `json:"created_at"`
}
func (c *PythonClient) GetUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
url := fmt.Sprintf("%s/users/%s", c.baseURL, userID)
var lastErr error
for attempt := 0; attempt <= c.retries; attempt++ {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
lastErr = err
if attempt < c.retries {
time.Sleep(time.Duration(attempt+1) * 100 * time.Millisecond)
continue
}
return nil, fmt.Errorf("after %d retries: %w", c.retries, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("user not found: %s", userID)
}
if resp.StatusCode >= 500 {
lastErr = fmt.Errorf("server error: %d", resp.StatusCode)
if attempt < c.retries {
time.Sleep(time.Duration(attempt+1) * 100 * time.Millisecond)
continue
}
return nil, lastErr
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
var profile UserProfile
if err := json.NewDecoder(resp.Body).Decode(&profile); err != nil {
return nil, fmt.Errorf("decoding response: %w", err)
}
return &profile, nil
}
return nil, lastErr
}
Optimisations performance : cache multi-niveau et batching
Système de cache hiérarchique
Insight technique #2 : L’implémentation d’un cache multi-niveau a été notre principale optimisation. J’ai découvert qu’une stratégie à trois niveaux permettait d’optimiser à la fois la latence et la cohérence des données :

package cache
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
type MultiLevelCache struct {
// L1: In-memory cache (données chaudes)
memCache map[string]cacheItem
memMutex sync.RWMutex
// L2: Redis cache (données tièdes)
redis *redis.Client
// L3: Fallback vers Python API
pythonClient *PythonClient
}
type cacheItem struct {
value interface{}
expiresAt time.Time
}
func NewMultiLevelCache(redisAddr string, pythonClient *PythonClient) *MultiLevelCache {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
PoolSize: 20,
MinIdleConns: 5,
})
cache := &MultiLevelCache{
memCache: make(map[string]cacheItem),
redis: rdb,
pythonClient: pythonClient,
}
// Nettoyage périodique du cache mémoire
go cache.cleanupExpiredItems()
return cache
}
func (c *MultiLevelCache) GetUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
key := fmt.Sprintf("user_profile:%s", userID)
// L1: Vérification cache mémoire
if profile := c.getFromMemory(key); profile != nil {
return profile, nil
}
// L2: Vérification Redis
if profile := c.getFromRedis(ctx, key); profile != nil {
// Mise en cache L1 pour les prochains accès
c.setInMemory(key, profile, 30*time.Second)
return profile, nil
}
// L3: Fallback vers API Python
profile, err := c.pythonClient.GetUserProfile(ctx, userID)
if err != nil {
return nil, err
}
// Mise en cache L2 et L1
go c.setInRedis(context.Background(), key, profile, 5*time.Minute)
c.setInMemory(key, profile, 30*time.Second)
return profile, nil
}
func (c *MultiLevelCache) getFromMemory(key string) *UserProfile {
c.memMutex.RLock()
defer c.memMutex.RUnlock()
if item, exists := c.memCache[key]; exists && time.Now().Before(item.expiresAt) {
if profile, ok := item.value.(*UserProfile); ok {
return profile
}
}
return nil
}
func (c *MultiLevelCache) setInMemory(key string, value *UserProfile, ttl time.Duration) {
c.memMutex.Lock()
defer c.memMutex.Unlock()
c.memCache[key] = cacheItem{
value: value,
expiresAt: time.Now().Add(ttl),
}
}
func (c *MultiLevelCache) getFromRedis(ctx context.Context, key string) *UserProfile {
val, err := c.redis.Get(ctx, key).Result()
if err != nil {
return nil
}
var profile UserProfile
if err := json.Unmarshal([]byte(val), &profile); err != nil {
return nil
}
return &profile
}
func (c *MultiLevelCache) cleanupExpiredItems() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
c.memMutex.Lock()
now := time.Now()
for key, item := range c.memCache {
if now.After(item.expiresAt) {
delete(c.memCache, key)
}
}
c.memMutex.Unlock()
}
}
Batch processing pour réduire la latence réseau
Insight technique #3 : Pour les endpoints nécessitant plusieurs appels vers Python, j’ai implémenté un système de batching qui groupe les requêtes :
type BatchProcessor struct {
requests chan BatchRequest
client *PythonClient
maxBatch int
maxWaitTime time.Duration
}
type BatchRequest struct {
UserID string
Response chan BatchResponse
}
type BatchResponse struct {
Profile *UserProfile
Error error
}
func NewBatchProcessor(client *PythonClient) *BatchProcessor {
bp := &BatchProcessor{
requests: make(chan BatchRequest, 1000),
client: client,
maxBatch: 50,
maxWaitTime: 10 * time.Millisecond,
}
go bp.processBatches()
return bp
}
func (bp *BatchProcessor) GetUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
respChan := make(chan BatchResponse, 1)
select {
case bp.requests <- BatchRequest{UserID: userID, Response: respChan}:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case resp := <-respChan:
return resp.Profile, resp.Error
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (bp *BatchProcessor) processBatches() {
ticker := time.NewTicker(bp.maxWaitTime)
defer ticker.Stop()
batch := make([]BatchRequest, 0, bp.maxBatch)
for {
select {
case req := <-bp.requests:
batch = append(batch, req)
if len(batch) >= bp.maxBatch {
bp.executeBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
bp.executeBatch(batch)
batch = batch[:0]
}
}
}
}
func (bp *BatchProcessor) executeBatch(batch []BatchRequest) {
// Construction de la requête batch vers Python
userIDs := make([]string, len(batch))
for i, req := range batch {
userIDs[i] = req.UserID
}
profiles, err := bp.client.GetUserProfilesBatch(context.Background(), userIDs)
// Distribution des réponses
for i, req := range batch {
var profile *UserProfile
if err == nil && i < len(profiles) {
profile = profiles[i]
}
select {
case req.Response <- BatchResponse{Profile: profile, Error: err}:
default:
// Channel fermé, requête abandonnée
}
}
}
Monitoring et métriques de performance
Dashboard de surveillance
Pour surveiller cette architecture hybride, j’ai mis en place un dashboard Grafana avec les métriques clés :
# prometheus.yml - Configuration monitoring
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'go-api'
static_configs:
- targets: ['localhost:8081']
metrics_path: /metrics
- job_name: 'python-api'
static_configs:
- targets: ['localhost:8000']
metrics_path: /metrics
rule_files:
- "alert_rules.yml"
# alert_rules.yml
groups:
- name: api_performance
rules:
- alert: HighLatencyDetected
expr: histogram_quantile(0.95, http_request_duration_seconds) > 0.5
for: 2m
labels:
severity: warning
annotations:
summary: "API latency is high"
description: "95th percentile latency is {{ $value }}s"
- alert: CrossServiceErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.1
for: 1m
labels:
severity: critical
Résultats mesurés après 2 mois
Les métriques de performance ont confirmé l’efficacité de l’approche :
Articles connexes: Tester vos modèles d’intelligence artificielle avec Python : mode d’emploi
Latence (p95) :
– Avant : 850ms (Python pur)
– Après : 127ms (architecture hybride)
– Amélioration : 85%
Cache hit ratios :
– L1 (mémoire) : 34% des requêtes
– L2 (Redis) : 53% des requêtes
– L3 (Python API) : 13% des requêtes
– Hit ratio global : 87%

Utilisation ressources :
– CPU moyen : -61%
– Mémoire : -43%
– Connexions TCP actives : -45%
Retour d’expérience et recommandations
Ce qui a bien fonctionné
La migration progressive endpoint par endpoint s’est révélée cruciale. Nous avons déployé avec des feature flags permettant un rollback instantané, et validé chaque migration avec des tests de charge k6 reproduisant nos patterns de trafic réels.
Le choix de conserver Python pour les endpoints à forte logique métier était judicieux : notre vélocité de développement sur ces parties est restée identique, et nous avons évité de réécrire des algorithmes complexes déjà testés.
Pièges évités de justesse
Négligence des timeouts : Nos premiers déploiements n’avaient pas de timeouts appropriés entre services. Nous avons frôlé une cascade de failures lors d’un pic de trafic inattendu. La configuration de timeouts agressifs (5s max) et de circuit breakers a résolu ce problème.
Articles connexes: Surveiller vos pipelines Airflow pour prévenir les échecs coûteux
Sur-optimisation des mauvaises métriques : J’ai initialement passé deux semaines à optimiser l’allocation mémoire en Go, gagnant 3MB par instance. Pendant ce temps, un simple ajustement des pools de connexions HTTP aurait apporté 10x plus d’amélioration.

Recommandations pour des équipes similaires
Cette approche hybride est pertinente si :
– Votre trafic dépasse 500 req/s sur des endpoints spécifiques
– Votre équipe peut investir 2-3 mois dans la migration
– Vous avez identifié des goulots clairement CPU-bound
– Votre logique métier Python est stable et bien testée
Pour des applications avec moins de trafic ou une logique métier en constante évolution, rester sur Python avec des optimisations ciblées (async/await, caching, profiling) sera probablement plus rentable.
Prochaines étapes : Nous évaluons l’intégration de composants Rust pour notre pipeline de traitement de données en batch, qui représente actuellement notre dernier goulot d’étranglement significatif.
Cette architecture hybride nous a permis d’obtenir les performances de Go là où c’était critique, tout en conservant la productivité Python pour le développement de nouvelles fonctionnalités. Un compromis pragmatique qui a transformé notre architecture sans révolution technique majeure.
À Propos de l’Auteur : Pierre Dubois est un ingénieur logiciel senior passionné par le partage de solutions d’ingénierie pratiques et d’insights techniques approfondis. Tout le contenu est original et basé sur une expérience réelle de projets. Les exemples de code sont testés dans des environnements de production et suivent les bonnes pratiques actuelles de l’industrie.