Published on

How To Write a Clickhouse Aggregate Function (without being a C++ expert)

Authors

Introduction

At Oden, we use an open-source Online Analytical Processing (OLAP) database called Clickhouse to store our customer's timeseries data. Clickhouse allows us to easily store trillions of points, coming from our customers at tens of thousands of points per second, and flexibly query them with SQL.

Clickhouse provides Aggregate Functions that produce a result based on a set of input values. These functions allow us to summarize large sets of data into a much smaller set of values; for example, we can take the average, max, min, or sum of chunks of timeseries data, grouped by some time bucket like five minutes or by some other common property.

In this post, you'll learn how to write your own aggregate function. Since Clickhouse is open source, anyone can contribute a function, and the list of functions continues to grow over time.

Contributing an aggregate function to Clickhouse does require writing some C++. However, a minimal understanding C++ and a mindset of persistence is sufficient to write one. This is especially true if the input to your function is simply one or more numbers, and it doesn't need to store a complex data structure as its state. Mostly, you'll be able to just look at the code for other aggregate functions, and copy what they're doing.

Why write an aggregate function?

Much can be accomplished using the aggregate functions provided by Clickhouse. It supports full SQL, plus many extensions. However, there are some cases where it is difficult to express what we want in a delarative manner. This is where it becomes useful to be able to write an aggregate function.

Oden is a manufacturing analytics company. We help factories waste less material and produce more product more efficiently by providing insights into their data. As such, the data that we deal with is a little different from what most analytics companies are looking at.

For example, some of our customers have a counter metric that counts the amount of some product that has been produced. Imagine a rotary encoder, a device that measures angular position of a shaft or axle, counting the number of times a spool of wire has been turned as that wire is being produced. If we sample that position sensor once a second, we'll end up with a stream of numbers that count upwards, and sometimes reset back to zero.

If we want to determine the amount of product produced in some interval, we need to sum the differences between consecutive points, but only if those sums are positive. One might think you can simply subtract the first number you've seen in some interval from the last number, but what if the spool was reset in the middle of that interval?

Oden contributed the deltaSum aggregate function to Clickhouse in order to compute these values.

Understanding Aggregate Functions

We'll develop an understanding of aggregate functions in Clickhouse by way of example. The simplest example is to consider the code required to implement the arithmetic mean:

def mean(values):
    accumulator = 0
    count = 0
    for value in values:
        accumulator += value
        count += 1
    return accumulator / count

We can conceptually separate this function into three distinct pieces:

  1. The intermediate state. We set up variables, accumulator & count, which store the state of the aggregation.
  2. The operation we perform on each value. We add the value to the accumulator and increment the count inside the foor loop.
  3. Finalizing the aggregation. In order to finalize this aggregation, we can simply divide the accumulator by the count.

So with this breakdown, we can rewrite our function with each of the pieces broken out individually:

class Mean:
    def __init__(self, name):
        self.accumulator = 0
        self.count = 0

    def add_value(self, value):
        self.accumulator += value
        self.count += 1

    def finalize(self):
        return self.accumulator / self.count

So now, we have a Mean class, which allows our "function" to store some intermediate state. When we add a value to an instance of the Mean class, we manipulate that internal state. When we finalize the aggregation, we use the internal state to produce the final value.

Clickhouse aggregate functions are like the Mean class above. They store some intermediate state and can add a value to it, then be finalized when we've added all the values. However, we're missing three more operations of aggregate functions that extend the functionality enormously: merge, serialize, and deserialize.

Suppose we had two instances of this Mean class. Can we formulate an operation in which we can combine those two instances, and then finalize that third instance? If so, this could potentially be a way to parallelize, and to be able to store partial aggregations on disk.

Of course, this is trivial as well: all we need to do is add the accumulator and the count. Then, we just need functions to serialize and deserialize:

    def merge(self, other):
        new_mean = Mean()
        new_mean.count = self.count + other.count
        new_mean.accumulator = self.accumulator + other.accumulator
        return new_mean

    def serialize(self, buffer):
        buffer.write_int(self.accumulator)
        buffer.write_int(self.count)

    def deserialize(self, buffer):
        self.accumulator = buffer.read_int()
        self.count = buffer.read_int()

So now, we've got a class that can average numbers, but can also be serialized to disk if we're not quite 'ready' yet to finalize that aggregation. We can also parallelize a large averaging operation. If we have millions of values to average, we can spin up several threads and add a million values to an intermediate state in each thread, then merge those states. This concept of aggregate functions is integral to understanding how Clickhouse manages to be so fast.

Next, we'll take a look at how they're implemented.

Implementation details

First, you'll have to get a development environment for Clickhouse going. The instructions in the Clickhouse Documentation are quite good, so that should be enough. I'd recommend doing a debug build, as it'll compile much faster. Clickhouse has a lot of source code, so it can take around an hour to compile the whole thing. Follow the instructions to get a compiled clickhouse-server binary.

The source code for aggregations lives in the src/AggregateFunctions directory. It would be a good idea at this point to simply read a couple of the functions. You'll notice the majority of the implementation is in headers; the .cpp file for a given aggregate function is simply a type of constructor. This is because a large amount of the code is templated. It also allows for more inlining, giving the compiler more opportunities for optimization. And, it means we don't have to write an implementation of our function for every numeric type there is.

A good place to start is to simply copy one of the other aggregate function implementations into a new file, then rename all of the references. The aggregate function Oden contributed, deltaSum, could be a good choice. The PR also shows all of the steps necessary to get an aggregate function to work, so it's a good reference. Try to find an aggregate function that needs to store similar state to the one you want to write. That way, you'll have a good reference for how to initialize, serialize, and deserialize that state.

The example below would be implemented in a file called src/AggregateFunctions/AggregateFunctionMyMean.h.

Design your intermediate state

The first step is to design the intermediate state that you'll need for your computation. If your function simply takes one or more numeric types, this is relatively easy; you'll just need a templated struct that stores the values you'll need. For example:

template <typename T>
struct AggregateFunctionMyMeanData
{
    T accumulator = 0;
    T count = 0;
};

For more complex types of intermediate state, refer to other functions for inspiration. For example, if you need a hash table, check out QuantileExactWeighted.h to see how the map for that calculation was used. If you need a set of unique values, have a look at AggregateFunctionTopK.h.

Once you've figured out what data to store, you can move on to implementing the five operations listed out above.

Writing Add, Merge, and Finalize

The next step is to write the add and merge operations. You'll need some boilerplate code, which you can grab from another file:

template <typename T>
class AggregationFunctionMyMean final
    : public IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>
{
public:
    AggregationFunctionMyMean(const DataTypes & arguments, const Array & params)
        : IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>{arguments, params, createResultType()}
    {}

    AggregationFunctionMyMean()
        : IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>{}
    {}

    String getName() const override { return "myMean"; }

    static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); }

    bool allocatesMemoryInArena() const override { return false; }
}

This class template is where we'll actually be implementing the operations on our aggregate state. Take note of the createResultType function. If your function doesn't return a numeric value, you'll have to include the correct data type header, such as src/DataTypes/DataTypeArray.h, and construct the object that corresponds with the intended return type of your aggregate function.

Next, we can go ahead and write our add function:

    void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        auto value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
        auto data = &this->data(place);

        data->count += 1;
        data->accumulator += value;
    }

this->data(place) essentially returns a reference to the correct instance of your data struct. That function is provided by the IAggregateFunctionDataHelper that we inherited from when creating our class template.

The merge function looks somewhat similar:

    void NO_SANITIZE_UNDEFINED ALWAYS_INLINE merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        auto lhs_data = &this->data(place);
        auto rhs_data = &this->data(rhs);

        lhs_data->count += rhs_data->count;
        lhs_data->accumulator += rhs_data->accumulator;
    }

The code looks very similar to our pesudocode example above. There's some C++ mumbo jumbo, but none of it is going to be unique to the function you're adding.

Lastly, the finalize function needs to write the final value to the result column:

    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
    {
        auto data = &this->data(place);
        auto result = data->accumulator / data->count;
        assert_cast<ColumnVector<T> &>(to).getData().push_back(result);
    }

Writing serialize and deserialize

These operations are also quite simple if your intermediate state is simple. If it's not simple and you're using a collection from the Clickhouse codebase, it should have a function that serializes it already. Common/HashTable/HashMap.h, for example, has a write function and a Map::Reader class.

For simple data types, the helpers defined in src/IO/WriteHelpers.h will allow you to simply serialize out your variables in their native representation. readPODBinary and writePODBinary should take care of any simple value:

    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t>) const override
    {
        writePODBinary(this->data(place).count, buf);
        writePODBinary(this->data(place).accumulator, buf);
    }

    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
    {
        readPODBinary(this->data(place).count, buf);
        readPODBinary(this->data(place).accumulator, buf);
    }

Writing the constructor and registering the function

You'll need to write a constructor for your aggregate function in a .cpp file, createAggregateFunction$NAME. This constructor will return an AggregateFunctionPtr, a pointer to IAggregateFunction, which your class will implement, based on the data types provided. In other words, it generates a concrete implementation for the types that were actually handed to the function in the SQL query. If the query handed the function ints, it generates an IAggregateFunction class that operates on ints.

The easiest way to write the constructor is to simply copy a similar one. Here's an example of what a simple one looks like, which would be defined in src/AggregateFunctions/AggregateFunctionMyAvg.cpp:

AggregateFunctionPtr createAggregateFunctionMyAvg(
    const String & name,
    const DataTypes & arguments,
    const Array & params,
    const Settings *)
{
    assertNoParameters(name, params);

    if (arguments.size() != 1)
        throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
            "Incorrect number of arguments for aggregate function {}", name);

    const DataTypePtr & data_type = arguments[0];

    if (isInteger(data_type) || isFloat(data_type))
        return AggregateFunctionPtr(createWithNumericType<AggregationFunctionMyAvg>(
            *data_type, arguments, params));
    else
        throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
            arguments[0]->getName(), name);
}

Note the createWithNumericType templated function. It will dispatch on the type passed to it and instantiate the correct version of your function. It's defined in src/AggregateFunctions/Helpers.h, which defines numerous other helpers that may be useful to you if your function does not simply operate on a single numeric input.

To make sure your function is compiled and registered, add it to src/AggregateFunctions/ya.make and src/AggregateFunctions/registerAggregateFunctions.cpp.

Adding tests

You'll need to write tests as well. In the tests/queries/0_stateless directory you'll see thousands of tests; each has a .reference file and a .sql file. They match in number of select queries, and it's expected that the nth select query in the .sql file generates the output on the corresponding line in the .reference file.

Tests often use the arrayJoin function. This is, as the Clickhouse documentation states, a very unusual function. It takes an array and essentially expands it out into individual rows. This makes it very easy to write tests for aggregate functions, as we can simply use arrayJoin to generate the input data.

For example, we could create the following reference file:

1.5
101

And the following test file:

select myAvg(arrayJoin([0, 1, 2, 3]))
select myAvg(arrayJoin([100, 200, 3]))

Many of the aggregate function tests are written this way. However, it is possible to create a test table of values and run your aggregate function on that test table. The test for windowFunnel at tests/queries/0_stateless/00632_aggregation_window_funnel.sql works this way.

Debugging

The easiest way to debug an aggregate function is to add in a bunch of logging. Clickhouse is able to send logs for a running query to the client during execution. To add logs, you'll need to #include <common/logger_useful.h>, then you can use the LOG_INFO macro:

LOG_INFO(&Poco::Logger::get("Aggregator"), "Current accumulator value: {}", toString(this->data(place).accumulator));

Then, you can run the Clickhouse client and connect to your server and run:

SET send_logs_level = 'trace'

Now, when testing your function, you'll get the debug logs back.

-State, -Merge, -If, etc combinators

If you've worked with Clickhouse aggregate functions before, you might be wondering, why don't I have to implement the "state" version of the function? This function is automatically provided as part of the framework for writing aggregate functions. The -State, -Merge, -If, and several other combinators come for free.

If you're unfamiliar with these functions, they do what's on the tin: the -State version of an aggregate function will return the intermediate state; the -Merge version of the function will merge those states. For example, we get myMeanState() and myMeanMerge() for free from the framework. This allows us to use our new aggregate function in materialized views, for example, without having to manually implement those methods.

Conclusion

Writing Clickhouse aggregate functions is not as hard as it seems. The code is relatively clean, and there's plenty of examples to base your work on. The main barrier to entry is fear of C++. But with some perseverence, it's certainly possible to write a Clickhouse aggregate function without being an expert C++ developer.