Asynchroner, gepufferter Nachrichten-Austausch mit RabbitMQ und Spring

RabbitMQ ist ein Nachrichten-Broker mit integrierter Queue zur asynchronen Kommunikation zwischen Services. Der Produzenten-Service übergibt Nachrichten an RabbitMQ ohne auf den Konsumenten-Service zu warten. Der Konsument holt sich Nachrichten aus der RabbitMQ Queue nach Bedarf und kann so nicht vom Produzent mit Nachrichten überflutet werden. Hier zeige ich die Spring AMQP Implementierung zur RabbitMQ Integration.

Vorteile des asynchronen, gepufferten Nachrichten-Austauschs

  • Nachrichten Produzent und Konsument müssen nicht aufeinander warten. Wenn der Produzent z. B. für den Benutzer einen Auftrag erstellt, muss der Benutzer nicht warten bis der Konsument den Auftrag erfolgreich verarbeitet hat. Der Produzent übergibt die Nachricht an die RabbitMQ Queue und kann dem Benutzer danach eine Erfolgsmeldung anzeigen. Das ist insbesondere bei lang andauernden Auftragsverarbeitungen im Backend (Konsument) angenehm für den Benutzer, da er nicht warten muss. Der Konsument könnte den Benutzer später per Email über die erfolgreiche Verarbeitung des Auftrags informieren.
  • Die RabbitMQ Queue dient auch als Puffer zwischen Produzent und Konsument. Wenn innerhalb kurzer Zeit viele Nachrichten produziert werden, können diese in der Queue gesammelt werden. Der Konsument holt sich dann die Nachrichten in seinem Tempo aus der Queue und bricht so auf keinen Fall unter Überlast zusammen.
  • Wenn Produzent und Konsument als Dienste in der Cloud betrieben werden, könnte die Skalierung dieser Dienste anhand des Füllstandes in der RabbitMQ Queue gemanagt werden. Bei einer vollen Queue könnten Konsumenten Dienste hoch skaliert werden, während sie bei einer sich leerenden Queue herunter skaliert werden. Wir können also Ressourcen effizient nutzen. 

RabbitMQ im Docker Container starten

Zum schnellen Ausprobieren bietet sich RabbitMQ im Docker Container an. Wir starten RabbitMQ mit Management UI, damit wir die Queues im Browser prüfen können:

    docker run -d -p 15672:15672 -p 5672:5672 rabbitmq:3-management
  • 15672 ist der Management UI Port, den wir mittels Port Weiterleitung erreichbar machen. Mit dieser URL können wir das Management UI im Browser öffenen:
    http://localhost:15672/
    Benutzername und Password zum Login ist: guest / guest
  • 5672 ist der RabbitMQ Port, den unsere Spring Anwendung verwenden wird.
  • Weitere Details zu Docker findet ihr hier.
RabbitMQ Management UI

Nachrichten-Produzenten Implementierung

Maven & Spring Property Konfiguration

Bevor die Implementierung des Produzenten startet, benötigen wir die Spring Starter Dependency von Spring AMQP. Hier der entsprechende Ausschnitt aus der Maven Konfiguration:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

Außerdem konfigurieren wir in der Spring Properties-Datei application.yml die Verbindung zum RabbitMQ Server:

    spring.rabbitmq:
      host: localhost
      port: 5672
      password: guest
      username: guest

Produzenten Implementierung

Die Implementierung des Produzenten ist eine Spring Bean, welche Comment Objekte an RabbitMQ schickt:

@Service
public class CommentProducerService {

    @Autowired private RabbitTemplate rabbitTemplate;
    public void createComment() {
        var comment = Comment.builder()
                .message("New comment message created")
.author("Elmar")
.createdAt(LocalDateTime.now())
.build();
rabbitTemplate.convertAndSend(
                "comments", 
                comment);
    }
    ...
  • RabbitTemplate wird automatisch durch die Spring Boot Starter Dependency erzeugt.
  • convertAndSend schickt die Instanz der Klasse Comment zum RabbitMQ Server in die Queue mit dem Namen "comments". Diese Queue erzeugt ihr vorher im Management UI oder verwendet den Administrationscode weiter unten.
  • Die Klasse Comment ist ein Java POJO mit 2 String und einem LocalDateTime Attribute. Für die Instanziierung verwende ich einen Builder anstelle von Konstruktor und Setter-Methoden.

Nachrichten Serialisierung

Wenn wir die Nachrichten im gängigen JSON Format zur RabbitMQ Queue schicken wollen, müssen wir einen entsprechenden Konverter konfigurieren. Dazu erzeugen wir eine Spring Bean vom Typ Jackson2JsonMessageConverter:

@Configuration
public class RabbitmqConfig {
    @Bean
    SmartMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter(objectMapper());
    }
    @Bean
    ObjectMapper objectMapper() {
        var mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        return mapper;
    }
}
  • Die Jackson2JsonMessageConverter Bean benötigt eine ObjectMapper Instanz zum Schreiben von Nachrichten-Objekten als AMQP JSON Nachrichten.
  • Beim ObjectMapper musste das JavaTimeModule registriert werden, weil unsere Nachrichten Klasse Comment den Java 8 Datumstyp LocalDateTime verwendet.

Konsument an Queue registrieren

Der Konsument benötigt zur Deserialisierung der Comment Nachrichten die gleichen Konverter Beans wie der Produzent. Das Konsumieren von RabbitMQ Nachrichten aus einer Queue ist mit noch weniger Code als das Produzieren zu implementieren:

@RabbitListener(queues = "comments")
@Component
@Slf4j
public class CommentConsumerService {
    @RabbitHandler
    public void readComments(Comment comment) {
log.info(comment.toString());
    }
    ...
  • Die Annotation @RabbitListener definiert die Bean CommentConsumerService als Konsument der Queue mit dem Namen "comments".
  • Die Annotation @RabbitHandler legt fest, dass die Methode readComments Nachrichten des Typs Comment verarbeitet. Würde die Queue "comment" zusätzlich einfache String Objekte als Nachrichten enthalten, könnten diese von einer 2. Methode mit @RabbitHandler Annotation und String Parameter verarbeitet werden.
  • Die readComments Methode loggt hier lediglich die konsumierte Comment Nachricht.

RabbitMQ administrieren mit Spring

Spring bietet uns die Möglichkeit den RabbitMQ Nachrichten-Broker per Code zu administrieren. Hier zeige ich euch, wie ihr ohne Management UI eine neue Nachrichten Queue erstellt:

@Configuration
public class RabbitmqConfig {
    @Bean
    RabbitAdmin createRabbitAdminAndInitQueue(
            RabbitTemplate rabbitTemplate) {
        var admin = new RabbitAdmin(rabbitTemplate);
        if (admin.getQueueInfo("comments") != null)
            admin.purgeQueue("comments");
else
            admin.declareQueue(
                    QueueBuilder.durable("comments").build());
        return admin;
    }
    ...
  • RabbitAdmin ist die Bean zur Administration von RabbitMQ. Die Verbindung wird durch den Parameter RabbitTemplate festgelegt - wie auch schon zuvor beim Nachrichten Produzent.
  •  getQueueInfo verwende ich hier nur zum Prüfen, ob die Queue "comments" schon existiert.
  • purgeQueue löscht alle Nachrichten in einer Queue.
  • declareQueue erzeugt eine neue Queue basierend auf dem übergebenen Parameter. Die neue Queue wurde hier als dauerhafte Queue mit dem Namen "comments" definiert. Der QueueBuilder bietet natürlich noch weitere Optionen an, schaut euch dazu die public Methoden an.

Fazit

Die Vorteile von RabbitMQ bzw. eines Nachrichten-Brokers mit Queue habe ich am Anfang des Artikels bereits aufgelistet. In den Code Beispielen habe ich gezeigt, wie viel Programmierarbeit uns Spring abnimmt, da wir vieles per Konfiguration und Annotation einrichten können. 

Weitere Details zu Spring Integration findet ihr hier:
https://spring.io/projects/spring-integration

Den kompletten Code zum Artikel findet ihr in GitHub:
https://github.com/elmar-brauch/rabbitmq

Kommentare

Beliebte Posts aus diesem Blog

CronJobs mit Spring

OpenID Connect mit Spring Boot 3

Kernkonzepte von Spring: Beans und Dependency Injection