Asynchroner, gepufferter Nachrichten-Austausch mit RabbitMQ und Spring
RabbitMQ ist ein Nachrichten-Broker mit integrierter Queue zur asynchronen Kommunikation zwischen Services. Der Produzenten-Service übergibt Nachrichten an RabbitMQ ohne auf den Konsumenten-Service zu warten. Der Konsument holt sich Nachrichten aus der RabbitMQ Queue nach Bedarf und kann so nicht vom Produzent mit Nachrichten überflutet werden. Hier zeige ich die Spring AMQP Implementierung zur RabbitMQ Integration.
Vorteile des asynchronen, gepufferten Nachrichten-Austauschs
- Nachrichten Produzent und Konsument müssen nicht aufeinander warten. Wenn der Produzent z. B. für den Benutzer einen Auftrag erstellt, muss der Benutzer nicht warten bis der Konsument den Auftrag erfolgreich verarbeitet hat. Der Produzent übergibt die Nachricht an die RabbitMQ Queue und kann dem Benutzer danach eine Erfolgsmeldung anzeigen. Das ist insbesondere bei lang andauernden Auftragsverarbeitungen im Backend (Konsument) angenehm für den Benutzer, da er nicht warten muss. Der Konsument könnte den Benutzer später per Email über die erfolgreiche Verarbeitung des Auftrags informieren.
- Die RabbitMQ Queue dient auch als Puffer zwischen Produzent und Konsument. Wenn innerhalb kurzer Zeit viele Nachrichten produziert werden, können diese in der Queue gesammelt werden. Der Konsument holt sich dann die Nachrichten in seinem Tempo aus der Queue und bricht so auf keinen Fall unter Überlast zusammen.
- Wenn Produzent und Konsument als Dienste in der Cloud betrieben werden, könnte die Skalierung dieser Dienste anhand des Füllstandes in der RabbitMQ Queue gemanagt werden. Bei einer vollen Queue könnten Konsumenten Dienste hoch skaliert werden, während sie bei einer sich leerenden Queue herunter skaliert werden. Wir können also Ressourcen effizient nutzen.
RabbitMQ im Docker Container starten
- 15672 ist der Management UI Port, den wir mittels Port Weiterleitung erreichbar machen. Mit dieser URL können wir das Management UI im Browser öffenen:
http://localhost:15672/
Benutzername und Password zum Login ist: guest / guest - 5672 ist der RabbitMQ Port, den unsere Spring Anwendung verwenden wird.
- Weitere Details zu Docker findet ihr hier.
Nachrichten-Produzenten Implementierung
Maven & Spring Property Konfiguration
Produzenten Implementierung
- RabbitTemplate wird automatisch durch die Spring Boot Starter Dependency erzeugt.
- convertAndSend schickt die Instanz der Klasse Comment zum RabbitMQ Server in die Queue mit dem Namen "comments". Diese Queue erzeugt ihr vorher im Management UI oder verwendet den Administrationscode weiter unten.
- Die Klasse Comment ist ein Java POJO mit 2 String und einem LocalDateTime Attribute. Für die Instanziierung verwende ich einen Builder anstelle von Konstruktor und Setter-Methoden.
Nachrichten Serialisierung
- Die Jackson2JsonMessageConverter Bean benötigt eine ObjectMapper Instanz zum Schreiben von Nachrichten-Objekten als AMQP JSON Nachrichten.
- Beim ObjectMapper musste das JavaTimeModule registriert werden, weil unsere Nachrichten Klasse Comment den Java 8 Datumstyp LocalDateTime verwendet.
Konsument an Queue registrieren
- Die Annotation @RabbitListener definiert die Bean CommentConsumerService als Konsument der Queue mit dem Namen "comments".
- Die Annotation @RabbitHandler legt fest, dass die Methode readComments Nachrichten des Typs Comment verarbeitet. Würde die Queue "comment" zusätzlich einfache String Objekte als Nachrichten enthalten, könnten diese von einer 2. Methode mit @RabbitHandler Annotation und String Parameter verarbeitet werden.
- Die readComments Methode loggt hier lediglich die konsumierte Comment Nachricht.
RabbitMQ administrieren mit Spring
- RabbitAdmin ist die Bean zur Administration von RabbitMQ. Die Verbindung wird durch den Parameter RabbitTemplate festgelegt - wie auch schon zuvor beim Nachrichten Produzent.
- getQueueInfo verwende ich hier nur zum Prüfen, ob die Queue "comments" schon existiert.
- purgeQueue löscht alle Nachrichten in einer Queue.
- declareQueue erzeugt eine neue Queue basierend auf dem übergebenen Parameter. Die neue Queue wurde hier als dauerhafte Queue mit dem Namen "comments" definiert. Der QueueBuilder bietet natürlich noch weitere Optionen an, schaut euch dazu die public Methoden an.
Fazit
https://spring.io/projects/spring-integration
https://github.com/elmar-brauch/rabbitmq
Kommentare