Background Image
TECNOLOGÍA

Evolución del esquema Kafka con Java Spring Boot y Protobuf

Scott Strzynski

Consultor principal

February 7, 2023 | 10 Minuto(s) de lectura

En este blog, voy a demostrar la evolución del esquema de Kafka con Java, Spring Boot y Protobuf. Esta aplicación es para fines de tutorial, por lo que habrá casos en los que podría ocurrir una refactorización. Traté de mantenerlo simple y comprensible. El código consiste en dos productores Kafka y dos consumidores. En una situación de la vida real, su consumidor sería posiblemente en otra aplicación.

Para ejecutar esta aplicación localmente necesitarás tener un registro local de Kafka/Zookeeper/Schema funcionando. El código está disponible enhttp://github.com/hal90210/spring-kafka-protobuf 

Descripción

Esta aplicación es un simple ejemplo de publicación de mensajes en un sitio web y permitir comentarios en esos mensajes. Es una aplicación incompleta en el sentido de que una vez que se consume un mensaje, no pasa nada. Una aplicación real podría persistir en una base de datos, o escribir en una KTable y más tarde unir el mensaje a los comentarios publicando un gran modelo canónico a un tema diferente. Ambos están fuera del alcance de este blog.

Lo que voy a demostrar es cómo hacer evolucionar su modelo de dominio utilizando Protobuf. Voy a empezar con una v1 del esquema, a continuación, añadir y eliminar campos, publicar ambas versiones del mensaje a un tema, y tienen dos consumidores diferentes manejar ambos mensajes de forma segura.

Nota: He incluido los archivos Java generados por el compilador Protobuf, pero normalmente tendrías un repositorio y un proceso de compilación separados para eso.

El esquema

  • WebsiteMessage

  • Autor

  • Comentario

El código

En esta aplicación, hay dos Rest Controllers que manejan solicitudes Post para el esquema v1 o v2. Dependiendo del endpoint, se llama a WebsiteMessageProducer.java o WebsiteMessageProducerV2.java para producir y publicar un mensaje Protobuf en Kafka.

Ambos consumidores, WebsiteMessageListener.java, y WebsiteMessageListenerV2.java escuchan el mismo tema con diferentes ids de grupo.

Definiciones de mensajes Protobuf

Usando un IDL (Interface definition language), Protobuf te permite crear esquemas de mensajes simples o complejos. En esta aplicación de ejemplo modelaremos un sitio web que permite a las personas (author.proto) publicar mensajes (websiteMessage.proto) y a otras personas comentar (comment.proto) ese mensaje. Se trata de un esquema sencillo, pero muestra algunas características importantes de Protobuf.

websiteMessage.proto 

sintaxis = "proto3";

option java_package = "com.Improving.springprotobuf.generated.v1.model";

option java_outer_classname = "WebsiteMessagePayload";

import "google/protobuf/timestamp.proto";

import "comment.proto

import "author.proto";

mensaje WebsiteMessage {

string id = 1

string contenido = 2

AuthorMessage autor = 3;

repetido CommentMessage comentarios = 4

string tema = 5;

}

Esta definición tiene un par de campos obligatorios, sintaxis y mensaje.

El campo de sintaxis indica qué versión de Protobuf está utilizando; en este caso proto3.

El campo mensaje es la definición del esquema de tu mensaje. Puede contener tipos de variables escalares, mensajes personalizados, tipos de mensajes Protobuf, variables repetidas (Listas), o variables enum. Véase la definición completa: https://developers.google.com/protocol-buffers/docs/proto3 

Los mensajes personalizados pueden incluirse en el mismo fichero o importarse de otros ficheros. Para sus mensajes en archivos externos, la importación utiliza una ruta relativa al archivo actual.

Cada var declara un tipo, un nombre y un índice de uso interno dentro de ese mensaje. Este índice no se puede cambiar una vez que se empiezan a producir mensajes con él. Es el valor que se serializa en el mensaje en lugar del nombre del campo. Hablaré sobre la modificación de su esquema en una sección posterior.

El tipo repetido sólo indica que el campo puede contener de 0 a n de los tipos especificados en una estructura similar a una lista.

Opciones

Las opciones le permiten ayudar a definir la estructura de las clases Java generadas.

option java_package = "com.improving.springprotobuf.generated.v1.model"; 

Indica al protoc a qué paquete pertenecen las clases generadas.

option java_outer_classname = "WebsiteMessagePayload"; 

El mensaje de nivel superior está contenido en una Outerclass, protoc pondrá por defecto el nombre de esta clase externa asuNombreMensajeOuterClass.java. Utilice esta opción para cambiarlo por el que desee, ya que necesitará referenciarlo en su código.

Clases Java Generadas por Protobuf

Añadir el bloque protoc a nuestro build.gradle permite al compilador protobuf construir clases java que representan el esquema de los archivos .proto durante la tarea de construcción gradle.

 protobuf {

// Configurar el ejecutable protoc

protoc {

artifact = 'com.google.protobuf:protoc:3.0.0'

generatedFilesBaseDir= "$projectDir/src"

}

}

Esta configuración añadirá las clases java al directorio src/java de tu proyecto.

Nota: Para los usuarios de IntelliJ, es posible que se produzcan errores en los archivos .proto relacionados con los archivos importados. Para solucionar esto tendrá que modificar donde el IDE busca estos archivos.

En Mac
  • Vaya a IntelliJ IDEA > Preferencias > Lenguaje y marcos > Búferes de protocolo

  • Desmarque Configurar automáticamente

  • Añade esta rutafile:///tu ruta al proyecto/spring-kafka-protobuf/src/main/proto

  • Haga clic en Ok

Puede almacenar sus archivos .proto en cualquier lugar, pero el compilador protoc espera que estén bajo src/main/proto

WebsiteMessageProducer.java

Este es el productor de mensajes v1 Kafka. Importa las clases Java generadas por Protobuf

import com.improving.springprotobuf.generated.v1.model.Comment.CommentMessage; 

import com.improving.springprotobuf.generated.v1.model.Author.AuthorMessage; 

import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage; 

Las clases generadas permiten crear objetos utilizando el patrón constructor. Aquí el código crea un Author y un WebsiteMessage publicándolo con una plantilla kafka.

// Normalmente se buscaría el autor por id, pero esto es sólo una aplicación de ejemplo

AutorMensajeMensajeAutor =AutorMensaje

.newBuilder()

.setFirstName(message.getAuthor().getFirstName())

.setLastName(message.getAuthor().getLastName())

.setId(UUID.randomUUID().toString())

.build();

AuthorMessagecommentAuthor =AutorMensaje

.newBuilder()

.setNombre("Darth")

.setApellido("Maul")

.setId(UUID.randomUUID().toString())

.build();

ComentarioMensaje comment =

MensajeComentario

.newBuilder()

.setContent("Gran entrada")

.setAuthor(commentAuthor)

.build();

// Muestra la adición de un comentario

WebsiteMessagewMsg = WebsiteMensaje

.newBuilder()

.setId(UUID.randomUUID().toString())

.setContent(mensaje.getContent())

.setAuthor(autordelmensaje)

.setTopic("test-topic")

.addComments(comentario)

.build();

WebsiteMessageListener.java 

El consumidor utiliza las mismas clases Java generadas para deserializar el mensaje Kafka. El tipo del mensaje es com.google.protobuf.DynamicMessage. Las clases generadas por Protobuf permiten instanciar una instancia a partir de una matriz de bytes, como se muestra a continuación.

import com.improving.springprotobuf.generated.v1.model.WebsiteMessagePayload.WebsiteMessage;

import com.google.protobuf.DynamicMessage;

import com.google.protobuf.InvalidProtocolBufferException;

...

 @Servicio

public class WebsiteMessageListener implements MessageConsumer {

@KafkaListener(topics = "mensajes-sitio", groupId= "group_id_v1")

public void processEvent(ConsumerRecord<String, DynamicMessage> record) {

WebsiteMessagemessage=null;

try {

mensaje =MensajeWeb

.newBuilder()

.build()

.getParserParaTipo()

.parseFrom(record.value().toByteArray());

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

return;

}

 

logger.info(String.format("#### -> Consumingmessage -> %s", message.toString());

logger.info(String.format("#### -> Tema -> %s", message.getTopic()));

}

}

Evolución del esquema

Para demostrar la evolución del esquema, he hecho un poco de trampa. Modifiqué el nombre de importación en los archivos .proto para generar dos paquetes separados a v1 y v2. Esto simula dos aplicaciones diferentes usando dos versiones diferentes de las clases generadas.

Cambios 

He eliminado el campo topic de WebsiteMessage.Para asegurarte de que nadie utiliza el número/nombre del campo debes añadirlos al bloque reservado.

También añadí un campo Enum llamado Version. Los Enums siempre deben tener un valor cero, es el valor por defecto si no se proporciona ninguno. Aquí lo llamé UNKNOWN, pero podría llamarse cualquier cosa.

sintaxis = "proto3";

option java_package= "com.improving.springprotobuf.generated.v2.model";

option java_outer_classname= "WebsiteMessagePayload";

import "google/protobuf/timestamp.proto";

import "comment.proto

import "author.proto";

mensaje WebsiteMessage {

reservado 5;

reservado "topic";

cadena id = 1

string contenido = 2

AuthorMessage autor = 3

repetido CommentMessage comentarios = 4

Versión version= 6;

}

enum Versión {

UNKNOWN = 0

V1 = 1;

V2 = 2;

}

Compatibilidad hacia delante y hacia atrás

Los cambios que realicé y el código Java subsiguiente demuestran la compatibilidad hacia adelante y hacia atrás.

La compatibilidad hacia adelante es cuando un productor crea un mensaje con una versión anterior del esquema y un consumidor que utiliza una versión más reciente del esquema puede deserializar el mensaje con seguridad. Vea WebsiteMessageProducer.java produciendo un mensaje v1 y WebsiteMessageListenerV2.java consumiendo ese mensaje con un esquema v2.

La compatibilidad con versiones anteriores se da cuando un productor crea un mensaje con una versión más reciente del esquema y un consumidor que utiliza una versión más antigua del esquema puede deserializar el mensaje de forma segura. Véase WebsiteMessageProducerV2.java produciendo un mensaje v2 y WebsiteMessageListener.java consumiendo ese mensaje con un esquema v1.

WebsiteMessageProducerV2.java 

Este productor demostrará el uso de una versión más reciente del esquema. En esta clase, añadimos el campo de versión del esquema v2

import com.improving.springprotobuf.generated.v2.model.Author;

import com.improving.springprotobuf.generated.v2.model.Comment.CommentMessage;

importar

com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload;

importar

com.improving.springprotobuf.generated.v2.model.WebsiteMessagePayload.Version;

...

@Servicio

public class WebsiteMessageProducerV2 implements MessageProducer {

...

// Normalmente buscaría el autor por id, pero esto es sólo una aplicación de ejemplo

        AutorMensajeAutor =AutorMensaje

.newBuilder()

.setFirstName(message.getAuthor().getFirstName())

.setLastName(message.getAuthor().getLastName())

.setId(UUID.randomUUID().toString())

.build();

Author.AuthorMessagecommentAuthor =Autor.AuthorMessage

.newBuilder()

.setFirstName("Eso")

.setLastName("UnComentador")

.setId(UUID.randomUUID().toString())

.build();

CommentMessage comment =CommentMessage.newBuilder().setContent("¡Primero!").setAuthor(commentAuthor).build();

CommentMessage comment2 =CommentMessage.newBuilder().setContent("Vaya, supongo que no fui el primero").setAuthor(commentAuthor).build();

// Muestra la adición de múltiples comentarios en una llamada al método

        WebsiteMessagewMsg=MensajeDelSitioWeb

.newBuilder()

.setId(UUID.randomUUID().toString())

.setContent(mensaje.getContent())

.setAuthor(autorMensaje)

.setVersion(Versión.V2)

.addAllComments(Arrays.asList(comment, comment2))

.build();

 

this.kafkaTemplate.send(TOPIC, wMsg.getId(), wMsg);

}

}

Ejecutando localmente

Hay un docker-compose.yaml que iniciará zookeeper, kafka, schema registry y la aplicación en sus respectivos contenedores. Necesitarás construir el jar primero. Así que ejecuta

  • ./gradlew clean build

  • docker-compose build

  • docker-compose up -d

Registro

docker logs -f spring-kafka-protobuf_app_1

Publicar Json para crear mensajes Kafka

Hay dos controladores que manejarán los mensajes v1 y v2.

V1 Post

curl -X POST http://localhost:8080/v2/message/publish \

-H 'Content-Type: application/json' \

-d '{"content": "Mira esto es un mensaje v2",

"author": {

"firstName": "Scott", "lastName": "Strzynski"

}

}'

Veamos lo que registra cada consumidor. Como este mensaje tiene un esquema v1, esperamos que aparezca el tema, y no la versión porque es un concepto v2.

Para mostrar la compatibilidad hacia atrás/hacia adelante, ambos consumidores escuchan el mismo tema con diferentes ids de grupo de consumidores.

Aquí el consumidor v1 imprime el mensaje Protobuf deserializado e incluye el campo topic v1:

WebsiteMessageListener :#### Mensaje de consumo

id: "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"

contenido: "hey este es un mensaje v1"

autor {

id: "07312922-5398-43e1-93b5-9fa12a24502c"

firstName: "Scott"

lastName: "Strzynski"

}

comentarios {

content: "Gran post"

autor {

id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"

firstName: "Darth"

lastName: "Maul"

}

}

topic: "test-topic"

Aquí, el oyente V2 muestra la compatibilidad hacia adelante, ya que el tema del esquema v2 ha sido eliminado, no lo muestra. Además, como en la v2 se añadió Version, pero no se estableció en el productor de la v1, muestra el valor cero por defecto.

 WebsiteMessageListenerV2 :#### Mensaje de consumo

id: "176bfa8d-474e-49c4-8c24-ad54c8b03b1e"

contenido: "hey este es un mensaje v1"

autor {

id: "07312922-5398-43e1-93b5-9fa12a24502c"

firstName: "Scott"

lastName: "Strzynski"

}

comentarios {

content: "Gran post"

autor {

id: "60ca4a5f-2058-4652-a584-2d28a1e2c822"

firstName: "Darth"

lastName: "Maul"

}

}

WebsiteMessageListenerV2 :#### Versión del mensaje: UNKNOWN

Mensaje V2

En este post, el controlador v2 manejará la petición post y construirá un mensaje V2 con Version, y sin topic.

curl -X POST http://localhost:8080/v2/message/publish \

-H 'Content-Type: application/json' \

-d '{"content": "Mira esto es un mensaje v2",

"author": {

"firstName": "Scott", "lastName": "Strzynski"

}

}'

Puede ver en los registros que ahora se muestra la versión correcta y no hay ningún campo de tema.

WebsiteMessageListenerV2 :#### Consumiendo mensaje

id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"

contenido: "Mira esto es un mensaje v2"

autor {

id: "85a8a789-c30d-495e-a187-d702610a8150"

firstName: "Scott"

lastName: "Strzynski"

}

comentarios {

content: "¡Primero!"

autor {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName: "Eso"

lastName: "OneCommenter"

}

}

comentarios {

contenido: "Oops supongo que no fui el primero"

autor {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName: "Eso"

lastName: "OneCommenter"

}

}

version: V2

El listener v1 que muestra compatibilidad con versiones anteriores no muestra esta versión y una cadena vacía es el valor para el tema.

WebsiteMessageListener :#### Mensaje de consumo

id: "33518693-f76a-4bf7-8bf7-0b9b4de6c56a"

contenido: "Mira esto es un mensaje v2"

autor {

id: "85a8a789-c30d-495e-a187-d702610a8150"

firstName: "Scott"

lastName: "Strzynski"

}

comentarios {

content: "¡Primero!"

autor {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName: "Eso"

lastName: "OneCommenter"

}

}

comentarios {

contenido: "Oops supongo que no fui el primero"

autor {

id: "71b0534e-eb0d-478c-b5f3-46984ae1715d"

firstName: "Eso"

lastName: "OneCommenter"

}

}

WebsiteMessageListener :#### Tema:

Registro de esquemas

El esquema Protobuf se añade al registro de esquemas como Avro.

http://localhost:8081/subjects

[

"autor.proto",

"comment.proto",

"google/protobuf/timestamp.proto",

"website-messages-value"

]

http://localhost:8081/subjects/website-messages-value/versions

[

1,

2

]

http://localhost:8081/subjects/website-messages-value/versions/2

{

"asunto": "website-mensajes-valor",

"version": 2,

"id": 4,

"schemaType": "PROTOBUF",

"references": [

{

"name": "google/protobuf/timestamp.proto",

"subject": "google/protobuf/timestamp.proto",

"version": 1

},

{

"name": "comentario.proto",

"subject": "comentario.proto",

"version": 1

},

{

"nombre": "autor.proto",

"subject": "autor.proto",

"version": 1

}

],

"schema": "syntax = "proto3";nimport "google/protobuf/timestamp.proto";nimport "comment.proto";nimport "author.proto";noptionjava_package = "com.improving.springprotobuf.generated.v2.model\";\noptionjava_outer_classname = \"WebsiteMessagePayload\";\nmessageWebsiteMessage {\n reservado 5 a 6;\n reservado \"topic\";\n string id = 1;\n string content = 2;\n .AuthorMessage autor = 3;\n repetido .CommentMessage comentarios = 4;\n .Version version = 6;\n} {\nenum Version {\n UNKNOWN = 0;\n V1 = 1;\n V2 = 2;\n}"

}

Conclusión

El uso de Protobuf proporciona al desarrollador la capacidad de construir un esquema evolucionable como Avro. A diferencia de Avro, utiliza un IDL en lugar de un JSON. Spring Boot y Gradle proporcionan un buen marco para permitir al desarrollador escribir aplicaciones rápidamente y no preocuparse por los problemas de un esquema cambiante.

Tecnología
Datos

Reflexiones más recientes

Explore las entradas de nuestro blog e inspírese con los líderes de opinión de todas nuestras empresas.
Blog Image - Unveiling the Future of AI at Google Cloud Next 24 -1
IA/ML

Unveiling the Future of AI at Google Cloud Next ‘24

Get firsthand insights from Improving into the innovation brewing around artificial intelligence and cloud computing at Google Cloud Next '24.