January 20, 2025

Biotechnologie News

Classe Mondiale Technologie

Comment configurer le test d’intégration de Kafka – Grape Up

Comment configurer le test d’intégration de Kafka – Grape Up

Considérez-vous que les tests unitaires ne sont pas une solution suffisante pour maintenir la fiabilité et la stabilité de l’application ? Avez-vous peur que d’une manière ou d’une autre, un bogue potentiel se cache dans l’hypothèse selon laquelle les tests unitaires devraient couvrir tous les cas ? Et se moquer de Kafka n’est-il pas suffisant pour les besoins du projet ? Si même une seule réponse est “oui”, alors bienvenue dans un guide simple et agréable sur la configuration des tests d’intégration pour Kafka à l’aide de TestContainers et Embedded Kafka pour Spring !

Qu’est-ce que TestContainers ?

TestContainers est une bibliothèque Java open source spécialisée dans la fourniture de toutes les solutions nécessaires à l’intégration et au test de sources externes. Cela signifie que nous sommes capables d’imiter une base de données réelle, un serveur Web ou même un environnement de bus d’événements et de le traiter comme un endroit fiable pour tester les fonctionnalités de l’application. Toutes ces fonctionnalités sophistiquées sont accrochées à des images docker, définies comme des conteneurs. Avons-nous besoin de tester la couche de base de données avec MongoDB réel ? Pas de soucis, nous avons un conteneur de test pour cela. Nous ne pouvons pas non plus oublier les tests d’interface utilisateur – Selenium Container fera tout ce dont nous avons réellement besoin.
Dans notre cas, nous nous concentrerons sur Kafka Testcontainer.

Qu’est-ce que Kafka intégré ?

Comme son nom l’indique, nous allons traiter une instance Kafka en mémoire, prête à être utilisée comme un courtier normal avec toutes les fonctionnalités. Cela nous permet de travailler avec les producteurs et les consommateurs, comme d’habitude, en allégeant nos tests d’intégration.

Avant de commencer

Le concept de notre test est simple – je voudrais tester le consommateur et le producteur de Kafka en utilisant deux approches différentes et vérifier comment nous pouvons les utiliser dans des cas réels.

Les messages Kafka sont sérialisés à l’aide de schémas Avro.

Kafka intégré – Test du producteur

Le concept est simple – créons un projet simple avec le contrôleur, qui appelle une méthode de service pour envoyer un message sérialisé Kafka Avro.

Dépendances :

dependencies 
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.0")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')

implementation('org.springframework.boot:spring-boot-starter-web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-test-support:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Il convient également de mentionner le plugin fantastique pour Avro. Ici la section plugins :

plugins 
	id 'org.springframework.boot' version '2.6.8'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
	id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"

Avro Plugin prend en charge la génération automatique de schéma. C’est quelque chose qu’on se doit d’avoir.

Lien vers le plug-in : https://github.com/davidmc24/gradle-avro-plugin

Définissons maintenant le schéma Avro :


  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "type": "record",
  "name": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "avro.java.string": "String"
    

  ]

Notre ProducerService se concentrera uniquement sur l’envoi de messages à Kafka à l’aide d’un modèle, rien d’excitant à propos de cette partie. La fonctionnalité principale peut être effectuée simplement en utilisant cette ligne :

ListenableFuture<SendResult<String, RegisterRequest>> future = this.kafkaTemplate.send("register-request", kafkaMessage);

Nous ne pouvons pas oublier les propriétés de test :

spring:
  main:
    allow-bean-definition-overriding: true
  kafka:
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroDeserializer
    producer:
      auto.register.schemas: true
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroSerializer
    properties:
      specific.avro.reader: true

Comme nous le voyons dans les propriétés de test mentionnées, nous déclarons un désérialiseur/sérialiseur personnalisé pour KafkaMessages. Il est fortement recommandé d’utiliser Kafka avec Avro – ne laissez pas les JSON maintenir la structure de l’objet, utilisons un mappeur civilisé et une définition d’objet comme Avro.

Sérialiseur :

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient());
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) 
        super(new MockSchemaRegistryClient(), props);
    

Désérialiseur :

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) 
        super(new MockSchemaRegistryClient());
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) 
        super(new MockSchemaRegistryClient(), props);
    

Et nous avons tout pour commencer à écrire notre test.

@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 1, topics = "register-request")
class ProducerControllerTest {

Tout ce que nous avons à faire est d’ajouter l’annotation @EmbeddedKafka avec les rubriques et les partitions répertoriées. Application Context démarrera Kafka Broker avec la configuration fournie comme ça. Gardez à l’esprit que @TestInstance doit être utilisé avec une attention particulière. Lifecycle.PER_CLASS évitera de créer les mêmes objets/contexte pour chaque méthode de test. Cela vaut la peine de vérifier si les tests prennent trop de temps.

Consumer<String, RegisterRequest> consumerServiceTest;
@BeforeEach
void setUp() 
DefaultKafkaConsumerFactory<String, RegisterRequest> consumer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties();

consumerServiceTest = consumer.createConsumer();
consumerServiceTest.subscribe(Collections.singletonList(TOPIC_NAME));

Ici, nous pouvons déclarer le consommateur de test, basé sur le type de retour du schéma Avro. Toutes les propriétés Kafka sont déjà fournies dans le fichier .yml. Ce consommateur sera utilisé pour vérifier si le producteur a effectivement envoyé un message.

Voici la méthode de test réelle :

@Test
void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()
                .id(12)
                .address("tempAddress")
                .build();

        mockMvc.perform(
                post("/register-request")
                      .contentType("application/json")
                      .content(objectMapper.writeValueAsBytes(request)))
                .andExpect(status().isOk());

      ConsumerRecord<String, RegisterRequest> consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_NAME);

        RegisterRequest valueReceived = consumedRegisterRequest.value();

        assertEquals(12, valueReceived.getId());
        assertEquals("tempAddress", valueReceived.getAddress());
    

Tout d’abord, nous utilisons MockMvc pour effectuer une action sur notre endpoint. Ce point de terminaison utilise ProducerService pour envoyer des messages à Kafka. KafkaConsumer est utilisé pour vérifier si le producteur a fonctionné comme prévu. Et c’est tout – nous avons un test entièrement fonctionnel avec Kafka intégré.

Conteneurs de test – Test consommateur

Les TestContainers ne sont rien d’autre que des images docker indépendantes prêtes à être dockerisées. Le scénario de test suivant sera amélioré par une image MongoDB. Pourquoi ne pas conserver nos données dans la base de données juste après que quelque chose se soit passé dans le flux Kafka ?

Les dépendances ne sont pas très différentes de celles de l’exemple précédent. Les étapes suivantes sont nécessaires pour les conteneurs de test :

testImplementation 'org.testcontainers:junit-jupiter'
	testImplementation 'org.testcontainers:kafka'
	testImplementation 'org.testcontainers:mongodb'

ext 
	set('testcontainersVersion', "1.17.1")


dependencyManagement 
	imports 
		mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
	

Concentrons-nous maintenant sur la partie Consommateur. Le cas de test sera simple – un service client sera responsable de la réception du message Kafka et du stockage de la charge utile analysée dans la collection MongoDB. Tout ce que nous devons savoir sur KafkaListeners, pour l’instant, c’est cette annotation :

@KafkaListener(topics = "register-request")

Par la fonctionnalité du processeur d’annotation, KafkaListenerContainerFactory se chargera de créer un écouteur sur notre méthode. À partir de ce moment, notre méthode réagira à tout message Kafka à venir avec le sujet mentionné.

Les configurations de sérialiseur et de désérialiseur Avro sont les mêmes que dans le test précédent.

Concernant TestContainer, nous devrions commencer par les annotations suivantes :

@SpringBootTest
@ActiveProfiles("test")
@Testcontainers
public class AbstractIntegrationTest {

Au démarrage, tous les modules TestContainers configurés seront activés. Cela signifie que nous aurons accès à l’environnement d’exploitation complet de la source sélectionnée. A titre d’exemple :

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017);

À la suite du démarrage du test, nous pouvons nous attendre à ce que deux conteneurs Docker démarrent avec la configuration fournie.

Comment configurer le test d’intégration de Kafka – Grape Up

Ce qui est vraiment important pour le conteneur mongo – il nous donne un accès complet à la base de données en utilisant juste une simple connexion uri. Avec une telle fonctionnalité, nous sommes en mesure de jeter un œil à l’état actuel de nos collections, même en mode débogage et aux points d’arrêt préparés.
Jetez également un coup d’œil au conteneur Ryuk – il fonctionne comme overwatch et vérifie si nos conteneurs ont démarré correctement.

Et voici la dernière partie de la configuration :

@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl);


static 
   kafkaContainer.start();
   mongoDBContainer.start();

   mongoDBContainer.waitingFor(Wait.forListeningPort()
           .withStartupTimeout(Duration.ofSeconds(180L)));


@BeforeTestClass
public void beforeTest() 

   kafkaListenerEndpointRegistry.getListenerContainers().forEach(
           messageListenerContainer -> 
               ContainerTestUtils
                       .waitForAssignment(messageListenerContainer, 1);

           
   );


@AfterAll
static void tearDown() 
   kafkaContainer.stop();
   mongoDBContainer.stop();

DynamicPropertySource nous donne la possibilité de définir toutes les variables d’environnement nécessaires pendant le cycle de vie du test. Fortement nécessaire à des fins de configuration pour TestContainers. De plus, beforeTestClass kafkaListenerEndpointRegistry attend que chaque écouteur obtienne les partitions attendues lors du démarrage du conteneur.

Et la dernière partie du parcours des conteneurs de test Kafka – le corps principal du test :

@Test
public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("register-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").build());

   //Wait for KafkaListener
   TimeUnit.SECONDS.sleep(5);
   Assertions.assertEquals(1, taxiRepository.findAll().size());



private KafkaProducer<String, RegisterRequest> createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties());


private void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   try (KafkaProducer<String, RegisterRequest> producer = createProducer()) 
       Arrays.stream(registerRequests)
               .forEach(registerRequest -> 
                           ProducerRecord<String, RegisterRequest> record = new ProducerRecord<>(topicName, registerRequest);
                           producer.send(record);
                       
               );
   

Le producteur personnalisé est chargé d’écrire notre message à KafkaBroker. De plus, il est recommandé de laisser un peu de temps aux consommateurs pour traiter correctement les messages. Comme nous le voyons, le message n’était pas seulement consommé par l’auditeur, mais également stocké dans la collection MongoDB.

conclusion

Comme nous pouvons le voir, les solutions actuelles pour les tests d’intégration sont assez faciles à mettre en œuvre et à maintenir dans les projets. Il ne sert à rien de garder uniquement les tests unitaires et de compter sur toutes les lignes couvertes comme signe de qualité du code/de la logique. Maintenant la question est, devrions-nous utiliser une solution Embedded ou des TestContainers ? Je suggère tout d’abord de se concentrer sur le mot “Embedded”. En tant que test d’intégration parfait, nous voulons obtenir une copie presque idéale de l’environnement de production avec toutes les propriétés/fonctionnalités incluses. Les solutions en mémoire sont bonnes, mais la plupart du temps, pas suffisantes pour les grands projets d’entreprise. Sans aucun doute, l’avantage des services intégrés est le moyen facile d’implémenter de tels tests et de maintenir la configuration, juste au moment où quelque chose se passe en mémoire.
Les TestContainers à première vue peuvent sembler exagérés, mais ils nous donnent la fonctionnalité la plus importante, qui est un environnement séparé. Nous n’avons même pas besoin de nous fier aux images docker existantes – si nous le voulons, nous pouvons en utiliser des personnalisées. Il s’agit d’une amélioration considérable pour les scénarios de test potentiels.
Et Jenkins ? Il n’y a aucune raison d’avoir peur d’utiliser également TestContainers dans Jenkins. Je recommande fermement de consulter la documentation de TestContainers sur la facilité avec laquelle nous pouvons configurer la configuration des agents Jenkins.
Pour résumer – s’il n’y a pas de bloqueur ou de condition indésirable pour l’utilisation de TestContainers, alors n’hésitez pas. Il est toujours bon de garder tous les services gérés et sécurisés avec des contrats de test d’intégration.