In analyzing streaming data, you will often make use of windowed aggregation. Windowed aggregation performs an analytic function across a window specified by either time, such as “one hour proceeding” or rows, such as “the last six rows”.
A time-based window is a defined by a rowtime interval. The window’s defining criteria specify a limited set of rows, using a rowtime-based specification. At any arbitrary wall-clock-time, the number of rows to be found in that window can vary, based on the number of rows that have arrived whose rowtimes falls within the window’s defined period. (The SQL standard calls this a “logical” window, and calls a row-based Window a “physical” window.)
For example, RANGE INTERVAL '1' HOUR PRECEDING
specifies that the window contains all rows whose ROWTIMEs are within the hour preceding the stream’s current time, including endpoints. (That time is usually the rowtime of the most recent row received.)
For time-based windows on streams, analytic functions are normally only able to identify the complete set of rows (and hence calculate a result) once a row arrives that falls outside the later bound of the current window. For example, in the case of a window that is RANGE INTERVAL ‘1’ MINUTE FOLLOWING, the arrival of a row with a rowtime more than one minute later than that of the row for which the window is being evaluated would indicate that there will be no more rows in the window, and the query can return a result.
Alternately, you can use a rowtime bound can be used to indicate that no more rows will arrive within a given window, enabling the query to return a result.
A row-based window is specified by a fixed number of rows. For example, “ROWS 10 PRECEDING” specifies that only the latest 10 rows be included in the window. (The SQL standard calls this a “physical” window, and calls a Time-based Window a “logical” window.)
For row-based windows on streams, such as “ROWS 3 PRECEDING”, rowtime bounds have no effect on windows (and hence, on the evaluation of analytic functions). See below for examples of rowtime bounds in windowed aggregation.
Assume the following information flowing through the stream WEATHERSTREAM:
ROWTIME | CITY | TEMP |
---|---|---|
2018-11-01 01:00:00.0 | Denver | 29 |
2018-11-01 01:00:00.0 | Anchorage | 2 |
2018-11-01 06:00:00.0 | Miami | 65 |
2018-11-01 07:00:00.0 | Denver | 32 |
2018-11-01 09:00:00.0 | Anchorage | 9 |
2018-11-01 13:00:00.0 | Denver | 50 |
2018-11-01 17:00:00.0 | Anchorage | 10 |
2018-11-01 18:00:00.0 | Miami | 71 |
2018-11-01 19:00:00.0 | Denver | 43 |
2018-11-02 01:00:00.0 | Anchorage | 4 |
2018-11-02 01:00:00.0 | Denver | 39 |
2018-11-02 07:00:00.0 | Denver | 46 |
2018-11-02 09:00:00.0 | Anchorage | 3 |
2018-11-02 13:00:00.0 | Denver | 56 |
2018-11-02 17:00:00.0 | Anchorage | 2 |
2018-11-02 19:00:00.0 | Denver | 50 |
2018-11-03 01:00:00.0 | Denver | 36 |
2018-11-03 01:00:00.0 | Anchorage | 1 |
Let’s say we want to find the minimum and maximum temperature recorded in the 24-hour period prior to any given reading, globally, regardless of city. To do this, we define a window of RANGE INTERVAL ‘1’ DAY PRECEDING, and use it in the OVER clause for the MIN and MAX analytic functions:
SELECT STREAM
ROWTIME,
MIN(TEMP) OVER W1 AS WMIN_TEMP,
MAX(TEMP) OVER W1 AS WMAX_TEMP
FROM WEATHERSTREAM
WINDOW W1 AS (
RANGE INTERVAL '1' DAY PRECEDING
);
Results:
ROWTIME | WMIN_TEMP | WMAX_TEMP |
---|---|---|
2018-11-01 01:00:00.0 | 29 | 29 |
2018-11-01 01:00:00.0 | 2 | 29 |
2018-11-01 06:00:00.0 | 2 | 65 |
2018-11-01 07:00:00.0 | 2 | 65 |
2018-11-01 09:00:00.0 | 2 | 65 |
2018-11-01 13:00:00.0 | 2 | 65 |
2018-11-01 17:00:00.0 | 2 | 65 |
2018-11-01 18:00:00.0 | 2 | 71 |
2018-11-01 19:00:00.0 | 2 | 71 |
2018-11-02 01:00:00.0 | 2 | 71 |
2018-11-02 01:00:00.0 | 2 | 71 |
2018-11-02 07:00:00.0 | 4 | 71 |
2018-11-02 09:00:00.0 | 3 | 71 |
2018-11-02 13:00:00.0 | 3 | 71 |
2018-11-02 17:00:00.0 | 2 | 71 |
2018-11-02 19:00:00.0 | 2 | 56 |
2018-11-03 01:00:00.0 | 2 | 56 |
2018-11-03 01:00:00.0 | 1 | 56 |
Now,, let’s assume we want to find the minimum, maximum, and average temperature recorded in the 24 hour period prior to any given reading, broken down by city. To do this, we add a PARTITION BY clause on CITY to the window specification, and add the AVG analytic function over the same window to the selection list:
SELECT STREAM
ROWTIME,
CITY,
MIN(TEMP) over W1 AS WMIN_TEMP,
MAX(TEMP) over W1 AS WMAX_TEMP,
AVG(TEMP) over W1 AS WAVG_TEMP
FROM AGGTEST.WEATHERSTREAM
WINDOW W1 AS (
PARTITION BY CITY
RANGE INTERVAL '1' DAY PRECEDING
);
Results:
ROWTIME | CITY | WMIN_TEMP | WMAX_TEMP | WAVG_TEMP |
---|---|---|---|---|
2018-11-01 01:00:00.0 | Denver | 29 | 29 | 29 |
2018-11-01 01:00:00.0 | Anchorage | 2 | 2 | 2 |
2018-11-01 06:00:00.0 | Miami | 65 | 65 | 65 |
2018-11-01 07:00:00.0 | Denver | 29 | 32 | 30 |
2018-11-01 09:00:00.0 | Anchorage | 2 | 9 | 5 |
2018-11-01 13:00:00.0 | Denver | 29 | 50 | 37 |
2018-11-01 17:00:00.0 | Anchorage | 2 | 10 | 7 |
2018-11-01 18:00:00.0 | Miami | 65 | 71 | 68 |
2018-11-01 19:00:00.0 | Denver | 29 | 50 | 38 |
2018-11-02 01:00:00.0 | Anchorage | 2 | 10 | 6 |
2018-11-02 01:00:00.0 | Denver | 29 | 50 | 38 |
2018-11-02 07:00:00.0 | Denver | 32 | 50 | 42 |
2018-11-02 09:00:00.0 | Anchorage | 3 | 10 | 6 |
2018-11-02 13:00:00.0 | Denver | 39 | 56 | 46 |
2018-11-02 17:00:00.0 | Anchorage | 2 | 10 | 4 |
2018-11-02 19:00:00.0 | Denver | 39 | 56 | 46 |
2018-11-03 01:00:00.0 | Denver | 36 | 56 | 45 |
2018-11-03 01:00:00.0 | Anchorage | 1 | 4 | 2 |
This is an example of a windowed aggregate query:
SELECT STREAM ROWTIME, ticker, amount, SUM(amount)
OVER (
PARTITION BY ticker
RANGE INTERVAL '1' HOUR PRECEDING)
AS hourlyVolume
FROM Trades
Because this is a query on a stream, rows pop out of this query as soon as they go in. For example, given the inputs:
Trades: IBM 10 10 10:00:00
Trades: ORCL 20 10:10:00
Trades.bound: 10:15:00
Trades: ORCL 15 10:25:00
Trades: IBM 30 11:05:00
Trades.bound: 11:10:00
the output will be:
Trades: IBM 10 10 10:00:00
Trades: ORCL 20 20 10:10:00
Trades.bound: 10:15:00
Trades: ORCL 15 35 10:25:00
Trades: IBM 30 30 11:05:00
Trades.bound: 11:10:00
The rows still hang around behind the scenes for an hour, and thus the second ORCL row output has a total of 35; but the original IBM trade falls outside the “hour preceding” window, and so is excluded from the IBM sum.
Some business problems seem to need totals over the whole history of a stream, but this is usually not practical to compute. However, such business problems are often solvable by looking at the last day, the last hour, or the last N records. Sets of such records are called windowed aggregates. They are easy to compute in a stream database, and can be expressed in standard SQL as follows:
SELECT STREAM ticker,
avg(price OVER lastHour AS avgPrice,
max(price) OVER lastHour AS maxPrice
FROM Bids
WINDOW lastHour AS (
PARTITION BY ticker
RANGE INTERVAL '1' HOUR PRECEDING)
Note: The Interval_clause must be of an appropriate type: