Spring Kafka filtering messages by header on a listener level - TagMerge
3Spring Kafka filtering messages by header on a listener levelSpring Kafka filtering messages by header on a listener level

Spring Kafka filtering messages by header on a listener level

Asked 1 years ago
1
3 answers

Creating custom annotations is a pretty advanced topic; you would need to subclass the annotation bean post processor and come up with some mechanism to customize the endpoint by adding the filter strategy bean.

Feel free to open a new feature request on GitHub https://github.com/spring-projects/spring-kafka/issues

We could add a new property to pass the bean name of a RecordFilterStrategy bean from the @KafkaListener.

EDIT

I see you opened an issue; thanks.

Here is a work around to add the filters later...

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300", autoStartup = "false")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300", autoStartup = "false")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> xxx() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "xxx".getBytes());
        };
    }

    @Bean
    RecordFilterStrategy<String, String> yyy() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "yyy".getBytes());
        };
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> xxx, RecordFilterStrategy<String, String> yyy,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);

            updateListener("xxx", xxx, registry);
            updateListener("yyy", yyy, registry);
            registry.start();
        };
    }

    private void updateListener(String id, RecordFilterStrategy<String, String> filter,
            KafkaListenerEndpointRegistry registry) {

        MessageListener listener = (MessageListener) registry.getListenerContainer(id).getContainerProperties()
                .getMessageListener();
        registry.getListenerContainer(id).getContainerProperties()
                .setMessageListener(new FilteringMessageListenerAdapter<>(listener, filter));
    }

}
1:test.to.xxx
2:test.to.yyy

Source: link

0

Maven
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.8.3</version>
</dependency>
Gradle
compile 'org.springframework.kafka:spring-kafka:2.8.3'
Maven
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
Gradle
compile 'org.springframework.kafka:spring-kafka'
Java
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

Source: link

0

Let’s start by adding spring-kafka dependency to our pom.xml:
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.5.2.RELEASE</version>
</dependency>
Let’s have a look at the producer configuration first:
@Configuration
class KafkaProducerConfig {

  @Value("${io.reflectoring.kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      StringSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
}
Now that our producer config is ready, let’s create a configuration for the consumer:
@Configuration
class KafkaConsumerConfig {

  @Value("${io.reflectoring.kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class);
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }
}
Spring Boot does most of the configuration automatically, so we can focus on building the listeners and producing the messages. It also provides the option to override the default configuration through application.properties. The Kafka configuration is controlled by the configuration properties with the prefix spring.kafka.*:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
A topic must exist to start sending messages to it. Let`s now have a look at how we can create Kafka topics:
@Configuration
class KafkaTopicConfig {

  @Bean
  public NewTopic topic1() {
    return TopicBuilder.name("reflectoring-1").build();
  }

  @Bean
  public NewTopic topic2() {
    return TopicBuilder.name("reflectoring-2").build();
  }
  ...
}

Source: link

Recent Questions on spring

    Programming Languages