Intégration de Kafka dans une application Spring Boot - Allons plus loins

Dans l’article précédent, nous avons vu comment intégré facilement Kafka dans une application spring boot.

Et à la fin je vous avais donné des pistes pour aller plus loins :

Ce sont ces dernières que nous allons explorer au cours de cet article.

Dead Letter Queue & Retry

Il peut arriver que des erreurs surviennent lors du traitement des messages : données invalides, service tiers indisponible, problème réseau, etc.

Dans ces cas, deux stratégies sont couramment utilisées :

  1. Retry : réessayer le traitement du message un certain nombre de fois avant de le considérer comme irrécupérable.
  2. Dead Letter Queue (DLQ) : envoyer les messages en échec dans un topic dédié pour analyse ou re-traitement ultérieur.

Pourquoi utiliser une DLQ ?

Une Dead Letter Queue permet de ne pas bloquer le flux global.
Sans DLQ, un message problématique peut provoquer un blocage complet du consommateur s’il revient sans cesse dans le flux.
Avec une DLQ :

Modification de la configuration

Pour mettre en place la DLQ, nous allons reprendre le code du consumer et de la configuration de l’article précédent et les completer.

@Configuration
public class KafkaConsumerConfig {

    @Value("${app.kafka.dlt-topic}")
    private String dltTopic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory,
            KafkaTemplate<String, String> kafkaTemplate) { // Inject KafkaTemplate

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory);
        
        // Active le mode Ack manuel
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        // Configure le DefaultErrorHandler avec DeadLetterPublishingRecoverer
        // FixedBackOff(0L, 0) signifie pas de re-tentative avant d'envoyer au DLT
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord, e) -> {
                // Définir le topic DLQ et la partition (null pour laisser Kafka choisir)
                return new TopicPartition(dltTopic, -1);
            }),
            new FixedBackOff(0L, 0)
        );
        factory.setCommonErrorHandler(errorHandler);

        
        return factory;
    }
}

nouvelle version de la configuration

📌 Points clés

Modification du consumer principal

@KafkaListener(
        topics = "${app.kafka.topic}",
        groupId = "${spring.kafka.consumer.group-id}",
        containerFactory = "kafkaListenerContainerFactory"
)
public void listen(String message, Acknowledgment acknowledgment) {
    LOG.info("Message reçu : {}", message);

    if (message.contains("erreur")) {
        LOG.error("Erreur simulée pour le message : {}", message);
        throw new RuntimeException("Erreur de traitement simulée");
    }

    LOG.info("Message traité avec succès : {}", message);
    acknowledgment.acknowledge();
}

📌 Ici, nous simulons une erreur pour forcer le passage en DLQ.
L’exception déclenchée est interceptée par notre DefaultErrorHandler, qui publie le message dans le topic DLQ.

Listener pour la DLQ

@KafkaListener(
        topics = "${app.kafka.dlt-topic}",
        groupId = "${spring.kafka.consumer.group-id}",
        containerFactory = "kafkaListenerContainerFactory"
)
public void listenDlt(String message, Acknowledgment acknowledgment) {
    LOG.warn("Message reçu de la DLQ : {}", message);

    try {
        LOG.info("Tentative de re-traitement du message DLQ : {}", message);

        if (message.contains("re-erreur")) {
            throw new RuntimeException("Re-échec simulé");
        }

        LOG.info("Message de la DLQ re-traité avec succès");
        acknowledgment.acknowledge();
    } catch (Exception e) {
        LOG.error("Échec du re-traitement DLQ : {}", message, e);
        acknowledgment.acknowledge();
    }
}

📌 Ce listener est dédié au topic DLQ.
Il permet de tenter un retry manuel et d’appliquer une logique différente de celle du flux principal.

Avec cette configuration, tout message en erreur est automatiquement envoyé dans un topic DLQ, où il peut être surveillé, corrigé ou retraité, tout en laissant le reste du flux Kafka tourner normalement.

Idempotence & transactions

L’idempotence permet de garantir qu’un message produit ne sera pas envoyé plusieurs fois, même en cas de retry côté producer.
Les transactions Kafka vont plus loin : elles permettent de grouper plusieurs opérations (lecture, traitement, écriture) dans une seule unité atomique.

Concepts Clés

  1. Idempotence du Producteur (Producer Idempotence) :
    • Garantit que les messages envoyés par un producteur à une seule partition sont livrés exactement une fois, même en cas de retries du producteur.
    • C’est une fonctionnalité intégrée à Kafka, activée par une simple configuration.
  2. Transactions Kafka (Kafka Transactions) :
    • Permettent d’envoyer des messages à plusieurs topics/partitions de manière atomique, ou d’envoyer des messages et de committer les offsets de consommation de manière
      atomique avec d’autres opérations (par exemple, des écritures en base de données).
    • C’est la clé pour les garanties “exactement une fois” de bout en bout.
  3. Idempotence au niveau de l’Application (Consumer-Side Idempotence) :
    • Même avec les transactions Kafka, un consommateur peut toujours recevoir le même message plusieurs fois (par exemple, après un crash et un redémarrage).
    • L’application consommatrice doit être conçue pour détecter et ignorer les messages déjà traités. Cela implique généralement de stocker un identifiant unique du message dans
      une base de données et de vérifier sa présence avant tout traitement.

Idempotence & Transactions côté Producer

Lorsque l’on produit des messages vers Kafka, un retry côté producer (par exemple en cas de perte réseau) peut générer des doublons dans le topic.
Pour éviter ce problème, Kafka propose un mode idempotent, qui garantit qu’un message sera écrit une seule fois même si l’envoi est retenté.

En complément, l’utilisation de transactions Kafka permet de grouper plusieurs envois dans une seule unité atomique :

Configuration

Dans application.properties, on active l’idempotence et on définit un préfixe pour l’ID transactionnel :

spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.transaction-id-prefix=tx-sender-
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

j’ai également ajouté un ùJsonSerializer` car pour cet exemple j’ai fait évoluer le producer pour qu’il envoie un objet au lieu d’un simple String.

Implémentation du Producer transactionnel

public void sendMessage(Message message) {
    LOG.info("Envoi du message : {}", message);

    kafkaTemplate.executeInTransaction(kafkaOperations -> {
        kafkaOperations.send(topic, message)
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        LOG.error("Erreur lors de l'envoi du message : {}", message, ex);
                    } else {
                        LOG.info("Message envoyé : topic={}, partition={}, offset={}",
                                result.getRecordMetadata().topic(),
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset());
                    }
                });
        return true;
    });
}

Idempotence & Transactions côté Consumer

L’idempotence côté producer évite les doublons à l’envoi, mais ce n’est pas suffisant :

Pour éviter de traiter deux fois le même message, il faut mettre en place un contrôle d’unicité côté consumer.

Stratégie

Nous allons stocker l’ID de chaque message traité dans une base (ici H2 en mémoire pour l’exemple).
Avant chaque traitement, le consumer vérifie si l’ID est déjà présent :

Cela permet de garantir l’idempotence même après un redémarrage ou en cas de relecture depuis la DLQ.

Configuration

La configuration n’a pas évoluer depuis la partie précédente, si ce n’est le passage à notre nouvel objet Message.

@Configuration
public class KafkaConsumerConfig {

    @Value("${app.kafka.dlt-topic}")
    private String dltTopic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory( ConsumerFactory<String, Message> consumerFactory,
                                                                                                   KafkaTemplate<String, Message> kafkaTemplate) { // Changé à Message

        ConcurrentKafkaListenerContainerFactory<String, Message> factory = // Changé à Message
                new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory);
        
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate, (consumerRecord, e) -> new TopicPartition(dltTopic, -1)),
            new FixedBackOff(0L, 0)
        );
        factory.setCommonErrorHandler(errorHandler);
        
        return factory;
    }
}

Implémentation du Consumer idempotent

@Transactional
@KafkaListener(
        topics = "${app.kafka.topic}",
        groupId = "${spring.kafka.consumer.group-id}",
        containerFactory = "kafkaListenerContainerFactory"
)
public void listen(Message message, Acknowledgment acknowledgment) {
    if (message == null || message.id() == null) {
        LOG.warn("Received message or message ID is null, cannot process for idempotence.");
        acknowledgment.acknowledge();
        return;
    }

    if (processedMessageRepository.existsById(message.id())) {
        LOG.info("Message with ID {} has already been processed, skipping.", message.id());
        acknowledgment.acknowledge();
        return;
    }

    LOG.info("Message reçu : {}", message);

    // Simuler une erreur pour les messages contenant "erreur"
    if (message.message() != null && message.message().contains("erreur")) {
        LOG.error("Simulating processing error for message: {}", message);
        throw new RuntimeException("Erreur de traitement simulée pour le message: " + message);
    }

    // Traitement du message (si pas d'erreur)
    LOG.info("Message traité avec succès : {}", message);
    processedMessageRepository.save(new ProcessedMessage(message.id()));
    acknowledgment.acknowledge();
}

Conclusion

La mise en place combinée de mécanismes de reprise (Retry et Dead Letter Queue) et de garanties d’idempotence avec transactions Kafka permet d’atteindre un haut niveau de fiabilité dans le traitement des messages.

En combinant ces approches :

  1. On isole les messages en échec pour éviter de bloquer le flux global.
  2. On évite la duplication qui pourrait corrompre les données ou provoquer des effets indésirables.
  3. On garantit la cohérence des traitements, même en cas de panne ou de scénarios imprévus.

Dans un système distribué, où la défaillance est une possibilité permanente, cette combinaison d’outils est une assurance :
le système devient résilient, prévisible et capable de se remettre des aléas sans perte ni incohérence.