Background Image
TECHNOLOGIE

Evolution du schéma Kafka avec Java Spring Boot et Protobuf

Scott Strzynski

Consultant principal

February 7, 2023 | 10 Lecture minute

Dans ce blog, je vais démontrer l'évolution des schémas Kafka avec Java, Spring Boot et Protobuf. Cette application est destinée à servir de tutoriel, il y aura donc des cas où un remaniement pourrait se produire. J'ai essayé de la garder simple et compréhensible. Le code consiste en deux producteurs Kafka et deux consommateurs. Dans une situation réelle, votre consommateur pourrait se trouver dans une autre application.

Pour exécuter cette application localement, vous devrez avoir un registre Kafka/Zookeeper/Schema local en cours d'exécution. Le code est disponible à l'adressehttp://github.com/hal90210/spring-kafka-protobuf 

Description de l'application

Cette application est un exemple simple de publication de messages sur un site web et d'autorisation de commentaires sur ces messages. Il s'agit d'une application incomplète en ce sens qu'une fois qu'un message est consommé, rien ne se passe. Une application réelle pourrait persister dans une base de données, ou écrire dans une KTable et plus tard joindre le message aux commentaires postant un grand modèle canonique sur un sujet différent. Ces deux possibilités sortent du cadre de ce blog.

Ce que je vais démontrer, c'est comment faire évoluer votre modèle de domaine en utilisant Protobuf. Je commencerai par une v1 du schéma, puis j'ajouterai et supprimerai des champs, je publierai les deux versions du message sur un sujet et je ferai en sorte que deux consommateurs différents traitent les deux messages en toute sécurité.

Note : J'ai inclus les fichiers Java générés par le compilateur Protobuf, mais vous devriez normalement avoir un repo séparé et un processus de construction pour cela.

Le schéma

  • WebsiteMessage

  • Auteur

  • Commentaire

Le code

Dans cette application, il y a deux Rest Controllers qui gèrent les requêtes Post pour le schéma v1 ou v2. Selon le point de terminaison, WebsiteMessageProducer.java ou WebsiteMessageProducerV2.java est appelé pour produire et publier un message Protobuf vers Kafka.

Les deux consommateurs, WebsiteMessageListener.java et WebsiteMessageListenerV2.java, écoutent le même sujet avec des identifiants de groupe différents.

Définitions des messages Protobuf

À l'aide d'un IDL (langage de définition d'interface), Protobuf vous permet de créer des schémas de messages simples ou complexes. Dans cet exemple d'application, nous modéliserons un site web qui permet aux personnes (author.proto) de publier des messages (websiteMessage.proto) et aux autres personnes de commenter (comment.proto) ce message. Il s'agit d'un schéma simple, mais il met en évidence quelques caractéristiques importantes de Protobuf.

websiteMessage.proto 

syntaxe = "proto3" ;

option java_package = "com.improving.springprotobuf.generated.v1.model" ;

option java_outer_classname = "WebsiteMessagePayload" ;

import "google/protobuf/timestamp.proto" ;

import "comment.proto" ;

import "author.proto" ;

message WebsiteMessage {

string id = 1 ;

string content = 2 ;

AuthorMessage author = 3 ;

répété CommentMessage comments = 4 ;

string topic = 5 ;

}

Cette définition comporte deux champs obligatoires : syntaxe et message.

Le champ syntaxe indique la version de Protobuf que vous utilisez, dans ce cas proto3.

Le champ message est la définition du schéma de votre message. Il peut contenir des types de variables scalaires, des messages personnalisés, des types de messages Protobuf, des variables répétées (Lists) ou des variables enum. Voir la définition complète : https://developers.google.com/protocol-buffers/docs/proto3 

Les messages personnalisés peuvent être inclus dans le même fichier ou importés d'autres fichiers. Pour vos messages dans des fichiers externes, l'importation utilise un chemin relatif au fichier actuel.

Chaque variable déclare un type, un nom et un index utilisé en interne dans ce message. Cet index ne peut être modifié une fois que vous avez commencé à produire des messages avec lui. C'est la valeur qui est sérialisée dans le message au lieu du nom du champ. Je parlerai de la modification de votre schéma dans une section ultérieure.

Le type répété indique simplement que le champ peut contenir de 0 à n des types spécifiés dans une structure de type liste.

Options

Les options vous permettent de définir la structure des classes Java générées.

option java_package = "com.improving.springprotobuf.generated.v1.model" ; 

Indique au protocole à quel paquetage appartiennent les classes générées.

option java_outer_classname = "WebsiteMessagePayload" ; 

Le message de premier niveau est contenu dans une classe externe, le nom de cette classe externe est défini par défaut dans le protocole comme suitvotreNomDeMessageOuterClass.java. Utilisez cette option pour le modifier à votre guise, car vous devrez y faire référence dans votre code.

Classes Java générées par Protobuf

L'ajout du bloc protoc à notre build.gradle permet au compilateur protobuf de construire des classes Java qui représentent le schéma des fichiers .proto pendant la tâche de construction de gradle.

 protobuf {

// Configure l'exécutable protoc

protoc {

artifact = 'com.google.protobuf:protoc:3.0.0'

generatedFilesBaseDir= "$projectDir/src"

}

}

Cette configuration ajoutera les classes java au répertoire src/java de votre projet.

Note : Pour les utilisateurs d'IntelliJ, vous pouvez obtenir des erreurs dans les fichiers .proto qui traitent des fichiers importés. Pour y remédier, vous devrez modifier l'endroit où l'IDE recherche ces fichiers.

Sur un Mac
  • Allez dans IntelliJ IDEA > Preferences > Language & Frameworks > Protocol Buffers

  • Décocher la case Configurer automatiquement

  • Ajouter ce cheminfile:///votre chemin vers le projet/spring-kafka-protobuf/src/main/proto

  • Cliquez sur Ok

Vous pouvez stocker vos fichiers .proto n'importe où, mais le compilateur de protocoles s'attend à ce qu'ils se trouvent sous src/main/proto.

WebsiteMessageProducer.java

Il s'agit du producteur de messages Kafka v1. Il importe les classes Java générées par Protobuf

import com.improving.springprotobuf.generated.v1.model.Comment.CommentMessage ; 

import com.improving.springprotobuf.generated.v1.model.Author.AuthorMessage; ; 

import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage ; 

Les classes générées permettent de créer des objets en utilisant le modèle de construction. Ici, le code crée un Author et un WebsiteMessage en le publiant avec un modèle kafka.

// Normalement, on rechercherait l'auteur par son identifiant, mais il s'agit ici d'un exemple d'application.

AuthorMessagemessageAuthor =AuthorMessage

.newBuilder()

.setFirstName(message.getAuthor().getFirstName())

.setLastName(message.getAuthor().getLastName())

.setId(UUID.randomUUID().toString())

.build() ;

AuthorMessagecommentAuthor =AuthorMessage

.newBuilder()

.setFirstName("Dark")

.setLastName("Maul")

.setId(UUID.randomUUID().toString())

.build() ;

CommentMessage comment =

MessageCommentaire

.newBuilder()

.setContent("Excellent article")

.setAuthor(commentAuthor)

.build() ;

// Montre l'ajout d'un commentaire

WebsiteMessagewMsg = WebsiteMessage

.newBuilder()

.setId(UUID.randomUUID().toString())

.setContent(message.getContent())

.setAuthor(messageAuthor)

.setTopic("test-topic")

.addComments(commentaire)

.build() ;

WebsiteMessageListener.java 

Le consommateur utilise les mêmes classes Java générées pour désérialiser le message Kafka. Le type du message est com.google.protobuf.DynamicMessage. Les classes générées par Protobuf vous permettent d'instancier une instance à partir d'un tableau d'octets, comme illustré ci-dessous.

import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage ;

import com.google.protobuf.DynamicMessage ;

import com.google.protobuf.InvalidProtocolBufferException ;

...

 @Service

public class WebsiteMessageListener implements MessageConsumer {

@KafkaListener(topics = "website-messages", groupId= "group_id_v1")

public void processEvent(ConsumerRecord<String, DynamicMessage> record) {

WebsiteMessagemessage=null ;

try {

message =WebsiteMessage

.newBuilder()

.build()

.getParserForType()

.parseFrom(record.value().toByteArray()) ;

} catch (InvalidProtocolBufferException e) {

e.printStackTrace() ;

return ;

}

 

logger.info(String.format("#### -> Consumingmessage -> %s", message.toString())) ;

logger.info(String.format("#### -> Topic -> %s", message.getTopic())) ;

}

}

Évolution du schéma

Pour démontrer l'évolution du schéma, j'ai un peu triché. J'ai modifié le nom d'importation dans les fichiers .proto pour générer deux paquets distincts, v1 et v2. Cela simule deux applications différentes utilisant deux versions différentes des classes générées.

Modifications 

J'ai supprimé le champ topic de WebsiteMessage. Pour s'assurer que personne n'utilise le numéro/nom du champ, vous devez les ajouter au bloc réservé.

J'ai également ajouté un champ Enum nommé Version. Les Enums doivent toujours avoir une valeur nulle, c'est la valeur par défaut si aucune n'est fournie. Ici, je l'ai nommé UNKNOWN, mais il pourrait être nommé n'importe comment.

syntaxe = "proto3" ;

option java_package= "com.improving.springprotobuf.generated.v2.model" ;

option java_outer_classname= "WebsiteMessagePayload" ;

import "google/protobuf/timestamp.proto" ;

import "comment.proto" ;

import "author.proto" ;

message WebsiteMessage {

réservé 5 ;

réservé "topic" ;

string id = 1 ;

string content = 2 ;

AuthorMessage auteur = 3 ;

répété CommentMessage comments = 4 ;

Version version= 6 ;

}

enum Version {

UNKNOWN = 0 ;

V1 = 1 ;

V2 = 2 ;

}

Compatibilité ascendante et descendante

Les modifications que j'ai apportées et le code Java qui en découle démontrent la compatibilité ascendante et descendante.

On parle de compatibilité ascendante lorsqu'un producteur crée un message avec une version plus ancienne du schéma et qu'un consommateur utilisant une version plus récente du schéma peut désérialiser le message en toute sécurité. Voir WebsiteMessageProducer.java produisant un message v1 et WebsiteMessageListenerV2.java consommant ce message avec un schéma v2.

On parle de compatibilité ascendante lorsqu'un producteur crée un message avec une version plus récente du schéma et qu'un consommateur utilisant une version plus ancienne du schéma peut désérialiser le message en toute sécurité. Voir WebsiteMessageProducerV2.java produisant un message v2 et WebsiteMessageListener.java consommant ce message avec un schéma v1.

WebsiteMessageProducerV2.java 

Ce producteur démontrera l'utilisation d'une version plus récente du schéma. Dans cette classe, nous ajoutons le champ version du schéma v2

import com.improving.springprotobuf.generated.v2.model.Author ;

import com.improving.springprotobuf.generated.v2.model.Comment.CommentMessage; ;

import

com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload ;

import

com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload.Version ;

...

@Service

public class WebsiteMessageProducerV2 implements MessageProducer {

...

// Normalement, l'auteur serait recherché par son identifiant, mais il s'agit ici d'un exemple d'application.

        AuthorMessagemessageAuthor =AuthorMessage

.newBuilder()

.setFirstName(message.getAuthor().getFirstName())

.setLastName(message.getAuthor().getLastName())

.setId(UUID.randomUUID().toString())

.build() ;

Author.AuthorMessagecommentAuthor =Author.AuthorMessage

.newBuilder()

.setFirstName("Cela")

.setLastName("UnCommentateur")

.setId(UUID.randomUUID().toString())

.build() ;

CommentMessage comment =CommentMessage.newBuilder().setContent("Premier !").setAuthor(commentAuthor).build() ;

CommentMessage comment2 =CommentMessage.newBuilder().setContent("Oops I guess I wasn't first").setAuthor(commentAuthor).build() ;

// Montre l'ajout de plusieurs commentaires en un seul appel de méthode

        WebsiteMessagewMsg=WebsiteMessage

.newBuilder()

.setId(UUID.randomUUID().toString())

.setContent(message.getContent())

.setAuthor(messageAuthor)

.setVersion(Version.V2)

.addAllComments(Arrays.asList(comment, comment2))

.build() ;

 

this.kafkaTemplate.send(TOPIC, wMsg.getId(), wMsg) ;

}

}

Exécution locale

Il y a un docker-compose.yaml qui va démarrer zookeeper, kafka, schema registry et l'application dans leurs conteneurs respectifs. Vous devrez d'abord compiler le jar. Lancez donc

  • ./gradlew clean build

  • docker-compose build

  • docker-compose up -d

Logging

docker logs -f spring-kafka-protobuf_app_1

Poster du Json pour créer des messages Kafka

Il y a deux contrôleurs qui vont gérer les messages v1 et v2.

Message V1

curl -X POST http://localhost:8080/v2/message/publish \N- 'Content-Type : application' -H 'Content-Type : application'.

-H 'Content-Type : application/json' \N -d '{"content" : application/json")

-d '{"content" : "Regardez, c'est un message v2",

"author" : {

"firstName" : "Scott", "lastName" : "Strzynski"

}

}'

Voyons ce que chaque consommateur enregistre. Puisque ce message a un schéma v1, nous nous attendons à ce que le sujet apparaisse, et pas de Version parce que c'est un concept v2.

Pour montrer la compatibilité ascendante/descendante, les deux consommateurs écoutent le même sujet avec des identifiants de groupe de consommateurs différents.

Ici, le consommateur v1 imprime le message Protobuf désérialisé et inclut le champ v1 topic :

WebsiteMessageListener :#### Consuming message

id : "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"

content : "hey this is a v1 message"

auteur {

id: "07312922-5398-43e1-93b5-9fa12a24502c"

firstName : "Scott"

lastName : "Strzynski"

}

commentaires {

content : "Excellent article"

author {

id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"

firstName : "Darth"

lastName : "Maul"

}

}

topic : "test-topic"

Ici, l'auditeur V2 affiche la compatibilité ascendante, puisque le sujet du schéma v2 a été supprimé, il ne l'affiche pas. De même, comme Version a été ajouté dans la version 2, mais n'a pas été défini dans le producteur de la version 1, il affiche la valeur zéro par défaut.

 WebsiteMessageListenerV2 :#### Message consommé

id : "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"

content : "hey this is a v1 message"

auteur {

id: "07312922-5398-43e1-93b5-9fa12a24502c"

firstName : "Scott"

lastName : "Strzynski"

}

commentaires {

content : "Excellent article"

author {

id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"

firstName : "Darth"

lastName : "Maul"

}

}

WebsiteMessageListenerV2 :#### Version du message : INCONNUE

Message V2

Dans ce message, le contrôleur v2 va traiter la requête post et construire un message V2 avec Version, et sans sujet.

curl -X POST http://localhost:8080/v2/message/publish \N- -H 'Content-Type : application, application et sujet'.

-H 'Content-Type : application/json' \N- -d '{"content" : ")

-d '{"content" : "Regardez, c'est un message v2",

"author" : {

"firstName" : "Scott", "lastName" : "Strzynski"

}

}'

Vous pouvez voir dans les journaux que la version correcte est maintenant affichée et qu'il n'y a pas de champ de sujet.

WebsiteMessageListenerV2 :#### Message consommé

id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"

content : "Regardez, c'est un message v2"

auteur {

id: "85a8a789-c30d-495e-a187-d702610a8150"

firstName : "Scott"

lastName : "Strzynski"

}

commentaires {

content : "First !"

author {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName : "Ça"

lastName : "UnCommentateur"

}

}

commentaires {

content : "Oops I guess I wasn\'t first" (Oups, je crois que je n'étais pas le premier)

author {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName : "That"

lastName : "UnCommentateur"

}

}

version : V2

L'écouteur v1 affichant la compatibilité ascendante n'affiche pas cette version et une chaîne vide est la valeur du sujet.

WebsiteMessageListener :#### Message de consommation

id : "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"

content : "Regardez, c'est un message v2"

auteur {

id: "85a8a789-c30d-495e-a187-d702610a8150"

firstName : "Scott"

lastName : "Strzynski"

}

commentaires {

content : "First !"

author {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName : "Ça"

lastName : "UnCommentateur"

}

}

commentaires {

content : "Oops I guess I wasn\'t first" (Oups, je crois que je n'étais pas le premier)

author {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName : "That"

lastName : "UnCommentateur"

}

}

WebsiteMessageListener :#### Sujet :

Registre des schémas

Le schéma Protobuf est ajouté au registre des schémas comme Avro.

http://localhost:8081/subjects

[

"author.proto",

"commentaire.proto",

"google/protobuf/timestamp.proto",

"website-messages-value"

]

http://localhost:8081/subjects/website-messages-value/versions

[

1,

2

]

http://localhost:8081/subjects/website-messages-value/versions/2

{

"subject" : "website-messages-value",

"version" : 2,

"id" : 4,

"schemaType" : "PROTOBUF",

"references" : [

{

"name" : "google/protobuf/timestamp.proto",

"subject" : "google/protobuf/timestamp.proto",

"version" : 1

},

{

"name" : "commentaire.proto",

"subject" : "commentaire.proto",

"version" : 1

},

{

"name" : "auteur.proto",

"subject" : "auteur.proto",

"version" : 1

}

],

"schema" : "syntax = \N "proto3\N";\Nnimport \N "google/protobuf/timestamp.proto\N";\Nimport \N "comment.proto\N";\Nimport \N "author.proto\N";\Noptionjava_package = \N "com.improving.springprotobuf.generated.v2.model";\Noptionjava_outer_classname = \N "WebsiteMessagePayload";\NMessageWebsiteMessage {\n réservé 5 à 6;\Nréservé \N "topic";\Nchaîne id = 1;\Nchaîne content = 2;\N.../.../.../.../.../.../.../.../.../.../.../.../.../.../.../.../...AuthorMessage author = 3;\n répété .CommentMessage comments = 4;\n .Version version = 6;\n}\nenum Version {\n UNKNOWN = 0;\n V1 = 1;\n V2 = 2;\n}\n"

}

Conclusion

L'utilisation de Protobuf permet au développeur de construire un schéma évolutif comme Avro. A la différence d'Avro, il utilise un IDL au lieu d'un JSON. Spring Boot et Gradle fournissent un bon cadre pour permettre au développeur d'écrire des applications rapidement et de ne pas s'inquiéter des problèmes liés à un schéma changeant.

Technologie
Données

Dernières réflexions

Explorez nos articles de blog et laissez-vous inspirer par les leaders d'opinion de nos entreprises.
Blog Image - Unveiling the Future of AI at Google Cloud Next 24 -1
AI/ML

Unveiling the Future of AI at Google Cloud Next ‘24

Get firsthand insights from Improving into the innovation brewing around artificial intelligence and cloud computing at Google Cloud Next '24.