Descripción
Esta aplicación es un simple ejemplo de publicación de mensajes en un sitio web y permitir comentarios en esos mensajes. Es una aplicación incompleta en el sentido de que una vez que se consume un mensaje, no pasa nada. Una aplicación real podría persistir en una base de datos, o escribir en una KTable y más tarde unir el mensaje a los comentarios publicando un gran modelo canónico a un tema diferente. Ambos están fuera del alcance de este blog.
Lo que voy a demostrar es c ómo hacer evolucionar su modelo de dominio utilizando Protobuf. Voy a empezar con una v1 del esquema, a continuación, añadir y eliminar campos, publicar ambas versiones del mensaje a un tema, y tienen dos consumidores diferentes manejar ambos mensajes de forma segura.
Nota: He incluido los archivos Java generados por el compilador Protobuf, pero normalmente tendrías un repositorio y un proceso de compilación separados para eso.
El esquema
WebsiteMessage
Autor
Comentario
El código
En esta aplicación, hay dos Rest Controllers que manejan solicitudes Post para el esquema v1 o v2. Dependiendo del endpoint, se llama a WebsiteMessageProducer.java o WebsiteMessageProducerV2.java para producir y publicar un mensaje Protobuf en Kafka.
Ambos consumidores, WebsiteMessageListener.java, y WebsiteMessageListenerV2.java escuchan el mismo tema con diferentes ids de grupo.
Definiciones de mensajes Protobuf
Usando un IDL (Interface definition language), Protobuf te permite crear esquemas de mensajes simples o complejos. En esta aplicación de ejemplo modelaremos un sitio web que permite a las personas (author.proto) publicar mensajes (websiteMessage.proto) y a otras personas comentar (comment.proto) ese mensaje. Se trata de un esquema sencillo, pero muestra algunas características importantes de Protobuf.
websiteMessage.proto
sintaxis = "proto3";
option java_package = "com.Improving.springprotobuf.generated.v1.model";
option java_outer_classname = "WebsiteMessagePayload";
import "google/protobuf/timestamp.proto";
import "comment.proto
import "author.proto";
mensaje WebsiteMessage {
string id = 1
string contenido = 2
AuthorMessage autor = 3;
repetido CommentMessage comentarios = 4
string tema = 5;
}
Esta definición tiene un par de campos obligatorios, sintaxis y mensaje.
El campo de sintaxis indica qué versión de Protobuf está utilizando; en este caso proto3.
El campo mensaje es la definición del esquema de tu mensaje. Puede contener tipos de variables escalares, mensajes personalizados, tipos de mensajes Protobuf, variables repetidas (Listas), o variables enum. Véase la definición completa: https://developers.google.com/protocol-buffers/docs/proto3
Los mensajes personalizados pueden incluirse en el mismo fichero o importarse de otros ficheros. Para sus mensajes en archivos externos, la importación utiliza una ruta relativa al archivo actual.
Cada var declara un tipo, un nombre y un índice de uso interno dentro de ese mensaje. Este índice no se puede cambiar una vez que se empiezan a producir mensajes con él. Es el valor que se serializa en el mensaje en lugar del nombre del campo. Hablaré sobre la modificación de su esquema en una sección posterior.
El tipo repetido sólo indica que el campo puede contener de 0 a n de los tipos especificados en una estructura similar a una lista.
Opciones
Las opciones le permiten ayudar a definir la estructura de las clases Java generadas.
option java_package = "com.improving.springprotobuf.generated.v1.model";
Indica al protoc a qué paquete pertenecen las clases generadas.
option java_outer_classname = "WebsiteMessagePayload";
El mensaje de nivel superior está contenido en una Outerclass, protoc pondrá por defecto el nombre de esta clase externa asuNombreMensajeOuterClass.java. Utilice esta opción para cambiarlo por el que desee, ya que necesitará referenciarlo en su código.
Clases Java Generadas por Protobuf
Añadir el bloque protoc a nuestro build.gradle permite al compilador protobuf construir clases java que representan el esquema de los archivos .proto durante la tarea de construcción gradle.
protobuf {
// Configurar el ejecutable protoc
protoc {
artifact = 'com.google.protobuf:protoc:3.0.0'
generatedFilesBaseDir= "$projectDir/src"
}
}
Esta configuración añadirá las clases java al directorio src/java de tu proyecto.
Nota: Para los usuarios de IntelliJ, es posible que se produzcan errores en los archivos .proto relacionados con los archivos importados. Para solucionar esto tendrá que modificar donde el IDE busca estos archivos.
En Mac
Vaya a IntelliJ IDEA > Preferencias > Lenguaje y marcos > Búferes de protocolo
Desmarque Configurar automáticamente
Añade esta rutafile:///tu ruta al proyecto/spring-kafka-protobuf/src/main/proto
Haga clic en Ok
Puede almacenar sus archivos .proto en cualquier lugar, pero el compilador protoc espera que estén bajo src/main/proto
WebsiteMessageProducer.java
Este es el productor de mensajes v1 Kafka. Importa las clases Java generadas por Protobuf
import com.improving.springprotobuf.generated.v1.model.Comment.CommentMessage;
import com.improving.springprotobuf.generated.v1.model.Author.AuthorMessage;
import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage;
Las clases generadas permiten crear objetos utilizando el patrón constructor. Aquí el código crea un Author y un WebsiteMessage publicándolo con una plantilla kafka.
// Normalmente se buscaría el autor por id, pero esto es sólo una aplicación de ejemplo
AutorMensajeMensajeAutor =AutorMensaje
.newBuilder()
.setFirstName(message.getAuthor().getFirstName())
.setLastName(message.getAuthor().getLastName())
.setId(UUID.randomUUID().toString())
.build();
AuthorMessagecommentAuthor =AutorMensaje
.newBuilder()
.setNombre("Darth")
.setApellido("Maul")
.setId(UUID.randomUUID().toString())
.build();
ComentarioMensaje comment =
MensajeComentario
.newBuilder()
.setContent("Gran entrada")
.setAuthor(commentAuthor)
.build();
// Muestra la adición de un comentario
WebsiteMessagewMsg = WebsiteMensaje
.newBuilder()
.setId(UUID.randomUUID().toString())
.setContent(mensaje.getContent())
.setAuthor(autordelmensaje)
.setTopic("test-topic")
.addComments(comentario)
.build();
WebsiteMessageListener.java
El consumidor utiliza las mismas clases Java generadas para deserializar el mensaje Kafka. El tipo del mensaje es com.google.protobuf.DynamicMessage. Las clases generadas por Protobuf permiten instanciar una instancia a partir de una matriz de bytes, como se muestra a continuación.
import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
...
@Servicio
public class WebsiteMessageListener implements MessageConsumer {
@KafkaListener(topics = "mensajes-sitio", groupId= "group_id_v1")
public void processEvent(ConsumerRecord<String, DynamicMessage> record) {
WebsiteMessagemessage=null;
try {
mensaje =MensajeWeb
.newBuilder()
.build()
.getParserParaTipo()
.parseFrom(record.value().toByteArray());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
logger.info(String.format("#### -> Consumingmessage -> %s", message.toString());
logger.info(String.format("#### -> Tema -> %s", message.getTopic()));
}
}
Evolución del esquema
Para demostrar la evolución del esquema, he hecho un poco de trampa. Modifiqué el nombre de importación en los archivos .proto para generar dos paquetes separados a v1 y v2. Esto simula dos aplicaciones diferentes usando dos versiones diferentes de las clases generadas.
Cambios
He eliminado el campo topic de WebsiteMessage.Para asegurarte de que nadie utiliza el número/nombre del campo debes añadirlos al bloque reservado.
También añadí un campo Enum llamado Version. Los Enums siempre deben tener un valor cero, es el valor por defecto si no se proporciona ninguno. Aquí lo llamé UNKNOWN, pero podría llamarse cualquier cosa.
sintaxis = "proto3";
option java_package= "com.improving.springprotobuf.generated.v2.model";
option java_outer_classname= "WebsiteMessagePayload";
import "google/protobuf/timestamp.proto";
import "comment.proto
import "author.proto";
mensaje WebsiteMessage {
reservado 5;
reservado "topic";
cadena id = 1
string contenido = 2
AuthorMessage autor = 3
repetido CommentMessage comentarios = 4
Versión version= 6;
}
enum Versión {
UNKNOWN = 0
V1 = 1;
V2 = 2;
}
Compatibilidad hacia delante y hacia atrás
Los cambios que realicé y el código Java subsiguiente demuestran la compatibilidad hacia adelante y hacia atrás.
La compatibilidad hacia adelante es cuando un productor crea un mensaje con una versión anterior del esquema y un consumidor que utiliza una versión más reciente del esquema puede deserializar el mensaje con seguridad. Vea WebsiteMessageProducer.java produciendo un mensaje v1 y WebsiteMessageListenerV2.java consumiendo ese mensaje con un esquema v2.
La compatibilidad con versiones anteriores se da cuando un productor crea un mensaje con una versión más reciente del esquema y un consumidor que utiliza una versión más antigua del esquema puede deserializar el mensaje de forma segura. Véase WebsiteMessageProducerV2.java produciendo un mensaje v2 y WebsiteMessageListener.java consumiendo ese mensaje con un esquema v1.
WebsiteMessageProducerV2.java
Este productor demostrará el uso de una versión más reciente del esquema. En esta clase, añadimos el campo de versión del esquema v2
import com.improving.springprotobuf.generated.v2.model.Author;
import com.improving.springprotobuf.generated.v2.model.Comment.CommentMessage;
importar
com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload;
importar
com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload.Version;
...
@Servicio
public class WebsiteMessageProducerV2 implements MessageProducer {
...
// Normalmente buscaría el autor por id, pero esto es sólo una aplicación de ejemplo
AutorMensajeAutor =AutorMensaje
.newBuilder()
.setFirstName(message.getAuthor().getFirstName())
.setLastName(message.getAuthor().getLastName())
.setId(UUID.randomUUID().toString())
.build();
Author.AuthorMessagecommentAuthor =Autor.AuthorMessage
.newBuilder()
.setFirstName("Eso")
.setLastName("UnComentador")
.setId(UUID.randomUUID().toString())
.build();
CommentMessage comment =CommentMessage.newBuilder().setContent("¡Primero!").setAuthor(commentAuthor).build();
CommentMessage comment2 =CommentMessage.newBuilder().setContent("Vaya, supongo que no fui el primero").setAuthor(commentAuthor).build();
// Muestra la adición de múltiples comentarios en una llamada al método
WebsiteMessagewMsg=MensajeDelSitioWeb
.newBuilder()
.setId(UUID.randomUUID().toString())
.setContent(mensaje.getContent())
.setAuthor(autorMensaje)
.setVersion(Versión.V2)
.addAllComments(Arrays.asList(comment, comment2))
.build();
this.kafkaTemplate.send(TOPIC, wMsg.getId(), wMsg);
}
}
Ejecutando localmente
Hay un docker-compose.yaml que iniciará zookeeper, kafka, schema registry y la aplicación en sus respectivos contenedores. Necesitarás construir el jar primero. Así que ejecuta
./gradlew clean build
docker-compose build
docker-compose up -d
Registro
docker logs -f spring-kafka-protobuf_app_1
Publicar Json para crear mensajes Kafka
Hay dos controladores que manejarán los mensajes v1 y v2.
V1 Post
curl -X POST http://localhost:8080/v2/message/publish \
-H 'Content-Type: application/json' \
-d '{"content": "Mira esto es un mensaje v2",
"author": {
"firstName": "Scott", "lastName": "Strzynski"
}
}'
Veamos lo que registra cada consumidor. Como este mensaje tiene un esquema v1, esperamos que aparezca el tema, y no la versión porque es un concepto v2.
Para mostrar la compatibilidad hacia atrás/hacia adelante, ambos consumidores escuchan el mismo tema con diferentes ids de grupo de consumidores.
Aquí el consumidor v1 imprime el mensaje Protobuf deserializado e incluye el campo topic v1:
WebsiteMessageListener :#### Mensaje de consumo
id: "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"
contenido: "hey este es un mensaje v1"
autor {
id: "07312922-5398-43e1-93b5-9fa12a24502c"
firstName: "Scott"
lastName: "Strzynski"
}
comentarios {
content: "Gran post"
autor {
id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"
firstName: "Darth"
lastName: "Maul"
}
}
topic: "test-topic"
Aquí, el oyente V2 muestra la compatibilidad hacia adelante, ya que el tema del esquema v2 ha sido eliminado, no lo muestra. Además, como en la v2 se añadió Version, pero no se estableció en el productor de la v1, muestra el valor cero por defecto.
WebsiteMessageListenerV2 :#### Mensaje de consumo
id: "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"
contenido: "hey este es un mensaje v1"
autor {
id: "07312922-5398-43e1-93b5-9fa12a24502c"
firstName: "Scott"
lastName: "Strzynski"
}
comentarios {
content: "Gran post"
autor {
id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"
firstName: "Darth"
lastName: "Maul"
}
}
WebsiteMessageListenerV2 :#### Versión del mensaje: UNKNOWN
Mensaje V2
En este post, el controlador v2 manejará la petición post y construirá un mensaje V2 con Version, y sin topic.
curl -X POST http://localhost:8080/v2/message/publish \
-H 'Content-Type: application/json' \
-d '{"content": "Mira esto es un mensaje v2",
"author": {
"firstName": "Scott", "lastName": "Strzynski"
}
}'
Puede ver en los registros que ahora se muestra la versión correcta y no hay ningún campo de tema.
WebsiteMessageListenerV2 :#### Consumiendo mensaje
id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"
contenido: "Mira esto es un mensaje v2"
autor {
id: "85a8a789-c30d-495e-a187-d702610a8150"
firstName: "Scott"
lastName: "Strzynski"
}
comentarios {
content: "¡Primero!"
autor {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName: "Eso"
lastName: "OneCommenter"
}
}
comentarios {
contenido: "Oops supongo que no fui el primero"
autor {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName: "Eso"
lastName: "OneCommenter"
}
}
version: V2
El listener v1 que muestra compatibilidad con versiones anteriores no muestra esta versión y una cadena vacía es el valor para el tema.
WebsiteMessageListener :#### Mensaje de consumo
id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"
contenido: "Mira esto es un mensaje v2"
autor {
id: "85a8a789-c30d-495e-a187-d702610a8150"
firstName: "Scott"
lastName: "Strzynski"
}
comentarios {
content: "¡Primero!"
autor {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName: "Eso"
lastName: "OneCommenter"
}
}
comentarios {
contenido: "Oops supongo que no fui el primero"
autor {
id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"
firstName: "Eso"
lastName: "OneCommenter"
}
}
WebsiteMessageListener :#### Tema:
Registro de esquemas
El esquema Protobuf se añade al registro de esquemas como Avro.
http://localhost:8081/subjects
[
"autor.proto",
"comment.proto",
"google/protobuf/timestamp.proto",
"website-messages-value"
]
http://localhost:8081/subjects/website-messages-value/versions
[
1,
2
]
http://localhost:8081/subjects/website-messages-value/versions/2
{
"asunto": "website-mensajes-valor",
"version": 2,
"id": 4,
"schemaType": "PROTOBUF",
"references": [
{
"name": "google/protobuf/timestamp.proto",
"subject": "google/protobuf/timestamp.proto",
"version": 1
},
{
"name": "comentario.proto",
"subject": "comentario.proto",
"version": 1
},
{
"nombre": "autor.proto",
"subject": "autor.proto",
"version": 1
}
],
"schema": "syntax = "proto3";nimport "google/protobuf/timestamp.proto";nimport "comment.proto";nimport "author.proto";noptionjava_package = "com.improving.springprotobuf.generated.v2.model\";\noptionjava_outer_classname = \"WebsiteMessagePayload\";\nmessageWebsiteMessage {\n reservado 5 a 6;\n reservado \"topic\";\n string id = 1;\n string content = 2;\n .AuthorMessage autor = 3;\n repetido .CommentMessage comentarios = 4;\n .Version version = 6;\n} {\nenum Version {\n UNKNOWN = 0;\n V1 = 1;\n V2 = 2;\n}"
}
Conclusión
El uso de Protobuf proporciona al desarrollador la capacidad de construir un esquema evolucionable como Avro. A diferencia de Avro, utiliza un IDL en lugar de un JSON. Spring Boot y Gradle proporcionan un buen marco para permitir al desarrollador escribir aplicaciones rápidamente y no preocuparse por los problemas de un esquema cambiante.