фильтрация сообщений в топике делается оч просто с помощью стримов
KStream<String, String> source =
builder.stream("book-created");
KStream<String, String> filtered = source.filter((key, val) -> key.equals(correlationId));
filtered.foreach((key,value)->{
System.out.println(value);
});