Building my own analytical DBMS

After a long break from doing any side projects, I decided to do some programming in my free time. Those who know me from my Meta days or from TopCoder competitions, know that I like to write code which runs fast and solves hard problems. I’ve also always been curious about analytical databases, but never directly worked on one.

I found a perfect project for myself which lets me do all of those things: write a an analytical database which attempts to be faster than all the other databases listed in https://benchmark.clickhouse.com/. As an added challenge (like I needed one) I decided to learn Rust while doing this.

I have made some pretty good progress in the last few months, but I’m the first one to admit there’s tons of work before this is even resembles an analytical database. For now it’s mostly an experiment in writing a fast query engine targeting a specific benchmark.

While the code doesn’t support SQL yet, it’s generic enough to handle different types of queries with different data types. Adding some kind of limited SQL support should not affect performance much at all at this point.

I’m not quite ready to share the code yet since it’s a work in progress, but here’s a small glimpse how a larger vector is split into smaller vectors and merged: https://pastebin.com/QxqWprs3 (Some of the code uses “1d” to indicate that the vectors grow in a single dimension. At one point I had vectors growing into two dimensions “2d” but that solution was extremely complicated with little or no performance benefit so I scrapped that one)

The main thing for me is that the solution is fast. It beats all the other databases in about 80% of the benchmark queries despite running on a 16GB 10-core Macbook M1 Pro vs. some extremely beefy server in AWS. While a Macbook has a lot fewer cores at its disposal, the memory access is really fast making it perform similarly as a 16 core (32 vcpu) x86 machine in AWS. Below I give an overview of the technical details which help to explain the fast performance. There’s some benchmark numbers on the bottom comparing this solution to the fastest solutions listed in the benchmark.

Architecture

The solution is serverless. All the data is stored on disk and clients query the data directly from disk. A client can stay alive between queries, but it does not intentionally store anything in memory between queries. Disk cache and other OS level caches can make the following queries faster, but as far as the client is concerned, it always reads data from disk. This approach keeps the solution simple and given the speed of NVMe disks, the performance is quite good even when the data is read from the disk.

The data is stored in a columnar format where each 219 rows of each column are stored together. 219 was chosen since it gave the best overall performance across multiple queries.

Sorted Vectors

This is the main data structure used everywhere. Unlike something like Clickhouse, which uses different types of hashmaps to do GROUP BY queries, sorted vectors are used for all GROUP BY and COUNT DISTINCT queries. There are many nice features about sorted vectors: they do not require any extra memory, they can be split into cache friendly sizes, they can be split across multiple threads and they can be merged in linear time.

Obviously the data needs be sorted to take advantage of sorted vectors. Since this is an analytical DBMS, the data can be presorted. The data is stored in columnar format with 219 rows per block, sorted and all duplicates removed. This means that a single column GROUP BY query or COUNT DISTINCT query does not need to do much preprocessing before the merging can begin.

When multiple columns are present in a GROUP BY query, there’s a bit of massaging and sorting involved to get a single sorted vector. To avoid sorting very large vectors the first column is chosen to be the one with the largest number of unique values. Since the first column can be used in its original sorted format, this means that sorting is limited to the rows within each unique value of the first column.

Vector Size

Sorted vectors can grow quite large and become slow to allocate and cache unfriendly. For these reasons each sorted vector during a GROUP BY is already split into smaller vectors. Empirically 8192 has proven to be a good size for the vectors. Since the vectors are split into smaller vectors, during a merge of two larger vectors, the original 8192 sized vectors can be reused as results vectors. The vectors are being constantly recycled to minimize new allocations.

Memory Allocator

Memory allocator plays a huge role in the performances since a lot of memory is allocated during the queries. Currently Jemalloc is being with the default settings. By default Jemalloc does not immediately give back the memory to the OS after it’s deallocated. This means that purging of unused memory happens after the query is done. This makes Jemalloc a faster allocator than the default Rust allocator.

Splitting vectors amongst multiple threads

To efficiently use multiple cores merging must happen in multiple threads. In the beginning of each merge this is achieved simply by having multiple vectors (191 to be exact in the Clickbench case) of up to 219 rows that need to be merged. The goal is to combine these 191 sorted vectors (which are split into vectors of 8192 elements) into a single sorted vector (split into vectors of 8192 elements). Once the number of vectors start approaching 1, they are split into smaller vectors so that they can be operated by multiple threads in parallel. Splitting each vector into 16 vectors of 8192 elements and letting each thread merge two of these 16×8192 vectors proved to be a reasonable approach.

This strategy keeps all the threads almost 100% busy until the merge is done when there are 16 or fewer cores. Once the number of cores gets much larger, there’s quite a lot waiting happening in the end of the merge because threads will not be able to start a new merge before the previous one is done. To mitigate this, the merge itself can be split into 8 or 16 separate merges before the merging even begins. This can keep all the threads busy to the end. I’ve only experimented with this approach and got slightly better results in a 64+ core environment.

Memory-mapped file

Using memory mapped files instead of reading full files or partially pre-reading files speeds things in almost all the cases. Especially when only parts of the files are needed (see strings below) the improvement is quite significant.

Rust

The solution is written in Rust and while some of the crates it depends on use unsafe Rust (mostly for converting bytes slices into integer slices), the solution itself only uses unsafe Rust for memory-mapped files since it cannot be done with safe Rust. Everything else is implemented using safe Rust only. This means that the performance could possibly be improved with some unsafe code for the vector operations and handwriting the merge code in assembly. However, the goal was to make the solution fast enough using safe Rust so that those optimizations would not be necessary. The added safety of using Rust in a multithreaded environment where data is constantly being moved around between vectors has been useful. There has been no need to debug any weird memory corruption issues which might have been happening had the solution been written in C++.

SIMD

The solution does not use any SIMD instructions at the moment. The main reason is that my Macbook Pro M1 only supports NEON instructions which are limited to 128 bit registers so I haven’t been able to develop in an environment which supports AVX-512 registers. Another reason is that the solution is mostly memory bound since most of the computations are quite simple (compare, add, min, max, etc) and loading the data into memory takes majority of the time. A basic merge can be optimized into very efficient branchless code on both ARM64 (CSEL, CSET) and X86 (CMOV*) making it quite fast. Here’s the current basic merge (no aggregates) method from my solution which generates pretty fast code, although handwriting it in assembly might provide even faster solution.

    
    pub fn merge(
        left: &mut [u64],
        right: &mut [u64],
        result: &mut Vec,
    ) -> (usize, usize) {
        let mut left_pos = 0;
        let mut right_pos = 0;
        assert!(!left.is_empty());
        assert!(!right.is_empty());
        let capacity = result.capacity();
        while left_pos < left.len()
            && right_pos < right.len()
            && result.len() < capacity
        {
            let v1 = left[left_pos];
            let v2 = right[right_pos];
            let comp = v1.cmp(&v2);
            let le = comp.is_le();
            let ge = comp.is_ge();
            right_pos += if ge { 1 } else { 0 };
            left_pos += if le { 1 } else { 0 };
            let value = if le { v1 } else { v2 };
            result.push(value);
        }
        (left_pos, right_pos)
    }

Data format

The total data size for the Clickbench dataset is 13.8GiB which is very close to Clickhouse and smaller than many of the other databases. This means that there's not that much precomputed data which would give a large advantage in queries. Then again a lot of data means a lot of reading of data so a smaller size has its own advantages.

Obviously a lot more than just the values of the sorted vectors need to be stored. The number of times each unique value is present and the rows (0-524287) where the value occurs are also stored. The rows are stored in a custom delta compression format. The more rows there are, the smaller the deltas are between the rows and these can be compressed quite efficiently. If the same value repeats on multiple consecutive rows, the number of rows it repeats on can be stored directly making the compression even more effective.

Not much else is needed for integers. For strings there's plenty of additional information stored.

Strings

Strings are very different from integers since they don't have a fixed length and need to be compressed to avoid bloating the dataset. The goal is still to be able to GROUP BY strings as fast as possible. To achieve this a 128-bit cryptographic hash (BLAKE3) is precomputed for each strings. The sorted values used in GROUP BY and COUNT DISTINCT queries are these 128-bit hashes. While this sounds like cheating (and it is), the goal was to make the solution as fast as possible and using hashes instead of strings is the one of the big reasons why it is so fast. Collisions can obviously happen when hashes are being used, but the likelihood of collision is so tiny even with large datasets and the consequences so meaningless that it doesn't matter. For example on a dataset of 26B rows the likelihood of a collision (assuming perfect hash, quite big assumption...) is 10−18 https://en.wikipedia.org/wiki/Birthday_attack. If the rare collision occurred in that dataset where all the values are unique, the result of a DISTINCT COUNT query would be 26B - 1 instead of 26B. It is billions of times (at least) more likely that the solution itself contains bugs which produce incorrect results than hitting a collision with 128-bit hashes.

The strings themselves are stored in an alphabetic order in blocks of 128 strings in each block. Each block uses common prefix compression (the number of shared bytes with the previous string and the unique bytes of the string itself) with LZ4 compression. The goal of using blocks is to avoid decompression of all strings if only some of the blocks are needed. Using blocks is necessary because LZ4 compression is not effective on very small strings.

To speed up substring matches a trigram of each three-byte ASCII sequence is stored from each string. The trigrams are limited to ASCII characters since supporting non-ASCII would require storing a massive amount of data. Trigrams point to the blocks where the given trigram is present. For example given a query LIKE '%google%', only blocks which have the trigrams 'goo', 'oog' and 'gle' are loaded, decompressed and searched for 'google'.

Since the sorted 128-bit hashes and the strings themselves are stored in different orders, mapping between the orders is stored as well. Despite all the extra data stored for strings, the size of the dataset is not any larger than most of the other datasets in the benchmark.

Benchmark numbers

Clickbench numbers which highlight how fast the solution can run against other databases despite only having access to 10 cores. This is only a small selected set of queries to highlight where the performance gains come from.

Query 4: SELECT COUNT(DISTINCT UserID) FROM hits
46ms vs 88ms
(StarRocks c6a.metal)
The performance comes from the fact that the user ids are already in sorted order and there's no preprocessing involved other than dividing the user ids in vectors of 8192 elements before the merging begins.

Query 5: SELECT COUNT(DISTINCT SearchPhrase) FROM hits
28ms vs 128ms (Clickhouse tuned c6a.metal)
Same reasoning as in query 4 except using 128 bit hashes and never even loading and decompressing the strings makes this even faster compared to the benchmark.

Query 8: SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10
73ms vs 120ms (StarRocks c6a.metal)
Region ids are in sorted order and matching user ids can be split amongst the region ids in sorted order as well. The count distinct is postponed until the initial group by is done and the limit can be used to limit the number of count distinct operations.

Query 9: SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10
108ms vs 270ms
(Clickhouse tuned c6a.metal)
Similar approach as in query 8 with the added aggregations, but the since the limit is not based on the count distinct column, only 10 count distinct operations need to be done after the initial group by.

Query 12: SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10
37ms vs 140ms (Clickhouse tuned c6a.metal)
Grouping 128-bit hashes and simply skipping the hash produced by the empty string.

Query 16: SELECT UserID, SearchPhrase, COUNT() FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT() DESC LIMIT 10
210ms vs 300ms (StarRocks c6a.metal)
Two group by columns are still fast despite some preprocessing needed to incorporate the second column in sorted order. 128-bit hashes for search phrase help the performance.

Query 17: SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10
29ms vs 100ms (Clickhouse tuned c6a.metal)
Almost identical to query 16 except no order by column which means that it's enough to group by any 10 user ids and search phrases pairs as long as they are the same for all the 191 blocks. The fact that user ids are already in sorted order makes it fast to find a common a set of 10 across all the 191 blocks.

Query 20: SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%'
54ms vs 80ms
(Databend c6a.metal)
Using trigrams reduces the amount of data which needs to decompressed and searched for the match.

Query 26: SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10
11ms vs 20ms
(StarRocks c6a.metal)
The rows can be loaded in the order of already sorted event times and the loading can be stopped once there are 10 rows which do not have an empty search phrase. This makes the query very fast since only a very small amount of data needs to be loaded.

Clickbench numbers of the queries where 10 cores are not quite enough to beat the fastest database (but running the solution on c6a.metal would still be able to beat the fastest)

Query 18: SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT() FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT() DESC LIMIT 10
460ms vs 440ms
(StarRocks c6a.metal)
Three column group by takes a bit more time in preprocessing.

Query 30: SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
132ms vs 90ms (Clickhouse tuned c6a.metal)
Two group by columns and two aggregate columns requires plenty of preprocessing before the merging begins. Another column being used as a filter makes things faster vs. not having that filter, but given the data format a lot of the data must be loaded even if it's filtered out later.

Query 31: SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10
211ms vs 160ms Clickhouse tuned c6a.metal)
Similar to query 30 except there's a lot more data since WatchIDs are practically all unique

Query 32: SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10
730ms vs 500ms
(Snowflake (64×3XL))
This is the worst case scenario for a group by query since all but four watch ids are unique. The number of unique results before applying the limit is nearly 100 million. Despite this a sorted vector solution performs well. The query consumes about 4 GiB memory. As a comparison running this query on a local Clickhouse instance on my Macbook Pro takes about 6 seconds and uses more than 11 GiB of memory (I need to close my Google Chrome to be able to run this query using Clickhouse otherwise the dreaded kernel_task process kicks in to move memory around and makes the query even slower).


Posted

in

by

Tags:

Comments

2 responses to “Building my own analytical DBMS”

  1. Aapo Kyrölä Avatar
    Aapo Kyrölä

    Nice project and write-up! Reminds me of my analytical computation targeted graph database project (part of my phd) https://github.com/GraphChi/graphchiDB-scala

    Tricks like delta coding (I think I used gamma coding) for indices were hugely important for performance. Also 10 years ago had less ram to use…

    1. Tuomas Pelkonen Avatar

      Thanks Aapo! I need to check that project out. I’ve been working on even more perf improvements recently (instead of adding any useful features…). I’ll do another post soon about the current status of my project.

Leave a Reply

Your email address will not be published. Required fields are marked *