JOIN clause

You can use the JOIN clause of the SELECT statement to write a query that returns rows from multiple streams or tables at once, allowing you to enrich one stream or table with another stream or table’s data. To associate streams or tables, you use a common column or columns.

s-Server joins work similarly to joins in an RDBMS context, in that you write a query that contains names of two stream/tables and that selects columns from two streams/tables. In an RDBMS context, when you join two tables, you know all the rows for both tables. An RDBMS, then, can identify all matches at once and return rows accordingly. With streaming SQL, you run a query that continuously emits rows. As rows enter each stream, joined rows are emitted. Since it would be impossible to join all rows in two infinite streams, you will often want limit the two streams by time or row count so that s-Server can identify matches. You do so by using a WINDOW on each stream. The default window for each stream is RANGE CURRENT ROW.

This topic contains the following subtopics:

Syntax

{ [INNER] | { LEFT | RIGHT | FULL } [OUTER] | CROSS } JOIN  <StreamOrTable> ON boolean_expression |
{ [INNER] | { LEFT | RIGHT | FULL } [OUTER] } JOIN  <StreamOrTable> |
NATURAL { [INNER] | { LEFT | RIGHT | FULL } [OUTER] } JOIN  <StreamOrTable>
<StreamOrTable> := { <stream> [ OVER <join_window> ] | <table> }

<join_window> :=  WINDOW <window name> | ‘(‘ ( <range_window_spec> | <row_window_spec> ) ‘)’
<range_window_spec> := RANGE [ BETWEEN INTERVAL ‘<timeliteral>’ <timeunit> AND ] ( INTERVAL ‘<timevalue>’ <timeunit> PRECEDING | CURRENT ROW )
<row_window_spec> := ROWS [ BETWEEN <number> PRECEDING AND ] [ <number> PRECEDING | CURRENT ROW ]
<timeunit> := <basetimeunit> [ TO <basetimeunit> ] [ ‘(‘ <precision> ‘)’ ]
<basetimeunit> : = ( SECOND | MINUTE | HOUR | DAY | MONTH | YEAR )

Join Conditions

You can query both streams and tables with joins. To simplify, we’ll refer to streams in the following discussion. When you join two streams or tables, we refer to the first stream in the query as the left stream and the second stream in the query as the right stream.

In the query below, for example, stocks.order_amount is the left stream and stocks.stock_price is the right stream.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq
    JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Join conditions specify a way to connect the left and right streams. All types of joins except CROSS joins accept a join condition. (CROSS joins simply pair every row from the left stream with every row from the right stream).

There are three ways to specify a join condition:

  • You can use the ON condition to explicitly specify a column from each stream. These columns do not have to have the same names, but need to have matching data. When these two columns match, the join is successful. Using the ON condition is the most general and powerful way to specify a join condition.
  • You can use the USING condition to list columns with matching names in the left and right stream. For example, if you know that both streams have columns called “ticker” and “price,” you can write r1 JOIN r2 USING (ticker, price). That syntax is equivalent to r1 JOIN r2 ON r1.ticker= r1.price AND r1.ticker= r2.price.
  • You can use the the NATURAL keyword to automatically match columns from the left and right streams that have the same name.

The following inner examples are equivalent:

Example With ON

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   ON (oq.ticker = sp.ticker);

Example With USING

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   USING (ticker);

Example With NATURAL

Here, ticker is the only column in common between stocks.order_quantity and stocks.stock_price.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oa   NATURAL JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp;

Join Types

There are five types of joins:

Join Type Description
INNER JOIN (or JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, these rows are not emitted by the query.
LEFT JOIN (or LEFT OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, any rows from the left stream are kept, and null values are assigned to columns selected from the right stream.
RIGHT JOIN (or RIGHT OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, any rows from the right stream are kept, and null values are assigned to columns selected from the left stream.
FULL JOIN (or FULL OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, all rows from both streams are kept, and null values are assigned to any columns from the other stream.
CROSS JOIN In a CROSS JOIN every row from the left is paired with every row from the right. A cross join never has an ON or USING condition.

Working With Streaming Joins

Stream-to-stream joins join two different relations that are changing over time. During each millisecond some rows will get added to the left and some to the right. For each millisecond in the joined stream’s rowtime, s-Server outputs whatever is in the join of those two relations that wasn’t in the join the last millisecond.

As noted above, when joining on streaming data, you will often want to define a window for one or both of the streams that you are joining. Windows specify a subset of rows, either by time or number of rows. Windows include endpoints; that is, if you specify a 1 minute window starting at 1:00:00, rows with rowtimes of 1:00:00 and 1:01:00 are both included in the join. For more information on windows in streaming SQL, see the WINDOW clause topic in this guide.

If you do not specify an OVER clause for a stream, the join evaluates for the current row only. It may be useful in some cases to omit an OVER clause for one of the streams, but you may receive useful results if you omit the OVER clause for both streams unless you know the streams will emit data at the same rowtimes. For example, both streams represent periodic aggregates (such as hourly summaries) then they will both be emitting data in the same millisecond. In this case, there is no need for a window because the prior aggregation will sync the joined streams to the same rowtime.

The window specification may also be the name of a window defined in the WINDOW clause.

Which rows are joined, then, depends on how the windows for the join are defined. In the diagram below, rows from 05:02:15 to 05:03:15 are part of the initial window for the join. As the stream clock moves to 05:03:30, rows from 05:02:15 to 05:02:29.999 move out of the window.

Output Rowtimes

The rowtime of a given output row is the rowtime at the point it was possible to calculate the output row. The stream clock for the output stream remains at the lower of the rowtimes for the two streams. For an inner join, the rowtime of an output row is the later of the rowtimes of the two input rows. This is also true for an outer join in which matching input rows are found.

Examples

The examples below use the following input:

stocks.order_quantity

ROWTIME QUANTITY TICKER
‘2019-03-30 05:02:10.000’ 750 IBM
‘2019-03-30 05:03:10.000’ 1000 IBM
‘2019-03-30 05:03:15.000’ 1000 GOOGL
‘2019-03-30 05:03:20.000’ 2000 GOOGL
‘2019-03-30 05:03:20.000’ 2000 IBM
‘2019-03-30 05:03:28.000’ 1000 MSFT
‘2019-03-30 05:03:30.000’ 2000 MSFT

stocks.price

ROWTIME ENDING_PRICE TICKER
‘2019-03-30 05:02:10.000’ 75 IBM
‘2019-03-30 05:03:10.000’ 100 IBM
‘2019-03-30 05:03:15.000’ 100 MSFT
‘2019-03-30 05:03:20.000’ 100 GOOGL
‘2019-03-30 05:03:25.000’ 200 MSFT

The following query uses a window of 1 minute for both left and right streams and an INNER join:

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   ON (oq.ticker = sp.ticker);

Result:

'ROWTIME','TICKER','QUANTITY','ENDING_PRICE'
'2019-03-30 05:02:10.0','IBM','750','75'
--The next two rows join with the rows from 5:02:10, because they are still part of the window.
--Note that endpoints are included in the window.
'2019-03-30 05:03:10.0','IBM','1000','75'
'2019-03-30 05:03:10.0','IBM','750','100'
'2019-03-30 05:03:10.0','IBM','1000','100'
'2019-03-30 05:03:20.0','GOOGL','1000','100'
'2019-03-30 05:03:20.0','GOOGL','2000','100'
--note that in next row, rows with IBM are not joined with rows from 5:02:10, because they have fallen out of the window.
'2019-03-30 05:03:20.0','IBM','2000','100'

Note that:

  • The joined stream clock stays at ‘2019-03-30 05:03:20.000’, even though rows have arrived in stocks.order_quantity after this time.

  • No match appears for MSFT, so data from the rows at ‘2019-03-30 05:03:13.000’ and ‘2019-03-30 05:03:15.000’ from stocks.price do not appear in the join.

Once a row arrives in stocks.price at 05:03:30 or later, the two rows that had arrived after 05:03:15 become available for the join:

INSERT INTO stocks.ticker_price (order_time, ticker, price) VALUES (CAST('2019-03-30 05:03:30.000' as TIMESTAMP), 'IBM', 200);

This insert results in six new rows being emitted. These rows only emit once the stream clock moves to 5:03:30 (when the row from stocks.stock_price arrives at 05:03:30)

Nothing emits as long as the stream clock remains at 05:03:20. That would also be true for a right or full join–stream clock still needs to catch up.

This is the also the first time that anything has matched MSFT, which had appeared in stocks.stock_price, but not, up until now, in stocks.order_quantity.

In a left, right or full join, we would have seen MSFT earlier, but without any columns joined from stocks.order_quantity.

'2019-03-30 05:03:25.0','MSFT','1000','100'
'2019-03-30 05:03:25.0','MSFT','1000','200'
'2019-03-30 05:03:30.0','MSFT','2000','100'
'2019-03-30 05:03:30.0','MSFT','2000','200'
'2019-03-30 05:03:30.0','IBM','1000','200'
'2019-03-30 05:03:30.0','IBM','2000','200'

The four rows with “MSFT” in column ticker result from the two rows arriving at 05:03:16 and 05:03:17 matching the two earlier unmatched rows from stocks.price. This is because the output stream clock has moved to 05:03:17.

The new rows are all matches that have come in up until 5:03:30, including those rows that had arrived in stocks.stock_price after 05:03:20 but before 05:03:30. Note that this means that four matches came in for MSFT, even though the row inserted into stocks.stock_price at 05:03:30 had “IBM” in column ticker.

Example with ROWS

The following query uses a window of 5 rows preceding the current for both left and right streams and an INNER join:

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS oq
  JOIN stock_price OVER (ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS sp
  ON (oq.ticker = sp.ticker);

Left Join

Left joins include all rows from the left stream, regardless of whether the join condition matches rows from the right stream. The following is the same query in the above example, but with a LEFT JOIN instead of a JOIN.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    LEFT JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);
ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.0’ IBM 750 75
‘2019-03-30 05:03:10.0’ IBM 1000 75
‘2019-03-30 05:03:10.0’ IBM 750 100
‘2019-03-30 05:03:10.0’ IBM 1000 100
‘2019-03-30 05:03:15.0’ NULL 1000 NULL
‘2019-03-30 05:03:20.0’ GOOGL 1000 100
‘2019-03-30 05:03:20.0’ GOOGL 2000 100
‘2019-03-30 05:03:20.0’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, the row at 2019-03-30 05:03:15.000 still emits, but returns null for the columns ticker and price. (The first part of the query specifies that ticker is selected from price.)

Right Join

Right joins include all rows from the right stream, regardless of whether the join condition matches rows from the left stream. The following is the same query in the above example, but with a RIGHT JOIN instead of a JOIN.

SELECT STREAM rowtime, oq.ticker, sp.quantity, oq.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq
    RIGHT JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ MSFT NULL 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, the row at 2019-03-30 05:03:15.000 still emits, but returns null for the column amount.

Full Join

Full joins include all rows from both streams, regardless of whether the join condition matches rows. The following is the same query in the above example, but with a FULL JOIN instead of a JOIN.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    FULL JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ NULL 1000 NULL
‘2019-03-30 05:03:15.000’ MSFT NULL 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, rows at 2019-03-30 05:03:15.000 still emit, but return null for the columns ticker and price for the first row and returns null for the column amount in the second row. Note that a full join looks exactly as if you combined the results of a right and left join.

Cross Join

A CROSS join returns the Cartesian Product of all rows in the two streams’ windows that have arrived up until the joined stream’s stream clock. CROSS JOIN does not have any matching condition in the join clause.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    CROSS JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o;

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ MSFT 1000 100
‘2019-03-30 05:03:15.000’ MSFT 1000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100
‘2019-03-30 05:03:20.000’ MSFT 2000 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100
‘2019-03-30 05:03:20.000’ MSFT 2000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ MSFT 1000 200
‘2019-03-30 05:03:20.000’ MSFT 1000 200
‘2019-03-30 05:03:20.000’ MSFT 2000 200
‘2019-03-30 05:03:20.000’ MSFT 2000 200
‘2019-03-30 05:03:25.000’ IBM 1000 100
‘2019-03-30 05:03:25.000’ MSFT 1000 100
‘2019-03-30 05:03:25.000’ GOOGL 1000 100
‘2019-03-30 05:03:25.000’ MSFT 1000 200

As always for a CROSS JOIN, the size of the result is the product of the number of rows: seven rows from stocks.stock_price and five rows from stocks.order_quantity. Note that even though stocks.price has six input rows, CROSS JOIN still respects the stream clock, so rows that have arrived after ‘2019-03-30 05:03:25.000’ are not included in the cross join.

Using the WHERE Clause With Joins

You can use the WHERE clause to achieve similar effects to join conditions. With a WHERE clause, rows are filtered after they have been emitted from the join. For an inner join, WHERE is equivalent to ON, but for an outer join, the partially NULL rows are only generated correctly if the condition is evaluated for each pair of candidate rows, and a WHERE clause cannot do that. For more details, see the topic WHERE clause in this guide.

This syntax works similarly to the join syntax for Oracle (a non-ASCII join).

Using CROSS JOIN and WHERE

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oa
    CROSS JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp WHERE sp.ticker = oq.ticker;

Using a FROM clause with streams separated by commas, and a WHERE condition

SSELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
   FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oa,
   stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp WHERE sp.ticker = oq.ticker;

Stream-Table Joins

You can use the JOIN clause to enrich the content of a stream by treating one of the streams columns as a foreign key to a table, and finding table data that matches each row from the stream. In effect, this means computing the relational join of the stream with the table.

There are currently two ways to perform a stream-to-table join:

  • Using a stream-table join in SQL, described here.

  • Using the TableLookup UDX, as described in the topic Table Lookup UDX in the Integration Guide. We recommend only using this option if the stream-table join described here is insufficient. Most notably, the Table Lookup UDX lets you control how table data is fetched and cached. You can also subclass the UDX for special semantics. While performance is good, you need to call this as a UDX. Also, the s-Server query engine is unable to push projections and filters into the UDX.

Stream-Table Join

With stream-table joins, s-Server allows the ON condition and the USING condition as described above, and allows both INNER JOIN and OUTER JOIN. Either input can be a foreign table, a native stream, a foreign stream, a view, or a subquery.

The query engine detects when an input is like a stream (such as a stream or streaming view) and when it is like a table (such as a native table or table view). The join of two table-like inputs is a normal join. This works as a hash-join or an index-scan. The join of two stream-like inputs is described above.

When one input is stream-like and the other is table-like, the engine implements the query as a static stream-table join.

With a stream-table join, s-Server computes the stream by capturing the table or table view at the start of the query, and loading its data into a lookup-structure. Streaming input is read row by row; for each row, s-Server identifies matches in the table data, using the condition you have defined for the join, it emits matching rows based on this condition. For an outer join, if there are no matches, s-Server emits nulls as the match.

Limitations to Stream-Table Join

The table or table view input must fit into memory. However, since the query planner can push filters and projections through the join operator, this input can be much smaller than the whole table.

You cannot refresh the loading table data, except by executing the query again.

An outer join must be one-sided. It cannot detect table rows that never matched a stream row.

The join must be an equi-join (a join using an equality operator), not a general theta join.

Note: Joins using a tailing table stream have been defined but not implemented. Tailing foreign tables as a stream is defined in the topic Tailing Foreign Tables with SQL/MED in the Integration Guide.

Examples of Static Stream/Table Join

The following example uses a stream and a table that match on a key value called animal_id.:

The stream is a simple input stream of observed beast events:

CREATE STREAM beast_events (
    row_id integer not null,
    sensor integer,
    zone integer,
    animal_id int,
    weight integer
);

The table lists animal attributes:

CREATE TABLE animal_data (
    animal_id int not null primary key,
    boolean edible;
    boolean poisonous;
    name varchar(32) not null.
    species varchar(32) not null,
    genus varchar(32) not null
);      

Here are examples of SQL to join this stream to this table, both valid and invalid.

Simple join:

SELECT STREAM * from beast_events e join animal_data a using (animal_id);

Valid left outer join:

SELECT STREAM * from beast_events e left join animal_data a using (animal_id);

Invalid theta join:

SELECT STREAM * from beast_events e left join animal_data a on (e.animal_id > a.animal_id);

Valid equi-join with extra filters:

SELECT STREAM * from beast_events e join animal_data a   on (e.animal_id = a.animal_id and a.edible and not a.poisonous);

Using JOINs to Enrich Stream Data

You can use JOINs to enrich stream data. For example, RIGHT OUTER JOIN can be a powerful tool to generate results for all rows in a defined data set. Let’s say we have a stream and a table, each of which has information on stores in 200 different cities.

  • The stream, city_sales, contains sales data by city, with each city containing one store.
  • The table, city_info, contains information on the store in each city.

You can use a RIGHT OUTER JOIN to produce hourly sales reports, and return data even in cases where city_sales has no rows. This way, if a batch of hourly data has data aggregated for 125 cities, a RIGHT OUTER JOIN can ensure that exactly 200 rows are generated every hour even if no sales were reported in some cities.

You would do so with code along the following lines:

SELECT STREAM cs.*, ci.*
FROM city_sales OVER (ORDER BY FLOOR(ROWTIME TO HOUR)
                      RANGE CURRENT ROW) AS cs
RIGHT OUTER JOIN city_info AS ci
USING(city);

The column city joins the two streams, with USING as the join condition.

Because it uses ORDER BY FLOOR(ROWTIME TO HOUR, the window on the city_sales stream is a hopping window of zero size. That means that rows accumulate in the window until ROWTIME moves to a new hour. At that point, all rows from the previous window are dropped from the window, because (ORDER BY FLOOR(ROWTIME TO HOUR) will evaluate to the new hour. The “zero size” part refers to RANGE CURRENT ROW.

When the window frame shrinks to remove all rows for the past hour, RIGHT OUTER JOIN and emits all rows from city_info that did not match with any of the last batch of hourly rows from the stream.

Multi-way JOINs

To do a three way join, you use a joined table-reference as the table-reference in a JOIN statement. Here, stream/table 1 (b1) relates to stream/table 2 (asks) and stream/table 2 relates to stream/table 3 (b2), on the column “ticker.”

SELECT STREAM * from bids over (range interval '1' hour preceding) as b1
join asks over (range interval '2' second preceding)
on b1."ticker" = asks."ticker"
join bids over (range interval '3' minute preceding) as b2
on b2."ticker" = asks."ticker";