In a conventional relational database application, queries perform aggregations and joins over finite sets of rows. Aggregate functions like SUM, COUNT, or MAX, and operations like JOIN, GROUP BY, or PARTITION, must know all the data to produce correct results.
In the SQLstream context of streaming data and continuous processing, such functions and operations are generally applied over a sliding time-based subset of records called a WINDOW. Each window contains, at any given time, a subset of the rows streaming by. Defining such windows enables a query to identify the finite set of rows (from an otherwise infinite stream) over which to perform the desired aggregations and joins. This chapter shows using these windows for particular kinds of aggregations.
For the complete syntax, syntax chart, and more examples with more detailed explanations, see the WINDOW clause topic in the s-Server Streaming SQL Reference Guide, which also contains detailed examples showing a sample data set and the contents of various windows at various times. For more detail, see the topics Analytic and Aggregate functions in the s-Server Streaming SQL Reference Guide.
Aggregation can be done as windowed aggregation over rolling windows, such as generating a moving average, or as streaming aggregation over periodic windows, such as generating an hourly report or daily roll-up.
The Inspect Trades window shows a sample input stream of stock trades (taken from an Inspect window in SQLstream s-Studio, used for inspecting stream or table inputs and outputs):
This same input stream can be processed by either form of aggregation. The input stream is created by the following code:
CREATE SCHEMA "Trading" DESCRIPTION 'Contains the equity trades application objects'; SET SCHEMA '"Trading"'; CREATE OR REPLACE FOREIGN TABLE "Tickers" ( "Ticker" VARCHAR(5) NOT NULL, description VARCHAR(100), PRIMARY KEY ("Ticker") ) SERVER FILE_SERVER OPTIONS ( "DIRECTORY" '/data/' , "FILENAME_PATTERN" 'tickers.csv' , "PARSER" 'CSV' , "SKIP_HEADER" 'true' ); CREATE STREAM "TRADES" ( "Ticker" VARCHAR(5) NOT NULL, "Shares" INTEGER , "Price" DECIMAL(6,2) );
The Set Schema command enables referencing objects in the schema without fully qualifying the object name with the schema name (see also the SET SCHEMA command in the s-Server Streaming SQL Reference Guide). Since no quotes surround the column names given in lower case, they are saved in upper case. If lowercase distinctions are preferred for the column names, those names would need quotes around them, as has been done for the schema, table, and stream names.
As illustrated in the graphic below (Examples of Streaming and Windowed Aggregation), the input stream can be read and processed by SQLstream statements performing windowed aggregation, shown on the right, or by SQLstream statements performing streaming aggregation, shown on the left. (media/input_stream04_split80x80.png)
Windowed Aggregation and Streaming Aggregation have important differences that are described below.
Windowed aggregation uses a rolling window with a specified set of rows. A windowed aggregation query can define a rolling window as having a fixed number of rows (a row-based window) or a varying (though finite) number of rows.
With a varying number of rows, the number is not fixed in advance but rather is determined by some logical specification (time-based window). The windowed aggregation window contains any newly qualifying row and all older qualifying rows up to the specified number of rows or, for a time-based specification, all rows meeting that specification.
Example of several windowed aggregations over a 1-hour rolling window
CREATE OR REPLACE VIEW "Orders & Shares Trends" DESCRIPTION 'rolling-period orders and volume trends for TRADES' AS SELECT STREAM T.ROWTIME AS "orderTime", "Ticker", "Shares", "Price", COUNT (*) OVER "last_Hour" AS "ordersPerHour", COUNT (*) OVER "last_ten_min" AS "ordersLastTen", SUM("Shares") OVER "last_Hour" AS "sharesPerHour", SUM("Price" * "Shares") OVER "last_Hour" AS "amtPerHour" FROM TRADES AS T WINDOW "last_Hour" AS (RANGE INTERVAL '1' HOUR PRECEDING), "last_ten_min" AS (RANGE INTERVAL '10' MINUTE PRECEDING);
Note: RANGE and ROWS operate differently, as follows: RANGE is only for time-based windows, and requires a valid INTERVAL specification, which defines a time duration.
*INTERVAL ‘1’ HOUR is legal, but *ROWS 10 is not.
For row-based windows, the proper syntax example is *ROWS 10 PRECEDING.
See - WINDOW clause in the Streaming SQL Reference guide for more details. Output Each row arriving on TRADES changes the set of rows defined within the window and produces an output row with the new aggregated values.
Streaming aggregation uses time-based windows with many-rows-in, one-row-out behavior, applying the SQL constructs SELECT STREAM DISTINCT … or, more commonly, SELECT STREAM … GROUP BY. In this type of aggregation, incoming rows that belong to the group are added to the running aggregations. An incoming row that belongs to the next group ends the current group, causes that current group’s aggregations to be emitted as a single output row, and begins the next group. For streaming aggregation using GROUP BY in a streaming SELECT statement, the first group-by term must be time-based. For example GROUP BY FLOOR(S.ROWTIME) TO HOUR will yield one output row per hour for the previous hour’s input rows. The GROUP BY can specify additional partitioning terms. For example, GROUP BY FLOOR(S.ROWTIME) TO HOUR, USERID will yield one output row per hour per USERID value. streamingaggnExample of streaming aggregations to perform periodic reporting over a sliding 1-hour window
CREATE OR REPLACE VIEW "Orders & Shares Groups" DESCRIPTION 'rolling-period order groups for TRADES' SELECT STREAM FLOOR(T.ROWTIME TO HOUR) AS "hour", "Ticker", COUNT(*) AS "ordersPerHour" FROM TRADES AS T GROUP BY FLOOR(T.ROWTIME TO HOUR), "Ticker"
Each row arriving on TRADES adds to the count of orders for that ticker, i.e., the trade represented by that row, within the hour in which it arrived. Each ticker named in trades arriving during that hour provides a single output row in the streaming aggregation output, showing how many trades for that ticker occurred in that hour.
By default, each stream query in SQLstream is run independently from all other stream queries. However, it is often desirable for either performance or consistency reasons to share the output of one query as input to one or more other queries. A potential complexity arises from three facts:
In SQLstream this goal is accomplished by defining a stream that all such queries will listen for, and then creating a pump to feed that stream. The pump is based on the source views or queries. Using the pump compensates for the variations in the timing of the data sources; using the stream that the pump feeds ensures that every query listening for that stream sees the same set of results. This procedure enables “processing pipelines”, that is, modular sequences of processing steps, where each step performs filtering, aggregation, and transformation, providing its results to downstream consumers. Each such step thus also provides a public junction where its results may be
The simple example that follows illustrates the basic view/stream/pump mechanisms.
This example uses the BIDS and ASKS streams already defined in the SALES schema included in the First SQLstream s-Server distributed with the SQLstream product. The view matches bids and asks by ticker, shares, and price within a sliding ten-second window by using a windowed join. Such a join is a streaming join over a time-based subset of records ordered by row timestamps.
CREATE OR REPLACE VIEW "MatchBidsAndAsksView" DESCRIPTION 'match bids and asks for each ticker over a 10-second time window' AS SELECT STREAM B.ROWTIME AS "bidTime", A.ROWTIME AS "askTime", B."ticker", B."shares" AS "bidShares", B."price" AS "bidPrice", A."shares" AS "askShares", A."price" AS "askPrice" FROM SALES.BIDS OVER (RANGE INTERVAL '10' SECOND PRECEDING) AS B JOIN SALES.ASKS AS A ON A."ticker" = B."ticker" AND A."shares" = B."shares" AND A."price" = B."price";
After defining the view, you create a related stream to receive the view results and a pump to insert those results into that stream, as follows:
CREATE STREAM "MatchBidsAndAsks" ( "bidTime" TIMESTAMP, "askTime" TIMESTAMP, "ticker" varchar(5), "bidShares" INTEGER, "bidPrice" REAL, "askShares" INTEGER, "askPrice" REAL ); CREATE PUMP "MatchBidsAndAsksPump" STARTED as INSERT INTO "MatchBidsAndAsks" SELECT * FROM "MatchBidsAndAsksView";
At this point, any queries created using “MatchBidsAndAsks” will all see the same data stream.
A conventional SQL application prepares and executes a statement with a SELECT… query and iterates through the returned result set until end of fetch is detected, when there are no more rows to return. The application then returns to doing something else. In the SQLstream context of streaming data and queries that run forever, there is no obvious “end of fetch”. Instead, the “get next row” call (i.e., ResultSet.next() for JDBC) blocks within the SQLstream client driver until one of the following two possibilities occurs:
In the latter case, the application is in effect “un-subscribing” from the stream. For more detailed discussion of preventing blocking, see the topic JDBC driver in the SQLstream s-Server Integration Guide. SQLstream extends the JDBC API to offer millisecond-granularity timeout intervals to make ResultSet polling more practical for an application that wants to remain responsive to other events.