Exploring Kùzu Graph Database Management System code

Introduction

Kùzu is a Graph Database Manaagement System born after extensive research conducted over several years at University of Waterloo. Kùzu is highly optimized to handle complex join-heavy analytical workloads on very large databases. It is similar to what DuckDB is doing for SQL. It is extremely useful when you need to model your data as a graph from different sources and store it in one place for fast extraction in analytics. Kùzu has integration with Pytorch Geometric, making it easy to extract graph data and feed it into your PyG models to perform a GNN task. This article contains my annotations from when I started exploring how Kùzu database works. I took a ‘depth limited search’ approach exploring the code by first going to the CLI and running a simple query. I used LLDB to debug and learn more about the overall design of the database.

Starting from the embedded shell

Starting from the CLI tool, the purpose is to track what is happening internally from the initialization to a match query.

Kùzu uses args library to parse the arguments. #include "args.hxx". For instance, database path (-i parameter) can be retrieved by:

auto databasePath = args::get(inputDirFlag);
uint64_t bpSizeInMB = args::get(bpSizeInMBFlag);

Initialize default bufferPoolSize as -1u bit mask: uint64_t bpSizeInBytes = -1u;.

SystemConfig

shell_runner.cpp: SystemConfig systemConfig(bpSizeInBytes);

SystemConfig will initialize 4 variables:

database.cpp:
   24           auto systemMemSize =
-> 25               (std::uint64_t)sysconf(_SC_PHYS_PAGES) * (std::uint64_t)sysconf(_SC_PAGESIZE);

(lldb) p systemMemSize
(unsigned long long) $9 = 34359738368
_SC_PHYS_PAGES : the number of pages of physical memory
_SC_PAGESIZE : size of a page in bytes
-> 26           bufferPoolSize = (uint64_t)(StorageConfig::DEFAULT_BUFFER_POOL_RATIO *
   27                                       (double_t)std::min(systemMemSize, (std::uint64_t)UINTPTR_MAX));
   29       defaultPageBufferPoolSize =
-> 30           (uint64_t)((double_t)bufferPoolSize * StorageConfig::DEFAULT_PAGES_BUFFER_RATIO);
   31       largePageBufferPoolSize =
   32           (uint64_t)((double_t)bufferPoolSize * StorageConfig::LARGE_PAGES_BUFFER_RATIO);
include/common/configs.h:
struct StorageConfig {
    // The default ratio of system memory allocated to buffer pools (including default and large).
    static constexpr double DEFAULT_BUFFER_POOL_RATIO = 0.8;
    // The default ratio of buffer allocated to default and large pages.
    static constexpr double DEFAULT_PAGES_BUFFER_RATIO = 0.75;
    static constexpr double LARGE_PAGES_BUFFER_RATIO = 1.0 - DEFAULT_PAGES_BUFFER_RATIO;
    ... (omitted)
};

(lldb) p largePageBufferPoolSize/(1024*1024*1024)
(unsigned long long) $28 = 6
(lldb) p defaultPageBufferPoolSize/(1024*1024*1024)
(unsigned long long) $29 = 19
(lldb) p maxNumThreads
(uint64_t) $30 = 12

Embedded Shell

Initialize an instance of EmbddedShell (tools/shell/embedded_shell.cpp):

tools/shell/shell_runner.cpp:
-> 33           auto shell = EmbeddedShell(databasePath, systemConfig);

tools/shell/embedded_shell.cpp:

   201  EmbeddedShell::EmbeddedShell(const std::string& databasePath, const SystemConfig& systemConfig) {
-> 202      linenoiseHistoryLoad(HISTORY_PATH);
   203      linenoiseSetCompletionCallback(completion);
   204      linenoiseSetHighlightCallback(highlight);
   205      database = std::make_unique<Database>(databasePath, systemConfig);
   206      conn = std::make_unique<Connection>(database.get());
   207      updateTableNames();
   208  }

Initialize the embedded shell using the databasePath from the parameter and also the systemConfig previously defined:

(lldb) p systemConfig
(const kuzu::main::SystemConfig) $31 = {
  defaultPageBufferPoolSize = 20615843020
  largePageBufferPoolSize = 6871947673
  maxNumThreads = 12
}

linenoise is a lightweight library for editing line, providing useful functionalities such as single and multi line editing mode, history handling, completion, hints as you type, among others. It is used in Redis, MongoDB and Android. The library is embedded in the codebase (tools/shell/linenoise.cpp). I won’t get into the details of linenoise configuration.

-> 205      database = std::make_unique<Database>(databasePath, systemConfig);
-> 206      conn = std::make_unique<Connection>(database.get());

database and conn are both defined in embedded_shell.h:

private:
    std::unique_ptr<Database> database;
    std::unique_ptr<Connection> conn;
};

Line 205 and 206 define the database and get the current connection, respectively. Before getting into connection in the next section, I’ll take a look at the updateTableNames(), since now we are dealing with catalogue to read the database schema.

updateTableNames()

There are two type of tables: node and relations. updateTableNames will store the table names for both by fetching from database->catalog. In my database, I have “person” and “animal” node tables and “hasOwner” and “knows” relations tables:

tools/shell/embedded_shell.cpp:

   67   void EmbeddedShell::updateTableNames() {
   68       nodeTableNames.clear();
   69       relTableNames.clear();
-> 70       for (auto& tableSchema : database->catalog->getReadOnlyVersion()->getNodeTableSchemas()) {
   71           nodeTableNames.push_back(tableSchema.second->tableName);
   72       }
   73       for (auto& tableSchema : database->catalog->getReadOnlyVersion()->getRelTableSchemas()) {
   74           relTableNames.push_back(tableSchema.second->tableName);
   75       }
   76   }

lldb output:

(lldb) p nodeTableNames
(std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) $41 = size=2 {
  [0] = "person"
  [1] = "animal"
}
(lldb) p relTableNames
(std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) $42 = size=2 {
  [0] = "hasOwner"
  [1] = "knows"
}

Connection (src/main/connection.cpp)

Connection is used to interact with a Database instance, and each Connection is thread-safe. Multiple connections can connect to the same Database instance in a multi-threaded environment. The description of the API below was extracted from src/include/main/connection.h:

Creates a connection to the database.

KUZU_API explicit Connection(Database* database);

Destructor

KUZU_API ~Connection();

Manually starts a new read-only transaction in the current connection.

KUZU_API void beginReadOnlyTransaction();

Manually starts a new write transaction in the current connection.

KUZU_API void beginWriteTransaction();

Manually commits the current transaction.

KUZU_API void commit();

Manually rollbacks the current transaction.

KUZU_API void rollback();

Sets the maximum number of threads to use for execution in the current connection.

KUZU_API void setMaxNumThreadForExec(uint64_t numThreads);

Returns the maximum number of threads to use for execution in the current connection.

KUZU_API uint64_t getMaxNumThreadForExec();

Executes the given query and returns the result.

KUZU_API std::unique_ptr<QueryResult> query(const std::string& query);

Prepares the given query and returns the prepared statement.

KUZU_API std::unique_ptr<PreparedStatement> prepare(const std::string& query);

Executes the given prepared statement with args and returns the result.

KUZU_API template<typename... Args>
inline std::unique_ptr<QueryResult> execute(
    PreparedStatement* preparedStatement, std::pair<std::string, Args>... args) {
    std::unordered_map<std::string, std::shared_ptr<common::Value>> inputParameters;
    return executeWithParams(preparedStatement, inputParameters, args...);
}

Executes the given prepared statement with inputParams and returns the result.

KUZU_API std::unique_ptr<QueryResult> executeWithParams(PreparedStatement* preparedStatement,
    std::unordered_map<std::string, std::shared_ptr<common::Value>>& inputParams);

Return all node table names in string format.

KUZU_API std::string getNodeTableNames();

Return all rel table names in string format.

KUZU_API std::string getRelTableNames();

Return the node property names.

KUZU_API std::string getNodePropertyNames(const std::string& tableName);

Return the relation property names.

KUZU_API std::string getRelPropertyNames(const std::string& relTableName);

If you wondering what is behind KUZU_API, the datatype is defined in src/include/common/types/types.h:

KUZU_API enum DataTypeID : uint8_t {
    ANY = 0,
    NODE = 10,
    REL = 11,

    // physical types

    // fixed size types
    BOOL = 22,
    INT64 = 23,
    DOUBLE = 24,
    DATE = 25,
    TIMESTAMP = 26,
    INTERVAL = 27,

    INTERNAL_ID = 40,

    // variable size types
    STRING = 50,
    LIST = 52,
};

Starting from C++ API

I will now explore COPY command from the C++ API by using the existing example from examples/cpp/main.cpp. To compile, you just have to add add_subdirectory(examples/cpp) inside CMakeLists.txt and run make test or make debug. The example will be compiled and available at build/debug/examples/cpp or build/release/examples/cpp depending on the make parameter used to compile.

main.cpp

#include <iostream>

#include "main/kuzu.h"
using namespace kuzu::main;

int main() {
    auto database = std::make_unique<Database>("/tmp/db");
    auto connection = std::make_unique<Connection>(database.get());

    connection->query("CREATE NODE TABLE tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));");
    connection->query("COPY tableOfTypes FROM \"/Users/rfdavid/Devel/waterloo/kuzu/dataset/copy-test/node/csv/types_50k.csv\" (HEADER=true);");
}

This example created a node table named tableOfTypes (from copy-test schema) and use the command COPY to import 50k rows from types_50k.csv file.

I will start debugging by adding a breakpoint before the COPY command:

(lldb) b main.cpp:11
Breakpoint 1: where = example-cpp`main + 136 at main.cpp:11:5, address = 0x0000000100003c80
(lldb) r
Process 59055 launched: '/Users/rfdavid/Devel/waterloo/kuzu/build/debug/examples/cpp/example-cpp' (arm64)
Process 59055 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
    frame #0: 0x0000000100003c80 example-cpp`main at main.cpp:11:5
   1   	#include <iostream>
   2
   3   	#include "main/kuzu.h"
   4   	using namespace kuzu::main;
   5
   6   	int main() {
   7   	    auto database = std::make_unique<Database>("/tmp/db");
   8   	    auto connection = std::make_unique<Connection>(database.get());
   9
   10  	    connection->query("CREATE NODE TABLE tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));");
-> 11  	    connection->query("COPY tableOfTypes FROM \"/Users/rfdavid/Devel/waterloo/kuzu/dataset/copy-test/node/csv/types_50k.csv\" (HEADER=true);");
   12  	}
Target 0: (example-cpp) stopped.

Inside Connection::query, a mutex lock is set, a preparedStatement will be created and executed through executeAndAutoCommitIfNecessaryNoLock.

   76  	std::unique_ptr<QueryResult> Connection::query(const std::string& query) {
   77  	    lock_t lck{mtx};
-> 78  	    auto preparedStatement = prepareNoLock(query);
   79  	    return executeAndAutoCommitIfNecessaryNoLock(preparedStatement.get());
   80  	}

A prepared statement is a parameterized query used to avoid repeated execution of the same query. prepareNoLock will go through the following steps: parsing, binding, planning and optmizing and then return a PreparedStatement object to Connection::query.