Recherche de site Web

Comment configurer un consommateur Kafka pour recevoir des données via CLI


Introduction

Apache Kafka fournit des scripts shell pour produire et consommer des messages textuels de base vers et depuis un cluster Kafka. Bien que ceux-ci soient utiles pour explorer et expérimenter, les applications du monde réel accèdent à Kafka par programmation. À cette fin, Kafka propose de nombreuses bibliothèques clientes pour les langages et environnements de programmation largement utilisés.

Dans ce didacticiel, vous allez créer un programme Java qui consomme les données d'un sujet Kafka. Vous réutiliserez le projet Java du didacticiel précédent sur Comment configurer un producteur Kafka pour obtenir des données via CLI.

Vous allez implémenter une classe qui exploite le client Kafka en consommant les messages d'un sujet. Ensuite, vous découvrirez comment Kafka gère plusieurs consommateurs lisant le même sujet en même temps et comment il suit leurs progrès. Vous apprendrez également comment signaler manuellement la progression du consommateur au cluster.

Comment configurer un consommateur Kafka pour recevoir des données via CLI

  1. Créer un consommateur Kafka
  2. Implémentation d'un arrêt progressif
  3. Équilibrage de charge avec des groupes de consommateurs
  4. Validation manuelle des compensations

Conditions préalables

Pour terminer ce tutoriel, vous aurez besoin de :

  • Une machine avec au moins 4 Go de RAM et 2 processeurs. Dans le cas d'un serveur Ubuntu, suivez la configuration initiale du serveur Ubuntu pour les instructions de configuration.
  • Kit de développement Java (JDK) 8 ou supérieur installé sur votre Droplet ou votre ordinateur local. Pour obtenir des instructions sur l'installation de Java sur Ubuntu, consultez le didacticiel Comment installer Java avec Apt sur Ubuntu.
  • Apache Kafka installé et configuré sur votre Droplet ou votre machine locale. Vous pouvez suivre le didacticiel Introduction à Kafka pour obtenir des instructions de configuration.
  • Familiarité avec la disposition des répertoires standard des projets Java. Pour plus d'informations, consultez la rubrique Introduction à la disposition de répertoire standard dans la documentation officielle de Maven.
  • Un projet Java avec un producteur Kafka configuré conformément au didacticiel Comment configurer un producteur Kafka pour obtenir des données via CLI.

Étape 1 - Création d'un consommateur Kafka

Dans le cadre des prérequis, vous avez créé un projet Java avec les dépendances nécessaires pour accéder par programmation à Kafka et produire des messages dans la rubrique java_demo. Au cours de cette étape, vous allez créer une classe qui consommera les messages de ce sujet.

Accédez au répertoire où le projet dokafka est stocké. Conformément à la structure du projet, le code source est stocké sous src/main/java/com/dokafka.

Vous stockerez la classe dans un fichier nommé ConsumerDemo.java. Créez-le et ouvrez-le pour le modifier en exécutant :

nano src/main/java/com/dokafka/ConsumerDemo.java

Ajoutez les lignes suivantes :

package com.dokafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.time.*;

public class ConsumerDemo {
  private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);

  public static void main(String[] args) {
    String bootstrapServers = "localhost:9092";
    String topicName = "java_demo";
    String groupId = "group1";

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topicName));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.value()));
        }
      }
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
  }
}

Tout d’abord, vous définissez la classe ConsumerDemo et importez les classes que vous utiliserez. Vous instanciez également un Logger en tant que membre de la classe. De la même manière que ProducerDemo, dans la méthode main(), vous déclarez d'abord l'adresse du cluster Kafka (bootstrapServers) et le nom du sujet à partir duquel vous consommerez des messages.

Chaque consommateur Kafka appartient à un groupe de consommateurs, identifié par une chaîne unique appelée ID de groupe. Vous le définissez comme group1 et le stockez dans groupId.

Ensuite, vous instanciez un objet Properties, qui contient des paires de clés et de valeurs représentant la configuration d'exploitation de votre consommateur Kafka. Vous définissez la propriété BOOTSTRAP_SERVERS_CONFIG sur l'adresse du cluster Kafka. Ici, vous définissez les entrées de clé et de valeur deserializer sur StringDeserializer.class.getName().

À l'inverse des sérialiseurs (qui sont utilisés dans la classe ProducerDemo), les désérialiseurs sont des classes qui acceptent une entrée en octets et reconstruisent l'objet d'origine. Le consommateur les utilise pour reconvertir l'état acceptable par le réseau de la clé et de la valeur dans leurs formes originales comprises par le code. La clé et la valeur seront désérialisées en chaînes à l'aide du StringDeserializer intégré.

Vous définissez ensuite l'ID de groupe du consommateur (GROUP_ID_CONFIG). Vous définissez également le paramètre AUTO_OFFSET_RESET_CONFIG, qui définit à partir de quel endroit du sujet le consommateur doit commencer la lecture s'il n'a pas de position préalablement enregistrée. Le définir sur le plus tôt lui demande de commencer depuis le début (décalage 0).

Après cela, vous déclarez et instanciez un KafkaConsumer :

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

Le consommateur analysera les clés et les valeurs de type String avec les propriétés qui les accompagnent pour la configuration.

Pour exprimer votre intérêt à recevoir des messages du sujet, vous y subscribe() :

consumer.subscribe(Collections.singletonList(topicName));

Un consommateur peut s'abonner à plusieurs sujets à la fois en transmettant l'ID de chacun sous la forme d'une Liste. Étant donné que vous ne vous abonnerez qu'à un seul sujet, vous utilisez la méthode d'assistance Collections.singletonList() pour créer une Liste avec topic comme seul élément.

Ensuite, vous commencez à recevoir les enregistrements du sujet en interrogeant dans une boucle infinie :

...
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value()));
        }
      }
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
...

La méthode poll() de KafkaConsumer accepte une Durée, indiquant combien de temps le consommateur doit attendre que de nouveaux enregistrements y soient diffusés avant de revenir. Vous enregistrez ensuite les métadonnées et la valeur de chaque enregistrement reçu. Notez que la boucle infinie est située dans un bloc try. Sans cela, la compilation échouerait car consumer.close() serait inaccessible.

Lorsque vous avez terminé, enregistrez et fermez le fichier.

Ensuite, vous allez créer un script qui gérera la compilation et l'exécution de ConsumerDemo. Vous le stockerez dans un fichier appelé run-consumer.sh. Créez-le et ouvrez-le pour le modifier :

nano run-consumer.sh

Ajoutez les lignes suivantes :

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ConsumerDemo

Enregistrez et fermez le fichier, puis marquez-le comme exécutable :

chmod +x run-consumer.sh

Enfin, essayez de l'exécuter :

./run-consumer.sh

Le résultat sera long et sa fin devrait ressembler à ceci :

...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Found no committed offset for partition java_demo-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting offset for partition java_demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 0, key = null, value = Hello World!
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo, partition = 0, offset = 1, key = null, value = Hello World!

KafkaConsumer a enregistré qu'il a été affecté à la partition 0 du sujet java_demo et qu'il n'a pas trouvé de offset validé pour cette partition. Pour cette raison, vous avez précédemment défini AUTO_OFFSET_RESET_CONFIG sur le plus tôt et le décalage de la partition est réinitialisé à 0 comme l'indique le message de journal suivant. Les consommateurs suivent où ils ont arrêté la lecture en validant périodiquement (ou manuellement) le décalage dans le cluster. Vous en apprendrez davantage sur les compensations et leur relation avec les partitions et les groupes de consommateurs dans la section suivante.

Depuis qu'il a commencé la lecture depuis le début, le consommateur a reçu les messages Hello World! que vous avez produits dans le cadre des prérequis.

Appuyez sur CTRL+C pour l'arrêter, puis exécutez-le à nouveau :

...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
...

Le consommateur va maintenant commencer la lecture au décalage 2, là où il s'est arrêté la dernière fois. Cela signifie que le client Kafka a automatiquement enregistré son décalage précédent dans le cluster.

Au cours de cette étape, vous avez créé un consommateur Kafka en Java qui diffuse les enregistrements de la rubrique java_demo. Vous allez maintenant l'étendre afin qu'il élimine correctement le KafkaConsumer lors de l'arrêt.

Étape 2 - Mise en œuvre d'un arrêt progressif

La ligne consumer.close() dans le bloc finally s'exécutera une fois que le flux d'exécution aura quitté le bloc try. Puisqu'il contient une boucle infinie, cela ne se produira que si une exception se produit. Vous allez maintenant étendre ConsumerDemo pour fermer également le consommateur lorsqu'il est arrêté ou supprimé.

Java vous permet d'enregistrer des fonctions qui seront exécutées à la fermeture du programme, appelées shutdown hooks. Ouvrez ConsumerDemo pour le modifier :

nano src/main/java/com/dokafka/ConsumerDemo.java

Ajoutez les lignes en surbrillance :

...
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList(topic));

    Thread currentThread = Thread.currentThread();
    Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
        consumer.wakeup();

        try {
          currentThread.join();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    });
...

Ici, vous récupérez d’abord une référence au thread en cours d’exécution. Ensuite, vous ajoutez un hook d'arrêt en passant un nouveau Thread avec votre code. Vous remplacez la méthode run() de ce thread, dans laquelle vous ordonnez au consommateur de wakeup(). Cela arrêtera le consommateur et déclenchera une WakeupException qui doit être interceptée. Après, le bloc finally s'exécutera et close() le consommateur. Pour que cela se produise, ce thread va fusionner avec le thread principal et lui redonner le flux d'exécution.

Vous devrez attraper l'WakeupException en ajoutant les lignes suivantes après le bloc try :

...
    } catch (WakeupException e) {
      // Ignore
    } catch (Exception e) {
      log.error("An error occurred", e);
    } finally {
      consumer.close();
    }
...

La WakeupException est désormais interceptée et ignorée, puisque le consommateur sera fermé dans finally.

Enregistrez et fermez le fichier lorsque vous avez terminé, puis exécutez le consommateur :

./run-consumer.sh

Attendez qu'il se charge, puis appuyez sur CTRL+C. Vous remarquerez que l'arrêt n'est plus instantané car le consommateur communique avec le cluster et annonce son départ du groupe de consommateurs. La fin du résultat indiquant cela ressemblera à ceci :

...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions java_demo_partitions-0, java_demo_partitions-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-7bddcea2-ee1a-4a15-9797-669c0302d19f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Resetting generation and member id due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Request joining group due to: consumer pro-actively leaving the group
...

Dans cette étape, vous avez implémenté un hook d'arrêt dans ConsumerDemo qui garantit que le consommateur se fermera correctement lorsque le programme est arrêté. Vous allez maintenant en apprendre davantage sur l’architecture des groupes de consommateurs et des compensations.

Étape 3 : Équilibrage de charge avec les groupes de consommateurs

Le consommateur que vous avez créé à l'étape précédente a son paramètre GROUP_ID_CONFIG défini sur group1, ce qui indique qu'il appartient à un groupe de consommateurs avec cela IDENTIFIANT. Les consommateurs d'un groupe sont considérés comme homogènes, ce qui permet à Kafka d'équilibrer la charge des événements entre eux.

Partitions de sujets

Chaque sujet Kafka est composé de partitions, qui sont réparties entre les nœuds du cluster pour la tolérance aux pannes. Chaque partition possède son propre ensemble d'enregistrements, ainsi que son propre décalage. Kafka garantit l'ordre des enregistrements au sein d'une partition, mais pas entre eux au sein d'un sujet. Par défaut, Kafka utilise un algorithme circulaire pour décider à quelle partition un enregistrement entrant doit être ajouté. Bien qu'utile dans le cas général, cela n'est pas acceptable lorsque vous avez besoin de stocker les enregistrements associés dans un ordre strict, car la relecture d'une partition ne permettrait pas de récupérer toute la chaîne logique des événements.

Pour maintenir un ordre strict, Kafka vous permet de transmettre une clé pour le message. Avec une clé, Kafka ajoutera toujours l'enregistrement à la même partition. Comme Kafka garantit le classement dans une partition, vous pouvez être sûr que tous les messages avec la même clé seront diffusés par ordre d'insertion.

Les partitions peuvent être répliquées sur plusieurs courtiers pour garantir la redondance. La partition principale avec laquelle les consommateurs interagissent est appelée chef de partition, tandis que les partitions secondaires sont considérées comme des répliques. Les répliques sont stockées sur des nœuds autres que celui où réside le chef de partition.

Mappage des groupes de consommateurs et des partitions

Comme chaque partition est un flux d'événements distinct avec ses propres décalages, chaque partition sera attribuée à un consommateur du groupe. S'il y a plus de partitions que de consommateurs, certains consommateurs liront à partir de plusieurs partitions. Notez que s’il y a plus de consommateurs que de partitions, certains consommateurs ne se verront attribuer rien et resteront inactifs. Ceci est fait exprès, car la lecture parallèle de plusieurs consommateurs sur la même partition entraînerait le traitement des mêmes événements plusieurs fois. Pour cette raison, il est recommandé de faire correspondre le nombre de partitions et de consommateurs dans un groupe.

Les consommateurs peuvent entrer ou quitter un groupe de consommateurs à tout moment. Lorsque cela se produit, Kafka rééquilibrera les partitions entre le nouvel ensemble de consommateurs. Chaque consommateur récupérera ensuite le dernier décalage validé pour chaque partition à laquelle il est affecté et poursuivra le traitement à partir de là. Il n'y a aucune garantie quant à savoir si un consommateur conservera l'ensemble de partitions sur lesquelles il travaillait auparavant. Il est donc très important de ne restituer les compensations à Kafka que lorsque le travail est réellement terminé.

Définition des clés pour les enregistrements

Vous allez maintenant créer un nouveau sujet avec deux partitions. Ensuite, vous modifierez ProducerDemo pour définir une clé pour chaque message qu'il envoie et exécuterez plusieurs consommateurs dans un groupe pour voir comment Kafka organise les messages entrants.

Depuis le répertoire de votre installation Kafka, exécutez la commande suivante pour créer un nouveau sujet :

bin/kafka-topics.sh --create --topic java_demo_partitions --bootstrap-server localhost:9092 --partitions 2

Le sujet s'appelle java_demo_partitions et contient deux partitions. Le résultat sera :

...
Created topic java_demo_partitions.

Puisque vous allez modifier ProducerDemo, ouvrez-le pour le modifier :

nano src/main/java/com/dokafka/ProducerDemo.java

Tout d'abord, définissez topicName sur java_demo_producer :

...
    String topicName = "java_demo_partitions";
...

Ensuite, transmettez le premier argument CLI que votre programme reçoit comme clé pour le ProducerRecord :

...
    ProducerRecord<String, String> producerRecord =
      new ProducerRecord<>(topicName, args[0], "Hello World!");    
...

Le tableau de chaînes args est transmis à la méthode main() et contient les arguments qui sont transmis lors de l'exécution du programme. Lorsque vous avez terminé, enregistrez et fermez le fichier.

Ensuite, vous mettrez à jour ConsumerDemo pour récupérer les données du nouveau sujet. Ouvrez-le pour le modifier :

nano src/main/java/com/dokafka/ConsumerDemo.java

De même, définissez topicName sur le nouveau sujet :

...
    String topicName = "java_demo_partitions";
...

Enregistrez et fermez le fichier.

Avant d'exécuter le producteur, vous devrez mettre à jour le script run-producer.sh pour transmettre un argument donné à ProducerDemo. Ouvrez-le pour le modifier :

nano run-producer.sh

Passez le premier argument à la commande java comme indiqué :

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1

Lorsque vous avez terminé, enregistrez et fermez le fichier.

Ensuite, ouvrez une session de terminal distincte et exécutez le consommateur :

./run-consumer.sh

Ensuite, ouvrez une troisième session de terminal et exécutez-y le deuxième consommateur :

./run-consumer.sh

La fin de la sortie de l’un des consommateurs sera similaire à ceci :

...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}

L'autre consommateur se verra attribuer la partition java_demo_partitions-1.

Dans la session principale, produisez un message avec la clé key1 en exécutant :

./run-producer.sh key1

Notez qu'un seul des deux consommateurs recevra le message :

[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 0, key = key1, value = Hello World!

Essayez de produire un message avec une clé différente :

./run-producer.sh key2

Cette fois, l'autre consommateur recevra ce message car Kafka l'a acheminé vers la partition restante (à partir de laquelle ce consommateur diffuse les enregistrements) :

[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 0, key = key2, value = Hello World!

Appuyez sur CTRL+C lors de la troisième session pour mettre fin au deuxième consommateur. Vous verrez les deux partitions être rééquilibrées pour le consommateur restant :

...
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group1-1, groupId=group1] Notifying assignor about the new Assignment(partitions=[java_demo_partitions-0, java_demo_partitions-1])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker - [Consumer clientId=consumer-group1-1, groupId=group1] Adding newly assigned partitions: java_demo_partitions-0, java_demo_partitions-1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-1 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerUtils - Setting offset for partition java_demo_partitions-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}

Ensuite, essayez à nouveau de produire des messages :

./run-producer.sh key1
./run-producer.sh key2

Vous verrez que le consommateur restant a reçu les deux :

...
[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 0, offset = 1, key = key1, value = Hello World!

[main] INFO com.dokafka.ConsumerDemo - topic = java_demo_partitions, partition = 1, offset = 1, key = key2, value = Hello World!

Au cours de cette étape, vous avez appris à gérer plusieurs consommateurs dans un groupe et comment Kafka rééquilibre le trafic entre eux en réaffectant les partitions en cas de modifications. Vous allez maintenant apprendre à valider manuellement les compensations dans le cluster.

Étape 4 - Validation manuelle des compensations

Le comportement par défaut de la bibliothèque client Kafka est de valider automatiquement le dernier décalage renvoyé par poll() toutes les 5 secondes. Ceci n'est pas sûr si le consommateur traite les événements à un rythme plus lent que celui qu'il engage automatiquement et peut conduire à ce que les enregistrements ne soient pas traités. Vous allez maintenant apprendre à valider manuellement les compensations pour éviter ce problème.

Par exemple, si poll() renvoyait des messages allant des décalages 0 à 10, le consommateur renverrait ce 10 est la dernière compensation traitée, que cela se soit réellement produit ou non. Si l'application plante avant de traiter réellement le message au décalage 10, la prochaine fois qu'elle s'exécutera, elle démarrera à partir de là, laissant effectivement certains enregistrements non traités.

Vous allez modifier ConsumerDemo, alors ouvrez-le pour le modifier :

nano src/main/java/com/dokafka/ConsumerDemo.java

Tout d'abord, désactivez les validations automatiques en définissant ENABLE_AUTO_COMMIT_CONFIG sur false :

...
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...

Ensuite, ajoutez la ligne en surbrillance à la boucle qui parcourt chaque enregistrement reçu :

...
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record: records) {
          log.info(String.format("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value()));
          consumer.commitSync();
        }
      }
    }
...

La méthode commitSync() de KafkaConsumer validera le décalage actuel de manière bloquante, ce qui signifie que le traitement ultérieur des enregistrements n'aura pas lieu jusqu'à ce que le processus de validation soit terminé ou qu'une exception se produise. Est lancé.

KafkaConsumer fournit également la méthode commitAsync(), qui tentera de valider en arrière-plan tout en renvoyant le flux d'exécution à l'appelant (dans ce cas, la boucle continuera en cours d'exécution). L'inconvénient est que l'état peut toujours être incohérent - Kafka peut renvoyer une erreur et votre code serait déjà passé à l'enregistrement suivant.

Lorsque vous avez terminé, enregistrez et fermez le fichier. Vous pouvez essayer d'exécuter le consommateur dans une session distincte, puis de produire quelques messages. Le flux global sera le même, mais le consommateur n’engagera désormais plus les compensations qu’il n’a pas encore traitées.

Dans cette section, vous avez appris à valider manuellement les compensations dans Kafka lorsque vous traitez les enregistrements reçus.

Conclusion

Dans ce didacticiel, vous avez étendu le producteur Kafka ProducerDemo et créé ConsumerDemo, un consommateur Kafka écrit en Java. Vous avez découvert les groupes de consommateurs et la manière dont Kafka attribue des partitions aux consommateurs en groupes.

Vous vous êtes assuré que l'instance KafkaConsumer est finalement correctement fermée même lorsque le processus est supprimé. Vous avez également appris à valider manuellement les compensations dans le cluster tout en traitant les enregistrements.

Pour plus d'informations sur KafkaConsumer et ses propriétés, visitez la documentation officielle de Kafka.

L'auteur a sélectionné Open Source Initiative pour recevoir un don dans le cadre du programme Write for DOnations.

Articles connexes: