Tutorial 2: Introduction to Coding with Streaming SQL

In the last tutorial, we used StreamLab to connect with a data source, perform a simple analytic on the data, and visualize the data. As it worked, StreamLab was generating SQL. SQL is a query language that has been used for decades to query databases. SQLstream uses a version of SQL extended for streaming data.

Common tasks in SQL include creating streams and tables, inputting data into streams and tables, querying these streams and tables, moving data between objects, performing analysis on this data, and outputting data into a sink, or destination outside of s-Server.

This tutorial instructs you how to complete these tasks in SQL, first by using SQLstream's Integrated Development Environment, s-Studio, and second by writing SQL by hand and executing it using SQLline, a console utility for executing SQL.

After you complete these steps, you can use s-Dashboard to Show Streaming Data in Dashboard.

Tutorial 2A: Creating a Stream and View in SQLstream s-Studio

SQLstream s-Studio is SQLstream's Integrated Development Environment. It lets you develop, test, run, and administer streaming SQL applications.

To launch s-Studio, open a terminal and enter the following text:

/opt/sqlstream/<VERSION>/s-Studio/s-Studio

The s-Studio workplace opens.

Before you can begin working with data in s-Studio, you first need to connect to s-Server.

If you have installed s-Server and s-Studio as part of the same package, a connection to s-Server has been already set up.

To connect to this instance, right-click on First SQLstream Server and choose Connect.

Resetting the s-Studio Workplace

If s-Studio becomes unresponsive, you may need to reset the s-Studio workplace. You can do so by opening a terminal and issuing the following command:

/opt/sqlstream/<VERSION>/s-Studio/resetStudio

You will not lose any of your work, because all of your changes should be saved in s-Server itself.

Starting the Demo Stream

In the following exercise, we will capture data from a streaming log file and use this data to identify buses that exceed the speed limit in the Sydney area. Because this data is streaming, speeders will be identified in real time. We will derive this data by connecting to a simulated streaming log file located on your local machine. In this case, the log file records messages sent out by buses in Sydney, Australia.

To begin, you need to start StreamLab and WebAgent. If you are running Guavus SQLstream in a Docker container or other virtual machine, you can do so from the Guavus SQLstream Cover Page.

Starting the Buses Sample Streaming Data Source

For appliances and Docker installations, on the cover page, scroll down to Sydney Buses is Running and click the On/Off switch to start streaming data. Data streams to /tmp/buses.log.

If you are running s-Server on a local machine, you can start the script by opening a terminal and entering the following:

$SQLSTREAM_HOME/demo/data/buses/start.sh

To stop the script, enter

$SQLSTREAM_HOME/demo/data/buses/stop.sh

Data streams to /tmp/buses.log.

This file features data in the following categories:

Column Type Definition
id DOUBLE Identification number for the bus.
reported_at TIMESTAMP Time location was reported.
shift_no DOUBLE Shift number for the bus's driver.
driver_no DOUBLE Driver identification for number.
prescribed VARCHAR(4096) The direction on the motorway (into Sydney or out of Sydney).
highway DOUBLE Highway number if available.
gps VARCHAR GPS information with latitude, longitude, and bearing in JSON format.

Create a Foreign Stream to Connect with a Log File

In s-Server, you define foreign streams to connect with external data sources or sinks. A foreign stream serves as an interface between s-Server and external data. For each foreign stream, you define both connection information for the data source or sink, as well as (usually) columns to be read from the source or written to the sink. Each foreign stream requires a server. s-Server includes predefined servers for all data sources/sinks:

Prebuilt Servers in s-Server (Click to Expand)

To create the foreign stream in s-Studio:

  1. Open s-Studio by entering

    /opt/sqlstream/<VERSION>/s-Studio/s-Studio

    S-Studio should open.

  2. Right-click First SQLstream Server and select Connect.

All foreign streams need to be defined in schemas. Schemas are "containers" for foreign streams, internal streams, tables, pumps, and so on. Here, we're going to create a new schema as a container for our foreign stream.

  1. In the Catalog tree, right-click Schemas and select New.
  2. In the dialog box that opens, enter Buses_Schema for the data source's name.

  3. Click on Buses_Schema to expand it.

  4. Right-click Foreign Streams and select New.

  5. In the dialog box that opens, enter Buses for the data source's name.

    A tab opens called Buses (Foreign Stream).

  6. Click the Definition tab.

  7. To the right of Server, select FILE_SERVER from the dropdown menu.

  8. Under Columms, click Add and add the following columns:

    Column Type
    id BIGINT
    reported_at VARCHAR(32)
    speed INTEGER
    driver_no BIGINT
    prescribed BOOLEAN
    gps VARCHAR(128)
    highway VARCHAR(8)

Adding Options to the Foreign Stream

Next, we need to set options for the foreign stream. Options tell s-Server where and how to read the external data source. In this case, we will be drawing on two sets of options:

Reading Options for File System (Click to Expand)
Parsing Options for XML (Click to Expand)
  1. Under OPTIONS, click Add and add the following options:

    name Value Explanation
    CHARACTER_ENCODING UTF-8 Character set for data
    DIRECTORY /tmp Tells s-Server where the file resides.
    FILENAME_PATTERN buses\.log Tells s-Server what to look for in the directory in which the file resides.
    PARSER XML Tells s-Server "parse this data as XML"

    We need to add two additional options to tell s-Studio how to read the XML data in the streaming source. This data looks as follows:

    <Table1><id>50116195532</id><reported_at>2014-07-23T20:51:58.547</reported_at><speed>0</speed><driver_no>160019</driver_no><prescribed>false</prescribed><highway/><gps>{ "lat": -33.530784606933594, "lon": 150.87783813476562, "bearing": "bearing": 0}</gps></Table1>

    The top-level XML tag that we want to parse is called Table1. You indicate the top-level XML tag with an option called PARSER_XML_ROW_TAGS.

  2. Enter the following additional options:

    name Value Explanation
    PARSER_XML_ROW_TAGS /Table1 Tells s-Server "parse XML tags under /Table1"
    PARSER_XML_USE_ATTRIBUTES false Tells s-Server "ignore XML tag attributes"
  3. Note that the Buses (Foreign Stream) tab has an asterisk next to it. This indicates that the data source has not been saved. Choose File > Save (or Ctrl-S) to save the foreign stream. This step applies the SQL in s-Server.

  4. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.

Viewing SQL Generated by s-Studio

You can view the SQL you have created by clicking the SQL tab.

You should see SQL along the following lines:

(
   "id" BIGINT,
   "reported_at" VARCHAR(32),
   "speed" INTEGER,
   "driver_no" BIGINT,
   "prescribed" BOOLEAN,
   "gps" VARCHAR(128),
   "highway" VARCHAR(8)
)
SERVER FILE_SERVER
OPTIONS (
  CHARACTER_ENCODING 'UTF-8',
  DIRECTORY '/tmp',
  FILENAME_PATTERN 'buses\.log',
  PARSER 'XML',
  PARSER_XML_ROW_TAGS '/Table1',
  PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'R 'XML'
)

Testing the Foreign Stream Using a Query

Now that you have configured the foreign stream in s-Server, let's test the foreign stream to make sure data is streaming.

To do so:

  1. Right-click the Buses foreign stream under Foreign Streams and select New Query. The Query Editor opens.
  2. Enter SELECT STREAM * FROM "buses" in the Query Editor.
  3. Click the Execute button at the top of the Query Editor.

Data should flow in the lower part of the Query Editor.

Testing the Foreign Stream Using Inspect

Instead of writing a query, you can also use s-Studio's Inspect feature to see data flowing.

To do so, right-click the Buses foreign stream and choose Inspect. A tab opens called Inspect moving_buses with data flowing:

Creating a View on the Foreign Stream

Now that we've created a foreign stream, we want to perform a basic calculation on this stream. To do so, well create a new object called a view.

Views are reusable definitions of queries. Every view contains a SELECT statement. Views, then, work as proxies for streams. Views provide a flexible, easy-to-modify way to work with streaming data. You can query views themselves, and a query on a view automatically retrieves data from the stream or streams queried by the view. As an s-Server developer, you will do much of the work of cleaning, analyzing, and routing data through views.

Because you can create views linked to one another with SELECT statements, you can use views to create pipelines, linked SQL objects that perform some analytic task. If you have used StreamLab, and viewed the SQL generated by pipeline guides, you may have noticed that pipeline guides consist mostly of interlinked views. It's also the case that multiple views can query the same stream or view. This allows you to fork pipelines.

While views contain queries, data will not flow in s-Server until you run an active SELECT on either the view itself or on a view that SELECTs this view downstream in the pipeline.

Views take the general form CREATE VIEW AS <SELECT STATEMENT>.

The SQL statement below, for example, does a simple filtering of the "buses" stream that we created above, selecting only the columns "id" and "speed".

CREATE OR REPLACE VIEW "Buses_Schema"."View" AS
--the SELECT statement below defines the view.
    SELECT STREAM "id","speed"
    FROM "Buses_Schema"."buses" AS "input";

When you use views, you can take advantage of all the power of a SELECT statement. So, for example, if you want to select only rows that have speeds greater than 0, you can use the WHERE clause.

The WHERE clause extracts records that meet a specified condition. The condition can be a numeric or string comparison, as in the following examples

SELECT STREAM FROM Sales WHERE Employee='Bob';
SELECT STREAM FROM Sales WHERE Customer_Type=1;
SELECT STREAM FROM Sales WHERE Customer_Type<1;
Operators for WHERE Clause (Click to Expand)

Here, we'll apply the WHERE clause to the "speeders" column, in order to limit results to those buses going faster than 0 km/hour.

  1. Click on Buses_Schema to expand it.
  2. Right-click Views and select New.
  3. In the dialog box that opens, enter moving_buses for the name of the view.
  4. Click OK. A tab opens reading speeders (View). If it is not already open, click the SQL tab at the bottom of the speeders (View) tab.
  5. In the SQL window, complete the statement by entering the following text:

    SELECT STREAM * FROM "Buses_Schema"."buses" where "speed" > 0

    This tells s-Server, "get me all of the columns in a stream called buses in a schema called Buses_Schema where the column speed is greater than 0".

  6. Click the Definition tab. Note that all columns for the Buses stream appear here, and that buses_stream appears as a dependency for the view. Dependencies refer to objects referenced by the view.

  7. Note that the moving_buses(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.

  8. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.

The new View appears under Views.

Write a SELECT on the VIEW and See Data Flowing

You can query any objects that contain data directly from s-Studio. Here, we will run a query on the view we just created.

  1. Right-click the moving_buses view and choose New Query.
  2. In the window that opens, enter the following text: SELECT STREAM * from "moving_buses"
  3. Click the green arrow at the top of the tab to execute the query. Observe that data starts flowing in the bottom of the tab. If you scroll over to the speeders column, you will see that only rows with speeds above 0 appear.

Using Inspect to See Data Flowing

Instead of writing a query, you can also use s-Studio's Inspect feature to see data flowing.

To do so, right-click the moving_buses view and choose Inspect. A tab opens called Inspect moving_buses with data flowing:

Promoting reported_at to ROWTIME

s-Server depends on a special column called rowtime to deliver accurately timed data. Often, as a developer, you will want to take a column from the existing data with a timestamp and "promote" it to rowtime.

Here, you want to get the timestamp from the _reportedat field by creating a view that selects reported_at as ROWTIME.

To do so:

  1. In s-Studio, right-click on the Views folder under Buses_Schema and select New View.
  2. In the dialog box that opens, enter buses_rowtime_promoted for the name of the view.
  3. Click OK. A tab opens reading buses_rowtime_promoted(View). If it is not already open, click the SQL tab at the bottom of the buses_rowtime_promoted(View) tab.
  4. In the SQL window, complete the statement by entering the following text:

    SELECT STREAM
    --casts "reported_at" as TIMESTAMP, promotes "reported_at" as ROWTIME
    CAST("reported_at" AS TIMESTAMP) AS ROWTIME,
    --other columns for view
    "id","speed","driver_no","prescribed","gps","highway"
    --this is the view we created in the previous step
    FROM "Buses_Schema"."moving_buses"
  5. Note that the buses_rowtime_promoted(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.

  6. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.

The new View appears under Views.

Testing ROWTIME

You can test that reported_at has been promoted to ROWTIME by running a query on the new view. To do so:

  1. Right-click the Buses foreign stream under Foreign Streams and select New Query. The Query Editor opens.
  2. Enter the following in the Query Editor: SELECT STREAM ROWTIME, * from "buses_rowtime_promoted"
  3. Click the Execute button at the top of the Query Editor.

You should see data flowing in the lower window, with ROWTIMES that begin with 2014 on the lefthand side.

Creating a View with an Average

You can also use views with windows. By including a WINDOW clause in a SELECT query, you can specify that the query applies to rows in a stream partitioned by the time range interval or a number of rows. This allows you to perform calculations for each output row, such as AVG or MAX. Calculations are performed over the window you specify, such as "1 MINUTE" or "20 ROWS".

In a conventional database, you would not need a window to perform such calculations, because the query would know what rows are available in advance. But because of the nature of streaming data, you have to identify a subset of rows in the stream on which to perform calculations.

Here, we'll create a view that averages bus speed over 1 minute.

  1. In s-Studio, right-click on the Views folder under Buses_Schema and select New View.
  2. In the dialog box that opens, enter avg_buses_speed for the name of the view.
  3. Click OK. A tab opens reading avg_buses_speed(View). If it is not already open, click the SQL tab at the bottom of the avg_buses_speed(View) tab.
  4. In the SQL window, complete the statement by entering the following text:

    SELECT STREAM AVG("speed") OVER "aWindow" AS "avg_speed",*
    --note that we use the view that we defined above
    FROM "Buses_Schema"."buses_rowtime_promoted"
    --uses a WHERE clause to filter out buses that are not moving
    WHERE "speed">0
    WINDOW "aWindow" AS (RANGE INTERVAL '1' MINUTE PRECEDING)
  5. Note that the avg_buses_speed(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.

  6. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.

The new View appears under Views. When you query this view:

select stream * from "avg_buses_speed"

you should see a column called avg_speed that contains the average speed of buses over the past minute.

Using the Parser UDX to Parse JSON out of a Column

When you viewed data flowing, you have probably noticed that the column gps contains unparsed JSON. While you can often parse data as it enters s-Server, there are times when you may need to parse data within a stream. This is known as mid-stream parsing.

In order to proceed, we need to parse this data. We can do so using the Parser UDX. The Parser UDX lets you call one of s-Server's data parsers.

Create a Function to Parse JSON

In order to parse JSON, we need to create a function that calls one of s-Server's predefined user-defined transforms or UDXs. A user-defined transform returns a set of rows or a stream of rows. Its input arguments can be scalars or cursors. A cursor is an object representing a subquery, which the user-defined transform can use to read the subquery results.

Here, we'll use the Parser UDX. The Parser UDX lets you apply one of s-Server's predefined parsers to a column of data (in this case, the gps column in buses_rowtime_promoted).

To create a function that calls the Parser UDX:

  1. In s-Studio, right-click on the User Defined Functions folder under Buses_Schema and select New.
  2. In the dialog box that opens, select FUNCTION(JAVA).
  3. In the dialog box that opens, enter json_parser for the name of the view.
  4. Click OK. A tab opens reading json_parser (User Defined Function). If it is not already open, click the SQL tab at the bottom of the json_parser(User Defined Function) tab.
  5. In the SQL window, delete the existing text and complete the statement by entering the following text:
(
  INPUT CURSOR,
  COLUMNNAME VARCHAR(256),
  PARSERCLASSNAME VARCHAR(256),
  OPTIONS CURSOR)
RETURNS TABLE (
   --INPUT is the original stream, entered as a cursor for the function
   INPUT.*,
   --these are the columns that you expect to see in the JSON
   "lat" DOUBLE,
   "lon" DOUBLE,
   "bearing" INTEGER
)
SPECIFIC "json_parser"
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
NOT DETERMINISTIC
--this calls the Parser UDX
EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn'
  1. Note that the json_parser(User Defined Function) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the User Defined Function. This step applies the SQL in s-Server.
  2. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.

Write a View to Read the Parsed JSON

Next, we write a view that calls the json_parser function that we just defined.

  1. In s-Studio, right-click on the Views folder under Buses_Schema and select New View.
  2. In the dialog box that opens, enter buses_json_parsed for the name of the view.
  3. Click OK. A tab opens reading buses_json_parsed(View). If it is not already open, click the SQL tab at the bottom of the speeders (View) tab.
  4. In the SQL window, complete the statement by entering the following text:
SELECT STREAM "id","speed","driver_no","prescribed",
--these columns use the alias "JSON" to indicate that they are columns produced after the SELECT statement runs
JSON."lat" AS "lat", JSON."lon" AS "lon", JSON."bearing" AS "bearing","highway"
    FROM STREAM (
    --this is the name of the function that we defined above
    "json_parser"(
    --this is the input for the function (known as INPUT in function definition).
    CURSOR
    --cursor selects the view that we created above
    (SELECT STREAM * FROM "buses_rowtime_promoted"),
    --column from "buses_rowtime_promoted" to be parsed (known as COLUMN in function definition).
    'gps',
    --type for parser (known as PARSERCLASSNAME in function definition).
    'JSON',
    --these are options for the JSON parser (in function definition).
    CURSOR(SELECT * FROM (values ('$')) AS OPTIONS(ROW_PATH))
        ))
        AS JSON
  1. Note that the buses_json_parsed(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.
  2. To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server. 9.

Now, let's test our code by querying the buses_json_parsed view. (You can also use the Inspect feature to test your code.)

To do so:

  1. Right-click the Buses foreign stream under Foreign Streams and select New Query. The Query Editor opens.
  2. Enter the following in the Query Editor: SELECT STREAM * from "buses_json_parsed"
  3. Click the Execute button at the top of the Query Editor.

You should see data flowing in the lower window, with new columns called lat, lon, and bearing.

Filter Out Zero Values for Lat and Lon

Next, we want to be sure that our values for lat and lon do not contain any values of 0. To do so, we can create another view, along the following lines. Note that you can use the AND keyword with the WHERE clause.

Create a view called buses_filtered_lat_lon and enter the following text into the SQL window:

SELECT STREAM * FROM "Buses_Schema"."buses_json_parsed" where "lat" > 0 AND "lon" > 0

Categorizing speed into Speeding and Normal

Next, we want to set up the column "speed" so that instead of listing numerical speeds, it categorizes the column into "speeders" and "normal".

Create a view called speed_categorized and enter the following text into the SQL window:

SELECT STREAM "id",(CASE WHEN "speed" >= 45 THEN 'speeding' ELSE 'normal' END) AS "speed","driver_no","prescribed","lat","lon","bearing"
FROM "Buses_Schema"."buses_filtered_lat_lon"

You can test this view by entering the following query:

select stream * from "speed_categorized"

Skip down to learn how to use s-Dashboard to show this data in a Dashboard.

Tutorial 2B: Coding Streaming SQL in SQLstream SQLline

SQLstream s-Server lets you query, analyze, and route streaming data by using SQL (Structured Query Language), the same language that developers have been using to query traditional databases for decades.

This tutorial describes the steps for setting up a log file source and analyzing it, by coding blocks of SQL. You can execute SQL directly through a command-line program called SQLline. SQLline executes SQL code against SQLstream s-Server. A version of SQLline ships with s-Server, and comes pre-configured to connect with s-Server.

The tutorial includes the following steps:

Overview of Streaming SQL

The type of SQL that runs in s-Server is called streaming SQL. This SQL is based on the SQL standard, with some modifications. s-Server' streaming SQL is described in the SQLstream Streaming SQL Reference Guide.

SQLstream’s main enhancement to the SQL standard concerns the STREAM object. The process of creating streams in streaming SQL is similar to the process of creating tables in a database system like PostgreSQL or Oracle. Like database tables, streams have columns with column types. Once you create a stream, you can query it with a SELECT statement, or insert into it using an INSERT statement.

Connecting to a Data Source

As well as native streams, SQLstream supports foreign streams which allow you to read data from a remote source, or write data to a remote sink.

A block of SQL that creates a foreign stream looks something like the following, which create a source foreign stream.

CREATE OR REPLACE SCHEMA "Buses_Schema";
SET SCHEMA '"Buses_Schema"';

CREATE OR REPLACE FOREIGN STREAM "buses"
(
   "id" BIGINT,
   "reported_at" VARCHAR(32),
   "speed" INTEGER,
   "driver_no" BIGINT,
   "prescribed" BOOLEAN,
   "gps" VARCHAR(128),
   "highway" VARCHAR(8)
)
SERVER FILE_SERVER
OPTIONS (
  CHARACTER_ENCODING 'UTF-8',
  DIRECTORY '/tmp',
  FILENAME_PATTERN 'buses\.log',
  PARSER 'XML',
  PARSER_XML_ROW_TAGS '/Table1',
  PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'
;

Note that you declare types for each column. Data types determine what kind of data a column can contain. For example, a VARCHAR() column contains a character string of variable length, and a DOUBLE contains 64-bit floating point numbers.

For more information see CREATE FOREIGN STREAM.

Opening SQLline

To begin, you'll need to open SQLline.

To start SQLline:

  1. Open the desktop folder that installs with s-Server.
  2. Double-click the SQLline icon.

A terminal window opens. You can enter SQL at the command prompt.

There are two main ways to run SQL in SQLline:

  • By creating a SQL file and using the !run command. This command lets you access SQL scripts on your computer.
  • By directly typing SQL into the SQLline console.

In the following tutorial, you will type SQL directly into the SQLline console.

Starting the Buses Demonstration Script

In the following exercise, we will capture data from a streaming log file and use this data to identify buses that exceed the speed limit. Because this data is streaming, speeders will be identified in real time. We will derive this data by from a simulated streaming log file located on your local machine. In this case, the log file records messages sent out by buses in Sydney, Australia.

To start the sample streaming data source from an installation running the Guavus SQLstream Cover Page, such as a Docker container. On the cover page, scroll down to Sydney Buses is Running and click the On/Off switch to start streaming data. Data streams to /tmp/buses.log in XML format.

This file features data in the following categories:

Column Type Definition
id DOUBLE Identification number for the bus.
reported_at TIMESTAMP Time location was reported.
shift_no DOUBLE Shift number for the bus's driver.
driver_no DOUBLE Driver identification for number.
prescribed VARCHAR(4096) The direction on the motorway (into Sydney or out of Sydney).
highway DOUBLE Highway number if available.
gps VARCHAR GPS information with latitude, longitude, and bearing in JSON format.

If you are running s-Server on a local machine, you can start the script by opening a terminal and entering the following:

/opt/sqlstream/<VERSION>/s-Server/demo/data/buses/StreamXmlBusData.sh

Creating a Foreign Stream to Connect with a Log File in SQLline

The most important concept for Streaming SQL is the stream. A stream is a continually updating data object. A stream is like a table with no end, but that begins when the stream was established. As s-Server intakes information, new rows are continually added to the stream.

When you run a SELECT query on a conventional database table, the query iterates through the result set until there are no more rows to return. But when you run a SELECT query on an s-Server stream, there's no end of rows. Instead, the "get next row" call continues to run in s-Server until the statement is closed by the client application. In complex systems, this open-ended SELECT needs to be managed in order to maximize performance. Here, though, we can just let it run.

Streams can be written to by multiple writers and read from by multiple readers.

The following code block creates a stream that references a prebuilt server object FILE_SERVER. This object contains the basic information that s-Server needs to connect to a data source, which in this case is a log file on a local machine. The stream is a virtual object with the columns listed below. When you query the stream, it splits the queried log file into columns based on the separator.

Enter the following code into the SQLline prompt. This code first creates and sets a schema for the foreign stream, sets up columns for the stream, and defines options for connecting to the file.

All streams must be created within schemas. A schema lets you logically group s-Server objects, such as streams, tables, views, and pumps.

CREATE OR REPLACE SCHEMA "Buses_Schema";
SET SCHEMA '"Buses_Schema"';

CREATE OR REPLACE FOREIGN STREAM "buses"
(
   "id" BIGINT, --Identification number for the bus.
   "reported_at" VARCHAR(32), --Time location was reported.
   "speed" INTEGER, --Reported speed of bus.
   "driver_no" BIGINT, --Driver identification for reported bus
   "prescribed" BOOLEAN, --The direction on the motorway,                           
                         --(into Sydney or out of Sydney).
   "gps" VARCHAR(128), --gps information in JSON: lat, lon, bearing
   "highway" VARCHAR(8) --Highway number, if available.
)
SERVER FILE_SERVER
OPTIONS (
  CHARACTER_ENCODING 'UTF-8',
  DIRECTORY '/tmp',
  FILENAME_PATTERN 'buses\.log',
  PARSER 'XML',
  PARSER_XML_ROW_TAGS '/Table1',
  PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'
;

Let's look at the options defined above. Some of these options tell s-Server how to connect to the log file, including the directory, pattern for the file, and data type for the file (indicated by parser, which is XML in this case).

name Value Explanation
CHARACTER_ENCODING UTF-8 Character set for data
DIRECTORY /tmp Tells s-Server where the file resides.
FILENAME_PATTERN buses\.log Tells s-Server what to look for in the directory in which the file resides.
PARSER XML Tells s-Server "parse this data as XML"

You can see a full list of options for reading from the file system below.

Reading Options for File System (Click to Expand)

Two additional options to tell s-Studio how to read the XML data in the streaming source. This data looks as follows:

<Table1><id>50116195532</id><reported_at>2014-07-23T20:51:58.547</reported_at><speed>0</speed><driver_no>160019</driver_no><prescribed>false</prescribed><highway/><gps>{ "lat": -33.530784606933594, "lon": 150.87783813476562, "bearing": "bearing": 0}</gps></Table1>

The top-level XML tag that we want to parse is called Table1. You indicate the top-level XML tag with an option called PARSER_XML_ROW_TAGS.

To do so, we use two additional options:

name Value Explanation
PARSER_XML_ROW_TAGS /Table1 Tells s-Server "parse XML tags under /Table1"
PARSER_XML_USE_ATTRIBUTES false Tells s-Server "ignore XML tag attributes"

You can see a full list of options for parsing XML below.

Parsing Options for XML (Click to Expand)

Once the stream is created, you can query it as you would a table.

To do so, open another SQLline terminal and enter the following code:

SELECT STREAM * from "buses";

Note that the query uses the STREAM keyword. You need to use this keyword whenever you query a stream. We will use the STREAM keyword throughout the rest of this tutorial.

Creating a View on the Foreign Stream in SQLline

Now that we've created a foreign stream, we want to perform a basic calculation on this stream. To do so, well create a new object called a view.

Views are reusable definitions of queries. Every view contains a SELECT statement that reads from one or more streams or tables. Views, then, work as proxies for streams. They provide a flexible, easy-to-modify way to work with streaming data. You can query views themselves, and a query on a view automatically retrieves data from the stream or streams queried by the view. As an s-Server developer, you will do much of the work of cleaning, analyzing, and routing data through views.

Because you can create views linked to one another with SELECT statements, you can use views to create pipelines, linked SQL objects that perform some analytic task. If you have used StreamLab, and viewed the SQL generated by pipeline guides, you may have noticed that pipeline guides consist mostly of interlinked views. It's also the case that multiple views can query the same stream or view. This allows you to fork pipelines.

While views contain queries, data will not flow in s-Server until you run an active SELECT on either the view itself or on a view that SELECTs this view downstream in the pipeline.

Views take the general form CREATE VIEW AS <SELECT STATEMENT>.

The SQL statement below, for example, does a simple filtering of the "buses" stream that we created above, selecting only the columns id and speed.

CREATE OR REPLACE VIEW "Buses_Schema"."View" AS
--the SELECT statement below defines the view.
    SELECT STREAM "id","speed"
    FROM "Buses_Schema"."buses" AS "input";

When you use views, you can take advantage of all the power of a SELECT statement. So, for example, if you want to select only rows that have speeds greater than 0, you can use the WHERE clause.

The WHERE clause extracts records that meet a specified condition. The condition can be a numeric or string comparison, as in the following examples

SELECT STREAM FROM Sales WHERE Employee='Bob';
SELECT STREAM FROM Sales WHERE Customer_Type=1;
SELECT STREAM FROM Sales WHERE Customer_Type<1;
Operators for WHERE Clause (Click to Expand)

Here, we'll apply the WHERE clause to the "speeders" column, in order to limit results to those buses going faster than 0 km/hour. That way, we filter out buses that are stopped, so that these do not skew calculations such as average speed.

Enter the following in SQLline:

CREATE VIEW "moving_buses" AS
SELECT STREAM * FROM "Buses_Schema"."buses" where "speed" > 0;

This tells s-Server, "get me all of the columns in a stream called buses in a schema called Buses_Schema where the column speed is greater than 0".

Promoting a Column to Rowtime in SQLline

In working with streaming data, you need to be aware of what time data arrives. Because streams continually update, and may update from multiple sources, time is an important concept in Streaming SQL. Time in streams is monotonic, meaning it always goes forward.

Working with ROWTIME

This monotonically increasing time is tracked as a column value called ROWTIME. By default, ROWTIME is the time a row enters the stream, though you can also configure the system to assign this value to a time generated by the data source.

This will help you produce meaningful analysis about this data. Every streaming row carries a time value called a rowtime, implemented as a column in every row. The rowtime for newly arriving rows cannot be less than the rowtime for previously received rows (though it can be equal to the rowtime of the current row).

Rowtimes can be implicit or explicit.

Implicit rowtimes are established by the "arrival time" of the row: the time that s-Server receives the row. Even though there is no explicit mention of ROWTIME, it is nevertheless part of that row:

Explicit rowtimes are provided by a timestamp from the data itself.

Note: When setting an explicit ROWTIME, TIMESTAMP must be monotonically increasing from the previous TIMESTAMP. If your rowtimes are out of order, you can sort the data using a time sorting execution object. For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause.

In either case, the ROWTIME of the arriving row establishes the current time of the stream, known as the stream clock.

Here, we can create another VIEW that promotes reported_at to ROWTIME. First, we cast reported_at as a TIMESTAMP. The CAST function converts one value expression or data type to another value expression or data type.

Enter the following in SQLline:

CREATE VIEW "buses_rowtime_promoted" AS
SELECT STREAM
--casts "reported_at" as TIMESTAMP, promotes "reported_at" as ROWTIME
CAST("reported_at" AS TIMESTAMP) AS ROWTIME,
--other columns for view
"id","speed","driver_no","prescribed","gps","highway"
--this is the view we created in the previous step
FROM "Buses_Schema"."moving_buses";

Creating a View with an Average in SQLline

You can also use views with windows. By including a WINDOW clause in a SELECT query, you can specify that the query applies to rows in a stream partitioned by the time range interval or a number of rows. This allows you to perform calculations for each output row, such as AVG or MAX. Calculations are performed over the window you specify, such as "1 MINUTE" or "20 ROWS".

In a conventional database, you would not need a window to perform such calculations, because the query would know what rows are available in advance. But because of the nature of streaming data, you have to identify a subset of rows in the stream on which to perform calculations.

Here, we'll create a view that averages bus speed over 1 minute.

CREATE OR REPLACE VIEW "avg_buses" AS
SELECT STREAM AVG("speed") OVER "aWindow" AS "avg_speed",*
    --note that we use the view that we defined above
    FROM "Buses_Schema"."buses_rowtime_promoted"
    --uses a WHERE clause to filter out buses that are not moving
    WINDOW "aWindow" AS (RANGE INTERVAL '1' MINUTE PRECEDING);

You can see the new column by running the following query:

select stream "avg_speed" from "avg_buses_speed";

you should see a column called avg_speed that contains the average speed of buses over the past minute.

Using the Parser UDX to Parse JSON out of a Column in SQLline

When you viewed data flowing, you have probably noticed that the column gps contains unparsed JSON. While you can often parse data as it enters s-Server, there are times when you may need to parse data within a stream. This is known as mid-stream parsing.

In order to proceed, we need to parse this data. We can do so using the Parser UDX. The Parser UDX lets you call one of s-Server's data parsers.

Create a Function to Parse JSON

In order to parse JSON, we need to create a function that calls one of s-Server's predefined user-defined transforms or UDXs. A user-defined transform returns a set of rows or a stream of rows. Its input arguments can be scalars or cursors. A cursor is an object representing a subquery, which the user-defined transform can use to read the subquery results.

Here, we'll use the Parser UDX. The Parser UDX lets you apply one of s-Server's predefined parsers to a column of data (in this case, the gps column in buses_rowtime_promoted).

Enter the following in SQLline:

CREATE OR REPLACE FUNCTION "json_parser"
(
  INPUT CURSOR,
  COLUMNNAME VARCHAR(256),
  PARSERCLASSNAME VARCHAR(256),
  OPTIONS CURSOR)
RETURNS TABLE (
   --INPUT is the original stream, entered as a cursor for the function
   INPUT.*,
   --these are the columns that you expect to see in the JSON
   "lat" DOUBLE,
   "lon" DOUBLE,
   "bearing" INTEGER
)
SPECIFIC "json_parser"
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
NOT DETERMINISTIC
--this is the name of the class for the UDX
EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn';

Write a View to Read the Parsed JSON

Next, we write a view that references the json_parser function that we just defined.

The view below contains a SELECT statement. Some of the columns in the SELECT statement come directly from the stream buses_rowtime_promoted. Other columns--those prefixed with JSON--are the result of the parser UDX running. The prefix "adorns" these columns, indicating columns that result from changes made by the SELECT statement.

The SELECT statement passes input to the function that we defined above: a CURSOR, the column to be parsed, the type of parser, and options for the parser.

Enter the following in SQLline:

CREATE OR REPLACE VIEW "buses_json_parsed" AS
SELECT STREAM "id","speed","driver_no","prescribed",
--these columns use the alias "JSON" to indicate that they are columns produced after the SELECT statement runs buses_rowtime_promoted
JSON."lat" AS "lat", JSON."lon" AS "lon", JSON."bearing" AS "bearing","highway"
    FROM STREAM (
    --this is the name of the function that we defined above
    "json_parser"(
    --this is the input for the function (known as INPUT in function definition).
    CURSOR
    (SELECT STREAM * FROM "buses_rowtime_promoted"),
    --column from "buses_rowtime_promoted" to be parsed (known as COLUMN in function definition).
    'gps',
    --type for parser (known as PARSERCLASSNAME in function definition).
    'JSON',
    --these are options for the JSON parser (in function definition).
    CURSOR(SELECT * FROM (values ('$')) AS OPTIONS(ROW_PATH))
        ))
        AS JSON;

Filter Out Zero Values for Lat and Lon in SQLline

Next, we want to be sure that our values for lat and lon do not contain any values of 0. To do so, we can create another view, along the following lines. Note that you can use the AND keyword with the WHERE clause.

CREATE OR REPLACE VIEW "buses_filtered_lat_lon" AS
SELECT STREAM * FROM "Buses_Schema"."buses_json_parsed" where "lat" > 0 AND "lon" > 0;

Categorizing speed into Speeding and Normal in SQLline

Next, we want to set up the column "speed" so that instead of listing numerical speeds, it categorizes the column into "speeders" and "normal".

Enter the following in SQLline:

CREATE OR REPLACE VIEW "speed_categorized" AS
    SELECT STREAM "id",(CASE WHEN "speed" >= 45 THEN 'speeding' ELSE 'normal' END) AS "speed","driver_no","prescribed","lat","lon","bearing"
    FROM "Buses_Schema"."buses_filtered_lat_lon";

You can test this view by entering the following query:

select stream * from "speed_categorized";

Now that we have promoted a timestamp from the data to ROWTIME, cleaned and prepared the data, and categorized the speed column into speeding and normal, we are ready to display data on the speeding buses in a dashboard.

Using s-Dashboard to Show Streaming Data in a Dashboard

Now that you've created a view with speeding buses and their latitude/longitude, you can create a dashboard to display these buses on a map.

Dashboards are web pages that contain multiple panels, each of which can connect to a different stream, view, or table. Each panel contains a visualization. These are flexible modes of viewing your data, including simple tables, points on a map, line plots, bar graphs, area maps, and so on.

These all use column-row combinations to plot data. Panels can be changed in terms of both layout and data input. Dashboards will be most useful for streaming data, as you will be able to see data changing in real time.

The dashboard below shows the bus data in tabular form and bus locations on a map of Sydney. This is the dashboard that we will create in the steps below.

By using adjustable panels, s-Dashboard lets you view multiple such objects at once. Each dashboard can be laid out with multiple panels, in combinations that you can change by adjusting panel layout.

Creating the Pan and Zoom Map Dashboard

In the following exercise, we'll create a map dashboard. This type of dashboard takes values for latitude (lat) and longitude (lon) as input, and uses these values to display locations on a map.

To do so:

  1. Open s-dashboard and navigate to http://localhost:5595/dashboards
  2. Click the New Dashboard button.
  3. Enter a name for the new dashboard and click Create.
  4. Click the start button to open the new dashboard.
  5. Click the Configure Input Preferences button (the gear to the right of Buses_Schema.speed_categorized)
  6. In the window that opens, you should see "Buses_Schema" available as a schema.
  7. Click "Buses_Schema" to expand it. You should see foreign streams and views listed.
  8. Click Views to expand it.
  9. Select speed_categorized.
  10. Click Select. You should see data flowing in a table.
  11. Click the SQLstream icon in the upper right corner and select Enclose in Layout Frame
  12. In the Buses_Schema.speed_categorized frame, click the SQLstream icon in the upper right corner and select Clone Frame, Place Copy After. You should see a second, identical frame open to the right of the first Buses_Schema.speed_categorized frame.
  13. In the new panel, click the Panel Preferences icon (the gear to the right of View Stream as a Table:Table).
  14. In the window that opens, click Geographical and 3-D Streams to expand it.
  15. Click Pan & Zoom Map to select it and click Select to close the window. A map opens. You should see activity in Australia.
  16. Zoom in on Australia by either clicking the + icon in the upper left hand corner of the map or using your mouse wheel.
  17. Click the SQLstream logo in the upper right corner of the map panel and choose Input Preferences. In the window that opens, enter the following in the SELECT template field.

    <%= select %> "speed" AS "key","id","lat","lon" FROM <%= from %>

    Click Update.

  18. Click the gear icon to the right of Geographical and 3-D Streams:Pan & Zoom Map to open panel preferences.

  19. Make the following changes:

    • Change diameter formula to 25.
    • Change Event Label to /<%= id %/>. This refers to the id column in the speed_categorized view.
    • Add a row to Key Color Map by clicking +. In the first row, enter speeding for key and change marker to red, leaving circle and trail as none. In the second row, enter normal for key and change marker to green, leaving circle and trail as none.
  20. Click Update You should see a map with red and green markers. Click one of the markers to see the id for the reported bus.