Extensions to the SQL standard

SQLstream lets analysts use Standard SQL to ingest and transform fast-moving data. Via Standard SQL, analysts can visualize that data, discover its time-critical insights, and continuously summarize results. A small number of extensions to the SQL Standard make this possible:

  • Streams - A stream is an infinite sequence of rows, ordered by a leading timestamp column called ROWTIME. From one row to the next, the ROWTIME never decreases. Streams behave just like tables in queries. Streams come in two flavors: native (internal queues) and foreign (attachment points to external data sources and sinks).
  • Infinite queries It’s easy to spot the infinite queries which produce infinite results. They always start with the phrase “SELECT STREAM”. When an infinite query joins two streams, WINDOW clauses in the FROM list declare how the moving frames of rows correspond and zipper together.
  • Pumps A schema object called a pump lets users start, pause, resume, and stop infinite DML operations (INSERT/UPDATE/DELETE/MERGE).

Armed with these three simple extensions, analysts can run rich, expressive, Standard SQL on infinite data.

This topic lists and explains the extensions to the SQL standard that are required for SQLstream s-Server.

These extensions occur in the following categories:

Streams

In practice, streams function like RDBMS tables, but they are not persistent stores of data. Instead, a stream represents a flow of data. Once a stream is defined, data flows through it indefinitely, provided its data source keeps producing data. A data source might be a continually-updated log file, a network feed, a Kafka topic, an Amazon Kinesis stream, or an external database table.

Multiple writers can insert into the stream, and multiple readers can select from the stream. In this sense, the stream implements a publish/subscribe messaging paradigm, in which the data row, including time of creation and time of receipt, can be processed, interpreted, and forwarded by a cascade of streaming SQL statements, without having to be stored in a traditional RDBMS.

As with a traditional RDBMS, you can:

  • Define (streaming) SQL queries against streams.
  • Join streams to other streams and tables.
  • Aggregate queries over a time range or a series of rows.
  • Register specific SQL statements as (stream) views.

Streams do not store data persistently. However, streaming queries that aggregate or correlate data over windows - either time ranges or row bounds - retain the necessary sliding window of rows to be able to constitute results. For example, if a query generates a one hour weighted average, the SQLstream s-Server retains one hour’s worth of rows.

The STREAM keyword

In order to specify a query as a query on a stream–or any query that includes a stream–you must use a keyword called STREAM immediately after SELECT. For example:

SELECT STREAM ticker, price, qty
FROMTradeStream;

Windowed Queries

Because of the time-based nature of streams, when querying these you will generally want to set up what we call a windowed query.

A windowed query requests a set of results over a given amount of time or rows, as in the following example:

SELECT STREAM
ticker,
AVG(price * volume) OVER (RANGE INTERVAL ' 1' HOUR PRECEDING)
/ SUM(volume) OVER (RANGE INTERVAL ' 1' HOUR PRECEDING) AS weighted_avg
FROM TradeStream

In a windowed join, the OVER clause may be applied directly to the stream:

SELECT STREAM
t.price as trade_price,
s.price as ask_price
FROM TradeStream OVER (RANGE CURRENT ROW) AS t
JOIN AskStream OVER (ROWS CURRENT ROW) as s;

A more complex example shows a GROUP BY query using a tumbling window (a following hour). This particular example shows all the necessary uses of quote marks for lowercase identifiers:

SELECT STREAM "o"."orderid", sum("t"."amount")
FROM "OrderStream" OVER (RANGE CURRENT ROW) AS "o"
LEFT JOIN "TradeStream"
OVER (RANGE INTERVAL '1' HOUR FOLLOWING) AS "t"
ON "o"."orderid" = "t"."tradeid"
GROUP BY FLOOR("OrderStream".ROWTIME TO HOUR),"o"."orderid"
HAVING "o"."amount" <> SUM("t"."amount");

The code above focuses on those trades that execute within the hour following the order being placed. Each trade comes from some single order, which may have contained multiple trades. For each order that has one or more trades executed in that following hour, this code compares the total amount for those executed trades against the original order amount. When that sum differs from the original order amount, the order id and the sum of its executed trades become part of the output stream. Each row in the output stream represents an order that was not completely filled during the hour following its placement. Each row gives the id of the incompletely-filled order and the total amount from its trades that did

WINDOWs and the OVER clause

The OVER clause specifies a time period or a number of rows to be included in the query

SELECT STREAM
ticker,
AVG(price * volume) OVER (RANGE INTERVAL ' 1' HOUR PRECEDING)
/ SUM(volume) OVER (RANGE INTERVAL ' 1' HOUR PRECEDING) AS weighted_avg
FROM TradeStream

The WINDOW clause, which provides a way of defining an OVER clause once and using it many times in relation to several streams in the query. It can also be used simply for clarity, as in this example which prints a stream of ticker symbols as their 10-minute moving average crosses their 50-minute moving average:

SELECT STREAM
ticker, price, a10, a50
FROM (
SELECT STREAM
ticker,
price,
avg(price) OVER last10Minutes AS a10,
avg(price) OVER last50Minutes AS a50
FROM Trades)
WHERE first_value(
CASE
WHEN a10 < a50 THEN -1
WHEN a10 = a50 THEN 0
ELSE 1
END)) OVER previousRow !=
CASE
WHEN a10 < a50 THEN -1
WHEN a10 = a50 THEN 0
ELSE 1
END
WINDOW
last10minutes AS (
PARTITION BY ticker
RANGE INTERVAL '10' MINUTE PRECEDING),
last50minutes AS (
PARTITION BY ticker
RANGE INTERVAL '50' MINUTE PRECEDING),
previousRow AS (
PARTITION BY ticker
ROWS 1 PRECEDING)

Handling CURRENT_ROW

In an ordered window, two rows are said to be peers if they have the same sort key under the ordering. The SQL Standard states that the frame should include all peers of the row under consideration.

By default, SQLstream window processing behaves slightly differently:

In an ordered window, the CURRENT ROW upper frame bound includes only rows up until the row currently being considered. The frame contains peers of that row which arrive earlier than the row itself. However, the frame does not contain peers that arrive later.

If you want behavior that works according to the SQL standard (that is, you want all peers to be included in the frame), then you should declare your window as follows:

range between interval '$m' $timeUnit preceding and interval '0' $timeUnit preceding

where

$m is a number

$timeUnit is one of the interval time units: year, month day, hour, second

For example:

range between interval '1' second preceding and interval '0' second preceding

The ROWTIME Column and Alias

Rows within a stream are monotonically ordered by a system-generated column called ROWTIME.

Every row implicitly includes a ROWTIME column of type TIMESTAMP. This column is analogous to the ROWID pseudocolumn in an Oracle database.

On insert, if no value is given for ROWTIME, it is based on the system clock time at the SQLstream s-Server as each row arrives.

Every streaming query is itself a stream - and has an associated ROWTIME. By default it is based on system clock time at the time the row is emitted, but it can be overridden by selecting a timestamp value AS ROWTIME.

Note: The transactional statements COMMIT, ROLLBACK and SAVEPOINT are not supported by SQLstream s-Server.

Rowtime Bounds

Rowtime Bounds are events that flow through a stream in addition to the data in the stream. They help drive the output from streaming queries.

For example, in the 1 hour weighted average above, each incoming row is combined with all preceding rows over the last hour to calculate the average. As time goes by, rows more than an hour old that are no longer needed can be released from the cache.

If the rowtime is being supplied by the source system, we cannot use the system clock to decide when the hour has passed. We can use the rowtime associated with each incoming row:

Row# Rowtime Price Volume Price*Volume Weighted Average Notes
1 9:00 100 1000 100,000 100 since start of stream at 9:00
2 9:10 115 2000 230,000 110
3 9:59 105 500 52,500 109.3
4 10:00 110 500 55,000 109.4 a full hour’s data now available

Now, at 10:15 we could emit a second average based on rows 2,3,4; however no new row has arrived so there is no event to trigger the emission of a new result.

By sending a rowtime bound, the source application can notify the stream that the stream’s ROWTIME clock can be put forward to a new time. This is achieved using the JDBC extension method pstmt.setRowtimeBound() provided as part of the StreamingPreparedStatement interface.

DML differences

Update and Delete do not apply to Streams

The user cannot directly update or delete rows in a stream in the same way that rows in a table can be deleted.

Once data has been inserted into a stream, it remains there (only) until it has been consumed by all dependent queries - that is, for the period necessary to satisfy the query with the longest timeliness constraint. The row is then discarded.

INSERT EXPEDITED

Applications can insert into a stream using the SQLstream JDBC driver in exactly the same way as inserting into a relational table.

The most efficient way of inserting is to use the EXPEDITED option for the INSERT statement:

java.sql.PreparedStatement pstmt = connection.prepareStatement(
"INSERT EXPEDITED INTO Logger(timestamp, level, message)" + VALUES (?, ?, ?)");

A regular insert uses RMI for the each of query preparation, setting bind variables and statement execution. INSERT EXPEDITED uses RMI to prepare the statement, and then switches to the faster SDP protocol to transmit the bind variable values.

DDL extensions

SQLstream defines some object types to support event stream processing. These are created and managed using the standard CREATE, ALTER, DROP commands

STREAM

CREATE STREAM streamname (column1,… columnn);

FOREIGN STREAM

CREATE FOREIGN STREAM streamname (column1,… columnn);

DROP FOREIGN STREAM streamname;

PUMP

A Pump is a first-class object which provides INSERT INTO stream2 SELECT… FROM stream1 functionality. It is used to switch from “pulling” data from a source to “pushing” data to a passive target application.

If a target actively SELECTs from a stream, a pump may be unnecessary; however many targets - for example those fed via foreign streams) passively wait for data to be pushed at them.

DDL statements are:

Extensions to JDBC API

The standard JDBC API is described at:

  • Sun’s JDBC Guide.
  • Sun’s java.sql package and javax.sql package javadocs.
  • Sun’s JDBC spec.

The SQLstream driver extends the standard JDBC API for to allow rowtime bounds to be sent and received, millisecond-granularity timeouts, and access to server-side statement IDs for correlating with error stream entries.

package com.sqlstream.jdbc;

class Driver

class Driver implements java.sql.Driver { Driver(); }
### interface StreamingStatement
interface StreamingStatement extends java.sql.Statement {
long getStatementId();
long getQueryTimeoutMillis();
void setQueryTimeoutMillis(long);
long getRowtimeBound();
}

interface StreamingPreparedStatement

Every Statement and PreparedStatement created by the SQLstream JDBC driver implements the StreamingStatement interface.

interface StreamingPreparedStatement extends java.sql.PreparedStatement, StreamingStatement {
void setRowtimeBound(Timestamp bound);
}

Every PreparedStatement created by the SQLstream JDBC driver implements the StreamingPreparedStatement interface. But the setRowtimeBound method is only meaningful for INSERT EXPEDITED prepared statements. A SQLException is thrown if setRowtimeBound is called on other prepared queries.

Example usages:

long nextRowDelay = 5000L;
pstmt.setRowtimeBound(new Timestamp(System.currentTimeMillis() + nextRowDelay));

or

long nextRowDelay = 5000L;
java.sql.Timestamp ts = <some retrieved value>
pstmt.setRowtimeBound(new Timestamp(ts.getTime() + nextRowDelay));

class TimeoutException

class TimeoutException extends SQLException {
TimeoutException();
}