- Published on
How To Write a Clickhouse Aggregate Function (without being a C++ expert)
- Authors
- Name
- Oden Engineering Team
- @odentech
Introduction
At Oden, we use an open-source Online Analytical Processing (OLAP) database called Clickhouse to store our customer's timeseries data. Clickhouse allows us to easily store trillions of points, coming from our customers at tens of thousands of points per second, and flexibly query them with SQL.
Clickhouse provides Aggregate Functions that produce a result based on a set of input values. These functions allow us to summarize large sets of data into a much smaller set of values; for example, we can take the average, max, min, or sum of chunks of timeseries data, grouped by some time bucket like five minutes or by some other common property.
In this post, you'll learn how to write your own aggregate function. Since Clickhouse is open source, anyone can contribute a function, and the list of functions continues to grow over time.
Contributing an aggregate function to Clickhouse does require writing some C++. However, a minimal understanding C++ and a mindset of persistence is sufficient to write one. This is especially true if the input to your function is simply one or more numbers, and it doesn't need to store a complex data structure as its state. Mostly, you'll be able to just look at the code for other aggregate functions, and copy what they're doing.
Why write an aggregate function?
Much can be accomplished using the aggregate functions provided by Clickhouse. It supports full SQL, plus many extensions. However, there are some cases where it is difficult to express what we want in a delarative manner. This is where it becomes useful to be able to write an aggregate function.
Oden is a manufacturing analytics company. We help factories waste less material and produce more product more efficiently by providing insights into their data. As such, the data that we deal with is a little different from what most analytics companies are looking at.
For example, some of our customers have a counter metric that counts the amount of some product that has been produced. Imagine a rotary encoder, a device that measures angular position of a shaft or axle, counting the number of times a spool of wire has been turned as that wire is being produced. If we sample that position sensor once a second, we'll end up with a stream of numbers that count upwards, and sometimes reset back to zero.
If we want to determine the amount of product produced in some interval, we need to sum the differences between consecutive points, but only if those sums are positive. One might think you can simply subtract the first number you've seen in some interval from the last number, but what if the spool was reset in the middle of that interval?
Oden contributed the deltaSum aggregate function to Clickhouse in order to compute these values.
Understanding Aggregate Functions
We'll develop an understanding of aggregate functions in Clickhouse by way of example. The simplest example is to consider the code required to implement the arithmetic mean:
def mean(values):
accumulator = 0
count = 0
for value in values:
accumulator += value
count += 1
return accumulator / count
We can conceptually separate this function into three distinct pieces:
- The intermediate state. We set up variables,
accumulator
&count
, which store the state of the aggregation. - The operation we perform on each value. We add the value to the accumulator and increment the count inside the foor loop.
- Finalizing the aggregation. In order to finalize this aggregation, we can simply divide the
accumulator
by thecount
.
So with this breakdown, we can rewrite our function with each of the pieces broken out individually:
class Mean:
def __init__(self, name):
self.accumulator = 0
self.count = 0
def add_value(self, value):
self.accumulator += value
self.count += 1
def finalize(self):
return self.accumulator / self.count
So now, we have a Mean
class, which allows our "function" to store some intermediate state. When
we add a value to an instance of the Mean
class, we manipulate that internal state. When we
finalize the aggregation, we use the internal state to produce the final value.
Clickhouse aggregate functions are like the Mean
class above. They store some intermediate state
and can add a value to it, then be finalized when we've added all the values. However, we're
missing three more operations of aggregate functions that extend the functionality enormously:
merge, serialize, and deserialize.
Suppose we had two instances of this Mean
class. Can we formulate an operation in which we can
combine those two instances, and then finalize that third instance? If so, this could potentially
be a way to parallelize, and to be able to store partial aggregations on disk.
Of course, this is trivial as well: all we need to do is add the accumulator
and the count
.
Then, we just need functions to serialize and deserialize:
def merge(self, other):
new_mean = Mean()
new_mean.count = self.count + other.count
new_mean.accumulator = self.accumulator + other.accumulator
return new_mean
def serialize(self, buffer):
buffer.write_int(self.accumulator)
buffer.write_int(self.count)
def deserialize(self, buffer):
self.accumulator = buffer.read_int()
self.count = buffer.read_int()
So now, we've got a class that can average numbers, but can also be serialized to disk if we're not quite 'ready' yet to finalize that aggregation. We can also parallelize a large averaging operation. If we have millions of values to average, we can spin up several threads and add a million values to an intermediate state in each thread, then merge those states. This concept of aggregate functions is integral to understanding how Clickhouse manages to be so fast.
Next, we'll take a look at how they're implemented.
Implementation details
First, you'll have to get a development environment for Clickhouse going. The
instructions in the
Clickhouse Documentation
are quite good, so that should be enough. I'd recommend doing a debug build, as it'll compile much
faster. Clickhouse has a lot of source code, so it can take around an hour to compile the whole
thing. Follow the instructions to get a compiled clickhouse-server
binary.
The source code for aggregations lives in the
src/AggregateFunctions
directory. It would be a good idea at this point to simply read a couple of the functions. You'll
notice the majority of the implementation is in headers; the .cpp
file for a given aggregate
function is simply a type of constructor. This is because a large amount of the code is templated.
It also allows for more inlining, giving the compiler more opportunities for optimization. And, it
means we don't have to write an implementation of our function for every numeric type there is.
A good place to start is to simply copy one of the other aggregate function implementations into
a new file, then rename all of the references. The
aggregate function Oden contributed, deltaSum
,
could be a good choice. The PR also
shows all of the steps necessary to get an aggregate function to work, so it's a good reference.
Try to find an aggregate function that needs to store similar state to the one you want to write.
That way, you'll have a good reference for how to initialize, serialize, and deserialize that
state.
The example below would be implemented in a file called
src/AggregateFunctions/AggregateFunctionMyMean.h
.
Design your intermediate state
The first step is to design the intermediate state that you'll need for your computation. If your function simply takes one or more numeric types, this is relatively easy; you'll just need a templated struct that stores the values you'll need. For example:
template <typename T>
struct AggregateFunctionMyMeanData
{
T accumulator = 0;
T count = 0;
};
For more complex types of intermediate state, refer to other functions for inspiration. For
example, if you need a hash table, check out QuantileExactWeighted.h
to see how the map for that
calculation was used. If you need a set of unique values, have a look at AggregateFunctionTopK.h
.
Once you've figured out what data to store, you can move on to implementing the five operations listed out above.
Writing Add, Merge, and Finalize
The next step is to write the add and merge operations. You'll need some boilerplate code, which you can grab from another file:
template <typename T>
class AggregationFunctionMyMean final
: public IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>
{
public:
AggregationFunctionMyMean(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>{arguments, params, createResultType()}
{}
AggregationFunctionMyMean()
: IAggregateFunctionDataHelper<AggregationFunctionMyMeanData<T>, AggregationFunctionMyMean<T>>{}
{}
String getName() const override { return "myMean"; }
static DataTypePtr createResultType() { return std::make_shared<DataTypeNumber<T>>(); }
bool allocatesMemoryInArena() const override { return false; }
}
This class template is where we'll actually be implementing the operations on our aggregate state.
Take note of the createResultType
function. If your function doesn't return a numeric value,
you'll have to include the correct data type header, such as src/DataTypes/DataTypeArray.h
, and
construct the object that corresponds with the intended return type of your aggregate function.
Next, we can go ahead and write our add function:
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
auto value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
auto data = &this->data(place);
data->count += 1;
data->accumulator += value;
}
this->data(place)
essentially returns a reference to the correct instance of your data struct.
That function is provided by the IAggregateFunctionDataHelper
that we inherited from when
creating our class template.
The merge function looks somewhat similar:
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
auto lhs_data = &this->data(place);
auto rhs_data = &this->data(rhs);
lhs_data->count += rhs_data->count;
lhs_data->accumulator += rhs_data->accumulator;
}
The code looks very similar to our pesudocode example above. There's some C++ mumbo jumbo, but none of it is going to be unique to the function you're adding.
Lastly, the finalize function needs to write the final value to the result column:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
auto data = &this->data(place);
auto result = data->accumulator / data->count;
assert_cast<ColumnVector<T> &>(to).getData().push_back(result);
}
Writing serialize and deserialize
These operations are also quite simple if your intermediate state is simple. If it's not simple and
you're using a collection from the Clickhouse codebase, it should have a function that serializes
it already. Common/HashTable/HashMap.h
, for example, has a write
function and a Map::Reader
class.
For simple data types, the helpers defined in src/IO/WriteHelpers.h
will allow you to simply
serialize out your variables in their native representation. readPODBinary
and writePODBinary
should take care of any simple value:
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t>) const override
{
writePODBinary(this->data(place).count, buf);
writePODBinary(this->data(place).accumulator, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readPODBinary(this->data(place).count, buf);
readPODBinary(this->data(place).accumulator, buf);
}
Writing the constructor and registering the function
You'll need to write a constructor for your
aggregate function in a .cpp
file, createAggregateFunction$NAME
. This constructor will return
an AggregateFunctionPtr
, a pointer to IAggregateFunction
, which your class will implement,
based on the data types provided. In other words, it generates a concrete implementation for the
types that were actually handed to the function in the SQL query. If the query handed the function
int
s, it generates an IAggregateFunction
class that operates on int
s.
The easiest way to write the constructor is to simply copy a similar one. Here's an example of what
a simple one looks like, which would be defined in src/AggregateFunctions/AggregateFunctionMyAvg.cpp
:
AggregateFunctionPtr createAggregateFunctionMyAvg(
const String & name,
const DataTypes & arguments,
const Array & params,
const Settings *)
{
assertNoParameters(name, params);
if (arguments.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for aggregate function {}", name);
const DataTypePtr & data_type = arguments[0];
if (isInteger(data_type) || isFloat(data_type))
return AggregateFunctionPtr(createWithNumericType<AggregationFunctionMyAvg>(
*data_type, arguments, params));
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
arguments[0]->getName(), name);
}
Note the createWithNumericType
templated function. It will dispatch on the type passed to it and
instantiate the correct version of your function. It's defined in
src/AggregateFunctions/Helpers.h
, which defines numerous other helpers that may be useful to you
if your function does not simply operate on a single numeric input.
To make sure your function is compiled and registered, add it to src/AggregateFunctions/ya.make
and src/AggregateFunctions/registerAggregateFunctions.cpp
.
Adding tests
You'll need to write tests as well. In the tests/queries/0_stateless
directory you'll see
thousands of tests; each has a .reference
file and a .sql
file. They match in number of select
queries, and it's expected that the nth select query in the .sql
file generates the output on the
corresponding line in the .reference
file.
Tests often use the arrayJoin
function. This is, as the Clickhouse documentation states,
a very unusual function.
It takes an array and essentially expands it out into individual rows. This makes it very easy
to write tests for aggregate functions, as we can simply use arrayJoin
to generate the input
data.
For example, we could create the following reference file:
1.5
101
And the following test file:
select myAvg(arrayJoin([0, 1, 2, 3]))
select myAvg(arrayJoin([100, 200, 3]))
Many of the aggregate function tests are written this way. However, it is possible to create a test
table of values and run your aggregate function on that test table. The test for windowFunnel
at tests/queries/0_stateless/00632_aggregation_window_funnel.sql
works this way.
Debugging
The easiest way to debug an aggregate function is to add in a bunch of logging. Clickhouse is able
to send logs for a running query to the client during execution. To add logs, you'll need to
#include <common/logger_useful.h>
, then you can use the LOG_INFO
macro:
LOG_INFO(&Poco::Logger::get("Aggregator"), "Current accumulator value: {}", toString(this->data(place).accumulator));
Then, you can run the Clickhouse client and connect to your server and run:
SET send_logs_level = 'trace'
Now, when testing your function, you'll get the debug logs back.
-State, -Merge, -If, etc combinators
If you've worked with Clickhouse aggregate functions before, you might be wondering, why don't I
have to implement the "state" version of the function? This function is automatically provided as
part of the framework for writing aggregate functions. The -State
, -Merge
, -If
, and several
other combinators come for free.
If you're unfamiliar with these functions, they do what's on the tin: the -State
version of an
aggregate function will return the intermediate state; the -Merge
version of the function will
merge those states. For example, we get myMeanState()
and myMeanMerge()
for free from the
framework. This allows us to use our new aggregate function in materialized views, for example,
without having to manually implement those methods.
Conclusion
Writing Clickhouse aggregate functions is not as hard as it seems. The code is relatively clean, and there's plenty of examples to base your work on. The main barrier to entry is fear of C++. But with some perseverence, it's certainly possible to write a Clickhouse aggregate function without being an expert C++ developer.