Dans ce billet, je vous expliquerai comment nous avons conçu un système en temps réel avec un temps de latence inférieur à la seconde :
un temps de latence inférieur à la seconde
Un débit de plus de 100 000 événements/seconde
Fiabilité à l'échelle
En utilisant Scala/Python et des outils open-source
Le besoin de l'entreprise
À Improving, nous avions besoin de construire un pipeline ETL de niveau entreprise pour traiter des données de télémétrie massives et non structurées (JSON, XML) provenant de sources multiples. Les exigences étaient les suivantes
Temps de latence inférieur à la seconde entre l'ingestion de l'événement et la disponibilité de la requête
Haute disponibilité et tolérance aux pannes
Capacité à évoluer jusqu'à 10 To/jour de données d'événements brutes
Flexibilité des schémas pour prendre en charge les schémas IoT en constante évolution.
Pile technologique (sur site)
Nous avons choisi des technologies éprouvées et évolutives :
Conception du système sur site :
Kafka : L'épine dorsale du temps réel
Thèmes partitionnés pour le parallélisme (plus de 100 partitions)
Compression activée (LZ4)
Facteur de réplication = 3 pour la tolérance aux pannes
Les producteurs envoient des événements au format Avro avec Schema Registry
Conception des clés pour une faible latence:
Kafka est conçu pour une faible latence batch linger.ms = 1mset acks=1 pour privilégier la vitesse (avec redondance au niveau de la couche de stockage).
spark.streaming.backpressure.enabled=true
Streaming structuré Spark : la magie de la latence en dessous de la seconde
Nos tâches de streaming PySpark ont été finement réglées pour le micro-batching :
Taille du lot : Micro-lots dynamiques, plafonnés à ~1 seconde de données
Agrégations avec état : activées avec des filigranes
Points de contrôle : Répertoire de points de contrôle basé sur HDFS pour exactement une fois
Pour éviter la surcharge de sérialisation, nous avons analysé les messages à l'aide de la méthode fastavro dans les UDF et nous avons écrit la sortie à l'aide de DataFrameWriter avec le format format parquet (idéal pour Hive).
Hive : Stockage évolutif avec des recherches rapides
Tables partitionnées externes (par dt_skey, fournisseur)
Parquet compressé Snappy pour optimiser le débit d'E/S et permettre des lectures rapides en colonnes avec pushdown des prédicats dans Hive.
Moteur Tez optimisé avec : Filtres de Bloom, exécution vectorisée, élagage des partitions
Chaque lot Spark écrit directement dans HDFSet Hive récupère les nouvelles données via la réparation de partition, par exemple en déclenchant l'opération MSCK REPAIR TABLE
ou ALTER TABLE ADD PARTITION
(déclenché par Airflow).
Optimisations clés
1. Réduction de la surcharge des micro-lots
L'optimisation de spark.sql.shuffle.partitions et la réutilisation des sérialiseurs ont permis de maintenir chaque micro-lot en dessous de 50 ms.
Paramètres de réglage de Spark : executor-cores=5 mémoire de l'exécuteur=8G num-executors=50 spark.sql.shuffle.partitions=600 → optimisé sur la base du nombre de partitions Kafka spark.memory.fraction=0.8 → permet d'avoir plus de mémoire pour les shuffles et la mise en cache.
2. Joints de diffusion pour les données de référence
L'un des principaux facteurs de perte de performance dans les systèmes distribués tels que Spark est l'utilisation de jointures lourdes. Pour atténuer ce problème, nous diffusons de petits ensembles de données de référence. Nous avons remplacé les jointures de type shuffle par des jointures de type broadcast lorsque les tables de consultation étaient < 100MB.
broadcast_df = spark.read.parquet("/hdfs/path/ref_data").cache() ref_broadcast = F.broadcast(broadcast_df)
joined_df = streaming_df.join(ref_broadcast, "region_code", "left")
Spark envoie le petit fichier
ref_data
à tous les exécuteurs.Il n'est pas nécessaire de mélanger le grand ensemble de données en continu.
Résultat : Aucun brassage, jointure plus rapide, latence inférieure à la milliseconde.
3. Lectures parallèles de Kafka
Chaque partition Kafka a été mappée à un thread d'exécution Spark dédié (pas de goulets d'étranglement liés à la copartition).
Configuration de Kafka pour un débit élevé - Partitionnement des sujets : 100+ partitions pour répartir l'ingestion - Facteur de réplication : 3 (pour le basculement) - Compression : lz4 pour une latence plus faible et un compactage décent - Taille des messages : plafonnée à 1 Mo pour éviter les blocages
4. Zéro pause GC
Spark est configuré avec G1GC et un réglage de la mémoire pour éviter les blocages de la JVM pendant les pics d'ingestion. Problème : Les pauses du Garbage Collection et la pression sur la mémoire ralentissent l'achèvement des micro-lots.
Exemple Utilisation de G1GC et set : -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent
5. Traitement de l'asymétrie des données
Utilisation du salage et du repartitionnement pour éviter les tâches massives de brassage de données sur les types d'événements rares.
Exemple de salage et de repartitionnement : from pyspark.sql.functions import when, concat_ws, floor, rand
# Appliquer le sel uniquement à la clé asymétrique salted_df = df.withColumn( "event_type_salted", when(col("event_type") == "DELIVERED", concat_ws("_", col("event_type"), floor(rand() * 10).cast("int")) ).otherwise(col("event_type")) )
# Repartitionner pour répartir la charge plus uniformément repartitioned = salted_df.repartition(10, "event_type_salted")
6. Compactage de petits fichiers (Hive + HDFS)
Parquet + Snappy comme format par défaut pour toutes les sorties Spark batch/streaming. L'écriture de micro-batchs toutes les quelques millisecondes crée beaucoup de petits fichiers parquetqui :
ralentissent les requêtes Hive
Stressent le NameNode de HDFS
Entraînent une surcharge de mémoire
Optimisation : - Exécuter un travail de compactage quotidien/roulant à l'aide de Spark pour fusionner les fichiers. - Utiliser une logique de mise en bacs et de partitionnement adaptée à Hive (éviter un trop grand nombre de petites partitions). - Combiner la sortie en utilisant .coalesce(n) ou .repartition(n) avant d'écrire.
Surveillance et observabilité
Kafka : surveillé à l'aide d'exportateurs Prometheus et d'alertes en cas de retard.
Spark : intégré avec Grafana pour les métriques de mémoire d'exécuteur/CPU
Délai de bout en bout : mesuré entre l'horodatage de l'écriture Kafka et l'horodatage de l'écriture Hive, stocké dans la base de données des métriques.
Nous avons construit un tableau de bord de latence pour inspecter visuellement où les goulots d'étranglement se produisent, ce qui est utile pour un réglage proactif.
Tests et analyses comparatives
Avant la mise en production, nous avons - simulé une ingestion de 10 To/jour à l'aide de producteurs Kafka - Comparaison avec 3 ans de données historiques - Effectué des tests de stress avec des événements synthétiques en rafale (3x la charge normale).
Leçons apprises
Il faut toujours tester la latence de bout en bout, et pas seulement les performances par couche.
Hive peut être suffisamment en temps réel si vous partitionnez intelligemment et écrivez efficacement.
On-prem n'est pas synonyme de lenteur avec le bon réglage, l'open-source peut rivaliser avec les piles cloud-natives.
Spark Structured Streaming peut absolument fournir des pipelines à faible latence sur site.
La stratégie de partitionnement compte plus que la puissance de calcul
Le bon vieux Hive peut encore briller lorsqu'il est bien réglé
Les mécanismes de contre-pression de Kafka sont des bouées de sauvetage en cas de pic de charge.
Conclusion
Chez Improving, nous pensons que l'ingénierie de la performance est un sport d'équipe. Concevoir un pipeline on-prem de 8-10 To/jour avec une latence inférieure à la seconde était ambitieux, mais en utilisant Kafka + Spark + Hive avec des réglages intelligents, nous avons réussi.
Si vous construisez des pipelines ETL à grande échelle ou modernisez des systèmes on-prem, n'hésitez pas à nous contacter !