Technipelago AB

Technipelago Blog Stuff that we learned...


Micronaut in a Spring Kafka environment

Publicerad den 25 sep 2018

Spring Kafka encode message headers in a way that makes it incompatible with Micronaut (1.0.0.M4). This article provide one solution to the problem.

When Spring's Kafka implementation sends a message to Kafka it encode header values as JSON. So a simple string hello is written to the header as "hello". When Micronaut Kafka implementation recieves the message it will read the header value including the double quotes. So you header value is now ""hello"". This is probably not what you expected.

The other way around, when Micronaut sends a message to Kafka it just writes string headers as a sequence of bytes.When Spring Kafka decodes the header it will get a byte array instead of a String. And if the implementation expects a String and do messageHeaders.get("headername", String.class) an exception will be thrown.

If you only allow string values in your Kafka headers there is an easy solution on the Spring side to make it compatible with Micronaut (and other non-spring services). But this solution alters the message format for all messages in you cluster so it may not be an option in your environment.

The proposed solution is to add a custom KafkaHeaderMapper in each Spring service that read/wrie messages to Kafka.

 

    @Bean
    public KafkaHeaderMapper myKafkaHeaderMapper() {
        return new KafkaHeaderMapper() {
            @Override
            public void fromHeaders(MessageHeaders headers, Headers target) {
                for (Map.Entry<String, Object> entry : headers.entrySet()) {
                    target.add(entry.getKey(), entry.getValue().toString().getBytes());
                }
            }

            @Override
            public void toHeaders(Headers source, Map<String, Object> target) {
                for (org.apache.kafka.common.header.Header header : source) {
                    target.put(header.key(), new String(header.value()));
                }
            }
        };
    }

 

Creating the bean is not enough, you must also add a reference to the bean name in application.yml.

spring:
  cloud:
    stream:
      kafka:
        binder:
          headerMapperBeanName: myKafkaHeaderMapper

 

Now the Spring service will expect header values to be just plain byte arrays and it will map to/from String accordingly.

 

Original issue https://github.com/micronaut-projects/micronaut-core/issues/605

Tags: spring micronaut kafka


« Tillbaka