Parallele Datenverarbeitung mit Threads / CompletableFuture in der Praxis
Lang laufende Rechenoperationen oder Datentransformationen können durch parallele Ausführung erheblich beschleunigt werden. In diesem Artikel zeige ich, wie ich den Code für eine Datenbank Schemata Migration mit Hilfe von CompletableFuture und Paging von circa 15 Stunden Laufzeit auf 45 Minuten beschleunigt habe.
15 Stunden Laufzeit - warum nicht?
Wenn es keinen stört, sind 15 Stunden Laufzeit für eine Datentransformation kein Problem.
Unsere Datenbank Schemata Migration wurde aber bei einen produktiven System benötigt, das von Telekom Kunden genutzt wird. Leider mussten wir das produktive System während der Migration anhalten, so dass wir vor Kunde eine Downtime hatten - in diesem Fall wären 15 Stunden verständlicherweise inakzeptabel gewesen.
Ich gebe zu, dass wir im Vorfeld schon Design-Fehler gemacht. Diese Fehler haben uns in die Situation gebracht, dass eine Datenbank Schemata Migration überhaupt notwendig war - das künftig zu verhindern soll hier aber nicht mein Thema sein.
Den Code für die Migration haben wir in Java geschrieben. Für Lese- und Schreib-Operationen auf der Datenbank haben wir Spring Data verwendet. Spring Data habe ich in diesem Artikel vorgestellt: mongodb-mit-Spring-Data.html (Statt MongoDB wurde hier eine MariaDB verwendet).
Vermutlich hätte man die Migration auch mit ziemlich komplizierten SQL Statements machen können. Das wäre wahrscheinlich deutlich schneller gewesen. Es gibt aber immer viele Wege und ich möchte hier unseren Weg zeigen, bei dem die Tuning Möglichkeiten durch parallele Verarbeitung schön zur Geltung kommen.
Vermutlich hätte man die Migration auch mit ziemlich komplizierten SQL Statements machen können. Das wäre wahrscheinlich deutlich schneller gewesen. Es gibt aber immer viele Wege und ich möchte hier unseren Weg zeigen, bei dem die Tuning Möglichkeiten durch parallele Verarbeitung schön zur Geltung kommen.
Die langsame Lösung ohne Threads
Die Ausgangssituation war der folgende Code, der komplett im Haupt-Thread (main) ausgeführt wird und auf weitere Threads verzichtet:
@Component
@Slf4j
public class OneThreadMigrator {
@Autowired private ShoppingCartRepository dataRepo;
public void migrateCarts() {
log.info("Read all carts with single request...");
long start = System.currentTimeMillis();
Iterable<ShoppingCart> carts = dataRepo.findAll();
for (ShoppingCart cart : carts) {
migrateSingleCart(cart);
}
log.info("Migration completed in {} seconds!",
((System.currentTimeMillis() - start)/1000));
((System.currentTimeMillis() - start)/1000));
}
private void migrateSingleCart(ShoppingCart cart) {
log.info("Cart with id {} is in progress...", cart.getId());
// Doing complicated and time consuming migration logic.
// Simulated by save method.
dataRepo.save(cart);
}
}
- Die Klasse OneThreadMigrator ist eine Spring Bean. Spring ist hier aber nicht das Thema und müsste auch nicht verwendet werden. Weitere Infos zu @Service und @Autowired findet ihr in diesem Artikel kernkonzepte-von-spring.html
- @Slf4j ist eine Lombok Annotation, die einen Logger in der Variablen log zur Verfügung stellt.
- Hinter der Variablen dataRepo verbirgt sich eine Spring Data CrudRepository Bean, die verwendet wird, um lesend oder schreibend auf die Datenbank zuzugreifen.
Die Methode findAll liest alle Datensätze, die Spring Data in dieses Repository gemappt hat - standardmäßig der Inhalt einer Tabelle. - Mit save wird eine Instanz (hier von der Klasse ShoppingCart), in die Datenbank geschrieben - es kann eine Neuanlage oder eine Aktualisierung der Instanz sein.
Vorteile und Nachteile der Lösung ohne Threads
Die zuvor gezeigte Lösung ohne Threads hat den Vorteil, dass sie leicht verständlich ist: Loggen, Zeitmessung, Iteration über alle ShoppingCart Datensätze und diese dabei anpassen und abspeichern, fertig.
Der Nachteil ist die lange Laufzeit. Die Abfrage aller ShoppingCarts (findAll) kann bei großen Tabellen sehr lange dauern. Während dieser Abfrage ist der main-Thread blockiert und das Migrations-Tool macht nichts außer warten. Die Methode migrateSingleCart hat ebenfalls Datenbank-Interaktion, die den main-Thread blockiert, so dass auch hier Zeit verschenkt wird. So leicht verständlich das oben gezeigte Programm auch ist, die meiste Zeit wartet es auf Antworten der Datenbank. 😴
Die schnelle Lösung mit Threads
Mit Threads und Pagination können wir beide Nachteile der langsamen Lösung adressieren und somit die Laufzeit des Migrationsprogramms erheblich reduzieren.
Pagination
Mit Pagination können große Datenbanktabellen in Teilen bzw. Seitenweise abgefragt werden. Hat eine Datenbanktabelle 1000 Zeilen, dann kann ich sie mit 10 Seiten zu je 100 Zeilen abfragen. Das hat den Vorteil, dass ich für 100 Zeilen deutlich schneller eine Antwort bekommen als für 1000. Der Nachteil ist, dass ich statt einer Abfrage 10 Abfragen benötige.
Insbesondere im Zusammenspiel mit Threads zeigen sich die Vorteile von Pagination. Ohne Pagination würden alle Threads lange warten bis alle Datensätze gelesen wurden und verarbeitet werden können. Mit Pagination werden kleine Mengen von Datensätzen (Page bzw. Seite) schnell und nacheinander ausgelesen. Jede Seite von Datensätzen kann dann an einen Thread übergeben werden, so dass dieser bereits Daten verarbeitet, während die folgenden Seiten noch ausgelesen werden.
CompletableFuture
Asynchrone Berechnungen können seit Java 8 mit der CompletableFuture Klasse gemacht werden. Ein Vorteil im Vergleich zu klassischen Threads ist, dass man CompletableFutures einfacher miteinander kombinieren kann. Im folgenden Code verwende ich CompletableFutures, um die vielen Datenbank-Interaktionen in unterschiedlichen Threads zu machen. Durch die Verteilung auf verschiedene Threads ist nicht der gesamte Algorithmus blockiert, während ein Thread auf die Antwort der Datenbank wartet.
@Component
@Slf4j
public class MultiThreadMigrator {
@Autowired private ShoppingCartPageableRepository dataRepo;
private final Executor threadPool =
Executors.newFixedThreadPool(25);
Executors.newFixedThreadPool(25);
private static final int PAGE_SIZE = 10;
public void migrateCarts() {
log.info("Read all carts with pagination...");
long start = System.currentTimeMillis();
List<CompletableFuture<Void>> threadList = new ArrayList<>();
Pageable pageable = PageRequest.of(0, PAGE_SIZE);
do {
log.info("Request page {}.", pageable.getPageNumber());
var page = dataRepo.findAll(pageable);
threadList.add(CompletableFuture.runAsync(
() -> migrateCartPage(page), threadPool));
pageable = getNextPageable(page);
} while(pageable != null);
log.info("All pages passed to threads.");
CompletableFuture.allOf(threadList.toArray(
CompletableFuture[]::new)).join();
log.info("All Migration threads completed in {} seconds!",
((System.currentTimeMillis() - start)/1000));
((System.currentTimeMillis() - start)/1000));
}
private Pageable getNextPageable(Page<ShoppingCart> page) {
if (page.hasNext())
return page.getPageable().next();
return null;
}
private void migrateCartPage(Page<ShoppingCart> page) {
try {
for (ShoppingCart shoppingCart : page) {
log.info("Cart with id {} is in progress...",
shoppingCart.getId());
shoppingCart.getId());
// Doing complicated migration logic.
// Simulated by save method.
dataRepo.save(shoppingCart);
}
log.info("Successful migration of CART-page {} of {}",
page.getNumber(), page.getTotalPages());
page.getNumber(), page.getTotalPages());
} catch (Exception ex) {
log.error("Error during migration of CART-page "
+ page.getNumber(), ex);
}
}
}
Bevor wir in die Details des MultiThreadMigrator gehen, eine kurze Beschreibung des Algorithmus:
- Lese seitenweise alle ShoppingCarts aus der Datenbank
- Für jede Seite mit ShoppingCarts starte einen Thread, der die ShoppingCarts verarbeitet
- Warte bis alle Threads fertig sind und logge die Migrationsdauer in Sekunden
Das folgende Sequenz-Diagramm zeigt den Ablauf: Wann neue Threads erstellt werden und wann diese Threads mit der Datenbank interagieren.
MultiThreadMigrator mit 3 Seiten zu je 2 ShoppingCarts |
Pagination im MultiThreadMigrator Code
- Um seitenweises Auslesen von ShoppingCart Datensätzen aus der Datenbank zu ermöglichen, implementiert ShoppingCartPageableRepository das von Spring Data bereitgestellte Interface PagingAndSortingRepository. PagingAndSortingRepository bietet die Methode findAll mit einem Parameter vom Type Pageable an, welche die Pagination-Funktionalität für uns zur Verfügung stellt.
- Pageable definiert die genaue Seite mit den Datensätzen, die wir aus einer Tabelle lesen wollen. PageRequest.of(0, 10) erzeugt eine das Pageable Interface implementierende Instanz von PageRequest, welche die erste Seite (mit 0 indexiert - analog zu Arrays) mit 10 Datensätzen liefert.
- Page ist der Rückgabetyp von findAll mit dem PageRequest als Parameter. Die Page Instanz enthält hier 10 Datensätze, die wir dann verarbeiten können. Bevor ich die nächsten 10 Datensätze lese, prüfe ich zuerst mit hasNext auf dem Page Objekt, ob es eine nächste Seite gibt. Den PageRequest für die nächste Seite bekomme ich, indem ich auf dem Page Objekt getPageable().next() aufrufe. Die nächste Seite mit den nächsten 10 Datensätze lese ich wieder mit findAll am PagingAndSortingRepository.
CompletableFuture im MultiThreadMigrator Code
- CompletableFuture.runAsync starten einen asynchronen Thread als CompletableFuture. Die asynchron auszuführende Methode migrateCartPage habe ich als Lambda Ausdruck an den CompletableFuture übergeben. Der Rückgabewert von der statischen Methode runAsync ist die CompletableFuture Instanz, die den Thread ausführt. CompletableFuture bietet einige Methoden an mit denen wir uns über den Zustand des Threads informieren können, z.B. isDone um Anzuzeigen, ob der Thread mit oder ohne Fehler beendet oder abgebrochen wurde.
- Neben dem Lambda Ausdruck übergebe ich runAsync noch einen optionalen Parameter Executor. Der Executor führt die übergebenen Lambda Ausdrücke (Runnable) aus und fungiert hier als Thread-Pool. Solange freie Threads im Pool verfügbar sind, werden sie ausgeführt. Ansonsten werden die Runnables gesammelt und erst dann ausgeführt, wenn Threads nach ihrer Beendigung wieder im Pool verfügbar sind. Der Thread-Pool hier im Code hat 25 Threads und wurde mit Executors.newFixedThreadPool(25) erstellt. Wird kein Executor an runAsync übergeben, wird standardmäßig der gemeinsame Thread-Pool verwendet.
- Ich sammle alle erstellten CompletableFuture Instanzen in einer Liste (threadList), um später zu prüfen, ob alle abgeschlossen sind. Die Liste der CompleteableFuture Instanzen (threadList) wird in einen Array umgewandelt, um dann mittels der Methode allOf in ein neues CompleteableFuture Objekt überführt zu werden. Der neue CompleteableFuture nimmt den Zustand abgeschlossen (completed) an, wenn alle CompleteableFuture Instanzen in dem übergebenen Array im Zustand abgeschlossen sind.
- Mittels join warte ich bis alle CompleteableFuture Instanzen bzw. die mit allOf erzeugte Instanz abgeschlossen ist. Hätten die CompleteableFuture Instanzen ein Ergebnis, würde join es zurückgeben - das nutze ich hier aber nicht.
Vorteile und Nachteile der Lösung mit Threads
Die Vor- und Nachteile sind im Prinzip die Gegenteile der Vor- und Nachteile der Lösung ohne Threads. Der Code wird durch Threads komplexer und ist auch schwerer zu Debuggen. Je mehr Erfahrung man als Software-Entwickler sammelt, desto leichter wird aber der Umgang mit Threads.
Die Vorteile sind ganz klar die deutlich schnellere Laufzeit durch die parallele Ausführung. In GitHub findet ihr neben dem hier gezeigten Code auch noch Repository-Klassen, die eine simulierte Datenbank verwenden. Die simulierte Datenbank benötigt 500 Millisekunden bzw. eine halbe Sekunde, um einen Datensatz in die Datenbank zu schreiben. Zum Lesen eines Datensatzes benötigt sie 200 Millisekunden.
Wenn ihr nun die Spring Boot Anwendung ausführt, läuft zuerst die synchrone Migration und dann die asynchrone. Bei mir gab es folgende Laufzeiten:
- 100 Datensätze am Stück lesen und dann synchron der Reihe nach Schreiben: 70 Sekunden
- 100 Datensätze seitenweise mit 10 Datensätzen pro Seite auslesen und dann die Datensätze jeder Seite in einem Thread pro Seite in die Datenbank schreiben: 25 Sekunden
Tuning mit Pagination und Thread-Pool
Im einfachen Beispiel mit 100 Datensätzen hatten wir durch die Verwendung von Threads einen Unterschied von ungefähr Faktor 3. Die Datenbank-Migration in Produktion konnte ich mit den hier gezeigten Prinzipien von 15 Stunden auf 45 Minuten beschleunigen - also Faktor 20!
Je nach Situation können hier aber auch ganz andere Unterschiede rauskommen.
Den hier gezeigten Algorithmus kann man an 2 Stellen tunen:
- Die Anpassung der Seitengröße hat eine direkte Auswirkung auf die Laufzeit. Die optimale Größe der Seiten ist aber fast immer anders. Je größer die Seite, desto länger dauert es bis die Threads starten. Je kleiner die Seite, desto mehr Performance-Verlust hat man durch den Thread-Overhead.
- Die Größe des Thread-Pools legt fest wie viele Threads das Programm benutzt. Die optimale Größe hängt hier mit der Anzahl eurer Kerne im Prozessor zusammen.
Fazit
In diesem Artikel haben wir gesehen, wie mit Threads und paralleler Ausführung Algorithmen deutlich schneller werden. Im Fall einer Datenbank-Migration habe ich zusätzlich Pagination verwendet, um die Threads besser füttern zu können. Dadurch konnte der Migration-Code um den Faktor 20 beschleunigt werden.
Sicherlich gibt es noch andere Möglichkeiten eine Datenbank-Migration schneller zu machen, z.B. indem man auf der Datenbank SQL-Migrationsskripte ausführt. Auch die reaktive Programmierung ermöglicht noch größere Tunings, schaut euch dazu meine anderen Blog-Beiträge an:
Hier ging es mir aber in erster Linie darum zu zeigen, wie man Threads bzw. CompletableFutures verwenden kann. Damit ihr künftig in ähnlichen Situationen wisst, wie ihr diese Technik einsetzen könnt.
Den kompletten Code zum Artikel findet ihr hier:
Buch zur Programmierung mit Threads in Java |
Kommentare