Writing a UDX

User Defined Transforms (UDX) are simply User Defined Functions (UDFs) that return a table or streaminstead of a scalar value. SQLstream supports UDXs written in Java. Developing custom UDXs requires knowledge of Java programming.

This topic contains information on the following:

Writing a UDX Overview

Once you write and compile the UDX, you implement it in s-Server using the SQL statement CREATE FUNCTION. The EXTERNAL NAME clause of a CREATE FUNCTION statement associates the UDX with a public Java class. For more information, see Declaring a UDX below. That class must contain a public execute() method.

Note: This execute() method may raise a SQLException.

The execute() method must have one more argument than the UDX has parameters. A UDX typically accepts one or more cursors (drawn from java.sql.ResultSet) along with scalar values (optional) as input parameters. These cursors can also employ “streaming” result sets, using an extension of java.sql.ResultSet called com.sqlstream.jdbc.StreamingResultSet. The final argument of the execute() method must have type java.sql.PreparedStatement. For more information, see the topic Streaming ResultSet in this guide.

The leading arguments correspond, in order, to the UDX parameters. Such leading arguments can have the following types:

  • A scalar UDX parameter (in the CREATE FUNCTION topic of the SQLstream Streaming SQL Reference Guide).
  • A non-streaming UDX cursor parameter corresponds to a Java argument of type java.sql.ResultSet.
  • A streaming UDX cursor parameter corresponds to a Java argument of type com.sqlstream.jdbc.StreamingResultSet.

An abstract class, com.sqlstream.plugin.impl.AbstractBaseUdx, is included in the file $SQLSTREAM_HOME/lib/aspen-core.jar. This file is installed with s-Server. For best efficiency, we recommend building custom UDXs by extending this class. We provide an example of doing so below.

Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/<VERSION>/s-Server. . The Javadoc for AbstractBaseUdx is located at $SQLSTREAM_HOME/doc/SQLstream-JDBC-API.zip.

Setting Up the UDX

The first part of developing a UDX involves setting up a Maven project for your UDX. For more information on Maven, see https://maven.apache.org/

Using Maven, create your project.

As a parent pom, use $SQLSTREAM_HOME/examples/sdk/pom.xml.

Using this pom gives you compile time access to all of the API that’s available within the s-Server plugin environment, whether for a UDX, a UDF, or an adapter.

Invoke $SQLSTREAM_HOME/examples/sdk/install.sh to install artifacts.

cd $SQLSTREAM_HOME/examples/sdk
  ./install.sh
  cd $SQLSTREAM_HOME/examples/sdk/udxSample
  mvn package

(The above will only work if you have $SQLSTREAM_HOME defined.)

A sample pom file for a UDX might look like the following:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.sqlstream.udr</groupId>
    <artifactId>udr</artifactId>
    <version>6.1.0-SNAPSHOT</version>
    <relativePath>..</relativePath>
  </parent>
  <artifactId>AesEncryptDecrypt</artifactId>
  <packaging>jar</packaging>
  <name>Aspen AES Encrypt/Decrypt UDX</name>
</project>

Simple UDX implementation using java.sql.ResultSet and java.sql.PreparedStatement

Here is a simple example of a non-streaming UDX implementation that transfers column values from the input cursor (java.sql.PreparedStatement) to the output cursor (java.sql.ResultSet). This code will function as a UDX, but is not as efficient as extending AbstractBaseUDX, described below.

public static void execute(
    //copy input data to output prepared statement
    java.sql.ResultSet inputRows, int input1, String input2, double input3,
    java.sql.PreparedStatement  results)
     throws SQLException
    {
    while(inputRows.next()) {
         //read row from input
        try {
            int numColumns = inputRows.getMetaData().getColumnCount();
           for(int i = 1; i <= numColumns; i++) {results.setObject(i, inputRows.getObject(i));}
           // …
           // custom code that transforms data according to your specifications
           // …
        } catch (SQLException se) {
            tracer.warning(
                "Failed to execute function or set parameters for output");
        }
        results.executeUpdate();     // push the row out.
    }
}

Methods in AbstractBaseUDX

AbstractBaseUDX contains the following methods that you may find useful:

  • createColumnTransferMap. Creates a column transfer map which identifies like-named columns in the specified input ResultSet and the specified output column map whose data is to be copied from input to output.
  • createPassMap. Creates pass-through column map. Pass-through columns are like-named columns in UDX input ResultSet and output PreparedStatement whose data is to be copied from input to output.
  • passThruColumns. Calls transfer columns. Does not require parameters, uses createPassMap.
  • setParam. Sets value for parameter colNum in output (“results”) row given the column name, number and type.
  • transferColumns. Fast copies pass-through columns from input to output. If inputRows is not null it is used as the data source, otherwise row is used as the data source. If both inputRows and row are null the corresponding output columns are assigned SQL null. If both inputRows and row are not null, then inputRows is used as the data source and row is updated with column values from inputRows.

The member colMap is a map from column names to an int[2] containing column number and column type. To determine which field number to set for column “myColumn”, for example, you can use colMap.get(“myColumn”)[0];

Using a JAVA User Defined Transform (UDX) to Create Custom Windowed Aggregation Functions

The topic below describes how a JAVA User Defined Transform (UDX) can be used to create custom windowed aggregation functions that aggregate values on a “tumbling” window of rows over time.

SQL Standard Aggregation Syntax

SELECT STREAM
    FLOOR(s.ROWTIME TO MINUTE), "partCol1", "partCol2",
    "CustomAgg1"("aggCol") AS "aggResult1",
    "CustomAgg2"("aggCol2") AS "aggResult2"
FROM "aStream" AS s
GROUP BY FLOOR(s.ROWTIME TO MINUTE), "partCol1", "partCol2";

In the SQL example above, “CustomAgg1” & “CustomAgg2” are custom aggregation functions that a user needs to implement in JAVA. Subsequent paragraphs describe how you can use a JAVA UDX can be used to implement such custom aggregation.

First, we declare (create) the UDX that can be used to perform Windowed Aggregation:

CREATE OR REPLACE FUNCTION "GroupingAggUDX"(
    "inputRows" CURSOR
    "aggColName1" VARCHAR(64), "partitionColName1" VARCHAR(64),
    "aggColName2" VARCHAR(64), "partitionColName2" VARCHAR(64),
    "rowtimeColName" VARCHAR(64), "windowSize" INTEGER)
    RETURNS TABLE (
        ROWTIME TIMESTAMP,
        "partCol1" VARCHAR(64),
        "partCol2" INTEGER,
        "aggResult1" DOUBLE,
        "aggResult2" DOUBLE
    )
    LANGUAGE JAVA
    PARAMETER STYLE SYSTEM DEFINED JAVA
    NO SQL
    EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDX';

At this point, the semantics expressed in windowed aggregation syntax above can be implemented by invoking the sample UDX as described below with the UDX definition we created earlier.

Note: When a table function returns an infinite result, it needs to be wrapped with a STREAM(…) marker when invoked. s-Server will return an error otherwise.

SELECT STREAM ("GroupingAggUDX"(
    CURSOR(SELECT STREAM * FROM "aStream"),
    'aggCol1', 'partCol1', 'aggCol2', 'partCol2', 'ROWTIME', 60000));

The JAVA code below shows the basic structure of how windowed aggregation semantics can be implemented using the function “context” to maintain “windows” of rows and aggregated results that are partitioned by partition column.

import com.sqlstream.plugin.impl.AbstractBaseUdx;
import java.sql.*;
import com.sqlstream.jdbc.*;

public class Sample extends AbstractBaseUdx {
       FunctionContext context;

private Sample(
    java.sql.ResultSet inputRows,
    java.sql.PreparedStatement resultInserter)
    throws SQLException
{
    super(newTracer(), inputRows, resultInserter);
    context = newFunctionContext();
}

public static double simpleUDX(
    java.sql.ResultSet inputRows,
    String aggColName1, String aggColName2,
    String partColName1, String partColName2, String rowtimeColName,
    int windowSize, java.sql.PreparedStatement resultInserter)
    throws SQLException
{
    Sample instance = new Sample(inputRows, resultInserter);

    // Following determines columnIndexes in inputRows for columns
    // needed to compute aggregate
    context.updateContext(
        aggColName1, aggColName2,
        partColName1, partColName2, rowtimeColName,
        windowSize);

    instance.run();
}

public void run() throws SQLException {
    while (inputRows.next()) {
        ...
        String partCol1 = inputRows.getString(functionContext.partColIdx1);
        int partCol2 = inputRows.getInt(functionContext.partColIdx2);
        long rowtimeCol =
           inputRows.getTimestamp(functionContext.rowtimeIdx).getTime();
        while (functionContext.flushGroupedResults(rowtimeCol)) {

            AggBucket bucket = functionContext.getNextBucket();
            functionContext.passGroupingColumns(bucket, resultInserter);

            // set the parameter for aggResult column defined in
            // RETURNS clause of the UDX
            resultInserter.setDouble(
                functionContext.aggResultIdx1, bucket.getResult1());
            resultInserter.setDouble(
                functionContext.aggResultIdx2, bucket.getResult2());

            // insert the result row to the output table of the UDX.
            resultInserter.executeUpdate();
        }

        AggBucket agg = functionContext.getAggBucket(    
            partCol1, partCol2, rowtimeCol, windowSize);
        if (agg == null) {
            functionConext.addAggBucket(newAggBucket(
                partCol1, partCol2, rowtimeCol, windowSize));
        }

        double aggCol1 = inputRows.getDouble(functionContext.aggColIdx1);
        double aggCol2 = inputRows.getDouble(functionContext.aggColIdx2);
        agg.updateBucket(aggCol2, aggCol2);
        ...

    }
}
}

Rowtime Bounds and UDXes

The code samples in earlier sections work using simple JDBC APIs. As with all s-Server streaming SQL, you need to use the SQLstream extensions to these APIs to handle rowtime bounds (punctuations). You need to use the com.sqlstream.jdbc.StreamingResultSet and com.sqlstream.jdbc.StreamingPreparedStatement interfaces for handling rowtime bounds.

The following code block provides an example of implementing rowtime bounds using these two classes:

import java.sql.*;
import com.sqlstream.jdbc.*;
import com.sqlstream.StreamingResultSet.RowEvent;
public class SimpleUdx extends AbstractBaseUdx {

    private int input1;
    private String input2;
    private double input3;

    private SimpleUdx(
    java.sql.ResultSet inputRows, int input1, String input2,
    double input3, java.sql.PreparedStatement  results)
{
    super(tracer, inputRows, results);
    createPassMap();
    this.input1 = input1;
    this.input2 = input2;
    this.input3 = input3;
}

public static void execute(
    java.sql.ResultSet inputRows, int input1, String input2,
    double input3, java.sql.PreparedStatement  results)
    throws SQLException
{
    SimpleUdx instance =
        new SimpleUdx(inputRows, input1, input2, input3, results);
    instance.run();
}
public void run() throws SQLException
{
    StreamingResultSet in = (StreamingResultSet)inputRows;
    StreamingPreparedStatement out = (StreamingPreparedStatement)results;
                 while (true) {
                 try {
                         RowEvent e = in.nextRowOrRowtime(maxIdle);
                         switch (e) {
                         case EndOfStream:
                              // < custom code to handle rowtime bound
				   //end of stream >
                              return;  // end of input
                         case Timeout:
                             // <no data after waiting for queryTimeoutMillis>
                             continue;  
				    // <custom code for handling what happens when server times out>
                         case NewRow:
                             transferColumns(in, null, out, passList);
                             // <custom code to process new row>
                             break;
                         case NewRowtimeBound:
                         Timestamp newBound = in.getRowtimeBound();
                         // <custom code>
                         // pass the rowtime bound to out
                         out.setRowtimeBound(newBound);
                         continue;
                         }
                     } catch (SQLException se) {
                         // custom code. Typically swallow the exception here.
                     }
                     out.executeUpdate();    // throw SQLException
                 }
}
}

Installing the UDX

We recommend building the UDX with mvn package or mvn install, then adding code along the following lines to your SQL script to install the JAR:

CREATE JAR "SimpleUDX"
    LIBRARY 'file:/home/aspen/simpleUDX.jar'
    OPTIONS(0);
--- This code loads the installed Java JAR file

Alternately, you can use a prepackaged script called makeTarball.sh. This script runs mvn package and adds a CREATE OR REPLACE JAR statement to install.sql, which you can then run to install the UDF. makeTarball.sh is located here:

adapters/makeTarball.sh <plugin dir name>

Unpack the tarball into the directory $SQLSTREAM_HOME/plugins.

In the created directory, you should find install.sql. To install your plugin, invoke sqllineClient (available in $SQLSTREAM_HOME/bin) with this script.

Declaring UDXs in s-Server

In order for UDXs to be available in s-Server, you need to declare them in SQL. See the topic CREATE FUNCTION statement in the Streaming SQL Reference Guide for more details. The following code declares a UDX for the JAR defined above.

CREATE OR REPLACE FUNCTION simple_udx(
    inputRows CURSOR, input1 INTEGER,
    input2 VARCHAR(20), input3 DOUBLE)
RETURNS TABLE (inputRows.*, output1 DOUBLE, output2 INTEGER, output3 VARCHAR(40))
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'simple_jar:com.sqlstream.plugin.simple.SimpleUdx.execute';

For the JAR created above, the function definition above searches for the static method com.sqlstream.plugin.simple.SimpleUdx.execute() in the jar file /home/aspen/simpleUDX.jar.

It looks for the method with the signature

public static void execute(java.sql.ResultSet inputRows, int input1, String input2, double input3, java.sql.PreparedStatement  results) throws SQLException

Note that the java method signature has one extra parameter, results, compared to the signature in a typical SQL function definition. The parameter results represents the RETURNS clause of the function definition. It is the java.sql.PreparedStatement for the insert operation to the temporary table/stream being returned by the function. When the UDX is called, s-Server will automatically pass this PreparedStatement as the last parameter of the java method call.

The input cursors can be accessed using the following:

Rowtime Bounds and UDXs

In writing UDXs for s-Server, you need to be aware of rowtime bounds and how these affect application performance. A rowtime bound is an assertion about the future contents of a stream. It states that the next row in the stream will have a ROWTIME no earlier than the value of the bound.

Rowtime bounds are necessary elements of streaming SQL, because they prevent unnecessary lag time in waiting for a row, especially in situations where there will be known lag between rows. For example, if your streams track stock or other market activity, and the market tracked closes at 5 pm and will not open until 9 am the following day, rowtime bounds tell the system to not expect a new row until 9 am.

It is a fundamental characteristic of SQLstream s-Server that there may be arbitrarily long intervals between client thread calls. For example, a data-producing thread might insert a row of data, then block, waiting for another part of the application to produce more data. A data-consuming thread might block waiting for stream data, thus preventing that thread from doing other useful work. In either case, the thread may fail while executing in application code and never return to the UDX for data or an orderly cleanup.

A UDX written for s-Server must be aware of these system characteristics.

Rowtime Bounds: Forcing Timely Output

A stream is a sequence of timestamped messages (or rows): each row has a rowtime, and the sequence is ordered by rowtime. The current time of a stream is the rowtime of the latest row. When a relational operator executes, rows are passing downstream through that operator. The current time of its output stream(s) cannot be later than the latest of the rowtimes in all of its inputs.

For efficiency, current time is implicit in the data. (The stream is implemented as an “asynchronous” series of messages). But this means the current time of a stream advances only when the next message arrives. This can be a problem for certain operations, which pause waiting for one more input message.

Some examples are merging and rolling windows. Each potentially causes performance problems if you do not implement mechanisms to handle rowtimes gracefully in cases of noticeable real time gaps between input rows. (In many implementations, where messages are continuous, these issues do not come into play, though it is best practice to account for them anyway. To solve the problems of merging, you need a mechanism to advance the current time of a stream explicitly and immediately, without waiting for an additional message.

Merging

Merging occurs when several clients insert into the same named stream or when the UNION ALL operator executes. In either case, the output row is the earliest input row, taken from the input stream with the earliest current time. But to know the current time of all input streams, the system must wait for a row to arrive on each. This can cause the operator to wait and introduce a delay in downstream processing. Note that this happens only when there is a noticeable real time gap between input rows. Otherwise, waiting is negligible or a next row is already buffered up.

Rolling Windows

A window of a stream is a sequential block of messages. Rolling windows (also called sliding windows) mean a partitioning of a stream into a series of windows: for instance, a series that groups together all messages with a rowtime in the same hour (called rolling one hour windows).

An example rolling-window query is to find totals and other statistics for the last hour of a stream of trades. This, too, has an issue with timeliness. When does a window end? Only when the next window begins, when a row arrives that belongs to the next window. Here again the system delays, waiting for a row to arrive.

Setting and Getting Rowtime Bounds

For rolling hourly windows, the client inserting the data should set a rowtime bound (constraint) on the hour, in order to close off an hourly window. The only information this constraint carries is a rowtime. s-Server extends the PreparedStatement interface to handle such constraints.

By setting a rowtime bound with a value of noon, a data source is asserting a constraint on the future of the stream: noon is a lower bound to all forthcoming rows. In other words, the source is announcing that it has nothing more to say until noon.

The rowtime bound applies to the inserted data, not to the rolling averages query that makes use of it, nor to any client reading from that query.

Calculating Memory for Custom UDXs

Depending on whether your UDX maintains memory-intensive structures, you may need to increase the Java heap allocation for s-Server when you implement your custom UDX. If your UDX requires anything more than 128M, you will need to adjust the heap size for the Java implementation of s-Server.

Note: Basic operations will not require significant memory, nor will any of the built-in plugins for s-Server.

If your Java-based-UDX holds columns in memory, you will need to increase the heap allocation by the amount needed to hold these columns:

*Maximum number of rows saved in memory * (total size of row + size of any char rows) the minimum window size*

For example, if you save 100,000 rows in memory, and the total row size is 50 bytes, and the maximum window you maintain is an hour, you will need to increase the heap allocation by 10.7G

32 bytes 100000 rows 3600 = 18000000000 bytes = 10.7 G

To do so:

  1. Edit the file defineAspenRuntime.sh to alter heap allocation. This file is where heap parameter is set.
  2. Edit the line -Xmx1536m. For example, to change the heap parameter to 4G, you would change this number to 4096m.
  3. Restart s-Server by typing !quit into the SQLstream s-Server terminal window and starting the server again.

Using Streaming ResultSet

The Streaming ResultSet interface provides extends the standard ResultSet interface to include information on the next incoming row or rowtime bound. (See http://docs.oracle.com/javase/7/docs/api/java/sql/ResultSet.html) Knowledge of the next incoming row is crucial for streaming SQL because this lets you tell the system to “wait” for the next row. You should CAST ResultSet to Streaming ResultSet in order to use the functions below.

Nested Class Summary

Modifier and Type Interface and Description
static class StreamingResultSet.RowEvent

Method Summary

Modifier and Type Method and Description
Timestamp getRowtimeBound()Returns the latest rowtime bound from the target stream.
StreamingResultSet.RowEvent nextRowOrRowtime()Extension to ResultSet.next that returns when the next row or new rowtime bound arrives. Returns row, rowtime bound, or timeout.
StreamingResultSet.RowEvent nextRowOrRowtime(long timeout)Extension to ResultSet.next that returns when the next row or new rowtime bound arrives.

Method Detail

Method Detail
nextRowOrRowtime StreamingResultSet.RowEvent nextRowOrRowtime() throws SQLExceptionExtension to ResultSet.next that returns when the next row or new rowtime bound arrives.Returns:RowEvent indicating whether row or rowtime bound has arrived. If NewRow is returned, then JDBC fields will be set as in ResultSet.next.Throws:SQLException
getRowtimeBound Timestamp getRowtimeBound() throws SQLExceptionReturns the latest rowtime bound from the target stream. This is a lower bound on the rowtime of the next row to arrive on the stream.Returns:rowtime bound (UTC)Throws:SQLException

StreamingResultSet.RowEvent

Method Summary

Methods

Modifier and Type Method and Description
static StreamingResultSet.RowEvent valueOf(String name)Returns the enum constant of this type with the specified name.
staticStreamingResultSet.RowEvent[] values()Returns an array containing the constants of this enum type, in the order they are declared.

Methods inherited from class java.lang.Enum

clone, compareTo, equals, finalize, getDeclaringClass, hashCode, name, ordinal, toString, valueOf

Methods inherited from class java.lang.Object

getClass, notify, notifyAll, wait, wait, wait

Enum Constant Detail

Constant Detail
EndOfStream public static final StreamingResultSet.RowEvent EndOfStream
NewRow public static final StreamingResultSet.RowEvent NewRow
NewRowtimeBound public static final StreamingResultSet.RowEvent NewRowtimeBound
Timeout public static final StreamingResultSet.RowEvent Timeout

Method Detail

Method Detail
values public static StreamingResultSet.RowEvent[] values()Returns an array containing the constants of this enum type, in the order they are declared. This method may be used to iterate over the constants as follows:for (StreamingResultSet.RowEvent c : StreamingResultSet.RowEvent.values()) System.out.println©;Returns:an array containing the constants of this enum type, in the order they are declared
valueOf valueOfpublic static StreamingResultSet.RowEvent valueOf(String name)Returns the enum constant of this type with the specified name. The string must match exactly an identifier used to declare an enum constant in this type. (Extraneous whitespace characters are not permitted.)Parameters:name - the name of the enum constant to be returned.Returns:the enum constant with the specified nameThrows:IllegalArgumentException - if this enum type has no constant with the specified nameNullPointerException - if the argument is null