Особливості сортування великих файлів на Java
Всім привіт. Я Сергій Моренець, розробник, викладач, спікер та технічний письменник, хочу поділитися з вами своїм досвідом роботи з такою цікавою темою, як сортування даних. Сортування — одне з найпопулярніших алгоритмів, які зустрічаються як у роботі, так і на технічних співбесідах. У той самий час впевнений, для більшостi прикладних завдань досить знати метод Collections.sort() в Java чи вираз " ORDER BY " в SQL.
Але мені на співбесідах вже кілька разів зустрічалося те саме питання — як відсортувати великий (або дуже великий) файл з даними (зазвичай рядками) в умовах обмеженого обсягу пам’яті. Раз він зустрічався мені, значить він міг зустрітися і будь-кому, тому я хотів би поділитися своїми напрацюваннями і досвідом у вирішенні цього завдання, тим більше, що ми торкаємося цієї теми на деяких наших тренінгах.
Відомо, що будь-яке рішення у світі ПЗ може вичавити по максимуму два з трьох показників (час розробки, якість та вартість), тому в цій статті я хотів би розглянути не тільки класичний варіант із зовнішнім сортуванням, але й економічніший у плані часу розробки — використання бази даних. Ті benchmarks, які я підготував, дадуть відповідь на запитання — наскільки доречно використовувати те чи інше рішення. Зрозуміло, зараз є велика кількість технологій, що працюють у світі BigData і спеціалізуються на подібних завданнях, але мені хотілося розглянути класичні алгоритми та структури даних (тобто, що і потрібно на співбесіді).
Зовнішнє сортування
Впевнений, кожен розробник знає хоча б один алгоритм сортування. Сортування бульбашкою, вставками, злиттям, швидке сортування — у всіх цих алгоритмів є свої плюси і мінуси. Але їх не можна використовувати безпосередньо, якщо всі дані не можна помістити в оперативну пам’ять (вони знаходяться на зовнішніх носіях). І тоді нам допоможе зовнішнє сортування злиттям:
Ідея дуже проста:
- Ми розбиваємо файл на так звані chunks, кожен із яких можна завантажити в пам’ять.
- Сортуємо кожен з chunks окремо і записуємо у файл вже відсортований список.
- Далі використовуємо сортування злиттям для того, що зберегти рядки у відсортованому вигляді у вихідний файл. Оскільки кожен chunk-файл можна логічно подати як список (або ітератор), то це завдання еквівалентне злиттю безлічі зв’язаних списків в один (інша назва цього алгоритму k-way merge sort).
Нам потрібно буде реалізувати наступний інтерфейс:
@FunctionalInterface
public interface FileSorter {
void sort(String inputPath, String outputPath);
}
Розглянемо його реалізацію:
@RequiredArgsConstructor
public class MergeFileSorter implements FileSorter {
private static final int LINE_LENGTH = 30;
private final int chunkCount;
@Override
public void sort(String inputPath, String outputPath) {
try {
long fileSize = Files.size(Path.of(inputPath));
int batchSize = (int) fileSize / (chunkCount * LINE_LENGTH);
writeChunks(inputPath, batchSize);
mergeAndSort(inputPath, outputPath, batchSize);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
private void mergeAndSort(String inputPath, String outputPath, int batchSize) throws IOException {
Path chunkFolder = getChunkFolder(inputPath);
List<Path> chunks = Files.walk(chunkFolder).filter(path -> !Files.isDirectory(path)).toList();
Iterator<String> mergedIterator = new MergedIterator(chunks);
Path path = Path.of(outputPath);
Files.writeString(path, "");
List<String> lines = new ArrayList<>(batchSize);
while (mergedIterator.hasNext()) {
String line = mergedIterator.next();
lines.add(line);
if (lines.size() >= batchSize) {
Files.write(path, lines, StandardOpenOption.APPEND);
lines.clear();
}
}
if (!lines.isEmpty()) {
Files.write(path, lines, StandardOpenOption.APPEND);
}
}
private void writeChunks(String inputPath, int batchSize) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(inputPath))) {
String line = reader.readLine();
List<String> lines = new ArrayList<>(batchSize);
int index = 0;
while (line != null) {
lines.add(line);
if (lines.size() >= batchSize) {
Collections.sort(lines);
Files.write(getChunkFileName(inputPath, index), lines);
lines.clear();
index++;
}
line = reader.readLine();
}
if (!lines.isEmpty()) {
Collections.sort(lines);
Files.write(getChunkFileName(inputPath, index), lines);
}
}
}
private Path getChunkFolder(String inputPath) {
Path parent = Path.of(inputPath).getParent();
return parent.resolve("chunks");
}
private Path getChunkFileName(String inputPath, int index) {
return getChunkFolder(inputPath).resolve(index + ".txt");
}
class MergedIterator implements Iterator<String> {
private final List<Iterator<String>> iterators;
private final String[] currentData;
public MergedIterator(List<Path> paths) {
iterators = paths.stream().map(this::streamLines).map(Stream::iterator).toList();
currentData = iterators.stream().map(Iterator::next).toArray(String[]::new);
}
private Stream<String> streamLines(Path path) {
try {
return Files.lines(path);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext() {
return iterators.stream().anyMatch(Iterator::hasNext);
}
@Override
public String next() {
String minLine = null;
int minIndex = -1;
for (int i = 0; i < currentData.length; i++) {
if (currentData[i] != null && (minLine == null || currentData[i].compareTo(minLine) == -1)) {
minLine = currentData[i];
minIndex = i;
}
}
Iterator<String> minIterator = iterators.get(minIndex);
currentData[minIndex] = minIterator.hasNext() ? minIterator.next() : null;
return minLine;
}
}
Головна частина цього рішення — клас MergedIterator, який грає роль composite та об’єднує усі ітератори разом. Ідея наступна. Ми отримуємо ітератор, завантажуючи кожен chunk файл як Stream за допомогою методу Files.lines. А далі ми зберігаємо для кожного ітератора поточне значення і на кожному кроці глобальної ітерації шукаємо найменше з них, записуємо у вихідний файл і зчитуємо наступний рядок з цього ж ітератора. Ітерація закінчується, коли ми дійдемо до кінця кожного chunk-файлу.
Це дуже проста реалізація і можна помітити, що хоча на кожному кроці ми видаляємо лише одне значення зі списку currentData, нам доводиться наступного кроку знову шукати мінімальне значення з цього списку. Інтуїція підказує, що краще зберігати currentData не як список, а як відсортований набір (SortedSet), тоді першим елементом у ньому буде потрібне нам мінімальне значення (рядок). Але проблема в тому, що потрібно буде якось дізнатися, якому ітератору відповідає цей рядок, щоб просунути його вперед.
Тому зручніше зберігати в SortedSet не рядок, а об’єкт IteratorInfo з двома полями: рядок та ітератор, але порівнювати ці об’єкти саме по рядку:
class TreeSetMergedIterator implements Iterator<String> {
private final Set<IteratorInfo> iterators;
public TreeSetMergedIterator(List<Path> paths) {
iterators = paths.stream().map(this::streamLines).map(Stream::iterator).map(IteratorInfo::new)
.collect(Collectors.toCollection(TreeSet::new));
}
private Stream<String> streamLines(Path path) {
try {
return Files.lines(path);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext() {
return iterators.stream().map(IteratorInfo::getIterator).anyMatch(Iterator::hasNext);
}
@Override
public String next() {
IteratorInfo info = iterators.iterator().next();
String line = info.getLine();
iterators.remove(info);
if (info.iterator.hasNext()) {
iterators.add(info.next());
}
return line;
}
@Getter
@Setter
static class IteratorInfo implements Comparable<IteratorInfo>{
private final Iterator<String> iterator;
private String line;
public IteratorInfo(Iterator<String> iterator) {
this.iterator = iterator;
line = iterator.next();
}
public IteratorInfo next() {
line = iterator.next();
return this;
}
@Override
public int compareTo(IteratorInfo info) {
return line.compareTo(info.line);
}
}
}
Єдиний підступ у такому підході — ми порушуємо одне з головних правил колекцій, заснованих на унікальності елементів. Ми змінюємо ключ (тобто рядок) після того, як вона вже знаходиться в TreeSet. А ключ повинен бути immutable. Тому нам доводиться спочатку видалити елемент, а потім його знову додати:
iterators.remove(info);
if (info.iterator.hasNext()) {
iterators.add(info.next());
}
Використання БД
Реляційні БД хороші тим, що в них досить легко записати структуровані дані, а потім отримати відсортований варіант за допомогою конструкції SELECT...FROM...ORDER BY.
Для тестування я взяв дві найпопулярніis embedded бази даних — H2 та Sqlite. Вони не вимагають окремого сервера, достатньо лише додати одну Maven бібліотеку до свого про’кту. H2 компактніша (2 Мб jar-файл проти 12 у Sqlite). Обидві технології включають перспективний і в теорії швидший режим in-memory, коли всі дані зберігаються в пам’яті. Це дозволяє забути про такі інфраструктурні завдання, як очищення бази під час тестування.
При цьому не варто впадати в ейфорію. Хоча теоретично H2 підтримує до 2^64 записів у таблиці, проте обмежений обсягом поточної пам’яті на сервері і не підтримує масштабування. Тому цікаво було протестувати бази даних NoSQL (ті ж MongoDB, Redis), які дозволяють рівномірно розподіляти дані по partitions. Можливо, це буде у наступній статті.
Реалізація однакова для обох БД, єдине поле, що змінюється — JDBC URL:
@RequiredArgsConstructor
public class DatabaseSorter implements FileSorter {
private static final String TABLE_SQL= "CREATE TABLE Lines (line VARCHAR(30))";
private static final String INDEX_SQL = "CREATE INDEX idxLines ON Lines(line)";
private static final String INSERTION_SQL = "INSERT INTO Lines VALUES(?)";
private static final String DROP_SQL = "DROP TABLE Lines";
private static final String QUERY_SQL = "SELECT line FROM Lines ORDER BY line";
private final int batchSize;
private final String jdbcUrl;
@Override
public void sort(String inputPath, String outputPath) {
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
insertDataToDB(conn, inputPath);
writeToFile(conn, Path.of(outputPath));
dropTable(conn);
} catch (SQLException | IOException ex) {
throw new RuntimeException(ex);
}
}
private void writeToFile(Connection conn, Path outputPath) throws SQLException, IOException {
List<String> lines = new ArrayList<>(batchSize);
Files.writeString(outputPath, "");
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(QUERY_SQL);
while (rs.next()) {
String line = rs.getString("line");
lines.add(line);
if (lines.size() > batchSize) {
Files.write(outputPath, lines, StandardOpenOption.APPEND);
lines.clear();
}
}
if (!lines.isEmpty()) {
Files.write(outputPath, lines, StandardOpenOption.APPEND);
}
}
}
private void dropTable(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(DROP_SQL);
}
}
private void createTable(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(TABLE_SQL);
}
}
private void insertDataToDB(Connection conn, String inputPath) throws SQLException, IOException {
conn.setAutoCommit(false);
createTable(conn);
try (PreparedStatement insertStmt = conn.prepareStatement(INSERTION_SQL)) {
int count = 0;
try (BufferedReader reader = new BufferedReader(new FileReader(inputPath))) {
String line = reader.readLine();
while (line != null) {
insertStmt.setString(1, line);
insertStmt.addBatch();
count++;
if (count >= batchSize) {
insertStmt.executeBatch();
conn.commit();
count = 0;
}
line = reader.readLine();
}
if (count > 0) {
insertStmt.executeBatch();
conn.commit();
}
}
createIndex(conn);
}
}
private void createIndex(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(INDEX_SQL);
}
}
Декілька слів про це рішення:
- Для максимальної швидкодії був використаний JDBC.
- Таблиця складається з одного стовпця line. Оскільки рядки в принципі можуть дублюватися, його не можна зробити первинним ключем, а тільки звичайним вторинним індексом.
- Для того, щоб індекс не перебудовувався під час вставки даних, спочатку рядки вставляються в таблицю, а потім вже створюється індекс.
- Рядки вставляються в таблицю батчами зазначеного розміру.
Внутрішнє сортування
Для порівняння цікаво взяти дуже просту реалізацію, яка використовує звичний метод Collections.sort:
public class TimSortFileSorter implements FileSorter {
@Override
public void sort(String inputPath, String outputPath) {
try {
List<String> lines = Files.readAllLines(Path.of(inputPath));
Collections.sort(lines);
Files.write(Path.of(outputPath), lines);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
Який тип сортування буде використаний тут? Це хороше питання для співбесіди, тому що правила змінювалися неодноразово у версіях Java 6/7/8. Якщо раніше використовувалося швидке сортування, то зараз для сортування об’єктів (наших рядків) буде використовуватися Tim сортування, комбінація сортувань злиттям і вставками.
Benchmarks
При тестуванні використовувалися наступне залізо та ПЗ:
- JDK 19.0.2;
- JMH 1.36;
- Intel Core i9, 8 ядер, 16 потоків;
- 8 Гб пам’яті для JVM (крім тих випадків, де вказувалося 512Mб).
Було використано два вхідних текстових файли (30 і 1000 Мб), де зберігалися згенеровані випадковими чином
Ось так виглядав основний код тесту:
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
public class FileSorterBenchmark {
private Map<String, FileSorter> sorters;
@Setup
public void setup() {
sorters = new HashMap<>();
sorters.put("h2_memory_100", new DatabaseSorter(100, "jdbc:h2:mem:"));
sorters.put("h2_ssd_100", new DatabaseSorter(100, "jdbc:h2:file:./data.db"));
sorters.put("sqlite_ssd_100", new DatabaseSorter(100, "jdbc:sqlite:./sample.db"));
sorters.put("sqlite_memory_100", new DatabaseSorter(100, "jdbc:sqlite::memory:"));
sorters.put("merge_100", new MergeFileSorter(100, false));
sorters.put("merge_treeset_100", new MergeFileSorter(100, true));
sorters.put("timSort", new TimSortFileSorter());
}
@Param({ "h2_memory_100", "h2_ssd_100", "sqlite_memory_100", "sqlite_ssd_100", "merge_100", "merge_treeset_100", "timSort" })
private String sortMethod;
@Param({ "./input-billion.txt", "./input-30million.txt" })
private String fileName;
@Benchmark
public void replace(Blackhole blackhole) {
FileSorter sorter = sorters.get(sortMethod);
sorter.sort(fileName, "output.txt");
blackhole.consume(sorter);
}
В результаті було отримано цікаві результати:
Метод сортування/Розмір файлу (Мб) | 30 | 1000 |
H2 (memory) | 6.3 | error |
H2 (SSD) | 6.9 | 226 |
H2 (SSD), 512M |
| 239 |
Sqlite | 4.5 | 143 |
Sqlite, 512M |
| 140 |
Зовнішня | 2.3 | 75 |
Зовнішня (TreeSet) | 0.7 | 26 |
Зовнішня (TreeSet), 512M | 0.75 | 28 |
TimSort | 0.7 | 30 |
База даних H2 (в режимі in-memory) виявилася беззахисною перед OutOfMemoryError. Якщо їй виділити 512 Мб для гігабайтного файлу, вона практично відразу падала з цією помилкою. Якщо їй виділялося 8 Гб, вона падала з помилкою «Caused by: java.lang.OutOfMemoryError: Java heap space» під час створення індексу, хоча пам’яті було предостатньо. Проте в SSD варіанті при виділених 512М вона зуміла впоратися із завданням.
Висновки
У цій статті було розглянуто кілька способів сортування файлів, як простіших і зрозуміліших, так і інтелектуальніших, які потребують більше витрат часу на розробку та тестування. Benchmarks показали, що використання зовнішнього сортування виправдане з точки зору ресурсів (час роботи в
Чи можна ще більше оптимізувати цей алгоритм? Зрозуміло, за допомогою розпаралелювання завдань. Зараз і робота з диском (читання/запис), і обчислення (початкове сортування та злиття) виконуються в одному потоці. Можна, наприклад, рознести по різних потоках читання та запис різних файлів, а також сортування chunk-файлів. Тобто в кожному окремому потоці буде сортування та запис окремого chunk-файлу. Я не став наводити деталі в цій статті, щоб не обтяжувати її кодом, можливо викладу в окремій статті.
Ще один напрямок для оптимізації — це масштабування. Зараз наш алгоритм працює для файлів практично будь-якого розміру та навіть невеликих обсягів оперативної пам’яті. Але якщо вхідний файл буде досить великий (розміром з терабайт), то процес сортування буде дуже довгим, тому що ми впираємося в кількість ядер та обчислювальну потужність ядра на нашому сервері. Тому для таких файлів доцільно використовувати кластер із серверів, кожен з яких зберігає лише частину даних (рядків) та сортує їх на своєму сервері. Таку функціональність як partitioning підтримують багато NoSQL баз даних, такі як Redis і MongoDB, можливо, я торкнуся цієї теми в окремій статті.
Серед баз даних, Sqlite показала найкращі результати. Вона не тільки швидше відпрацювала, ніж H2, але і змогла коректно виконати сортування навіть якщо їй виділялося менше пам’яті, ніж обсяг оброблюваних даних. Більше того, у SSD варіанті вона компактніше зберігала дані. Для
5 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів