Creating Streaming JOINs

A Streaming JOIN refers to a JOIN in a streaming query where at least one of the relations being joined is a stream.

Streaming joins work just like regular table joins, but subject to the considerations implicit in dealing with streams, that is, rolling windows and rowtimes:

  • Sliding Windows. A window defined on a stream is a sliding window. As the current time progresses, the window excludes some rows while adding others. As a result, rows output by the join are generated incrementally. Note: With rolling windows, an output row will be produced only once by a match on a given pair of tuples from the left and right input streams. In other words, an output row already produced by a prior match will not be produced again when a subsequent identical match is identified.
  • Output Rowtimes. All output rows are produced in order of non-descending rowtime. (It is valid to have multiple output rows with the same rowtime.) The rowtime of a given output row is the rowtime at the point it was possible to calculate the output row:
    • For an inner join, the rowtime of an output row is the later of the rowtimes of the two input rows. This is also true for an outer join in which matching input rows are found.
    • For outer joins in which a match is not found, the rowtime of an output row is the later of the following two times: -the rowtime of the input row for which a match was not found, or -the later bound of the window of the other input stream at the point any possible match passed out of the window. All streaming joins are implicitly windowed joins between the affected streams. If no explicit window is specified, the window specification CURRENT ROW is used.

Stream-Window Joins Overview

Streaming joins take place over a time-based subset of records. By default, these subsets are ordered by row timestamps: rows from each stream whose ROWTIME values match are joined according to the JOIN condition.

A join with an explicit window is called a windowed join. This example matches bids and asks by ticker, shares, and price within a ten-second window. It is written with an inline window specification.

CREATE OR REPLACE VIEW "MatchBidsAndAsks"
DESCRIPTION 'match bids and asks for each ticker over a 10-second time window' AS
SELECT STREAM
 B.ROWTIME AS "bidTime",
 A.ROWTIME AS "askTime",
 B."ticker",
 B."shares" AS "bidShares",
 B."price" AS "bidPrice",
 A."shares" AS "askShares",
 A."price" AS "askPrice"
FROM SALES.BIDS OVER (RANGE INTERVAL '10' SECOND PRECEDING) AS B
JOIN SALES.ASKS OVER (ROWS CURRENT ROW) AS A ON A."ticker" = B."ticker"
 AND A."shares" = B."shares"
 AND A."price" = B."price";

For readability, or for reuse of a window specification within a query, you can use a WINDOW alias to define a named window. Here is the above example rewritten to use a WINDOW alias:

CREATE OR REPLACE VIEW "MatchBidsAndAsks"
DESCRIPTION 'match bids and asks for each ticker over a 10-second time window' AS
SELECT STREAM
 B.ROWTIME AS "bidTime",
 A.ROWTIME AS "askTime",
 B."ticker",
 B."shares" AS "bidShares",
 B."price" AS "bidPrice",
 A."shares" AS "askShares",
 A."price" AS "askPrice"
FROM SALES.BIDS OVER "lastTenSeconds" AS B
JOIN SALES.ASKS OVER (ROWS CURRENT ROW)AS A ON A."ticker" = B."ticker"
 AND A."shares" = B."shares"
 AND A."price" = B."price"
WINDOW "lastTenSeconds" AS (RANGE INTERVAL '10' SECOND PRECEDING);

Using Windows to Correlate and Join Streams

If you do not specify an OVER clause for a stream, the join evaluates for the current row only. It may be useful in some cases to omit an OVER clause for one of the streams, but you may receive useful results if you omit the OVER clause for both streams unless you know the streams will emit data at the same rowtimes. For example, both streams represent periodic aggregates (such as hourly summaries) then they will both be emitting data in the same millisecond. In this case, there is no need for a window because the prior aggregation will sync the joined streams to the same rowtime.

SELECT STREAM
​      LF."accountNumber",
​      "loginFailureCount",
​      "transactionType",
​       "amount"
 FROM "SuspectLoginFailures" AS LF
 JOIN "Transactions" AS TX
​                   OVER (RANGE INTERVAL '1' MINUTE PRECEDING)
​                   ON LF."accountNumber" = TX."accountNumber"
WHERE ("transactionType" = 'isDebit');

Using a WINDOW alias

You can factor out the RANGE INTERVAL window by giving it an alias, as in this equivalent SQL definition:

SELECT STREAM
​      "SuspectLoginFailures"."accountNumber",
​      "loginFailureCount",
​      "transactionType",
​      "amount"
 FROM "SuspectLoginFailures"  OVER "lastFew"
 JOIN "Transactions" OVER "lastFew"
​        ON "SuspectLoginFailures"."accountNumber" = "Transactions"."accountNumber"
WHERE ("transactionType" = 'isDebit')
WINDOW "lastFew" AS (RANGE INTERVAL '1' MINUTE PRECEDING);

Using a separate WINDOW definition is particularly useful when joining a stream to two or more other streams as it is easier to maintain (for example, if you want to increase the gap from one minute to two.

Row Based Windows

As well as using time-based windows, you can also use a defined number of rows (row-based windows).

SELECT STREAM
ticker,
MIN("price") OVER w AS PriceMin,
MAX("price") OVER w AS PriceMax,
FROM sales.bids
WINDOW w AS (ROWS BETWEEN 3 PRECEDING AND 0 FOLLOWING);

(See Windowed Aggregation on Streams.) Note: When several streams are joined, the “secondary” streams don’t all have to use the same WINDOW. They can also mix ROWS and INTERVAL windows.