Table Lookup UDX

The TableLookup UDX is a User Defined Transform that looks up external database entries matching 1, 2, or 3 stream columns (stream-table JOIN).

There are currently two ways to perform a stream-to-table join:

  • Using the TableLookup UDX, as described below. The Table Lookup UDX lets you control how table data is fetched and cached. You can also subclass the UDX for special semantics. While performance is good, you need to call this as a UDX. Also, the s-Server query engine is unable to push projections and filters into the UDX.
  • Using a stream-table join in SQL, as described in the SELECT JOIN clause topic in the SQLstream Streaming SQL Reference Guide. With stream-table joins, s-Server allows the ON condition and the USING condition, and allows both INNER JOIN and OUTER JOIN. Either input can be a native table, a foreign table, a native stream, a foreign stream, a view, or a subquery.

The TableLookup UDX implements a left-join in which the left arm of the join is supplied as the function’s CURSOR argument and the right arm of the join is the configured database table. Columns in the output rowtype are filled as follows:

  • Pass-through input columns are copied to output columns with matching names (case-sensitive).
  • Input columns without matching output columns are discarded.
  • Join columns from the database table are copied to output columns with matching names (case-sensitive).
  • Join columns without matching output columns are discarded.
  • Data from a join column overwrite data from a pass-through column, unless the join column contains NULLs, in which case the pass-through column data are used.
  • Primary key columns are protected from being overwritten.

The TableLookup UDX is implemented externally in a Java JAR file, located in the _SQLSTREAMHOME/plugin directory.

Code to Load the TableLookup Adapter

CREATE JAR "TableLookup"
   LIBRARY 'file:plugin/TableLookup.jar'
   OPTIONS(0);
--- This code loads the installed Java JAR file,
        located in the SQLSTREAM_HOME/plugin directory

Use CREATE FUNCTION to Access the Java Extension

See CREATE FUNCTION in the in the s-Server Streaming SQL Reference Guide for more detail.) You declare exactly the columns that you expect to get back from the join: some will be columns supplied by input, and some will come from the static table. You can define exactly what you want to bring back; it does not need to include everything from the stream or table.

The function figures out which columns are passed through from the input stream and which are to be provided from the join result. Output rows can include possibly many output rows per input row, with nulls put in for anything that lacks a matching column.

While matching is case-sensitive, column by column, validation on this RETURNS TABLE clause does not fail if its columns find no exact match. If a name does not match the input stream or join table, perhaps due to a misspelled name in the RETURNS table, nulls are returned.

CREATE OR REPLACE FUNCTION "getDealerData"(
      inputRows CURSOR,
    controlCmds CURSOR,
         dsName VARCHAR(64),
      tableName VARCHAR(128),
        colName VARCHAR(128),
      cacheSize INTEGER,
   prefetchRows BOOLEAN,
   fuzzyLookups BOOLEAN)
   RETURNS TABLE(
               "dealer" VARCHAR(64),
                 "tstamp" TIMESTAMP,
               "full_day" BOOLEAN,
                "new_day" BOOLEAN,
  "dealer_events_today" INTEGER,
     "dealer_daily_avg" FLOAT,
  "dealer_window_total" INTEGER,
       "all_window_total" INTEGER,
            "num_updates" INTEGER,
   "dealer_sum_updates" BIGINT,
   "dealer_sum_squares" BIGINT,
        "all_sum_updates" BIGINT,
        "all_sum_squares" BIGINT)
   LANGUAGE JAVA
   PARAMETER STYLE SYSTEM DEFINED JAVA
   NO SQL
   EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

The function figures out which columns are passed through from the input stream and which are to be provided from the join result. Output rows can include possibly many output rows per input row, with nulls put in for anything that lacks a matching column.

While matching is case-sensitive, column by column, validation on this RETURNS TABLE clause does not fail if its columns find no exact match. If a name does not match the input stream or join table, as with a misspelled name in the RETURNS table, nulls are returned.

Controlling a Table-lookup Cache with a Control Stream

The TableLookup UDX enables you to cache lookup values, which can speed production throughput for commonly-needed values. For example, the sample view that calls the TableLookup Function sets the cache to 5, selecting and preloading the first 5 rows of the target table. This makes sense if you know you have a fixed size target table and you want all the values in at once. Preloading is great when a table has a small finite number of things that you want to load.

With caching turned on, the TableLookup UDX builds up a cache of results from the table and uses those to satisfy stream-table joins. However, if a value was looked up and cached, and then gets changed in your static table, the UDX will fail to notice that the underlying table has changed; it’s relying on the cache

If caching is disabled for the source table from which you are taking look-up results, then the UDX goes to the table for every lookup (join).

To control caching, you can define a cache control stream. Commands you send in that stream enable you to pause use of the cache, to flush the cache if needed, and to resume use of the cache:

  • Sending a pause or suspend command on the control stream tells the UDX to stop satisfying joins for the moment, that is, to stop table lookups.
  • Sending a flush command on the control stream flushes the cache.
  • Sending a resume command on the control stream tells the UDX to restart satisfying joins, that is, to resume looking up values in the table.

When it resumes, the UDX will fail to get a hit from the flushed (empty) cache, forcing it to go out to the real table and repopulate the cache with current values from the table.

Note that the control stream can be used to suspend/resume the stream-table join operations even if the cache is not enabled or, for that matter, even if it is enabled but you don not plan to flush the cache.

See the code example of defining a control stream.

Streaming Output

Use CREATE OR REPLACE VIEW ) to output streaming data. Choose the SELECT STREAM columns from the RETURNS TABLE clause, which can have originated from the join table, the static table, or the stream. (See the topic CREATE VIEW in the in the s-Server Streaming SQL Reference Guide for more detail.)

The FROM TABLE syntax in the example below includes two input selects or cursors. The first one is a view from your application. The second one is the optional control stream. The control stream can be passed the following directives, in the indicated order:

  • pause or suspend
  • clearCache,
  • resume

After the cursors, you specify the following parameters:

  • dsName. The name of the file (much like a properties file) that contains the credentials for connecting to your external database. Put it in the SQLSTREAM_HOME/plugin/jndi directory. In the example code, dsName is set to “descriptions_prod”. The syntax for the dsName-file is as follows, where the last 4 lines are just like a properties file:

                     type=javax.sql.DataSource
                     driver=org.postgresql.Driver
                     url=jdbc:postgresql://localhost/mynethome
                     user=SQLstream
                     password=
    
  • tableName. The name of the external static table

  • colName for the primary column. The name of the column containing the primary column for the join. Note that the primary column (in this example, “dealer”) must exist in both the input stream and the rdbms table. Note: you can also specify an optional 2nd and 3rd column for the join, after the colName chosen for the primary join column. The parameter list, of course, would then change to be dsName, tableName, colName, colName2, colName3, cacheSize, prefetchRows.

  • cacheSize. This parameter sets the cache size, representing the number of entries to cache from the static table. Set cacheSize to the size that makes sense for the working-set or number-of-rows that you are commonly looking up, for which you do not want to make multiple database hits. If the UDX needs more than that number of rows, it automatically does a LIFO cache turn-over. You can set cache to 0 if you want the UDX always to look in the database. Performance may suffer, but this practice can be useful for testing, giving you an uncached stream-table join effect. With cache set to 0, you can either have the control stream present and not use it or you can avoid declaring a control stream.

  • prefetchRows. Set prefetchRows to false if you do not want a preload. If you set preload to true, and you have a non-zero number for cache, then the TableLookup UDX will run a query to simulate a join and pull in as many of the rows as it can from the > example, the view that calls the TableLookup Function*, where cache is set to 5, the TableLookup UDX would select and preload the first 5 rows out of the target table.

  • fuzzyLookups. When fuzzyLookups is true, numbers that are close to each other will match. Set fuzzyLookups to true only if your primary column to join on is a floating point number for which matching only needs to be a floating number close to it. Working with latitudes and longitudes is good use case for this setting, since you will not necessarily get an equals match when joining on latitude or longitude. In this case, with fuzzy set to true, numbers that are close to each other will match.

Sample Code

Adapter and UDX code shares a common pattern:

  • An “installing” section
  • A “Defining” sections
  • A - “calling” section

The following code has these sections and exemplifies the descriptions of the TableLookup UDX given above.

Code to “install” (load) the TableLookup adapter, as described in the section named TableLookup.jar File.

This code loads the installed Java JAR file, located in the SQLSTREAM_HOME/plugin directory.

CREATE JAR "TableLookup"
   LIBRARY 'file:plugin/TableLookup.jar'
   OPTIONS(0);

Code to create the function, as described in the section named - Use CREATE FUNCTION to access the Java extension

CREATE OR REPLACE FUNCTION "getDealerData"(
      inputRows CURSOR,
    controlCmds CURSOR,
         dsName VARCHAR(64),
      tableName VARCHAR(128),
        colName VARCHAR(128),
      cacheSize INTEGER,
   prefetchRows BOOLEAN,
   fuzzyLookups BOOLEAN)
   RETURNS TABLE(
               "dealer" VARCHAR(64),
                 "tstamp" TIMESTAMP,
               "full_day" BOOLEAN,
                "new_day" BOOLEAN,
  "dealer_events_today" INTEGER,
     "dealer_daily_avg" FLOAT,
  "dealer_window_total" INTEGER,
       "all_window_total" INTEGER,
            "num_updates" INTEGER,
   "dealer_sum_updates" BIGINT,
   "dealer_sum_squares" BIGINT,
        "all_sum_updates" BIGINT,
        "all_sum_squares" BIGINT)
   LANGUAGE JAVA
   PARAMETER STYLE SYSTEM DEFINED JAVA
   NO SQL
   EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';
``` 
Code to define a Control Stream, as described in the section named - Controlling table caching with a control stream.
```sql
CREATE OR REPLACE STREAM "DealerDataControl" (
   "SQLS_cmd" VARCHAR(12) NOT NULL, -- e.g. 'pause'/'suspend', 'resume', 'clearCache'
   "SQLS_reason" VARCHAR(100)
) DESCRIPTION 'TableLookup UDX control stream for getDealerData() function';

Create a view which calls the TableLookup Function, as described in the section named - Streaming Output

CREATE OR REPLACE VIEW "DailyAgencyDescription"
DESCRIPTION 'aggregated per-agency activity with historical description for that agency' AS
 SELECT STREAM
   "dealer",
   "tstamp",
   "full_day",
   "new_day",
   "dealer_events_today",   -- per-dealer count since midnight
   "dealer_daily_avg",      -- per-dealer historical daily average
   "dealer_window_total",   -- per-dealer count over current period
   "all_window_total",      -- all-dealers count over current period
   "num_updates",           -- number of times historical description has been updated [J]
   "dealer_sum_updates",    -- per-dealer sum of all historical updates [J]
   "dealer_sum_squares",    -- per-dealer "sum of the squares" of
                                               all historical updates [J]
   "all_sum_updates",       -- (repeated) grand total of
                                               all dealer dealer_sum_updates [J]
   "all_sum_squares"        -- (repeated) grand total of
                                               all dealer dealer_sum_squares [J]
 FROM TABLE("MyTable"."getDealerData"(
         CURSOR(SELECT STREAM * FROM "DailyAgencyActivity"),
         CURSOR(SELECT STREAM * FROM "DealerDataControl"),
         'descriptions_prod',     -- prod credentials
         'dealer_description',    -- table
         'dealer',                -- PK
         5,                       -- cache
         false,                   -- no preload
         false                    -- no fuzzy
     ));

Troubleshooting

Setting trace levels in properties.trace: see Tracers.

More information

For more information about the topics mentioned above, please see:

Additional Information and Examples

The following tables describe the options passed as arguments to the SQL function invocation and the tracing levels.

Configuration

Options passed as arguments to the SQL function invocation.

Option Description
dsName Name of the external data source, a properties file to be located in $SQLSTREAM_HOME/plugin/jndiNote: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX.
tableName Name of table to join against, schema-qualified if necessary
colName Name of lookup column for the join (not necessarily the primary key of the table)
colName2 (optional) Name of a 2nd lookup column for join
colName3 (optional) Name of a 3rd lookup column for join
cacheSize Number of entries to be cached in memory. Caching is enabled only if cacheSize > 0. Each entry corresponds to one lookup for given “primary” column values and could have multiple rows cached for it.
prefetchRows If true, prefetch rows into the cache. Used only if cacheSize > 0.
fuzzyLookup If true, use scale of input columns for lookups.

Tracing

com.sqlstream.plugin.tablelookup.TableLookup
.level= Notes
WARNING Unrecognized/unimplemented control stream commands, JDBC errors
INFO Control command receipt
FINE Control command execution, generated lookup query (UDX init)
FINER State changes (caused by control commands), column mapping details (UDX init)
FINEST Data transfer details (row by row)

Additional examples

The following examples further illustrate how to use the Table Lookup UDX.

CREATE JAR "TableLookup"
   LIBRARY 'file:plugin/TableLookup.jar'
   OPTIONS(0);

CREATE FUNCTION foo(
   c CURSOR, dsName VARCHAR(20), tableName VARCHAR(20), key VARCHAR(10),
   cacheSize INTEGER, prefetchRows BOOLEAN, fuzzyLookup BOOLEAN)
 RETURNS TABLE(A INTEGER, B VARCHAR(10), C INTEGER)
   LANGUAGE JAVA
   PARAMETER STYLE SYSTEM DEFINED JAVA
   NO SQL
   EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';
 SELECT STREAM *
   FROM TABLE(foo(
     CURSOR(SELECT STREAM * FROM SALES.BIDS), -- input stream
     'dsCredentials', -- refers to $SQLSTREAM_HOME/plugin/jndi/dsCredentials.properties
     'some.refer',      -- "refer" table in "some" schema
     'A',             -- use input column "A" as lookup column (key)
     0,               -- UDX cache disabled
     false,           -- no pre-fetch
     false            -- no fuzzy matching
 ));

CREATE FUNCTION bar(
   inputRows CURSOR, controlCmds CURSOR, dsName VARCHAR(20), tableName VARCHAR(20),
   key1 VARCHAR(10), key2 VARCHAR(10), key3 VARCHAR(10),
   cacheSize INTEGER, prefetchRows BOOLEAN, fuzzyLookup BOOLEAN)
 RETURNS TABLE(A INTEGER, B VARCHAR(10), C INTEGER)
    LANGUAGE JAVA
    PARAMETER STYLE SYSTEM DEFINED JAVA
    NO SQL
    EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';
 CREATE STREAM "TableLookupControl" (
    "SQLS_cmd" VARCHAR(12) NOT NULL, -- e.g. 'pause'/'suspend', 'resume', 'clearCache'
    "SQLS_reason" VARCHAR(100)
 ) DESCRIPTION 'TableLookup UDX control stream for bar() function';
 SELECT STREAM *
   FROM TABLE(bar(
     CURSOR(SELECT STREAM * FROM SALES.BIDS),           -- input stream
     CURSOR(SELECT STREAM * FROM "TableLookupControl"), -- control stream
     'dsCredentials', -- refers to $SQLSTREAM_HOME/plugin/jndi/dsCredentials.properties
     'some.refer',      -- "refer" table in "some" schema
     'A',             -- use input column "A" as lookup column (key) part 1
     'B',             -- use input column "B" as lookup column (key) part 2
     'C',             -- use input column "C" as lookup column (key) part 3
     1000,            -- 1000 unique keys in UDX cache
     false,           -- no pre-prefetch
     false            -- no fuzzy matching
  ));