Advanced Stream Resequencing with SQLstream

Promoting Event Time as ROWTIME

Guavus SQLstream provides the full power of SQL to process and analyze time-series event data. One of the powerful mechanisms supported is ROWTIME promotion. The idea here is to allow for any number of embedded or external timestamps within a record (tuple) and allow for explicit promotion of such fields to become the current ROWTIME. This allows for arbitrary reordering of records by time. The user must take care that certain WINDOW operations in SQL require monotonically increasing timestamp values, and any time-traveling backwards (or even unusual fast time-traveling forwards) may result in records being ejected into the ERRORSTREAM (where they can be reprocessed with custom exception stream analytic logic).

Reordering Events by a Timestamp Within a Time Window

Guavus SQLstream provides an efficient declaration stream operator (TSORT) to continually reorder records within a given time window, so that analytics can be performed even when the data arrive out of time order.

Example:

SELECT STREAM event_time AS ROWTIME,
FROM (SELECT STREAM
, MAX(event_time) OVER forEver AS max_event_time
FROM event_stream
WINDOW forEver AS (RANGE UNBOUNDED PRECEDING)

WHERE (max_event_time - event_time) < INTERVAL '1' MINUTE
ORDER BY event_time WITHIN INTERVAL '1' MINUTE;

In the example above, a t-sort operation is performed to sort the partially ordered event_stream by the event_time column to produce the resulting stream that is completely sorted by event_time. The subselect in the example query only accepts “t-sortable” events from event_stream.

For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause

A separate pipeline can process “late” rows that can be identified with a simple query as follows:

SELECT STREAM event_time AS ROWTIME,
FROM (SELECT STREAM
, MAX(event_time) OVER forEver AS max_event_time
FROM event_stream
WINDOW forEver AS (RANGE UNBOUNDED PRECEDING)
)
WHERE (max_event_time - event_time) > INTERVAL '1' MINUTE;

Because the WITHIN interval in the t-sort is inclusive, the late row query looks for rows where the lag is strictly greater than the WITHIN interval (so using > not >=).

Guavus SQLstream also provides a very efficient ORDER BY clause used within windows to time order the results that are emitted by any given analytic pipeline over a specified window. Windows can be TIME or NUMBER OF ROWS based.

Reordering Events by Arbitrary Keys

Guavus SQLstream provides the full power of SQL in allowing the exensions of the concepts described above for TIME to be applied to any key (combination of field values). If you wish to use time ordering then you need to convert the key to a timestamp-like value and promote it to be the new timestamp. Otherwise you can use the ORDER BY and the results and processing performed within the window will be continually sorted by the key values encountered.

Example:


SELECT STREAM *
FROM event_stream AS e
ORDER BY FLOOR(e.ROWTIME TO MINUTE), state, city;

SELECT STREAM *
FROM event_stream AS e
ORDER BY FLOOR(e.ROWTIME TO MINUTE), city_sales, state, city;

Taking Action on Missing or Delayed Events

Guavus SQLstream even provides mechanisms for taking action on missing or delayed records or events (other than reconstituting/imputing the missing values and records as described below). The mechanism that allows for this is called ROWTIME BOUNDS (or sometime also known a time punctuation).

The way this works within Guavus SQLstream is that streams can comprise not only data records (each of which have a given ROWTIME defaulting to the time at which the server originally received the record), but also can comprise of timestamps without data records! Such punctuation records (ROWTIME BOUNDS) can be viewed as a kind of htbeat or clock pulse. They allow computation to proceed even when records are missing or delayed.

Adapters and UDXs can emit such records, often at regular time intervals, so that SQL OUTER JOINS can be used so that the presence of NULL values can be used to infer that a punctuation record arrived without any corresponding data values. This allow arbitrary action to be taking in real time when a record is missing or delayed. A common use case is to detect transmission delays, SLA violations or services, servers and devices that have failed and are not transmitting, and then to take the appropriate action.

Snapping Data to an Emerging Model or Surface (Kalman Filters)

Guavus SQLstream provides a UDX implementing Kalman filters. The UDX which infers a surface or model (N-dimensional numeric data) from a stream of records with numeric fields (n-dimensional) examining their values and “snaps” them to a model or surface it constructs. This is great for cleaning up fuzzy/imprecise data, such as location or sensor data, and removing the “noise”. You can view this as a continuous real-time filter to amplify and extract the “signal” from the “noise”. It can be used to handle missing events and correct faulty events.

See Using the Kalman Filter UDX in the Integrating with Other Systems guide for more details.

Identifying Missing Records from Streams

SQLstream supports self RIGHT OUTER JOIN on a stream. The following example shows how to identify missing records in a stream.

SELECT STREAM lastKey AS partitionKey, col1, col2
FROM s OVER (ORDER BY FLOOR(s.ROWTIME TO MINUTE)
RANGE INTERVAL '0' MINUTE PRECEDING) AS sCurrent
RIGHT OUTER JOIN -- against the distinct keys over the last min
-- missing keys are assigned NULL values
( SELECT STREAM DISTINCT
STEP(s.ROWTIME BY INTERVAL '1' MINUTE),
s.partitionKey AS lastKey
FROM s )
OVER (RANGE INTERVAL '0' MINUTE PRECEDING) AS sHistory
ON (sCurrent.partitionKey = sHistory.lastKey)
WHERE col1 IS NULL AND col2 IS NULL;

Reconstituting/Imputing Missing Records from Streams

Using the Interpolation UDX to Reconstitute or Impute Missing Records

Guavus SQLstream provides an Interpolation UDX to detect missing records in a stream comprising data records that occur at regular time intervals. The UDX will infer the time interval given the timestamps of the data transmitted or can be told to ensure that every given time interval a record is emitted.

For example, a sensor might put out reading every 10s. We see readings for 9.9s interval, 10.1s interval, 10.0 interval, 10.1 interval, 19.9 interval. We detect that we have respectively readings for t1, t2, t3, t4, t6 given the intervals observed. The UDX will then detect the missing event/record for t5 and perform interpolation to create and emit the missing record immediately before emitting the record for t6. So the UDX is per every record input it will output one or more records. The interpolation performed can be specified based on the number of values you wish to be used for “curve fitting” which you specify as a positive integral parameter. Two values implies linear interpolation. Three implies quadratic and so on.

We emit a record for each missing record. It is very unlikely that there will be more than three missing records if it is due to, say, UDP/IP data transmission losses, which is the most common use case we encounter for such transmission losses. UDP, unlike TCP, is a “fire and forget” protocol, and is low-cost and real-time, but does not provide reliable delivery. It relies on the data transmission layer to deliver without accounting for each delivery of a packet. This is popular for sensors emitting metrics whose delivery is not mission critical or life-threatening.

See Using the Interpolation UDX in the Integrating with Other Systems.

Using SQL to Reconstitute or Impute Missing Records

Finally SQL is such a powerful analytic language that advance SQL constructs can be used to provide arbitrary algorithms and windows for imputing missing values. Below we show a generic, powerful SQL template where you can insert any arbitrary SQL UDA (we show AVG below) can be applied to the values received in an arbitrary pipeline. The technique exploits SQL’s ability to join streams with themselves, and to use Right Outer Joins specifically that can be used to detect missing values for given keys (shown as “partitionKey”s below).

A self join on a stream helps impute missing values in a stream. The example below illustrates how a self stream-stream right outer join generates a missing report for each patitionKey every minute. These missing values for columns are extrapolated as simply the average of reports for the past 10 minutes.

CREATE OR REPLACE VIEW JoinView AS
SELECT STREAM
lastKey AS partitionKey, col1, col2, lastCol1, lastCol2
FROM
s OVER (ORDER BY FLOOR(s.ROWTIME TO MINUTE)
RANGE INTERVAL '0' MINUTE PRECEDING) AS sCurrent
RIGHT OUTER JOIN -- impute a missing report for partitionKey
(SELECT STREAM
s.partitionKey AS lastKey,
LAST_VALUE(col1) AS lastCol1,
LAST_VALUE(col2) AS lastCol2,
FROM s
GROUP BY FLOOR(s.ROWTIME TO MINUTE), partitionKey
) OVER (RANGE INTERVAL '10' MINUTE PRECEDING) AS sHistory
ON (sCurrent.partitionKey = sHistory.lastKey);

SELECT STREAM
partitionKey,

CASE WHEN AVG(col1) IS NULL
THEN AVG(lastCol1)
ELSE AVG(col1) END AS col1,

CASE WHEN AVG(col2) IS NULL
THEN AVG(lastCol2)
ELSE AVG(col2) END AS col2

FROM JoinView AS j
GROUP BY j.ROWTIME, partitionKey;