Chapitre Messagerie avec Quarkus
Quarkus et Kafka
Nous avons utiliser Kafka comme bus de message pour signaler les nouvelles adoptions et quand une adoption a été réalisée.
Nous allons visiter des fonctionnalités telles que :
- Utilisation de l'extension Quarkus pour Kafka
- exécuter un code au démarrage de Quarkus
- ajouter de la configuration pour l'application
- configurer les traces et les logs
Objectifs
Ce tutoriel vous guidera à travers les étapes pour configurer Kafka en tant que conteneur Docker et l'intégrer avec une application Quarkus afin d'envoyer et recevoir des événements de type "Nouvelle adoption disponible" et "Adoption finalisée".
Qu'est-ce que Kafka ?
Kafka est un système de messagerie distribué open-source développé par Apache. Il est utilisé pour la diffusion de flux de données en temps réel, la gestion de files d'attente de messages et le traitement de flux de données.
Voici quelques caractéristiques clés de Kafka :
- Haute performance: Kafka est capable de gérer des millions de messages par seconde avec une faible latence.
- Scalabilité: Kafka peut être facilement mis à l'échelle horizontalement en ajoutant des serveurs supplémentaires au cluster.
- Durabilité: Kafka stocke les messages de manière persistante sur le disque, ce qui garantit qu'ils ne seront pas perdus en cas de panne.
- Fiabilité: Kafka garantit la livraison des messages au moins une fois, ce qui le rend adapté aux applications critiques.
- Flexibilité: Kafka peut être utilisé pour une variété de cas d'utilisation, notamment la diffusion de données en temps réel, la gestion de files d'attente de messages et le traitement de flux de données.
Voici quelques exemples d'utilisation de Kafka :
- Diffusion de données en temps réel: Kafka peut être utilisé pour diffuser des données en temps réel à partir de sources telles que des capteurs, des journaux d'événements et des médias sociaux.
- Gestion de files d'attente de messages: Kafka peut être utilisé pour gérer des files d'attente de messages pour les applications telles que les systèmes de commande et les systèmes de paiement.
- Traitement de flux de données: Kafka peut être utilisé pour traiter des flux de données en temps réel, tels que les données de clics sur les sites Web et les données de
Prérequis
- Docker installé sur votre machine
- Java JDK 11 ou supérieur
- Maven
- Une installation fonctionnelle de Quarkus CLI (optionnel)
Étape 1 : Lancer Kafka avec Docker
- Créez un fichier
docker-compose.yml
pour configurer et démarrer Kafka :
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "adoption-events:1:1"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
Notre Topic aura 1 partition et une seule réplication..
- Lancez Kafka avec Docker Compose :
docker compose up -d
Kafka sera maintenant accessible sur localhost:5672 et l'interface de gestion sur localhost:15672.
Ajouter l'extension Kafka
Ajoutez les dépendances nécessaires au backend et au frontend :
quarkus ext add io.quarkus:quarkus-kafka-client
quarkus extension add messaging-kafka
Etape 3 : configurer Quarkus pour Kafka
Les applications envoient et reçoivent des messages. Un message enveloppe une charge utile et peut être étendu avec des métadonnées. Avec le connecteur Kafka, un message correspond à un enregistrement Kafka.
Les messages transitent par des canaux. Les composants de l'application se connectent à des canaux pour publier et consommer des messages. Le connecteur Kafka associe les canaux aux topics Kafka.
Les canaux sont connectés aux backends de messagerie à l'aide de connecteurs. Les connecteurs sont configurés pour mapper les messages entrants à un canal spécifique (consommé par l'application) et collecter les messages sortants envoyés à un canal spécifique. Chaque connecteur est dédié à une technologie de messagerie spécifique. Par exemple, le connecteur traitant de Kafka est nommé smallrye-kafka.
Ajoutez la configuration suivante dans src/main/resources/application.properties
:
# Kafka
%prod.mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092
%prod.kafka.bootstrap.servers=172.23.240.1:9092
%prod.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092mp.messaging.outgoing.
quarkus.kafka.devservices.enabled=true
adoption-events.connector=smallrye-kafka
Do not forget to create a user in the Kafka admin console.
Etape 4 : Création des producteurs et consommateurs d'événements
Déclencher des événements Kafka quand une nouvelle demande d'adoption est effectuée et quand un monstre est adopté
Nous allons créer la classe qui représente les événements d'adoption. Nous allons le créer dans le package com.byoskill.domain.adoption.events
et le nom de la classe sera AdoptionEvent
.
Créez une classe pour les événements AdoptionEvent dans le frontend et le backend.
package com.example.adoption;
public class AdoptionEvent {
public String type;
public String message;
public AdoptionEvent() {
}
public AdoptionEvent(String type, String message) {
this.type = type;
this.message = message;
}
}
Créez une classe pour produire les événements AdoptionEventProducer
dans le backend par exemple dans le package com.byoskill.domain.adapters.adoptions.Kafka
.
package com.byoskill.adapters.adoptions.kafka;
import com.byoskill.domain.adoption.events.AdoptionEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@ApplicationScoped
public class AdoptionEventProducer {
@Channel("adoption-events")
Emitter<AdoptionEvent> emitter;
public void sendAdoptionAvailableEvent(String message) {
emitter.send(new AdoptionEvent("Nouvelle adoption disponible", message));
}
public void sendAdoptionFinalizedEvent(String message) {
emitter.send(new AdoptionEvent("Adoption finalisée", message));
}
}
Quarkus prend en charge nativement la sérialisation et la désérialisation JSON basées sur Jackson. Il générera également le sérialiseur et le désérialiseur pour vous, de sorte que vous n'avez rien à configurer. Lorsque la génération est désactivée, vous pouvez utiliser l'ObjectMapperSerializer et l'ObjectMapperDeserializer fournis, comme expliqué ci-dessous.
Il existe un ObjectMapperSerializer que vous pouvez utiliser pour sérialiser tous les objets de données via Jackson. Vous pouvez créer une sous-classe vide si vous souhaitez utiliser l'autodétection du sérialiseur/désérialiseur.
Nous allons modifier AdoptionRepository
pour ajouter la méthode adoptMonster
.
Voici un exemple de code :
/**
* This method is used to notify that a monster have been adopted.
* @param monsterId the monster UUID
*/
void adoptMonster(String monsterId);
Voici un exemple d'implémentation pour le repository qui utilise une base mémoire :
public Uni<Monster> adoptMonster(String monsterId) {
Uni<Monster> monsterByUuid = getMonsterByUuid(monsterId);
return monsterByUuid.flatMap(monster -> {
deleteMonsterByUuid(monsterId);
return Uni.createFrom().item(monster);
});
}
Ajouter un point rest pour tester les adoptions
Modifier le contrôleur des adoptions pour ajouter un point rest pour tester les adoptions.
Voici un exemple de code à insérer :
@Path("/apply/{id}")
@ResponseStatus(204)
@POST
public void adoptMonster(@PathParam("id") final String id) {
adoptionRepository.adoptMonster(id);
}
Nous allons désormais connecter les actions de notre Repository à des événements Kafka.
Pour simplifier, nous utiliserons un décorateur afin de ne pas avoir à dupliquer les codes.
package com.byoskill.domain.common;
import com.byoskill.adapters.adoptions.Kafka.AdoptionEventProducer;
import com.byoskill.domain.adoption.model.Monster;
import com.byoskill.domain.adoption.repository.AdoptionRepository;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.inject.Inject;
import java.util.Optional;
import java.util.function.Function;
@Priority(10)
@Decorator
public class AdoptionRepositoryDecorator implements AdoptionRepository {
@Inject
@Delegate
AdoptionRepository delegate;
@Inject
AdoptionEventProducer eventProducer;
@Override
public Multi<Monster> getAllMonsters() {
return delegate.getAllMonsters();
}
@Override
public Uni<Monster> addMonsterToAdopt(Monster monster) {
Function<? super Monster, Uni<? extends Monster>> monsterUniFunction = monster1 ->{
if (monster1 != null) {
eventProducer.sendAdoptionAvailableEvent("Monster " + monster1.getName() + " is available for adoption");
}
return Uni.createFrom().item(monster1);
};
return delegate.addMonsterToAdopt(monster).flatMap(monsterUniFunction);
}
@Override
public Uni<Monster> getMonsterByUuid(String id) {
return getMonsterByUuid(id);
}
@Override
public Multi<Monster> searchMonstersByName(String pattern, Optional<Integer> size) {
return delegate.searchMonstersByName(pattern, size);
}
@Override
public Multi<Monster> searchMonstersByDescription(String pattern, Optional<Integer> size) {
return null;
}
@Override
public void deleteMonsterByUuid(String id) {
delegate.deleteMonsterByUuid(id);
}
@Override
public Uni<Monster> updateMonsterByUUID(String id, Monster monster) {
return delegate.updateMonsterByUUID(id, monster);
}
@Override
public Multi<Monster> searchMonstersByAge(Integer age) {
return delegate.searchMonstersByAge(age);
}
@Override
public Uni<Monster> adoptMonster(String monsterId) {
Function<? super Monster, Uni<? extends Monster>> onAdoption = monster1 ->{
if (monster1 != null) {
eventProducer.sendAdoptionAvailableEvent("Monster " + monster1.getName() + " has been adopted");
}
return Uni.createFrom().item(monster1);
};
return delegate.adoptMonster(monsterId)
.flatMap(onAdoption);
}
@Override
public Uni<Monster> changeName(Monster entityToBeUpdated, String newName) {
return delegate.changeName(entityToBeUpdated, newName);
}
}
Tester l'envoi d'événements
Nous allons tester l'envoi d'événements en faisant des appels CURL aux deux endpoints d'adoption et de demande d'adoption.
Ajout d'une adoption avec :
curl -X POST \
http://localhost:8090/adoptions \
-H 'Content-Type: application/json' \
-d '{
"name": "Fluffy",
"description": "A cute and cuddly monster",
"price": 100,
"age": 2,
"location": "New York"
}'
Pour réaliser une adoption ,remplacez monster-id par le UUID de l'adoption.
curl -X POST \
http://localhost:8090/adoptions/apply/monster-id \
-H 'Content-Type: application/json'
Dans Kafka, un exchange devrait être créé pour les événements Kafka.
Capturer les événements Kafka et les afficher
Nous allons créer la même classe AdoptionEvent
dans le frontend.
Créez une classe pour les événements AdoptionEvent dans le frontend.
package com.example.adoption;
public class AdoptionEvent {
public String type;
public String message;
public AdoptionEvent() {
}
public AdoptionEvent(String type, String message) {
this.type = type;
this.message = message;
}
}
Créez une classe pour consommer les événements AdoptionEventConsumer dans le frontend.
package com.byoskill.adoption.events;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
@ApplicationScoped
public class AdoptionEventConsumer {
@Inject
AdoptionService adoptionService;
@Incoming("adoption-events")
@Blocking
public void process(JsonObject messagePayload) {
AdoptionEvent event = messagePayload.mapTo(AdoptionEvent.class);
if ("Nouvelle adoption disponible".equals(event.type)) {
adoptionService.handleNewAdoption(event.message);
} else if ("Adoption finalisée".equals(event.type)) {
adoptionService.handleFinalizedAdoption(event.message);
}
}
}
L'objet Java a été sérialisé automatiquement par le connecteur Kafka dans le format JSON. Quand nous recevons le message, nous recevons un objet de type JsonObject qui doit être déserialisé en AdoptionEvent
.
Implémenter le service Kafka
Implémentez le service AdoptionService :
package com.example.adoption;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class AdoptionService {
public void handleNewAdoption(String message) {
// Logique pour gérer une nouvelle adoption
System.out.println("Nouvelle adoption disponible : " + message);
}
public void handleFinalizedAdoption(String message) {
// Logique pour gérer une adoption finalisée
System.out.println("Adoption finalisée : " + message);
}
}
N'oubliez pas d'ajouter la configuration Kafka dans le fichier application.properties
# Kafka
quarkus.kafka.devservices.enabled=true
%prod.mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092
%prod.kafka.bootstrap.servers=172.23.240.1:9092
mp.messaging.incoming.adoption-events.connector=smallrye-kafka
mp.messaging.incoming.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json
%pr
Les propriétés de configuration fournies définissent les détails de connexion pour les canaux de messagerie sortants et entrants utilisant Kafka dans une application Quarkus. Décomposons chaque propriété :
_Canal sortant :_
```properties
mp.messaging.outgoing.adoption-events.connector=smallrye-kafka: Spécifie l'implémentation du connecteur à utiliser pour les messages sortants. Dans ce cas, il s'agit de smallrye-kafka, qui indique le connecteur Kafka de SmallRye Reactive Messaging.
%prod.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092: Définit le nom d'hôte et le port du serveur Kafka. Ici, il est défini sur localhost:9092, en supposant que le serveur Kafka est en cours d'exécution sur la même machine que l'application Quarkus.
mp.messaging.outgoing.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json: Définit le serializer utilisé pour convertir les messages sortants au format JSON avant de les envoyer à Kafka.
Canal entrant :
mp.messaging.incoming.adoption-events.connector=smallrye-kafka: Similaire au canal sortant, cette propriété définit le connecteur pour les messages entrants.
%prod.mp.messaging.incoming.adoption-events.bootstrap.servers=localhost:9092: Identique au canal sortant, cela définit le nom d'hôte et le port du serveur Kafka.
mp.messaging.incoming.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json: Définit le serializer utilisé pour convertir les messages entrants du format JSON aux objets Java.od.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092
Tester l'envoi d'événements et la réception par le frontend.
Démarrez à la fois votre backend et votre frontend.
Utilisez les commandes CURL précédents et vérifiez que les événements sont bien envoyés et reçus dans le Frontend.
Conclusion
Dans cette leçon, nous avons abordé comment intégrer et configurer le connecteur de messaging Kafka pour Quarkus afin de lancer des évènements Kafka et les afficher dans le Frontend.
Eventuellement, configurez l'application pour ne démarrer Kafka qu'avec le profile Kafka activé!
Exemple :
@IfBuildProfile(anyOf = "Kafka")
@ApplicationScoped
public class AdoptionEventConsumer {
@Inject
AdoptionService adoptionService;
@Incoming("adoption-events")
@Blocking
public void process(JsonObject messagePayload) {
AdoptionEvent event = messagePayload.mapTo(AdoptionEvent.class);
if ("Nouvelle adoption disponible".equals(event.type)) {
adoptionService.handleNewAdoption(event.message);
} else if ("Adoption finalisée".equals(event.type)) {
adoptionService.handleFinalizedAdoption(event.message);
}
}
}