Description de l'application
Cette application est un exemple simple de publication de messages sur un site web et d'autorisation de commentaires sur ces messages. Il s'agit d'une application incomplète en ce sens qu'une fois qu'un message est consommé, rien ne se passe. Une application réelle pourrait persister dans une base de données, ou écrire dans une KTable et plus tard joindre le message aux commentaires postant un grand modèle canonique sur un sujet différent. Ces deux possibilités sortent du cadre de ce blog.
Ce que je vais démontrer, c'est comment faire évoluer votre modèle de domaine en utilisant Protobuf. Je commencerai par une v1 du schéma, puis j'ajouterai et supprimerai des champs, je publierai les deux versions du message sur un sujet et je ferai en sorte que deux consommateurs différents traitent les deux messages en toute sécurité.
Note : J'ai inclus les fichiers Java générés par le compilateur Protobuf, mais vous devriez normalement avoir un repo séparé et un processus de construction pour cela.
Le schéma
WebsiteMessage
Auteur
Commentaire
Le code
Dans cette application, il y a deux Rest Controllers qui gèrent les requêtes Post pour le schéma v1 ou v2. Selon le point de terminaison, WebsiteMessageProducer.java ou WebsiteMessageProducerV2.java est appelé pour produire et publier un message Protobuf vers Kafka.
Les deux consommateurs, WebsiteMessageListener.java et WebsiteMessageListenerV2.java, écoutent le même sujet avec des identifiants de groupe différents.
Définitions des messages Protobuf
À l'aide d'un IDL (langage de définition d'interface), Protobuf vous permet de créer des schémas de messages simples ou complexes. Dans cet exemple d'application, nous modéliserons un site web qui permet aux personnes (author.proto) de publier des messages (websiteMessage.proto) et aux autres personnes de commenter (comment.proto) ce message. Il s'agit d'un schéma simple, mais il met en évidence quelques caractéristiques importantes de Protobuf.
websiteMessage.proto
syntaxe = "proto3" ;
option java_package = "com.improving.springprotobuf.generated.v1.model" ;
option java_outer_classname = "WebsiteMessagePayload" ;
import "google/protobuf/timestamp.proto" ;
import "comment.proto" ;
import "author.proto" ;
message WebsiteMessage {
string id = 1 ;
string content = 2 ;
AuthorMessage author = 3 ;
répété CommentMessage comments = 4 ;
string topic = 5 ;
}
Cette définition comporte deux champs obligatoires : syntaxe et message.
Le champ syntaxe indique la version de Protobuf que vous utilisez, dans ce cas proto3.
Le champ message est la définition du schéma de votre message. Il peut contenir des types de variables scalaires, des messages personnalisés, des types de messages Protobuf, des variables répétées (Lists) ou des variables enum. Voir la définition complète : https://developers.google.com/protocol-buffers/docs/proto3
Les messages personnalisés peuvent être inclus dans le même fichier ou importés d'autres fichiers. Pour vos messages dans des fichiers externes, l'importation utilise un chemin relatif au fichier actuel.
Chaque variable déclare un type, un nom et un index utilisé en interne dans ce message. Cet index ne peut être modifié une fois que vous avez commencé à produire des messages avec lui. C'est la valeur qui est sérialisée dans le message au lieu du nom du champ. Je parlerai de la modification de votre schéma dans une section ultérieure.
Le type répété indique simplement que le champ peut contenir de 0 à n des types spécifiés dans une structure de type liste.
Options
Les options vous permettent de définir la structure des classes Java générées.
option java_package = "com.improving.springprotobuf.generated.v1.model" ;
Indique au protocole à quel paquetage appartiennent les classes générées.
option java_outer_classname = "WebsiteMessagePayload" ;
Le message de premier niveau est contenu dans une classe externe, le nom de cette classe externe est défini par défaut dans le protocole comme suitvotreNomDeMessageOuterClass.java. Utilisez cette option pour le modifier à votre guise, car vous devrez y faire référence dans votre code.
Classes Java générées par Protobuf
L'ajout du bloc protoc à notre build.gradle permet au compilateur protobuf de construire des classes Java qui représentent le schéma des fichiers .proto pendant la tâche de construction de gradle.
protobuf {
// Configure l'exécutable protoc
protoc {
artifact = 'com.google.protobuf:protoc:3.0.0'
generatedFilesBaseDir= "$projectDir/src"
}
}
Cette configuration ajoutera les classes java au répertoire src/java de votre projet.
Note : Pour les utilisateurs d'IntelliJ, vous pouvez obtenir des erreurs dans les fichiers .proto qui traitent des fichiers importés. Pour y remédier, vous devrez modifier l'endroit où l'IDE recherche ces fichiers.
Sur un Mac
Allez dans IntelliJ IDEA > Preferences > Language & Frameworks > Protocol Buffers
Décocher la case Configurer automatiquement
Ajouter ce cheminfile:///votre chemin vers le projet/spring-kafka-protobuf/src/main/proto
Cliquez sur Ok
Vous pouvez stocker vos fichiers .proto n'importe où, mais le compilateur de protocoles s'attend à ce qu'ils se trouvent sous src/main/proto.
WebsiteMessageProducer.java
Il s'agit du producteur de messages Kafka v1. Il importe les classes Java générées par Protobuf
import com.improving.springprotobuf.generated.v1.model.Comment.CommentMessage ;
import com.improving.springprotobuf.generated.v1.model.Author.AuthorMessage; ;
import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage ;
Les classes générées permettent de créer des objets en utilisant le modèle de construction. Ici, le code crée un Author et un WebsiteMessage en le publiant avec un modèle kafka.
// Normalement, on rechercherait l'auteur par son identifiant, mais il s'agit ici d'un exemple d'application.
AuthorMessagemessageAuthor =AuthorMessage
.newBuilder()
.setFirstName(message.getAuthor().getFirstName())
.setLastName(message.getAuthor().getLastName())
.setId(UUID.randomUUID().toString())
.build() ;
AuthorMessagecommentAuthor =AuthorMessage
.newBuilder()
.setFirstName("Dark")
.setLastName("Maul")
.setId(UUID.randomUUID().toString())
.build() ;
CommentMessage comment =
MessageCommentaire
.newBuilder()
.setContent("Excellent article")
.setAuthor(commentAuthor)
.build() ;
// Montre l'ajout d'un commentaire
WebsiteMessagewMsg = WebsiteMessage
.newBuilder()
.setId(UUID.randomUUID().toString())
.setContent(message.getContent())
.setAuthor(messageAuthor)
.setTopic("test-topic")
.addComments(commentaire)
.build() ;
WebsiteMessageListener.java
Le consommateur utilise les mêmes classes Java générées pour désérialiser le message Kafka. Le type du message est com.google.protobuf.DynamicMessage. Les classes générées par Protobuf vous permettent d'instancier une instance à partir d'un tableau d'octets, comme illustré ci-dessous.
import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage ;
import com.google.protobuf.DynamicMessage ;
import com.google.protobuf.InvalidProtocolBufferException ;
...
@Service
public class WebsiteMessageListener implements MessageConsumer {
@KafkaListener(topics = "website-messages", groupId= "group_id_v1")
public void processEvent(ConsumerRecord<String, DynamicMessage> record) {
WebsiteMessagemessage=null ;
try {
message =WebsiteMessage
.newBuilder()
.build()
.getParserForType()
.parseFrom(record.value().toByteArray()) ;
} catch (InvalidProtocolBufferException e) {
e.printStackTrace() ;
return ;
}
logger.info(String.format("#### -> Consumingmessage -> %s", message.toString())) ;
logger.info(String.format("#### -> Topic -> %s", message.getTopic())) ;
}
}
Évolution du schéma
Pour démontrer l'évolution du schéma, j'ai un peu triché. J'ai modifié le nom d'importation dans les fichiers .proto pour générer deux paquets distincts, v1 et v2. Cela simule deux applications différentes utilisant deux versions différentes des classes générées.
Modifications
J'ai supprimé le champ topic de WebsiteMessage. Pour s'assurer que personne n'utilise le numéro/nom du champ, vous devez les ajouter au bloc réservé.
J'ai également ajouté un champ Enum nommé Version. Les Enums doivent toujours avoir une valeur nulle, c'est la valeur par défaut si aucune n'est fournie. Ici, je l'ai nommé UNKNOWN, mais il pourrait être nommé n'importe comment.
syntaxe = "proto3" ;
option java_package= "com.improving.springprotobuf.generated.v2.model" ;
option java_outer_classname= "WebsiteMessagePayload" ;
import "google/protobuf/timestamp.proto" ;
import "comment.proto" ;
import "author.proto" ;
message WebsiteMessage {
réservé 5 ;
réservé "topic" ;
string id = 1 ;
string content = 2 ;
AuthorMessage auteur = 3 ;
répété CommentMessage comments = 4 ;
Version version= 6 ;
}
enum Version {
UNKNOWN = 0 ;
V1 = 1 ;
V2 = 2 ;
}
Compatibilité ascendante et descendante
Les modifications que j'ai apportées et le code Java qui en découle démontrent la compatibilité ascendante et descendante.
On parle de compatibilité ascendante lorsqu'un producteur crée un message avec une version plus ancienne du schéma et qu'un consommateur utilisant une version plus récente du schéma peut désérialiser le message en toute sécurité. Voir WebsiteMessageProducer.java produisant un message v1 et WebsiteMessageListenerV2.java consommant ce message avec un schéma v2.
On parle de compatibilité ascendante lorsqu'un producteur crée un message avec une version plus récente du schéma et qu'un consommateur utilisant une version plus ancienne du schéma peut désérialiser le message en toute sécurité. Voir WebsiteMessageProducerV2.java produisant un message v2 et WebsiteMessageListener.java consommant ce message avec un schéma v1.
WebsiteMessageProducerV2.java
Ce producteur démontrera l'utilisation d'une version plus récente du schéma. Dans cette classe, nous ajoutons le champ version du schéma v2
import com.improving.springprotobuf.generated.v2.model.Author ;
import com.improving.springprotobuf.generated.v2.model.Comment.CommentMessage; ;
import
com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload ;
import
com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload.Version ;
...
@Service
public class WebsiteMessageProducerV2 implements MessageProducer {
...
// Normalement, l'auteur serait recherché par son identifiant, mais il s'agit ici d'un exemple d'application.
AuthorMessagemessageAuthor =AuthorMessage
.newBuilder()
.setFirstName(message.getAuthor().getFirstName())
.setLastName(message.getAuthor().getLastName())
.setId(UUID.randomUUID().toString())
.build() ;
Author.AuthorMessagecommentAuthor =Author.AuthorMessage
.newBuilder()
.setFirstName("Cela")
.setLastName("UnCommentateur")
.setId(UUID.randomUUID().toString())
.build() ;
CommentMessage comment =CommentMessage.newBuilder().setContent("Premier !").setAuthor(commentAuthor).build() ;
CommentMessage comment2 =CommentMessage.newBuilder().setContent("Oops I guess I wasn't first").setAuthor(commentAuthor).build() ;
// Montre l'ajout de plusieurs commentaires en un seul appel de méthode
WebsiteMessagewMsg=WebsiteMessage
.newBuilder()
.setId(UUID.randomUUID().toString())
.setContent(message.getContent())
.setAuthor(messageAuthor)
.setVersion(Version.V2)
.addAllComments(Arrays.asList(comment, comment2))
.build() ;
this.kafkaTemplate.send(TOPIC, wMsg.getId(), wMsg) ;
}
}
Exécution locale
Il y a un docker-compose.yaml qui va démarrer zookeeper, kafka, schema registry et l'application dans leurs conteneurs respectifs. Vous devrez d'abord compiler le jar. Lancez donc
./gradlew clean build
docker-compose build
docker-compose up -d
Logging
docker logs -f spring-kafka-protobuf_app_1
Poster du Json pour créer des messages Kafka
Il y a deux contrôleurs qui vont gérer les messages v1 et v2.
Message V1
curl -X POST http://localhost:8080/v2/message/publish \N- 'Content-Type : application' -H 'Content-Type : application'.
-H 'Content-Type : application/json' \N -d '{"content" : application/json")
-d '{"content" : "Regardez, c'est un message v2",
"author" : {
"firstName" : "Scott", "lastName" : "Strzynski"
}
}'
Voyons ce que chaque consommateur enregistre. Puisque ce message a un schéma v1, nous nous attendons à ce que le sujet apparaisse, et pas de Version parce que c'est un concept v2.
Pour montrer la compatibilité ascendante/descendante, les deux consommateurs écoutent le même sujet avec des identifiants de groupe de consommateurs différents.
Ici, le consommateur v1 imprime le message Protobuf désérialisé et inclut le champ v1 topic :
WebsiteMessageListener :#### Consuming message
id : "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"
content : "hey this is a v1 message"
auteur {
id: "07312922-5398-43e1-93b5-9fa12a24502c"
firstName : "Scott"
lastName : "Strzynski"
}
commentaires {
content : "Excellent article"
author {
id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"
firstName : "Darth"
lastName : "Maul"
}
}
topic : "test-topic"
Ici, l'auditeur V2 affiche la compatibilité ascendante, puisque le sujet du schéma v2 a été supprimé, il ne l'affiche pas. De même, comme Version a été ajouté dans la version 2, mais n'a pas été défini dans le producteur de la version 1, il affiche la valeur zéro par défaut.
WebsiteMessageListenerV2 :#### Message consommé
id : "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"
content : "hey this is a v1 message"
auteur {
id: "07312922-5398-43e1-93b5-9fa12a24502c"
firstName : "Scott"
lastName : "Strzynski"
}
commentaires {
content : "Excellent article"
author {
id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"
firstName : "Darth"
lastName : "Maul"
}
}
WebsiteMessageListenerV2 :#### Version du message : INCONNUE
Message V2
Dans ce message, le contrôleur v2 va traiter la requête post et construire un message V2 avec Version, et sans sujet.
curl -X POST http://localhost:8080/v2/message/publish \N- -H 'Content-Type : application, application et sujet'.
-H 'Content-Type : application/json' \N- -d '{"content" : ")
-d '{"content" : "Regardez, c'est un message v2",
"author" : {
"firstName" : "Scott", "lastName" : "Strzynski"
}
}'
Vous pouvez voir dans les journaux que la version correcte est maintenant affichée et qu'il n'y a pas de champ de sujet.
WebsiteMessageListenerV2 :#### Message consommé
id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"
content : "Regardez, c'est un message v2"
auteur {
id: "85a8a789-c30d-495e-a187-d702610a8150"
firstName : "Scott"
lastName : "Strzynski"
}
commentaires {
content : "First !"
author {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName : "Ça"
lastName : "UnCommentateur"
}
}
commentaires {
content : "Oops I guess I wasn\'t first" (Oups, je crois que je n'étais pas le premier)
author {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName : "That"
lastName : "UnCommentateur"
}
}
version : V2
L'écouteur v1 affichant la compatibilité ascendante n'affiche pas cette version et une chaîne vide est la valeur du sujet.
WebsiteMessageListener :#### Message de consommation
id : "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"
content : "Regardez, c'est un message v2"
auteur {
id: "85a8a789-c30d-495e-a187-d702610a8150"
firstName : "Scott"
lastName : "Strzynski"
}
commentaires {
content : "First !"
author {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName : "Ça"
lastName : "UnCommentateur"
}
}
commentaires {
content : "Oops I guess I wasn\'t first" (Oups, je crois que je n'étais pas le premier)
author {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName : "That"
lastName : "UnCommentateur"
}
}
WebsiteMessageListener :#### Sujet :
Registre des schémas
Le schéma Protobuf est ajouté au registre des schémas comme Avro.
http://localhost:8081/subjects
[
"author.proto",
"commentaire.proto",
"google/protobuf/timestamp.proto",
"website-messages-value"
]
http://localhost:8081/subjects/website-messages-value/versions
[
1,
2
]
http://localhost:8081/subjects/website-messages-value/versions/2
{
"subject" : "website-messages-value",
"version" : 2,
"id" : 4,
"schemaType" : "PROTOBUF",
"references" : [
{
"name" : "google/protobuf/timestamp.proto",
"subject" : "google/protobuf/timestamp.proto",
"version" : 1
},
{
"name" : "commentaire.proto",
"subject" : "commentaire.proto",
"version" : 1
},
{
"name" : "auteur.proto",
"subject" : "auteur.proto",
"version" : 1
}
],
"schema" : "syntax = \N "proto3\N";\Nnimport \N "google/protobuf/timestamp.proto\N";\Nimport \N "comment.proto\N";\Nimport \N "author.proto\N";\Noptionjava_package = \N "com.improving.springprotobuf.generated.v2.model";\Noptionjava_outer_classname = \N "WebsiteMessagePayload";\NMessageWebsiteMessage {\n réservé 5 à 6;\Nréservé \N "topic";\Nchaîne id = 1;\Nchaîne content = 2;\N.../.../.../.../.../.../.../.../.../.../.../.../.../.../.../.../...AuthorMessage author = 3;\n répété .CommentMessage comments = 4;\n .Version version = 6;\n}\nenum Version {\n UNKNOWN = 0;\n V1 = 1;\n V2 = 2;\n}\n"
}
Conclusion
L'utilisation de Protobuf permet au développeur de construire un schéma évolutif comme Avro. A la différence d'Avro, il utilise un IDL au lieu d'un JSON. Spring Boot et Gradle fournissent un bon cadre pour permettre au développeur d'écrire des applications rapidement et de ne pas s'inquiéter des problèmes liés à un schéma changeant.