Background Image
TECHNOLOGIE

Dévoiler les drapeaux de fonctionnalités aux consommateurs Kafka

Image - Matt Schroeder

May 31, 2022 | 6 Lecture minute

Les indicateurs de fonctionnalité sont un outil permettant d'activer ou de désactiver stratégiquement une fonctionnalité au moment de l'exécution. Ils sont souvent utilisés pour piloter différentes expériences utilisateur, mais peuvent également être utiles dans les systèmes de données en temps réel. Dans ce billet, nous allons utiliser les feature flags pour mettre en pause et reprendre un Kafka Consumer à la volée.

Vous vous demandez peut-être pourquoi ne pas simplement arrêter le(s) consommateur(s) pour "mettre en pause" le traitement ? Cette approche est efficace à 100 % et permet d'atteindre l'objectif final qui est d'arrêter la consommation. Cependant, elle présente des inconvénients lorsqu'il s'agit d'une architecture de consommateurs à grande échelle.

  • Rééquilibrages - En fonction de la configuration de votre groupe de consommateurs et du nombre de partitions, le processus de rééquilibrage qui est déclenché lorsqu'un consommateur quitte le groupe peut s'avérer pénible et chronophage.

  • Manque de granularité - si le consommateur lit plusieurs sujets, vous arrêtez la consommation de tout. Il n'y a pas d'intermédiaire.

  • Traitement réactif - Pour réactiver le traitement, tous les consommateurs doivent redémarrer et rééquilibrer les partitions. Avec plus de 100 instances de consommateurs, cela peut prendre beaucoup de temps.

Avant d'aborder la manière d'incorporer les indicateurs de fonctionnalité, nous devrions examiner la fonctionnalité qui existe déjà dans les bibliothèques client Kafka pour mettre en pause et reprendre le traitement de manière dynamique. Nous sommes déjà plus proches de l'objectif final que vous ne l'avez peut-être réalisé.

Mettre en pause les consommateurs Kafka

Le client Kafka Consumer dispose de fonctionnalités permettant de récupérer des partitions assignées, de mettre en pause des partitions spécifiques, de récupérer des partitions actuellement en pause et de reprendre des partitions spécifiques. Ces méthodes sont tout ce dont vous avez besoin pour mettre en pause et reprendre la consommation de manière dynamique.

Examinons ces opérations de plus près.

  • KafkaConsumer.assignment() : renvoie les partitions de sujet actuellement assignées sous la forme d'un ensemble de TopicPartition. La TopicPartition contient deux propriétés principales, le nom du sujet sous la forme d'une chaîne et la partition sous la forme d'un nombre entier.

  • KafkaConsumer.pause(Set of TopicPartition) : suspend la recherche dans les partitions de sujets fournies. L'interrogation du consommateur se poursuit, ce qui est essentiel pour maintenir le consommateur en vie et éviter les rééquilibrages. Cependant, les sondages futurs ne renverront pas d'enregistrements des partitions en pause jusqu'à ce qu'ils aient été repris.

  • KafkaConsumer.paused() : renvoie les partitions topic qui sont actuellement assignées et dans un état de pause suite à l'utilisation de la méthode pause.

  • KafkaConsumer.resume(Set of TopicPartition) : reprend la recherche à partir des partitions de sujet fournies.

Option 1 : Mettre le monde en pause

La manière la plus simple de mettre le traitement en pause est de tout mettre en pause. L'extrait ci-dessous l'illustre.

// obtenir toutes les partitions attribuées au consommateur Ensemble<TopicPartition> assignedPartitions = consumer.assignment() ;

// mettre en pause toutes les partitions attribuées Set<TopicPartition> pausedPartitions = consumer.pause(assignedPartitions) ;

// reprendre toutes les partitions en pause consumer.resume(consumer.paused()) ;

// on peut aussi reprendre consumer.assignment() puisque tout est en pause consumer.resume(consumer.assignment()) ;

Option 2 : Pause sélective

Il est utile d'avoir le gros bouton rouge pour arrêter tout le traitement montré précédemment, mais il serait plus utile de pouvoir contrôler les choses à un niveau plus granulaire.

L'extrait suivant montre à quoi pourrait ressembler la mise en pause des partitions pour un seul sujet.

// supposons que le consommateur est abonné à demo.topic.name &amp;&amp; another.topic.name String pausedTopic = "demo.topic.name" ;

// // PAUSE

// filtrer les partitions assignées pour le sujet en pause

Set<TopicPartition> partitionsToPause = consumer.assignment() .stream()

.filter(tp -&gt; pausedTopic.equals(tp.topic())) .collect(toSet()) ;

Set<TopicPartition> pausedPartitions = consumer.pause(partitionsToPause) ;

// REPRISE //

// filtrer les partitions en pause pour le sujet repris Set<TopicPartition> partitionsToResume = consumer.paused()

.stream() .filter(tp -&gt; pausedTopic.equals(tp.topic())) .collect(toSet()) ;

consumer.resume(partitionsToResume) ;

Libérer les drapeaux

Après avoir vu le dernier extrait, vous pouvez probablement deviner comment les indicateurs de fonctionnalité entrent en jeu pour fournir dynamiquement la variable "pausedTopic" de l'extrait.

Avec une simple convention de dénomination des indicateurs de fonctionnalité, telle que topic_{topic-name-here} l'application peut extraire tous les drapeaux et ne filtrer que ceux qui l'intéressent.

Le pseudo-code ci-dessous montre à quoi cela pourrait ressembler.

// deux sujets qui intéressent ce consommateur List assignedTopics = ["demo.topic.one", "demo.topic.two"]

// tous les indicateurs de caractéristiques List flags = [

"topic_demo.topic.one", "ui-feature-flag", "topic_important.topic"

]

// les drapeaux qui intéressent réellement ce consommateur List assignedTopicFlags = toggles

.stream() .filter(t -&gt; t.startsWith('topic_') &amp;&amp; assignedTopics.contains(getTopicName(t)) .collect(toList()) ;

// assignedTopicFlags = ["topic_demo.topic.one"]

Maintenant que nous pouvons imaginer comment cela fonctionne, examinons l'intégration d'Unleash, une solution open-source d'indicateur de fonctionnalité. Vous pouvez remplacer Unleash par n'importe quel produit de signalisation.

Unleash

Il y a de nombreuses options sur le marché, mais Unleash est une plateforme open-source de gestion des fonctionnalités qui a bien fonctionné pour moi. Elle a gagné suffisamment de terrain pour que Gitlab l'offre maintenant comme un service intégré et depuis Gitlab 13.5, elle est disponible dans tous les niveaux. Une liste complète des fonctionnalités est disponible ici.

Connecter des applications à Unleash est simple. Sélectionnez le SDK Unleash qui correspond à votre application et configurez l'URL Unleash et l'ID d'instance. Si vous utilisez Gitlab, il y a des instructions claires sur l'intégration des feature flags avec votre application.

Voici un exemple de configuration du client Unleash dans une application Java Spring Boot.

/** * Configurer le client qui se connecte à Unleash si unleash.enabled est "true" **/ @Bean @ConditionalOnProperty(value = "unleash.enabled", havingValue = "true") Unleash defaultUnleash() {

UnleashConfig config = UnleashConfig.builder() .unleashAPI("&lt;http://unleash-api.com&gt;") .instanceId("asdf-123") .appName("unleash-demo") .environment("development") .fetchTogglesInterval(10)

.subscriber(new Log4JSubscriber()) .build() ;

return new Unleash(config) ; }

/** * Si unleash.enabled est faux, le bean ci-dessus ne sera pas créé. * Dans ce scénario, le client FakeUnleash sera injecté dans le contexte de l'application pour être utilisé. Ceci est utile pour le développement local. **/ @Bean @ConditionalOnMissingBean(Unleash.class) Unleash fakeUnleash() {

return new FakeUnleash(config) ; }

Récupération des drapeaux de fonctionnalité

Nous avons examiné la fonctionnalité de pause/reprise et la manière dont une simple convention de nommage des indicateurs de fonctionnalité peut être utilisée pour cibler les sujets qui doivent être mis en pause/repris. Lions maintenant les deux et examinons les stratégies de récupération et d'application des bascules.

Option 1 : Boucle de sondage

L'option la plus simple est d'intégrer les contrôles de drapeaux dans la boucle de sondage. Unleash rafraîchira les états des drapeaux dans un fil d'exécution en arrière-plan, de sorte que chaque vérification atteindra le cache en regardant les dernières valeurs et en voyant ce qui, le cas échéant, doit être mis en pause. L'avantage de cette approche est que tout se passe directement dans le thread d'interrogation, ce qui est important puisque le consommateur n'est pas thread-safe.

// consommateur simplifié try {

while (true) { // récupère les dernières valeurs de drapeaux et les applique en conséquence applyFeatureFlags(consumer) ;

ConsumerRecords<String, String> records = consumer.poll(100) ; for (ConsumerRecord<String, String> record : records) {

// logique

}

} } finally {

consumer.close() ; }

Option 2 : Abonnement à l'événement UnleashEvent

Unleash dispose d'une conception événementielle interne qui permet de s'abonner facilement aux événements déclenchés après le rafraîchissement des toggles. Il s'agit de la représentation la plus récente de l'état des drapeaux, car l'événement est déclenché immédiatement après la mise à jour du cache.

Comme mentionné dans l'option 1, les consommateurs ne sont pas à l'abri des threads, vous devez donc gérer les opérations du consommateur de manière appropriée et exécuter les opérations de pause du consommateur sur le thread d'interrogation.

Le principal avantage de cette option est qu'elle n'encombre pas la boucle de sondage avec des responsabilités. Les sondages se poursuivent et si les choses sont en pause, ils ne récupèrent pas les enregistrements. Si les choses sont actives, ils récupèreront les enregistrements.

/** * Abonné personnalisé qui sera notifié sur tous les UnleashEvents. * L'abonné Log4JS étendu déconnectera les événements qui ne sont pas * gérés par cette extension. **/ public class KafkaUnleashSubscriber extends Log4JSubscriber {

@Override public void on(@NotNull UnleashEvent event event) {

// beaucoup d'événements passent ici, nous ne nous intéressons qu'aux réponses if (event instance of FeatureToggleResponse) {

// l'événement contient TOUS les bascules, filtrez celles qui vous intéressent

// appliquer les bascules au consommateur

}

}

}

N'oubliez pas le RebalanceListener

Le dernier élément à prendre en compte est le ConsumerRebalanceListener. Lorsque plusieurs consommateurs s'exécutent dans un groupe, chacun d'entre eux est responsable de la mise en pause des partitions qui lui sont attribuées. Si un consommateur meurt, les partitions qui lui ont été attribuées seront automatiquement rééquilibrées au profit des autres consommateurs du groupe. Cependant, le consommateur qui reçoit les partitions nouvellement assignées n'a pas connaissance de leur état précédent (en pause/actif) et elles seront donc actives après l'assignation.

Le RebalanceListener est le point d'ancrage du cycle de vie du rééquilibrage pour mettre en pause les partitions avant que la consommation ne commence et pour éviter de consommer accidentellement un thème qui devrait être mis en pause.

Avec tous les composants qui ont déjà été construits, il devrait être assez simple de créer un écouteur qui lie le tout et maintient les partitions en pause si elles en ont besoin.

Technologie
Données

Dernières réflexions

Explorez nos articles de blog et laissez-vous inspirer par les leaders d'opinion de nos entreprises.
Blog Image - Unveiling the Future of AI at Google Cloud Next 24 -1
AI/ML

Unveiling the Future of AI at Google Cloud Next ‘24

Get firsthand insights from Improving into the innovation brewing around artificial intelligence and cloud computing at Google Cloud Next '24.