Dernière modification : Dec 08 , 2024

Objectifs

Ce tutoriel vous guidera à travers les étapes pour configurer Kafka en tant que conteneur Docker et l'intégrer avec une application Quarkus afin d'envoyer et recevoir des événements de type "Nouvelle adoption disponible" et "Adoption finalisée".

Qu'est-ce que Kafka ?

Kafka est un système de messagerie distribué open-source développé par Apache. Il est utilisé pour la diffusion de flux de données en temps réel, la gestion de files d'attente de messages et le traitement de flux de données.

Voici quelques caractéristiques clés de Kafka :

  • Haute performance: Kafka est capable de gérer des millions de messages par seconde avec une faible latence.
  • Scalabilité: Kafka peut être facilement mis à l'échelle horizontalement en ajoutant des serveurs supplémentaires au cluster.
  • Durabilité: Kafka stocke les messages de manière persistante sur le disque, ce qui garantit qu'ils ne seront pas perdus en cas de panne.
  • Fiabilité: Kafka garantit la livraison des messages au moins une fois, ce qui le rend adapté aux applications critiques.
  • Flexibilité: Kafka peut être utilisé pour une variété de cas d'utilisation, notamment la diffusion de données en temps réel, la gestion de files d'attente de messages et le traitement de flux de données.

Voici quelques exemples d'utilisation de Kafka :

  • Diffusion de données en temps réel: Kafka peut être utilisé pour diffuser des données en temps réel à partir de sources telles que des capteurs, des journaux d'événements et des médias sociaux.
  • Gestion de files d'attente de messages: Kafka peut être utilisé pour gérer des files d'attente de messages pour les applications telles que les systèmes de commande et les systèmes de paiement.
  • Traitement de flux de données: Kafka peut être utilisé pour traiter des flux de données en temps réel, tels que les données de clics sur les sites Web et les données de

Prérequis

  • Docker installé sur votre machine
  • Java JDK 11 ou supérieur
  • Maven
  • Une installation fonctionnelle de Quarkus CLI (optionnel)

Étape 1 : Lancer Kafka avec Docker

  1. Créez un fichier docker-compose.yml pour configurer et démarrer Kafka :
version: "3"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-net
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "adoption-events:1:1"
    networks:
      - kafka-net
networks:
  kafka-net:
    driver: bridge

Notre Topic aura 1 partition et une seule réplication..

  1. Lancez Kafka avec Docker Compose :
docker compose up -d

Kafka sera maintenant accessible sur localhost:5672 et l'interface de gestion sur localhost:15672.

Quarkus et Kafka

Ajouter l'extension Kafka

Ajoutez les dépendances nécessaires au backend et au frontend :

quarkus ext add io.quarkus:quarkus-kafka-client
quarkus extension add messaging-kafka

Etape 3 : configurer Quarkus pour Kafka

Les applications envoient et reçoivent des messages. Un message enveloppe une charge utile et peut être étendu avec des métadonnées. Avec le connecteur Kafka, un message correspond à un enregistrement Kafka.

Les messages transitent par des canaux. Les composants de l'application se connectent à des canaux pour publier et consommer des messages. Le connecteur Kafka associe les canaux aux topics Kafka.

Les canaux sont connectés aux backends de messagerie à l'aide de connecteurs. Les connecteurs sont configurés pour mapper les messages entrants à un canal spécifique (consommé par l'application) et collecter les messages sortants envoyés à un canal spécifique. Chaque connecteur est dédié à une technologie de messagerie spécifique. Par exemple, le connecteur traitant de Kafka est nommé smallrye-kafka.

Ajoutez la configuration suivante dans src/main/resources/application.properties :

# Kafka
%prod.mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092
%prod.kafka.bootstrap.servers=172.23.240.1:9092
%prod.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092mp.messaging.outgoing.

quarkus.kafka.devservices.enabled=true
adoption-events.connector=smallrye-kafka

Do not forget to create a user in the Kafka admin console.

Etape 4 : Création des producteurs et consommateurs d'événements

Déclencher des événements Kafka quand une nouvelle demande d'adoption est effectuée et quand un monstre est adopté

Nous allons créer la classe qui représente les événements d'adoption. Nous allons le créer dans le package com.byoskill.domain.adoption.events et le nom de la classe sera AdoptionEvent.

Organisation du projet

Créez une classe pour les événements AdoptionEvent dans le frontend et le backend.

package com.example.adoption;

public class AdoptionEvent {
    public String type;
    public String message;

    public AdoptionEvent() {
    }

    public AdoptionEvent(String type, String message) {
        this.type = type;
        this.message = message;
    }
}

Créez une classe pour produire les événements AdoptionEventProducer dans le backend par exemple dans le package com.byoskill.domain.adapters.adoptions.Kafka.

package com.byoskill.adapters.adoptions.kafka;


import com.byoskill.domain.adoption.events.AdoptionEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
public class AdoptionEventProducer {

    @Channel("adoption-events")
    Emitter<AdoptionEvent> emitter;

    public void sendAdoptionAvailableEvent(String message) {
        emitter.send(new AdoptionEvent("Nouvelle adoption disponible", message));
    }

    public void sendAdoptionFinalizedEvent(String message) {
        emitter.send(new AdoptionEvent("Adoption finalisée", message));
    }
}

Quarkus prend en charge nativement la sérialisation et la désérialisation JSON basées sur Jackson. Il générera également le sérialiseur et le désérialiseur pour vous, de sorte que vous n'avez rien à configurer. Lorsque la génération est désactivée, vous pouvez utiliser l'ObjectMapperSerializer et l'ObjectMapperDeserializer fournis, comme expliqué ci-dessous.

Il existe un ObjectMapperSerializer que vous pouvez utiliser pour sérialiser tous les objets de données via Jackson. Vous pouvez créer une sous-classe vide si vous souhaitez utiliser l'autodétection du sérialiseur/désérialiseur.

Nous allons modifier AdoptionRepository pour ajouter la méthode adoptMonster.

Voici un exemple de code :

    /**
     * This method is used to notify that a monster have been adopted.
     * @param monsterId the monster UUID
     */
    void adoptMonster(String monsterId);

Voici un exemple d'implémentation pour le repository qui utilise une base mémoire :

public Uni<Monster> adoptMonster(String monsterId) {
        Uni<Monster> monsterByUuid = getMonsterByUuid(monsterId);
        return monsterByUuid.flatMap(monster -> {
            deleteMonsterByUuid(monsterId);
            return Uni.createFrom().item(monster);
        });
    }

Ajouter un point rest pour tester les adoptions

Modifier le contrôleur des adoptions pour ajouter un point rest pour tester les adoptions.

Voici un exemple de code à insérer :

@Path("/apply/{id}")
@ResponseStatus(204)
@POST
public void adoptMonster(@PathParam("id") final String id) {
    adoptionRepository.adoptMonster(id);
}

Nous allons désormais connecter les actions de notre Repository à des événements Kafka.

Pour simplifier, nous utiliserons un décorateur afin de ne pas avoir à dupliquer les codes.

package com.byoskill.domain.common;

import com.byoskill.adapters.adoptions.Kafka.AdoptionEventProducer;
import com.byoskill.domain.adoption.model.Monster;
import com.byoskill.domain.adoption.repository.AdoptionRepository;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.Priority;
import jakarta.decorator.Decorator;
import jakarta.decorator.Delegate;
import jakarta.inject.Inject;

import java.util.Optional;
import java.util.function.Function;

@Priority(10)
@Decorator
public class AdoptionRepositoryDecorator implements AdoptionRepository {

    @Inject
    @Delegate
    AdoptionRepository delegate;

    @Inject
    AdoptionEventProducer eventProducer;

    @Override
    public Multi<Monster> getAllMonsters() {
        return delegate.getAllMonsters();
    }

    @Override
    public Uni<Monster> addMonsterToAdopt(Monster monster) {

        Function<? super Monster, Uni<? extends Monster>> monsterUniFunction = monster1 ->{
            if (monster1 != null) {
                eventProducer.sendAdoptionAvailableEvent("Monster " + monster1.getName() + " is available for adoption");
            }
            return Uni.createFrom().item(monster1);
        };
        return delegate.addMonsterToAdopt(monster).flatMap(monsterUniFunction);
    }

    @Override
    public Uni<Monster> getMonsterByUuid(String id) {
        return getMonsterByUuid(id);
    }

    @Override
    public Multi<Monster> searchMonstersByName(String pattern, Optional<Integer> size) {
        return delegate.searchMonstersByName(pattern, size);
    }

    @Override
    public Multi<Monster> searchMonstersByDescription(String pattern, Optional<Integer> size) {
        return null;
    }

    @Override
    public void deleteMonsterByUuid(String id) {
        delegate.deleteMonsterByUuid(id);
    }

    @Override
    public Uni<Monster> updateMonsterByUUID(String id, Monster monster) {
        return delegate.updateMonsterByUUID(id, monster);
    }

    @Override
    public Multi<Monster> searchMonstersByAge(Integer age) {
        return delegate.searchMonstersByAge(age);
    }

    @Override
    public Uni<Monster> adoptMonster(String monsterId) {

        Function<? super Monster, Uni<? extends Monster>> onAdoption = monster1 ->{
            if (monster1 != null) {
                eventProducer.sendAdoptionAvailableEvent("Monster " + monster1.getName() + " has been adopted");
            }
            return Uni.createFrom().item(monster1);
        };
        return delegate.adoptMonster(monsterId)
                .flatMap(onAdoption);
    }

    @Override
    public Uni<Monster> changeName(Monster entityToBeUpdated, String newName) {
        return delegate.changeName(entityToBeUpdated, newName);
    }
}

Tester l'envoi d'événements

Nous allons tester l'envoi d'événements en faisant des appels CURL aux deux endpoints d'adoption et de demande d'adoption.

Ajout d'une adoption avec :

curl -X POST \
  http://localhost:8090/adoptions \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "Fluffy",
    "description": "A cute and cuddly monster",
    "price": 100,
    "age": 2,
    "location": "New York"
  }'

Pour réaliser une adoption ,remplacez monster-id par le UUID de l'adoption.

curl -X POST \
  http://localhost:8090/adoptions/apply/monster-id \
  -H 'Content-Type: application/json'

Dans Kafka, un exchange devrait être créé pour les événements Kafka.

Rabbit MQ Exchange creation

Capturer les événements Kafka et les afficher

Nous allons créer la même classe AdoptionEventdans le frontend.

Créez une classe pour les événements AdoptionEvent dans le frontend.

package com.example.adoption;

public class AdoptionEvent {
    public String type;
    public String message;

    public AdoptionEvent() {
    }

    public AdoptionEvent(String type, String message) {
        this.type = type;
        this.message = message;
    }
}

Créez une classe pour consommer les événements AdoptionEventConsumer dans le frontend.

package com.byoskill.adoption.events;

import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;


@ApplicationScoped
public class AdoptionEventConsumer {

    @Inject
    AdoptionService adoptionService;

    @Incoming("adoption-events")
    @Blocking
    public void process(JsonObject messagePayload) {
        AdoptionEvent event = messagePayload.mapTo(AdoptionEvent.class);
        if ("Nouvelle adoption disponible".equals(event.type)) {
            adoptionService.handleNewAdoption(event.message);
        } else if ("Adoption finalisée".equals(event.type)) {
            adoptionService.handleFinalizedAdoption(event.message);
        }
    }
}

L'objet Java a été sérialisé automatiquement par le connecteur Kafka dans le format JSON. Quand nous recevons le message, nous recevons un objet de type JsonObject qui doit être déserialisé en AdoptionEvent.

Implémenter le service Kafka

Implémentez le service AdoptionService :

package com.example.adoption;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AdoptionService {

    public void handleNewAdoption(String message) {
        // Logique pour gérer une nouvelle adoption
        System.out.println("Nouvelle adoption disponible : " + message);
    }

    public void handleFinalizedAdoption(String message) {
        // Logique pour gérer une adoption finalisée
        System.out.println("Adoption finalisée : " + message);
    }
}

N'oubliez pas d'ajouter la configuration Kafka dans le fichier application.properties

# Kafka
quarkus.kafka.devservices.enabled=true
%prod.mp.messaging.connector.smallrye-kafka.bootstrap.servers=172.23.240.1:9092

%prod.kafka.bootstrap.servers=172.23.240.1:9092
mp.messaging.incoming.adoption-events.connector=smallrye-kafka
mp.messaging.incoming.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json
%pr

Les propriétés de configuration fournies définissent les détails de connexion pour les canaux de messagerie sortants et entrants utilisant Kafka dans une application Quarkus. Décomposons chaque propriété :

_Canal sortant :_

```properties
mp.messaging.outgoing.adoption-events.connector=smallrye-kafka: Spécifie l'implémentation du connecteur à utiliser pour les messages sortants. Dans ce cas, il s'agit de smallrye-kafka, qui indique le connecteur Kafka de SmallRye Reactive Messaging.
%prod.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092: Définit le nom d'hôte et le port du serveur Kafka. Ici, il est défini sur localhost:9092, en supposant que le serveur Kafka est en cours d'exécution sur la même machine que l'application Quarkus.
mp.messaging.outgoing.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json: Définit le serializer utilisé pour convertir les messages sortants au format JSON avant de les envoyer à Kafka.

Canal entrant :

mp.messaging.incoming.adoption-events.connector=smallrye-kafka: Similaire au canal sortant, cette propriété définit le connecteur pour les messages entrants.
%prod.mp.messaging.incoming.adoption-events.bootstrap.servers=localhost:9092: Identique au canal sortant, cela définit le nom d'hôte et le port du serveur Kafka.
mp.messaging.incoming.adoption-events.serializer=io.quarkus.kafka.serde.serialization.Json: Définit le serializer utilisé pour convertir les messages entrants du format JSON aux objets Java.od.mp.messaging.outgoing.adoption-events.bootstrap.servers=localhost:9092

Tester l'envoi d'événements et la réception par le frontend.

Démarrez à la fois votre backend et votre frontend.

Utilisez les commandes CURL précédents et vérifiez que les événements sont bien envoyés et reçus dans le Frontend.

Conclusion

Dans cette leçon, nous avons abordé comment intégrer et configurer le connecteur de messaging Kafka pour Quarkus afin de lancer des évènements Kafka et les afficher dans le Frontend.

Kafka Integration

Eventuellement, configurez l'application pour ne démarrer Kafka qu'avec le profile Kafka activé!

Exemple :

@IfBuildProfile(anyOf = "Kafka")
@ApplicationScoped
public class AdoptionEventConsumer {

    @Inject
    AdoptionService adoptionService;

    @Incoming("adoption-events")
    @Blocking
    public void process(JsonObject messagePayload) {
        AdoptionEvent event = messagePayload.mapTo(AdoptionEvent.class);
        if ("Nouvelle adoption disponible".equals(event.type)) {
            adoptionService.handleNewAdoption(event.message);
        } else if ("Adoption finalisée".equals(event.type)) {
            adoptionService.handleFinalizedAdoption(event.message);
        }
    }
}

References