Recherche de site Web

Rationalisez la sérialisation et la gestion des versions des données avec Confluent Schema Registry sur Kafka


Introduction

À mesure que les entreprises se développent, le maintien de la compatibilité entre les formats de données est d'une importance primordiale en cas de streaming d'événements. Les données stockées sur les sujets Apache Kafka sont immuables et ne peuvent pas être modifiées rétroactivement pour répondre aux demandes actuelles. Contourner ce problème peut s'avérer être un défi avec un grand nombre de modifications de schéma.

Apache Avro est une bibliothèque de sérialisation de données, conçue pour les pipelines de streaming d'événements. Il vous permet de définir des structures riches (appelées schémas) pour vos données avec des capacités de sérialisation pour un transport et un stockage efficaces. Pour suivre les schémas et leurs versions, Confluent Schema Registry a été créé. Il agit comme un référentiel centralisé pour vos schémas, gère leur stockage et assure l'intercompatibilité. Cela vous permet de vous concentrer sur les données au lieu de trouver des moyens de convertir manuellement une version de schéma en une autre.

Dans ce didacticiel, vous allez déployer Confluent Schema Registry à l'aide de Docker et étendre le producteur et le consommateur Kafka que vous avez créés dans les didacticiels précédents de la série. Vous les retravaillerez pour créer et consommer des objets conformes à un schéma que vous définirez. Vous modifierez également le schéma et apprendrez à le faire évoluer sans casser les données conformes aux versions antérieures.

Conditions préalables

Pour suivre ce tutoriel, vous aurez besoin de :

  • Docker installé sur votre machine. Pour Ubuntu, consultez Comment installer et utiliser Docker sur Ubuntu. Il vous suffit de suivre l'étape 1 et l'étape 2 de ce didacticiel.
  • Docker Compose installé sur votre machine. Pour Ubuntu, suivez l'étape 1 et l'étape 2 de Comment installer et utiliser Docker Compose sur Ubuntu.
  • Un projet Java avec un producteur Kafka configuré conformément au guide Comment configurer un producteur Kafka pour obtenir des données via CLI.
  • Un projet Java avec un consommateur Kafka configuré conformément au guide Comment configurer un consommateur Kafka pour recevoir des données via CLI.
  • jq installé sur votre machine. Pour un aperçu, consultez l'article Comment transformer des données JSON avec jq.
  • Une compréhension de Kafka, y compris les sujets, les producteurs et les consommateurs. Pour plus d’informations, veuillez consulter Introduction à Kafka.

Étape 1 - Exécution du registre de schémas à l'aide de Docker Compose

Dans cette section, vous apprendrez comment exécuter le registre de schémas Confluent à l'aide de Docker Compose. Contrairement à Kafka, qui peut s'exécuter de manière autonome à l'aide de KRaft, Schema Registry nécessite une instance ZooKeeper pour fonctionner.

Dans le cadre des prérequis, vous avez déployé Kafka sur votre machine locale en tant que service systemd. Dans cette étape, vous allez déployer un nœud Kafka via Docker Compose. Tout d'abord, vous devez arrêter le service en exécutant :

sudo systemctl stop kafka

Vous allez maintenant définir la configuration de Docker Compose dans un fichier nommé schema-registry-compose.yaml. Créez-le et ouvrez-le pour le modifier en exécutant :

nano schema-registry-compose.yaml

Ajoutez les lignes suivantes :

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Ici, vous définissez trois services, zookeeper, kafka et kafka-schema-registry. Les trois services utilisent les dernières images Docker fournies par Confluent. Le service ZooKeeper sera exposé au port 2181 et kafka sera au port 9092. Pour kafka, sous environnement vous configurez l'adresse ZooKeeper et spécifiez un écouteur supplémentaire à 29092, que le registre de schémas utilisera pour se connecter directement à Kafka. . Vous spécifiez que le service kafka doit attendre que zookeeper démarre en premier.

Ensuite, vous exposez kafka-schema-registry sur le port 8081 et transmettez l'adresse de connexion au service kafka sous environnement. Vous spécifiez également que le registre de schémas ne doit être démarré que lorsque ZooKeeper et Kafka ont terminé leur initialisation.

Enregistrez et fermez le fichier, puis exécutez la commande suivante pour afficher les services en arrière-plan :

docker-compose -f schema-registry-compose.yaml up -d

La fin du résultat ressemblera aux lignes suivantes :

...
Creating root_zookeeper_1 ... done
Creating root_kafka_1     ... done
Creating kafka-schema-registry ... done

Vous pouvez lister les conteneurs en cours d'exécution avec :

docker ps

Le résultat ressemblera à ce qui suit :

CONTAINER ID   IMAGE                              COMMAND                  CREATED          STATUS          PORTS                                       NAMES
6783568a74c8   confluentinc/cp-schema-registry    "/etc/confluent/dock…"   19 seconds ago   Up 17 seconds   0.0.0.0:8081->8081/tcp, :::8081->8081/tcp   kafka-schema-registry
6367df4b55f7   confluentinc/cp-kafka:latest       "/etc/confluent/dock…"   19 seconds ago   Up 18 seconds   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   root_kafka_1
a5f5b09984e0   confluentinc/cp-zookeeper:latest   "/etc/confluent/dock…"   19 seconds ago   Up 19 seconds   2181/tcp, 2888/tcp, 3888/tcp                root_zookeeper_1

Au cours de cette étape, vous avez déployé une instance de Schema Registry, ainsi que ZooKeeper et Kafka à l'aide de Docker Compose. Vous allez maintenant apprendre à créer et utiliser des schémas Avro dans votre projet Java.

Étape 2 - Utilisation des schémas Avro

Dans cette section, vous ajouterez Avro à votre projet, ainsi que les dépendances associées. Vous apprendrez à définir des schémas Avro et à générer automatiquement des classes Java pour les types définis. Ensuite, vous ajouterez votre schéma Avro au registre de schémas.

Ajouter Avro à votre projet

Tout d’abord, vous ajouterez la dépendance org.apache.avro. Accédez à la page du référentiel Maven pour le client Java dans votre navigateur et sélectionnez la dernière version disponible, puis copiez l'extrait XML fourni pour Maven. Au moment de la rédaction de cet article, la dernière version de la bibliothèque client Java était 1.11.3.

Les dépendances sont ajoutées à pom.xml à la racine de votre projet. Ouvrez-le pour le modifier en exécutant :

nano pom.xml

Recherchez la section <dependencies> et ajoutez la définition de dépendance :

...
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.11.3</version>
</dependency>
...

Cela rendra Avro disponible pour votre projet. Cependant, pour pouvoir générer des classes Java à partir de schémas Avro, vous devrez également ajouter le avro-maven-plugin depuis le référentiel Maven de la même manière.

Une fois que vous avez défini les dépendances, vous devrez vous assurer que le plugin génère des sources. Recherchez la section <build> de pom.xml et ajoutez les lignes en surbrillance :

...
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.11.3</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>
...

Ici, vous configurez le avro-maven-plugin pour générer des sources Java basées sur des schémas sous /src/main/java/com/dokafka et les placez sous / src/main/java. Notez que vous ne devez pas modifier la section <plugins> existante sous .

Lorsque vous avez terminé, enregistrez et fermez le fichier.

Construisez le projet pour vérifier que tout est configuré correctement :

mvn package

La fin du résultat ressemblera à ceci :

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  5.759 s
[INFO] Finished at: 2024-04-01T13:38:31Z
[INFO] ------------------------------------------------------------------------

Définir des schémas Avro

Vous allez maintenant créer un schéma appelé TempMeasurement qui décrit une mesure de température à un moment donné. Vous le stockerez à côté des classes ProducerDemo et ConsumerDemo que vous avez créées dans le cadre des prérequis. Créez-le et ouvrez-le pour le modifier en exécutant :

nano src/main/java/com/dokafka/TempMeasurement.avsc

Ajoutez les lignes suivantes :

{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      }
  ]
}

Les fichiers de schéma Avro sont écrits en JSON et leur extension de fichier est .avsc. Tout d'abord, vous spécifiez le namespace du schéma, qui sera également l'espace de noms des classes Java générées automatiquement. Vous définissez son name sur TempMeasurement et spécifiez record comme type de schéma, ce qui signifie qu'il s'agit d'un objet Avro.

Ensuite, vous spécifiez les champs de votre schéma, que vous appelez measuredValue et measurerName de types double et string, respectivement. Avro prend également en charge d'autres types tels que int, long, float, boolean et bytes.

Enregistrez et fermez le fichier, puis construisez le projet :

mvn package

Maintenant, listez les fichiers sous src/main/java/com/dokafka :

ls src/main/java/com/dokafka

Vous verrez qu'une classe TempMeasurement a été créée :

ConsumerDemo.java  ProducerDemo.java  TempMeasurement.avsc  TempMeasurement.java

Cette classe contient le code pour instancier, sérialiser et désérialiser les objets TempMeasurement.

Stockage du schéma dans le registre de schémas

Maintenant que vous avez défini votre schéma, vous pouvez l'ajouter au Schema Registry en exécutant :

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
    http://localhost:8081/subjects/TempMeasurement/versions

Le registre de schémas est exposé sur http://localhost:8081 et accessible via HTTP. Dans cette commande, vous définissez d'abord la méthode HTTP sur POST avec le Content-Type approprié que le registre acceptera.

Vous transmettez le corps de la requête à -d et utilisez jq pour envelopper le contenu du schéma dans un champ appelé schema puisque c'est le format du registre de schémas. accepte. Enfin, vous pointez la requête vers subjects/TempMeasurement/versions, dans lequel vous spécifiez comment le nouveau schéma doit être appelé.

Le résultat sera :

{"id":1}

Schema Registry a accepté la demande et lui a attribué l'ID 1.

Pour répertorier tous les schémas disponibles, exécutez :

curl -X GET http://localhost:8081/subjects

Vous n’en verrez qu’un seul disponible :

["TempMeasurement"]

Au cours de cette étape, vous avez ajouté les dépendances nécessaires à votre projet Maven et configuré la génération automatique de code pour les schémas Avro que vous définissez. Vous allez maintenant vous plonger dans la production et la consommation de données basées sur des schémas.

Étape 3 - Produire et consommer selon des schémas

Confluent fournit une bibliothèque de sérialisation Avro pour Kafka appelée kafka-avro-serializer. Dans cette étape, vous allez l'ajouter à votre projet et configurer votre producteur et votre consommateur pour transmettre les objets TempMeasurement.

Ajout des dépendances nécessaires

Pour ajouter la bibliothèque à votre projet, accédez au référentiel Maven et copiez la définition de dépendance XML pour la dernière version disponible, qui était 7.6.0 au moment de la rédaction. Ensuite, ouvrez pom.xml pour le modifier :

nano pom.xml

Étant donné que le package kafka-avro-serializer est hébergé sur le référentiel Maven de Confluent, vous devrez le définir en ajoutant les lignes en surbrillance :

...
<repositories>
  <repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
  </repository>
</repositories>
...

De même, ajoutez la définition de la bibliothèque à la section <dependencies> :

...
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.6.0</version>
</dependency>
...

Pour permettre à votre client Kafka de communiquer avec le registre de schémas, vous devrez également ajouter le kafka-schema-registry-client en tant que dépendance, alors accédez au dépôt Maven et insérez la définition du dernière version disponible vers pom.xml.

Enregistrez et fermez le fichier lorsque vous avez terminé. Vous pourrez désormais utiliser les classes KafkaAvroSerializer et KafkaAvroDeserializer dans votre projet Maven.

Mise à jour de ProducerDemo et ConsumerDemo

Ensuite, vous allez retravailler la classe ProducerDemo pour vous connecter à Schema Registry et produire des objets de type TempMeasurement sur un sujet. Ouvrez-le pour le modifier en exécutant :

nano src/main/java/com/dokafka/ProducerDemo.java

Importez KafkaAvroSerializer et KafkaAvroSerializerConfig pour pouvoir les utiliser :

...
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
...

Ensuite, modifiez la première partie de la méthode main pour ressembler à ceci :

...
String topicName = "java_demo_avro";

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);

ProducerRecord<String, TempMeasurement> producerRecord =
  new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
...

Tout d’abord, vous définissez le nom du sujet sur java_demo_avro. Kafka créera le sujet s'il n'existe pas déjà. Ensuite, vous changez la classe de sérialiseur de valeurs de StringSerializer à KafkaAvroSerializer et définissez le paramètre SCHEMA_REGISTRY_URL_CONFIG, qui spécifie l'adresse du registre de schéma.

Vous remplacez également la définition de valeur String précédente par TempMeasurement. Pour le producterRecord, vous transmettez une nouvelle instance de TempMeasurement avec les deux paramètres (measurementValue et measurerName) provenant à partir des deux premiers arguments de ligne de commande passés dans la méthode main.

Lorsque vous avez terminé, enregistrez et fermez le fichier. Ensuite, pour pouvoir passer deux arguments au script run-producer.sh, vous devez l'ouvrir pour l'éditer :

nano run-producer.sh

Ajoutez le paramètre en surbrillance à la commande :

...
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2

Enregistrez et fermez le fichier, puis produisez un TempMeasurement en exécutant :

./run-producer.sh 100 sammy

Dans cette commande, vous transmettez 100 comme measurementValue et sammy comme measurerName.

Pour pouvoir recevoir l'objet Avro que vous venez de produire, vous devrez modifier ConsumerDemo de la même manière. Ouvrez le fichier pour le modifier :

nano src/main/java/com/dokafka/ConsumerDemo.java

Importez KafkaAvroDeserializer et KafkaAvroDeserializerConfig pour pouvoir les référencer :

...
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
...

Ensuite, modifiez la méthode main pour qu'elle ressemble à ceci :

...
String topic = "java_demo_avro";
String groupId = "group1";

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));

try {
  while (true) {
    ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, TempMeasurement> record: records) {
      log.info(String.format("measuredValue: %s, measurerName: %s\n",
        record.value().getMeasuredValue(),
        record.value().getMeasurerName()));
    }
  }
} catch (Exception e) {
  log.error("An error occurred", e);
} finally {
  consumer.close();
}
...

Comme avec ConsumerDemo, vous mettez à jour le topic et remplissez le paramètre SCHEMA_REGISTRY_URL_CONFIG avec l'adresse du registre de schéma. Vous mettez également à jour le désérialiseur vers KafkaAvroDeserializer. En définissant le paramètre SPECIFIC_AVRO_READER_CONFIG sur true, vous indiquez au désérialiseur de renvoyer de vrais objets TempMeasurement. Sinon, il renverrait un Avro GenericRecord, qui contiendrait toujours tous les champs mais ne serait pas fortement typé.

Ensuite, vous propagez TempMeasurement comme type de valeur dans le reste du code. Dans la boucle for, vous modifiez l'appel de la méthode de journalisation pour afficher measurementTemp et measurementValue.

Grâce à l'intégration de Schema Registry, le producteur ne regroupe pas le schéma d'objet avec l'objet lors de son envoi vers un sujet. Au lieu de cela, il envoie l'objet avec un identifiant du schéma. Le consommateur récupérera ce schéma dans son intégralité à partir du registre de schémas, puis le désérialisera.

Enregistrez et fermez le fichier lorsque vous avez terminé, puis exécutez le consommateur :

./run-consumer.sh

La fin du résultat ressemblera à ceci :

...
[main] INFO com.dokafka.ConsumerDemo - measuredValue: 100.0, measurerName: sammy

Votre consommateur Kafka a réussi à désérialiser le message Avro, comme en témoigne le message du journal.

Au cours de cette étape, vous avez mis à jour vos classes de producteur et de consommateur pour utiliser les objets Avro. À l'étape suivante, vous apprendrez comment mettre à jour les schémas et suivre leur compatibilité avec Schema Registry.

Étape 4 - Evolution et compatibilité du schéma

Au cours de cette étape, vous apprendrez comment mettre à jour les schémas existants et comment ces modifications impactent la compatibilité avec les versions et les clients existants.

En plus de stocker les schémas et de les versionner au fil du temps, Schema Registry est crucial pour permettre l'évolution des schémas. Les schémas peuvent être modifiés tout au long de la durée de vie d'un projet, contrairement aux données déjà produites.

Schema Registry prend en charge la compatibilité entre les versions de schéma et permet au consommateur d'analyser autant de données que possible en suivant sa version interne du schéma. Cela permet aux producteurs et aux consommateurs d'être désynchronisés concernant leurs versions exactes de schéma puisqu'ils peuvent résider dans des bases de code différentes.

Les principales stratégies de compatibilité proposées par Schema Registry sont :

  • BACKWARD, qui garantit que les consommateurs utilisant le nouveau schéma peuvent lire les données basées sur la version précédente
  • FORWARD, ce qui signifie que les consommateurs utilisant le nouveau schéma peuvent lire les données basées sur le nouveau schéma (sans aucune garantie pour les précédents)
  • FULL, qui combine les deux stratégies précédentes
  • NONE, ce qui signifie que les contrôles de compatibilité sont désactivés

Les trois premières stratégies ont également des contreparties transitives (telles que BACKWARD_TRANSITIVE), qui exigent que le nouveau schéma soit compatible avec toutes les versions précédentes du schéma et pas seulement avec son prédécesseur immédiat. La stratégie par défaut est BACKWARD.

Vous allez maintenant modifier le schéma TempMeasurement et ajouter un champ pour stocker la date de la mesure. Ouvrez le schéma pour le modifier en exécutant :

nano src/main/java/com/dokafka/TempMeasurement.avsc

Modifiez-le pour qu'il ressemble à ceci :

{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      },
      {
          "name": "measurementDate",
          "type": "string"
      }
  ]
}

Vous avez défini un nouveau champ appelé measurementDate, qui stockera la date de la mesure dans un format textuel. Enregistrez et fermez le fichier lorsque vous avez terminé.

Exécutez la commande suivante pour créer une nouvelle version du schéma dans Schema Registry :

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "$(jq -R -s '{"schema": .}' < src/main/java/com/dokafka/TempMeasurement.avsc)" \
    http://localhost:8081/subjects/TempMeasurement/versions

Vous obtiendrez le résultat suivant, détaillant une erreur :

{
  "error_code": 409,
  "message": "Schema being registered is incompatible with an earlier schema for subject \"TempMeasurement\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'measurementDate' at path '/fields/2' in the new schema has no default value and is missing in the old schema', additionalInfo:'measurementDate'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]"
}

L'erreur indique que le nouveau schéma n'est pas rétrocompatible avec le précédent car le champ measurementDate est nouveau et n'a pas de valeur par défaut. Ce comportement garantit que les consommateurs dotés de ce nouveau schéma pourront lire les données créées à l'aide de l'ancien schéma. Dans ce cas, la désérialisation échouerait sans valeur par défaut pour le champ inexistant.

Ouvrez le fichier et ajoutez la ligne en surbrillance :

{
  "namespace": "com.dokafka",
  "name": "TempMeasurement",
  "type": "record",
  "fields": [
      {
          "name": "measuredValue",
          "type": "double"
      },
      {
          "name": "measurerName",
          "type": "string"
      },
      {
          "name": "measurementDate",
          "type": "string",
          "default": ""
      }
  ]
}

measurementDate aura désormais une valeur par défaut d'une chaîne vide. Enregistrez et fermez le fichier, puis essayez à nouveau de soumettre le schéma. Le résultat sera :

{"id":2}

Schema Registry a accepté la deuxième version du schéma car elle est rétrocompatible avec la première et lui a attribué 2 comme ID. Vous pouvez récupérer la première version en exécutant :

curl -X GET http://localhost:8081/subjects/TempMeasurement/versions/1

Le résultat sera :

{
  "subject": "TempMeasurement",
  "version": 1,
  "id": 1,
  "schema": "{\"type\":\"record\",\"name\":\"TempMeasurement\",\"namespace\":\"com.dokafka\",\"fields\":[{\"name\":\"measuredValue\",\"type\":\"double\"},{\"name\":\"measurerName\",\"type\":\"string\"}]}"
}

Vous pouvez également répertorier toutes les versions d'un schéma en omettant un ID spécifique :

curl -X GET http://localhost:8081/subjects/TempMeasurement/versions

Vous verrez qu’il y en a deux :

[1, 2]

Pour modifier la stratégie de compatibilité, vous pouvez exécuter la commande suivante :

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
       --data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
       http://localhost:8081/config/TempMeasurements

Cette commande définit la stratégie pour TempMeasurements sur BACKWARD_TRANSITIVE. Notez que le point de terminaison est config et non subjects. Le résultat sera :

{"compatibility":"BACKWARD_TRANSITIVE"}

Pour détruire les ressources Docker Compose que vous avez démarrées, exécutez la commande suivante :

docker-compose -f schema-registry-compose.yaml down

Au cours de cette étape, vous avez modifié le schéma TempMeasurement conformément à la stratégie de compatibilité BACKWARD et l'avez publié dans le registre de schémas.

Conclusion

Dans cet article, vous avez étendu vos classes ProducerDemo et ConsumerDemo pour produire et consommer des objets TempMeasurement, sérialisés par Apache Avro. Vous avez appris à utiliser le registre de schémas pour le stockage et l'évolution des schémas et à y connecter vos clients Kafka.

L'auteur a sélectionné Apache Software Foundation pour recevoir un don dans le cadre du programme Write for DOnations.

Articles connexes: