Transforming Data

Overview

The main way to transform data in s-Server is through SQL. See the SQL Reference guide for more detail. For more sophisticated transformations, you can define custom User Defined Transformations and User Defined Functions. This section discusses the creation of these.

UDXes operate in the middle of processing a stream. They use the JDBC API to exchange data with SQLstream s-Server while stream processing is underway. You call a UDX with one or more streaming arguments, and the UDX returns a streaming result. To use that resulting stream, downstream operations or listeners must SELECT from it. (Code sections appear in the topic Writing an ECD Plugin.

For a full UDX code example, see the topic Table Lookup UDX in this guide.)

UDXes are similar to SQL functions (normal UDFs. The difference is that UDXes return streaming results, whereas UDFs return scalar results.

See also the UDF/UDX descriptions, discussions, and examples in the topic SQLstream Software Development Kit (SDK) in this guide.

User-defined Routines and Prepared Statement

When a query using one or more user-defined routines (UDFs, UDXes, etc.) will be executed many times, it should be made inside a PreparedStatement. This advice applies as well to any metadata query and to non-expedited Inserts. Multiple executions without using a PreparedStatement raises the possibility of leaking a class in PermGen space. Using a PreparedStatement for such a query thus minimizes the need to enlarge PermGen space, which is set by default at 64MB but can be changed using the

-xx MaxPermSize=<size>

flag on the start-server command, where can be 64m, 128m, 256m, etc. The “m” is necessary. See also the topic JDBC Driver in this guide for discussions and links pertaining to SQLstream’s StreamingPreparedStatement and StreamingStatement extensions to the standard PreparedStatement interface.

Control Streams

The TableLookup UDX implements stream-table join by looking up entries in an external table that match one or more key columns in the input stream. TableLookup supports a control stream with commands enabling you to suspend the stream-table join, flush the cache, and then resume the stream-table join. After resuming, the cache is repopulated with fresh values from the lookup table, some of which may have changed from the earlier state of the cache.

Blending SQL and Custom Operators

Once you create UDFs and UDXes as functions, you can use these to create custom windowed aggregation functions that aggregate values on a sliding window of rows over time.

Aggregating Values on Sliding Windows with UDFs

You can use a JAVA implementation of a “stateful” User Defined Function (UDF) to create custom windowed aggregation functions that aggregate values on a “sliding” window of rows over time.

A User Defined Transform (UDX) written in JAVA is a more powerful alternative. A UDX is essentially a table function implemented in JAVA. See Aggregating Values on Sliding Windows with UDXes below.

The following example uses standard s-Server Windowed Aggregation syntax. See the topic Window Clause of the SELECT Statement in the SQLstream Streaming SQL Reference Guide* for more details.

SELECT STREAM "CustomAgg"("aggCol") OVER (PARTITION BY "partCol" RANGE INTERVAL '1' MINUTE PRECEDING), *
FROM "aStream";

Here, “CustomAgg” is a custom aggregation function that a you implement in JAVA. The following describes how to implement such a function and use it to implement custom aggregation.

First, you declare (create) the UDF that can be used to perform Windowed Aggregation. See the topic CREATE FUNCTION.

CREATE FUNCTION in the SQLstream Streaming SQL Reference Guide* for more details.
CREATE OR REPLACE FUNCTION "WindowedAggUDF"(
   "aggCol" DOUBLE, "partitionCol" VARCHAR(128),
   "rowtimeCol" TIMESTAMP, "windowSize" INTEGER)
   RETURNS DOUBLE
   LANGUAGE JAVA
   PARAMETER STYLE SYSTEM DEFINED JAVA
   NO SQL
   EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDF';

You need to create the SimpleUDF before executing this statement. See the topic CREATE JAR in the SQLstream Streaming SQL Reference Guide for more details.* The sections below provide sample Java code that you can use to implement this JAR.

EXTERNAL NAME is used to define where s-Server should look for the Java class implementing the function. Java-defined external functions must be located either in a class file on SQLstream s-Server’s Java classpath, or in an external Java archive (JAR) file loaded into the system using CREATE JAR. In the latter case, the qualified jar name is the name given to the jar as part of CREATE FUNCTION. If the jar name was not defined in the current schema, then the fully qualified . format must be used.

At this point, you can implement the semantics expressed in the windowed aggregation syntax above by invoking the sample UDF with the UDF definition you created earlier.

SELECT STREAM "WindowedAggUDF"("aggCol", "partCol", s.ROWTIME, 60000), *
FROM "aStream" AS s;

The JAVA code below defines a function called simpleUDF. The code shows the basic structure of how windowed aggregation semantics can be implemented using a function called “context” to maintain “windows” of rows and aggregated results that are partitioned by partition column.

public static double simpleUDF(
   double aggCol, String partCol, long rowtimeCol, int windowSize)
{
   ...
   Object functionContext = PluginUtil.get().getContext();
   if (functionContext == null) {
       functionContext = getNewContext();
       PluginUtil.get().setContext(functionContext);
   }
   ...
   WindowPartition p = functionContext.getPartition(partCol);
   if (p == null) {
       functionContext.addPartition(newPartition(partCol));
       functionConext.addAggBucket(newAggBucket(partCol));
   }
   AggBucket agg = functionContext.getAggBucket(partCol);
   Row newRow = new Row(aggCol, rowtimeCol);
   p.addRow(newRow);
   while (p.oldestRow().needsPurge(rowtimeCol, windowSize)) {
       Row oldestRow = p.purgeOldestRow();
       agg.updateBucket(DROP_ROW, oldestRow);
       -- if partition has 0 rows in it then delete the partition
       -- as well as corresponding aggBucket from context.
   }
   agg.updateBucket(ADD_ROW, newRow);
   ...
}

Aggregating Values on Sliding Windows with UDXes

You can use a JAVA User Defined Transform (UDX)  to create custom windowed aggregation functions that aggregate values on a “sliding” window of rows over time. To do so, you will need to follow the steps to create and implement a UDX.

SQL Standard Windowed Aggregation Syntax

SELECT STREAM "CustomAgg"("aggCol") OVER (PARTITION BY "partCol" RANGE INTERVAL '1' MINUTE PRECEDING), *
FROM "aStream";

In the SQL example above, “CustomAgg” is a custom aggregation function that you implement in JAVA. Subsequent paragraphs describe how you can use a JAVA UDX to implement such custom aggregation.

First, you declare (create) the UDX that can be used to perform Windowed Aggregation. See the topic CREATE FUNCTION in the SQLstream Streaming SQL Reference Guide for more details.

CREATE OR REPLACE FUNCTION "WindowedAggUDX"(
   "inputRows" CURSOR
   "aggColName" VARCHAR(64), "partitionColName" VARCHAR(64),
   "rowtimeColName" VARCHAR(64), "windowSize" INTEGER)
   RETURNS TABLE (
       "inputRows".*, "aggResult" DOUBLE
   )
   LANGUAGE JAVA
   PARAMETER STYLE SYSTEM DEFINED JAVA
   NO SQL
   EXTERNAL NAME '"SampleJar":com.sqlstream.sample.functions.Sample.SimpleUDX';

You need to create the SimpleUDF before executing this statement. See the topic CREATE JAR in the SQLstream Streaming SQL Reference Guidefor more details. The sections below provide sample Java code that you can use to implement this JAR.

EXTERNAL NAME is used to define where s-Server should look for the Java class implementing the function. Java-defined external functions must be located either in a class file on SQLstream s-Server’s Java classpath, or in an external Java archive (JAR) file loaded into the system using CREATE JAR. In the latter case, the qualified jar name is the name given to the jar as part of CREATE FUNCTION. If the jar name was not defined in the current schema, then the fully qualified . format must be used.

At this point, you can implement the semantics expressed in the windowed aggregation syntax above by invoking the sample UDX as described below.

SELECT STREAM STREAM("WindowedAggUDX"(
   CURSOR(SELECT STREAM * FROM "aStream"),
   ‘aggCol’, ‘partCol’, ‘ROWTIME’, 60000));

The Java code below shows the basic structure of how windowed aggregation semantics can be implemented using the  function “context” to maintain “windows” of rows and aggregated results that are partitioned by partition column.

import com.sqlstream.plugin.impl.AbstractBaseUdx;
import java.sql.*;
import com.sqlstream.jdbc.*;

public class Sample extends AbstractBaseUdx {
      FunctionContext context;

private Sample(
   java.sql.ResultSet inputRows,
   java.sql.PreparedStatement resultInserter)
   throws SQLException
{
   super(newTracer(), inputRows, resultInserter);
   context = newFunctionContext();
}

public static double simpleUDX(
   java.sql.ResultSet inputRows,
   String aggColName, String partColName, String rowtimeColName,
   int windowSize, java.sql.PreparedStatement resultInserter)
   throws SQLException
{
   Sample instance = new Sample(inputRows, resultInserter);
   
   -- Following determines columnIndexes in inputRows for columns
   -- needed to compute aggregate
   context.updateContext(
       aggColName, partColName, rowtimeColName, windowSize);

   instance.run();
}

public void run() throws SQLException {
   while (inputRows.next()) {
       ...
       -- API to fast copy pass-through columns from input
       -- to output. Defined in AbstractBaseUdx
       transferColumns(inputRows, resultInserter, passList);
       ...

       WindowPartition p = functionContext.getPartition(partCol);
       if (p == null) {
           functionContext.addPartition(newPartition(partCol));
           functionConext.addAggBucket(newAggBucket(partCol));
       }
       AggBucket agg = functionContext.getAggBucket(partCol);
       Row newRow = new Row(aggCol, rowtimeCol);
       p.addRow(newRow);
       while (p.oldestRow().needsPurge(rowtimeCol, windowSize)) {
           Row oldestRow = p.purgeOldestRow();
           agg.updateBucket(DROP_ROW, oldestRow);
           -- if partition has 0 rows in it then delete the partition
           -- as well as corresponding aggBucket from context.
       }
       agg.updateBucket(ADD_ROW, newRow);
       ...
       -- set the parameter for aggResult column defined in
       -- RETURNS clause of the UDX
       resultInserter.setDouble(aggResultIdx, agg.getResult());

       -- insert the result row to the output table of the UDX.
       resultInserter.executeUpdate();
   }
}
}