Writing a C++ UDA

Contents:

Introduction

Using C++, you can define your own aggregate / analytic functions. A user-defined function of either type (aggregate or analytic) is referred to as a “UDA”. The following categories of C++ UDAs are supported:

  • “Flat” UDAs - In this case, the accumulators are of fixed size and can be mapped directly to byte arrays. In C++11 parlance, they should be “trivially-copyable”.
  • “Complex” UDAs - In this case, the accumulators involve data structures that are more complicated or arbitrary.

First of all, you should understand what is meant by aggregate functions and analytic functions. Essentially, these functions process a subset of 1 or more rows and emit a summary value.

Some common examples are MIN(), MAX(), COUNT(), SUM(), and AVG(). Less common examples, which are nevertheless offered in SQLstream, include statistical functions for variance and standard deviation. All of these (and others) exist in both forms: aggregate and analytic. Meanwhile, SQLstream includes some functions that are currently implemented only as aggregate functions (like LIST_AGG or only as analytic functions (like FIRST_VALUE and LAST_VALUE).

What is the difference between aggregate and analytic functions? Aggregate functions are used with tumbling windows, whereas analytic functions are used with sliding windows. Aggregate functions output a single row that encapsulates all input rows, whereas analytic functions produce 1 output for every input row.

Frequently, aggregate functions use GROUP BY, which is familiar from an RDBMS context. In contrast, analytic functions must use a WINDOW clause, which is unique to streaming data; and they will often use PARTITION BY, which subdivides rows for separate output calculations much as GROUP BY does. For more details about the difference between aggregate and analytic functions, see the discussion in the WINDOW overview.

Examples

Before going through the process of creating, compiling, installing, and calling your own UDA, you should examine the following UDA examples, ensuring that you understand the code fully. These functions can serve as templates when writing your first UDA.

LongAdder

This simple UDA implements addition (SUM) as an aggregate function with no overflow checking. It is a “Flat” UDA, as defined above because the variable acc (short for “accumulator”) is allocated a fixed size: int64_t, a signed 64-bit integer.

class LongAdder : public Count0IsNullUda
{
    int64_t acc;
public:
    void initAdd(CalculatorContext ctx, int64_t value) {
        acc = value;
    }
    void inline add(CalculatorContext ctx, int64_t value) {
        acc += value;
    }
};
INSTALL_UDA(longSum, LongAdder, int64_t)

LongAdderChecked

This next UDA is identical to the previous example but with overflow checking.

class LongAdderChecked : public Count0IsNullUda
{
    int64_t acc;
public:
    // you can either provide an init method or an initAdd method
    void init() {
        acc = 0;
    }
    void inline add(CalculatorContext ctx, int64_t value) {
        if (__builtin_add_overflow(acc, value, &acc)) {
            ctx.throwException(fennel::SqlState::instance().code22003());
        }
    }
};
INSTALL_UDA(longSumChecked, LongAdderChecked, int64_t)

StringConcatter

The UDA defined below is an aggregate function similar to LIST_AGG() but simpler, as it lacks some behavior found in LIST_AGG (such as separator support). In the technical sense (defined above](#introduction), this is a “Complex” UDA because the amount of memory allocated for the accumulator, acc, is variable rather than fixed. As strings are appended, more memory is allocated.

class StringConcatter : public ComplexUda {
public:
    std::string acc;

    inline void init() {
        acc.clear();
    }

    inline void add(CalculatorContext ctx, varchar_t value) {
        acc.append(value.data, value.size);
    }

    inline void getResult(CalculatorContext ctx, const ResultRegister<varchar_t> &result) const {
        // this would work, but allocate extra memory and do a copy.
        // result = acc;
        // instead return reference
        result.reference(acc.data(), acc.size());
    }
};
INSTALL_BASE_UDA(concatterAgg, StringConcatter)
INSTALL_UDA_RESULT_FUNCTION(strlistAgg, concatterAgg, StringConcatter, getResult)

LongAdderChecked (Analytic Version)

So far, all UDA examples have been aggregate functions. The code below, on the other hand, defines an analytic function. It implements addition (SUM) with overflow checking, making it similar to the aggregate version above.

template<typename TYPE>
class AdderAnalytic : public Count0IsInitialUda
{
    TYPE acc;
    bool doAdd(TYPE x, TYPE y, TYPE &result) {
        if constexpr(std::is_integral<TYPE>::value) {
            return __builtin_add_overflow(x, y, &result);
        } else {
            result = x + y;
            return std::isnan(result);
        }
    }
public:
    void initAdd(CalculatorContext ctx, TYPE value) {
        //fprintf(stderr, "initAdd %d\n", value);
        acc = value;
    }
    void inline add(CalculatorContext ctx, TYPE value) {
        //fprintf(stderr, "add %d, %d\n", acc, value);
        if (doAdd(acc, value, acc)) {
            ctx.throwException(fennel::SqlState::instance().code22003());
        }
    }
    void inline drop(CalculatorContext ctx, TYPE value) {
        //fprintf(stderr, "drop %d, %d\n", acc, value);
        acc -= value;
    }
    void inline addAccumulator(CalculatorContext ctx, AdderAnalytic<TYPE> const &other) {
        //fprintf(stderr, "addAll %d\n", acc);
        add(ctx, other.acc);
    }
    void inline dropAccumulator(CalculatorContext ctx, AdderAnalytic<TYPE> const &other) {
        //fprintf(stderr, "dropAll %d\n", acc);
        drop(ctx, other.acc);
    }
};
INSTALL_ANALYTIC(myAddAnalytic, AdderAnalytic<int32_t>, int32_t)

Guidelines

Before discussing how to compile, install, and call your UDA, here is some general guidance for how to properly create that UDA. Here you can learn about class inheritance, C++ data types, and class methods that you will be using in the UDA definition.

UDA Class

As illustrated by the examples above, UDAs should inherit from one of the following:

  • Count0isNullUda
  • Count0IsInitialUda
  • ComplexUda
  • ComplexAnalytic

For “Flat” UDAs, defined above, use Count0isNullUda or Count0IsInitialUda. If you want your UDA to return NULL whenever no rows with non-NULL values match the WINDOW criteria, then use Count0isNullUda. Meanwhile, if you want to return 0 (or whatever the getter returns on a newly initialized accumulator), then use Count0IsInitialUda.

For “Complex” UDAs, use ComplexUda or ComplexAnalytic.

C++ Data Types

When declaring variables in C++ for your UDA definition, you will need to allocate the appropriate amount of memory in order to achieve the desired data type and range in SQL. The following mapping should be used:

SQL Data Type C++ Data Type
TINYINT int8_t
SMALLINT int16_t
INT int32_t
BIGINT, TIMESTAMP, DECIMAL int64_t
DOUBLE, FLOAT double
BOOLEAN bool
CHAR char_t (contains size_t and char *)
BINARY binary_t (contains size_t and uint8_t *)
VARCHAR varchar_t (contains size_t and char *)
VARBINARY varbinary_t (contains size_t and uint8_t *)

C++ Class Methods

The following methods are needed (depending on the type of UDA)

Method Description
void init(CalculatorContext ctx) Needed for complex UDAs. Initializes or re-initializes accumulator.
void add(CalculatorContext ctx, <Parameter Type> parameter…) Needed for all UDAs. Adds UDA parameters to accumulator as rows enter window.
void initAdd(CalculatorContext ctx, <Parameter Type> parameter…) Optional for all UDAs. Adds UDA parameters to accumulator as first row enters window. If not supplied will instead use add().
void drop(CalculatorContext ctx, <Parameter Type> parameter…) Needed for Analytic functions. Subtracts UDA parameters from accumulator as rows leave window.
int serialize(uint8_t *buf, uint &len, uint maxSerializedSize, int index) Needed for complex UDAs. Serializes accumulator
uint8_t buf – pointer to area to serialize into
uint &len – size of serialized result
uint maxSerializedSize – maximum size for serialize result
int index – index into UDA when more than one call is needed to serialize. In first call, index will be 0. Method should return -1 if serialization is complete; otherwise it should return the index to be passed in next call.
void addAccumulator(CalculatorContext ctx, <UDA Type> const &other) Needed for flat Analytic functions. Adds another accumulator to this accumulator.
void addAccumulator(CalculatorContext ctx, varbinary_t &serializedOther) Needed for complex Analytic functions. Adds another accumulator to this accumulator. Will need to de-serialize serializedOther. If th other accumulater needed multiple calls to be serialized, then this method will likewise be called multiple times.
void dropAccumulator(CalculatorContext ctx, <UDA Type> const &parameter) Needed for Analytic functions. Subtracts another accumulator from this accumulator.
void dropAccumulator(CalculatorContext ctx, varbinary_t &serializedOther) Needed for complex Analytic functions. Subtracts another accumulator from this accumulator. Will need to de-serialize serializedOther. If th other accumulater needed multiple calls to be serialized, then this method will likewise be called multiple times.

Making a UDA

You can create your own UDA, using the examples above as templates and following the guidelines provided. In what follows, we describe how to compile, install, and call your UDA. Note: As a prerequisite, ensure that the SQLstream C++ SDK has been installed.

Building the C++ Library

Perform the following steps to build the C++ UDA examples, which are described above:

  1. Navigate to the directory $SQLSTREAM_HOME/examples/c++sdk/examples.
  2. Build a shared library using ./build.sh.

This will create a C++ library: c++sdk/examples/build/plugin/libsampleUdfs.so.

Using the C++ Library

Perform the following steps to use the C++ library examples created above:

  1. Deploy the shared library by copying the newly created libsampleUdfs.so to the s-Server plugin directory, using the following command:

    docker cp libsampleUdfs.so <container-name>:<plugin-directory>
    

    You may choose any convenient location, either using an absolute path or relative to s-Server’s working directory (usually $SQLSTREAM_HOME). For example: $SQLSTREAM_HOME/s-Server/plugin.

  2. Finally, invoke sqlline --run=install.sql, which is a script found in the SDK tarball.

Note: This example assumes a Docker container.

Writing Your C++ UDA

In addition to defining your UDA in C++, as illustrated by the examples above, you will need to modify CMakeLists.txt, including any new source files you want to compile and listing the libraries you want to build.

At the top of each C++ UDF module, you will want to include the following file:

#include "sqlstream/Udf.h"

Likewise, if you want to throw predefined SQLSTATE exceptions, you should also include the following:

#include "fennel/calculator/SqlState.h"

Most likely, you will want to specify the SQLstream namespace:

using namespace sqlstream;

Mapping C++ to SQL

Once you have created the library plugin, you will need to create a corresponding SQL function. In the following code, a SQL aggregate function called mySum() is created based on the longSum plugin, which was defined by INSTALL_UDA() in the LongAdder example above.

CREATE OR REPLACE AGGREGATE FUNCTION "mySum"(I INT) 
RETURNS INT 
LANGUAGE C 
PARAMETER STYLE GENERAL 
NO SQL 
EXTERNAL NAME 'plugin/libsampleudfs.so:longSum'; 

Note: When creating an analytic UDA, as opposed to an aggregate UDA, the CREATE statement should say ANALYTIC instead of AGGREGATE. Consider the following example, which references a different plugin, myAddAnalytic, which was defined above for an analytic version of our LongAdder:

CREATE OR REPLACE ANALYTIC FUNCTION "mySum"(I INT) 
RETURNS INT 
LANGUAGE C 
PARAMETER STYLE GENERAL 
NO SQL 
EXTERNAL NAME 'plugin/libsampleudfs.so:myAddAnalytic'; 

Calling Your UDA

Now that you have created your UDA in C++ and mapped it to a SQL function, you can use it. earlier we mapped the C++ LongAdder class to a SQL mySum() function. Since this is an aggregate UDA, as opposed to analytic, it can be used with GROUP BY in a tumbling window:

SELECT STREAM FLOOR(s.ROWTIME TO HOUR), A, "mySum"(B) AS "HourlyTotal"
FROM "myStream" s 
GROUP BY FLOOR(s.ROWTIME TO HOUR), A
;

Alternatively, if the SQL mySum() function were derived from the C++ AdderAnalytic class instead, it would behave as an analytic UDA. As such, it could be used with a sliding window, here defined implicitly using OVER:

SELECT STREAM A, B, "mySum"(B)
OVER (PARTITION BY A ROWS UNBOUNDED PRECEDING) as "CumulativeTotal" 
FROM "myStream" s
;

Note: Enclosing mySum in double quotes enforces case sensitivity. Without them, it would be equivalent to MYSUM. Use of double quotes in such cases is optional. However, since double quotes were used in the CREATE statement, the SELECT statement follows suit.