on
Exploring Kùzu Graph Database Management System code
Introduction
Kùzu is a Graph Database Manaagement System born after extensive research conducted over several years at University of Waterloo. Kùzu is highly optimized to handle complex join-heavy analytical workloads on very large databases. It is similar to what DuckDB is doing for SQL. It is extremely useful when you need to model your data as a graph from different sources and store it in one place for fast extraction in analytics. Kùzu has integration with Pytorch Geometric, making it easy to extract graph data and feed it into your PyG models to perform a GNN task. This article contains my annotations from when I started exploring how Kùzu database works. I took a ‘depth limited search’ approach exploring the code by first going to the CLI and running a simple query. I used LLDB to debug and learn more about the overall design of the database.
Starting from the embedded shell
Starting from the CLI tool, the purpose is to track what is happening internally from the initialization to a match query.
Kùzu uses args library to parse the arguments.
#include "args.hxx"
. For instance, database path (-i parameter) can be
retrieved by:
auto databasePath = args::get(inputDirFlag);
uint64_t bpSizeInMB = args::get(bpSizeInMBFlag);
Initialize default bufferPoolSize as -1u bit mask:
uint64_t bpSizeInBytes = -1u;
.
SystemConfig
shell_runner.cpp: SystemConfig systemConfig(bpSizeInBytes);
SystemConfig will initialize 4 variables:
- systemMemSize: total memory in the system. This is accomplished by mutiplying the number of pages of physical memory by the size of a page in bytes. Both values are retrieved using sysconf from unistd.h library.
database.cpp:
24 auto systemMemSize =
-> 25 (std::uint64_t)sysconf(_SC_PHYS_PAGES) * (std::uint64_t)sysconf(_SC_PAGESIZE);
(lldb) p systemMemSize
(unsigned long long) $9 = 34359738368
_SC_PHYS_PAGES : the number of pages of physical memory
_SC_PAGESIZE : size of a page in bytes
- bufferPoolSize: defined by the system memory or UINTPTR_MAX x default pages buffer ratio. UINTPTR_MAX is the larges value uintptr_t can hold. StorageConfig is located at include/common/configs.h and contains the struct with many default values used by the application.
-> 26 bufferPoolSize = (uint64_t)(StorageConfig::DEFAULT_BUFFER_POOL_RATIO *
27 (double_t)std::min(systemMemSize, (std::uint64_t)UINTPTR_MAX));
- defaultPageBufferPoolSize and largePageBufferPoolSize: the bufferPoolSize multiplied by the ratio defined for default pages and large pages.
29 defaultPageBufferPoolSize =
-> 30 (uint64_t)((double_t)bufferPoolSize * StorageConfig::DEFAULT_PAGES_BUFFER_RATIO);
31 largePageBufferPoolSize =
32 (uint64_t)((double_t)bufferPoolSize * StorageConfig::LARGE_PAGES_BUFFER_RATIO);
include/common/configs.h:
struct StorageConfig {
// The default ratio of system memory allocated to buffer pools (including default and large).
static constexpr double DEFAULT_BUFFER_POOL_RATIO = 0.8;
// The default ratio of buffer allocated to default and large pages.
static constexpr double DEFAULT_PAGES_BUFFER_RATIO = 0.75;
static constexpr double LARGE_PAGES_BUFFER_RATIO = 1.0 - DEFAULT_PAGES_BUFFER_RATIO;
... (omitted)
};
(lldb) p largePageBufferPoolSize/(1024*1024*1024)
(unsigned long long) $28 = 6
(lldb) p defaultPageBufferPoolSize/(1024*1024*1024)
(unsigned long long) $29 = 19
- maxNumThreads: the number of concurrent threads supported by the available hardware. This number is only a hint and might not be accurate.
(lldb) p maxNumThreads
(uint64_t) $30 = 12
Embedded Shell
Initialize an instance of EmbddedShell (tools/shell/embedded_shell.cpp):
tools/shell/shell_runner.cpp:
-> 33 auto shell = EmbeddedShell(databasePath, systemConfig);
tools/shell/embedded_shell.cpp:
201 EmbeddedShell::EmbeddedShell(const std::string& databasePath, const SystemConfig& systemConfig) {
-> 202 linenoiseHistoryLoad(HISTORY_PATH);
203 linenoiseSetCompletionCallback(completion);
204 linenoiseSetHighlightCallback(highlight);
205 database = std::make_unique<Database>(databasePath, systemConfig);
206 conn = std::make_unique<Connection>(database.get());
207 updateTableNames();
208 }
Initialize the embedded shell using the databasePath from the parameter and also the systemConfig previously defined:
(lldb) p systemConfig
(const kuzu::main::SystemConfig) $31 = {
defaultPageBufferPoolSize = 20615843020
largePageBufferPoolSize = 6871947673
maxNumThreads = 12
}
linenoise is a lightweight library for editing line, providing useful functionalities such as single and multi line editing mode, history handling, completion, hints as you type, among others. It is used in Redis, MongoDB and Android. The library is embedded in the codebase (tools/shell/linenoise.cpp). I won’t get into the details of linenoise configuration.
-> 205 database = std::make_unique<Database>(databasePath, systemConfig);
-> 206 conn = std::make_unique<Connection>(database.get());
database and conn are both defined in embedded_shell.h
:
private:
std::unique_ptr<Database> database;
std::unique_ptr<Connection> conn;
};
Line 205 and 206 define the database and get the current connection,
respectively. Before getting into connection in the next section, I’ll take a
look at the updateTableNames()
, since now we are dealing with catalogue to read
the database schema.
updateTableNames()
There are two type of tables: node and relations. updateTableNames will store the table names for both by fetching from database->catalog. In my database, I have “person” and “animal” node tables and “hasOwner” and “knows” relations tables:
tools/shell/embedded_shell.cpp:
67 void EmbeddedShell::updateTableNames() {
68 nodeTableNames.clear();
69 relTableNames.clear();
-> 70 for (auto& tableSchema : database->catalog->getReadOnlyVersion()->getNodeTableSchemas()) {
71 nodeTableNames.push_back(tableSchema.second->tableName);
72 }
73 for (auto& tableSchema : database->catalog->getReadOnlyVersion()->getRelTableSchemas()) {
74 relTableNames.push_back(tableSchema.second->tableName);
75 }
76 }
lldb output:
(lldb) p nodeTableNames
(std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) $41 = size=2 {
[0] = "person"
[1] = "animal"
}
(lldb) p relTableNames
(std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) $42 = size=2 {
[0] = "hasOwner"
[1] = "knows"
}
Connection (src/main/connection.cpp)
Connection is used to interact with a Database instance, and each Connection is thread-safe.
Multiple connections can connect to the same Database instance in a multi-threaded environment.
The description of the API below was extracted from src/include/main/connection.h
:
Creates a connection to the database.
KUZU_API explicit Connection(Database* database);
Destructor
KUZU_API ~Connection();
Manually starts a new read-only transaction in the current connection.
KUZU_API void beginReadOnlyTransaction();
Manually starts a new write transaction in the current connection.
KUZU_API void beginWriteTransaction();
Manually commits the current transaction.
KUZU_API void commit();
Manually rollbacks the current transaction.
KUZU_API void rollback();
Sets the maximum number of threads to use for execution in the current connection.
KUZU_API void setMaxNumThreadForExec(uint64_t numThreads);
Returns the maximum number of threads to use for execution in the current connection.
KUZU_API uint64_t getMaxNumThreadForExec();
Executes the given query and returns the result.
KUZU_API std::unique_ptr<QueryResult> query(const std::string& query);
Prepares the given query and returns the prepared statement.
KUZU_API std::unique_ptr<PreparedStatement> prepare(const std::string& query);
Executes the given prepared statement with args and returns the result.
KUZU_API template<typename... Args>
inline std::unique_ptr<QueryResult> execute(
PreparedStatement* preparedStatement, std::pair<std::string, Args>... args) {
std::unordered_map<std::string, std::shared_ptr<common::Value>> inputParameters;
return executeWithParams(preparedStatement, inputParameters, args...);
}
Executes the given prepared statement with inputParams and returns the result.
KUZU_API std::unique_ptr<QueryResult> executeWithParams(PreparedStatement* preparedStatement,
std::unordered_map<std::string, std::shared_ptr<common::Value>>& inputParams);
Return all node table names in string format.
KUZU_API std::string getNodeTableNames();
Return all rel table names in string format.
KUZU_API std::string getRelTableNames();
Return the node property names.
KUZU_API std::string getNodePropertyNames(const std::string& tableName);
Return the relation property names.
KUZU_API std::string getRelPropertyNames(const std::string& relTableName);
If you wondering what is behind KUZU_API, the datatype is defined in src/include/common/types/types.h:
KUZU_API enum DataTypeID : uint8_t {
ANY = 0,
NODE = 10,
REL = 11,
// physical types
// fixed size types
BOOL = 22,
INT64 = 23,
DOUBLE = 24,
DATE = 25,
TIMESTAMP = 26,
INTERVAL = 27,
INTERNAL_ID = 40,
// variable size types
STRING = 50,
LIST = 52,
};
Starting from C++ API
I will now explore COPY
command from the C++ API by using the existing
example from examples/cpp/main.cpp
. To compile, you just have to add
add_subdirectory(examples/cpp)
inside CMakeLists.txt
and run make test
or make debug
. The example will be compiled and available at
build/debug/examples/cpp
or build/release/examples/cpp
depending on the
make parameter used to compile.
main.cpp
#include <iostream>
#include "main/kuzu.h"
using namespace kuzu::main;
int main() {
auto database = std::make_unique<Database>("/tmp/db");
auto connection = std::make_unique<Connection>(database.get());
connection->query("CREATE NODE TABLE tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));");
connection->query("COPY tableOfTypes FROM \"/Users/rfdavid/Devel/waterloo/kuzu/dataset/copy-test/node/csv/types_50k.csv\" (HEADER=true);");
}
This example created a node table named tableOfTypes
(from copy-test schema)
and use the command COPY to import 50k rows from types_50k.csv
file.
I will start debugging by adding a breakpoint before the COPY command:
(lldb) b main.cpp:11
Breakpoint 1: where = example-cpp`main + 136 at main.cpp:11:5, address = 0x0000000100003c80
(lldb) r
Process 59055 launched: '/Users/rfdavid/Devel/waterloo/kuzu/build/debug/examples/cpp/example-cpp' (arm64)
Process 59055 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
frame #0: 0x0000000100003c80 example-cpp`main at main.cpp:11:5
1 #include <iostream>
2
3 #include "main/kuzu.h"
4 using namespace kuzu::main;
5
6 int main() {
7 auto database = std::make_unique<Database>("/tmp/db");
8 auto connection = std::make_unique<Connection>(database.get());
9
10 connection->query("CREATE NODE TABLE tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));");
-> 11 connection->query("COPY tableOfTypes FROM \"/Users/rfdavid/Devel/waterloo/kuzu/dataset/copy-test/node/csv/types_50k.csv\" (HEADER=true);");
12 }
Target 0: (example-cpp) stopped.
Inside Connection::query
, a mutex lock is set, a preparedStatement
will be
created and executed through executeAndAutoCommitIfNecessaryNoLock
.
76 std::unique_ptr<QueryResult> Connection::query(const std::string& query) {
77 lock_t lck{mtx};
-> 78 auto preparedStatement = prepareNoLock(query);
79 return executeAndAutoCommitIfNecessaryNoLock(preparedStatement.get());
80 }
A prepared statement is a parameterized query used to avoid repeated execution
of the same query. prepareNoLock
will go through the following steps:
parsing, binding, planning and optmizing and then return a PreparedStatement
object to Connection::query
.