Introduction to Pipelines

Most s-Server applications are built as pipelines. Pipelines move data from sources (such as a Kafka topic, or log file, or AMQP message) to sinks (such as a Kafka topic, an RDBMS system, or a StreamLab application). Their exact composition will depend on your business requirements, but they are generally a series of internal s-Server streams, pumps, sources (foreign streams), and sinks (foreign streams).

This topic features the following subtopics:

Pipelines Overview

Pipelines consist of the following elements:

Pipeline Element Explanation
Source Streams These are streaming read access to third-party platforms, such as the file system, databases, sockets, Kafka, AMQP, or Amazon Kinesis. You set these up by defining a server object and a foreign stream object. Once these are established, you can use pumps to move data from them into named streams.
Named Streams These are intermediate tanks in the pipeline that you “fill” by starting a pump. You can get data from these by running a SELECT query. The key thing about intermediate streams (tanks) is that the data in them evaporates if there are no readers hooked up to them. You will often use these to move subsets of data around, so that such data is available for analytic views.
Views Views are reusable queries. These are often used to cleanse or analyze data.
Pumps Pumps (INSERT macros) are used in s-Server to pass data from one point in a streaming pipeline to the other. You can think of a pipeline as similar to a plumbing network. In this model, pumps are like faucets installed on top of a tank (a named stream). These take water from water processing equipment and push it into the tank. The processing equipment is assembled on demand based on the description of the processing (view definition).
Sink Streams or Tables These are “tanks” that feed other systems, such as visualization tools or databases.

It is generally best practice to create all of these objects within the same schema. This will let you do things like drop all streams at once, or start all pumps at once. If you are using SQLline to communicate with s-Server, you could create all of these items in a single SQL script and then use the !run command to run the script.

Keeping Pipeline Objects Compatible

In order for data to move through a pipeline, stream columns must always be compatible. Otherwise, errors might result.

SQL for Source Streams

A source stream needs information to connect to its source. The code below sets up a server object for a file accessible through the file system.

CREATE OR REPLACE SERVER "BusFileReaderServer"
FOREIGN DATA WRAPPER ECDA
OPTIONS (classname 'com.sqlstream.aspen.namespace.common.FileSetColumnSet',
       parser 'CSV',
       character_encoding 'UTF-8',
       separator ',',
       skip_header 'false',
       directory '/tmp',
       filename_pattern 'buses\.log'
);

It then creates a foreign stream that uses this server object to pull data out of the file. The result is a stream with columns that you can query or pump to a named stream.

CREATE OR REPLACE SCHEMA "buses";
SET SCHEMA '"buses"';
CREATE OR REPLACE FOREIGN STREAM "buses_stream"
(
"id" DOUBLE, --Identification number for the bus.
"reported_at" TIMESTAMP, --Time location was reported.
"shift_no" DOUBLE, --Shift number for the bus's driver.
"trip_no" VARCHAR(4096), --Trip number for the bus.
"route_variant_id" VARCHAR(4096), --ID number for bus route.
"waypoint_id" VARCHAR(4096), --ID number for bus waypoint.
"lat" VARCHAR(4096), --Latitude of location.
"lon" VARCHAR(4096), --Longitude of location.
"speed" DOUBLE, --Reported speed of bus.
"bearing" VARCHAR(4096), --Navigational bearing for bus.
"driver_no" DOUBLE, --Driver identification for number.
)
SERVER "BusFileReaderServer"
--Server created in the previous step.
--Provides connection information for the log file.
;

SQL for Enriching Data

To enrich data means to take streaming data and use historical data to add information to the stream. The streaming data from our buses demonstration includes information on buses longitude and latitude. Using information from a mapping database, we can identify the road segment for a particular latitude/longitude.

CREATE OR REPLACE VIEW "ConditionedPositionsWithRoadInfo"
DESCRIPTION 'Vehicle positions and road information' AS
   SELECT STREAM
       FROM STREAM("roadInfo"(CURSOR(SELECT STREAM VID, "DateTime",
                                                   CAST(NULL AS SMALLINT) AS "segmentId",
                                                   "vLat", "vLon",
                                                   "Bearing" AS "vBearing",
                                                   "Speed" AS "vSpeed"
                                         FROM "Stage1" WHERE MISSING = 0 AND
                                                             MISMATCH = 0 AND
                                                             BOUNDS = 0),
                              CURSOR(SELECT STREAM FROM "RoadInfoControl"),
                              'road_segments', 'vLat', 'vLon',
                              10000, -- cache
                              false, -- no preload
                              false  -- no fuzzy
                   ));

Analytic View on Data

Here, you use SQL or a UDX to perform some calculation on your data. This might be calculating a running average, sorting data into groups, calculating frequency distribution, calculating deviation, and so on. This is the middle of the pipeline, after data has been pulled into s-Server and before it is visualized or written. Often, you will use views to perform such analysis. Views are reusable “macros” that

For example, the SQL below creates a view that identifies speeding buses by testing if speed is over 75 mph.

CREATE or REPLACE VIEW "buses"."speeders" AS SELECT STREAM FROM "buses"."buses_with_rowtime" WHERE "speed" > 75;

SQL for External Sink

Here, we are creating a server object that connects with an external database.

CREATE OR REPLACE SERVER "PostgreSQL_DB_1"
   FOREIGN DATA WRAPPER "SYS_JDBC"
   OPTIONS (
       "URL" 'jdbc:postgresql://localhost/demo',
       "USER_NAME" 'demo',
       "PASSWORD" 'demo',
       "SCHEMA_NAME" 'public',
       "DIALECT" 'PostgreSQL',
       "pollingInterval" '1000',
       "txInterval" '1000',
       "DRIVER_CLASS" 'org.postgresql.Driver'
   );

We then create a foreign table that uses this server object. As you move data into this table, it automatically moves to the defined table in the foreign database.

SET SCHEMA '"buses"';
CREATE FOREIGN TABLE "postgres_archive"
--these are column names for the foreign table as it exists in s-Server
   ("id" double,
    "reported_at" TIMESTAMP,
    "shift_no" DOUBLE,
    "trip_no" DOUBLE)
   SERVER "PostgreSQL_DB_1"
   OPTIONS (
   "SCHEMA_NAME" 'public',
   --this is the table name in the postgres database.
   "TABLE_NAME" 'buses_archive',
   --Amount of data to wait for before committing.
   "TRANSACTION_ROW_LIMIT" '0',
   --Amount of time to wait between commits.
   "TRANSACTION_ROWTIME_LIMIT" '1000'
);

Pumps

For all of the above stages, we need to create pumps to move data from stage to stage. Like streams, views, and tables, you create pumps in schemas. By default, pumps are created as stopped.

CREATE PUMP "buses"."postgres-pump" AS
INSERT INTO "buses"."postgres_archive"
   ("id", "reported_at", "shift_no", "trip_no")
SELECT STREAM "id", "reported_at", "shift_no", "trip_no"
   FROM "buses"."buses";

Starting Pumps

You can start all pumps in a single schema by using ALTER PUMP yourschema.* START. This ensures that all pumps are started in what we call topological order. Starting pumps this way ensures that data will be available for all streams.

ALTER PUMP "buses".* START;

Building and Troubleshooting Pipelines

This section describes tips for building and troubleshooting application pipelines.

Stream Computing Architecture (SCA) Approach to Organizing SQL

This topic describes how to organize SQL files using directory structures and naming conventions that best suit stream computing. Stream computing has particular structuring requirements when it comes to segmenting and organizing SQL, in that data arrives from sources in various locations in various format, and the same set of data may be leveraged for multiple business use cases. We recommend building a componentized stream computing system as described below, in order to provide yourself the most flexibility in data parsing, enriching, and analyzing.

Some of the common tasks involved with stream computing include:

  • Importing data from a range of sources. These* can arrive from log files, a network feed, an XML file outputted by another application, external databases, and so on. Usually, each data source consists of one or more SQL files and an s-Server plugin.
  • Enriching data. This* refers to creating a logical join with other data, often legacy data in a database. This could be accomplished through a stream-table join or a UDX. For example, you might have a phone number tracked from a log file that you might join to existing customer accounts, or a list of known problematic numbers.
  • Performing business logic on data*. In many environments, you will apply different sets of business logic to the same source data. For this reason, it’s advantageous to separate data acquisition from business logic.
  • Archiving data. Often, you will want to copy data out of the pipeline into a RDBMS or other long-term storage system.

Streaming Computing Processors

We use the term “processor” to refer to groups of SQL and plugins devoted to a particular task or group of tasks. Processors correspond to groups of SQL files that reference both each other and the plugins required to interface with other systems. Processors and links are the basic building blocks of a stream computing application. For example, you might have a source processor with the following files:

  • setup.sql. This file would create the schema or schemas for the processor.
  • acquire.sql. This file would load and reference adapters and UDXs, such as the Extendable Common Data Adapter.
  • condition.sql. This file might perform a number of functions to condition data, such as applying a UDX to apply second-level filtering to acquired data.
  • deliver.sql. This file would CREATE VIEWs and pumps to make the acquired data available for other processors.

By building your streaming computing system in this modular way, you can mix and match your source, feature, and mode processors to acquire, condition, enrich, and archive data where you need it. This modular approach also lets you swap sources and features in and out and add data enrichment or archiving.

Design

Each processor node runs its own schema, created by a create.sql file. You might run multiple processors in a single instance of SQLstream s-Server. Between each processor link, - pumps will likely move data from one processor to another. A key facet of stream computing design is to keep row types of input/output streams consistent. These are equivalent to interface definitions in object-oriented methodology.

Types of Processors

Processors generally fall into three categories: source, feature, and mode. Source processors* often write into streams, using an INSERT statement. This processor receives data from external sources like log files, databases, TCP sockets, AMQP feeds, and so on, performs operations like parsing and conditioning (validations, cleansing) and produces a relational streaming output.

Feature processors primarily use SELECT statements, in that they usually read from streams instead of writing into them. This processor corresponds to an analytics pipeline to solve a business problem. Feature processors often aggregate data over time windows (sliding or otherwise) to produce results that can be delivered to target systems such as databases or real-time dashboards. You also might use these results to create a feedback loop into the real-time application. For example, in a transport system, you might have the following feature processors:

  • Travel Time, which calculates the travel time between two points in a transportation network.
  • Rapid Deceleration of Traffic, which detects rapid deceleration events in a transportation network by monitoring average speeds over different time windows.

Mode processors might have enrichment, persistence, and action elements. These combine source processors and feature processors with links among pairs of processors. Mode processors combine streaming data from multiple processors to address a specific need in the system. For example, in a transportation system, you might have a mode processor to address the separate cases of roads, rail network, maritime transportation, and so on, whereas feature processors tend to work over the entire system. As the name suggests, multi-mode processors combine data from multiple mode processors to address other needs.

Schema, File, and Processor Naming Conventions

As in object-oriented computing, naming conventions are important in SCA, because proper naming conventions will allow scripts to find the right SQL files, coders to find the right schemas, and pumps to start in the proper order.

Schema Names

Each processor (source or feature) will create a schema for all SQLstream objects in it. We recommend naming the schema using the following format, which uses a source processor as an example.

"<ProcessorType>_<SourceType>_<SourceFormat>_<version>".
e.g., "sp_gps_MinorPlanet_1"
For other Processors,
"<ProcessorType>_<ProcessorName>_<version>"
e.g., "fp_TravelTime_1", "mp_Roads_2"

SQLstream Object Naming Conventions

All SQLstream object names should use quoted CamelCase, as in “listPumps.sql”. Each processor can have one or more input/output streams. Input streams should use an “_InputStream” suffix. Likewise, Output streams should use an “_OutputStream” suffix. All intermediate objects including streams and views which are not intended to be used by other processors should use a “_private” suffix.

Pump Names

Pumps will use numeric prefixes based on the topological order of pumps in the entire pipeline. The following table describes the numeric prefixes to be used for pumps in various processors.

Processor Type Prefix
Source 800-
Mode 400-

Creating an Order for Pumps

These are done in numerical order so that they can start in order

Organizing Processors in the File System

It’s important to organize processors in the file system so that they can be easily found and easily connected to each other. We recommend the following best practices in file system organization.

Sample File Tree

A sample file tree for a Stream Computing Architecture (SCA) might look like the following:

├── server
│├── features
││├── featureOne
││├── Enrich
│││├── analyze.sql
│││├── deliver.sql
│││├── package.markdown
│││└── setup.sql
││└── Rate
││├── analyze.sql
││├── deliver.sql
││├── package.markdown
││└── setup.sql
││
│├── modes
││├── package.markdown
││└── modeOne
││├── catalog.sql
││├── interface.sql
││├── jndi
│││└── streamingsolution.mode_1.archive.properties
││├── package.markdown
││├── startPumps.sql
││├── stopPumps.sql
││└── mode_1.conf
│├── sources
││└── sourceOne
││├── acquire.sql
││├── condition.sql
││├── deliver.sql
││├── package.markdown
││└── setup.sql
│└── utils
│├── deploy.sh
│├── functions.sql
│├── generateCatalog.sh
│├── listPumps.sql
│├── package.markdown
│├── sqldoc.sql
│└── wrappers.sql
└── streaming_solution_src.tgz
Directory Tree Explanation

-* utils/wrappers.sql* This file will contain definitions for all data wrappers for external sources such as logfiles and databases.

  • utils/functions.sql This file will contain utility functions and UDXs used by various processor components.
  • utils/src/ This directory tree will be for java source code for UDXs developed by SQLstream.
  • sources///… This directory will contain all the scripts and other files for a source processor. For example, the processor for a GPS data feed might sit in a directory calledserver/sources/gps*
  • features/. This directory will contain all the scripts and other files for a feature processor, such asserver/features/TravelTime/* orserver/features/FlowFactors*
  • modes/. This directory will contain definition files and scripts to generate a consolidated catalog of SQL scripts that need to be sourced to deploy the mode processor.

Contents of a Source/Feature Processor Directory

The directory for a processor will have a subset of the following files.

  • setup.sql. This file creates the schema for the processor.
  • acquire.sql This file creates the input stream for the processor. For source processors, this file may create a foreign stream to source data from external sources. It may also parse comma-separated records into individual column values using built-in functions such as VARIABLE_COLUMN_LOG_PARSE .
  • condition.sql This file contains the validations, filters, and transformations necessary to condition data from the input stream.
  • analyze.sql This file contains the analytic functions and aggregations which apply business logic. This file may not be present for source processors.
  • deliver.sql Using views and pumps, this file delivers the results of its processing to other processors through nexuses. As implemented by s-Server native streams, these nexus points act as defined interfaces for each mode/multi-mode processor.

Contents of a Mode/Multi-mode Processor directory

The mode processor or a multi-mode processor is primarily a collection of source processors and feature processors, with optional mode- or multimode-specific features. An SQLstream s-Server instance could host a mode processor or a multi-mode processor. With distributed capability, each processor may extend itself to run on multiple instances of SQLstream s-Server. The following files will be present in a mode processor directory:

  • interface.sql This file defines all public streams (nexuses) for passing streaming results among individual processors. Each processor then may define itself either as a “publisher” or a “subscriber” for a subset of these streams. -.conf* This file will contain all the source/feature processors as well as links in this mode processor.

Documentation & Comments

In order to generate automatic documentation, we recommend using markDown for inline comments. You can then generate HTML documentation (“SQLDoc”, in the same sense as “JavaDoc”) for each object in the tree, which sits in the same directory as the file tree, as in the following, which corresponds to the featureOne segment of the file tree above.

├── docs
├── css
│├── reset.css
│└── sqldoc.css
├── index.html
└── server
├── features
└── featureOne
├── Enrich
├── analyze.html
├── deliver.html
├── index.html
└── setup.html

See ://en.wikipedia.org/wiki/Markdown for details.

Troubleshooting Streaming Data

Streaming SQL is inherently time-based, and several factors determine when results are emitted. This section describes time-related issues; methods to determine whether results can be delivered earlier; and corresponding system and query changes.

Did I write my query wrong?

Let’s suppose that you have defined some streams, written some streaming queries, and started to write records into those streams. But no records are coming out yet. You’re probably wondering: Is the system slow? Did I write the query wrong?

Factors causing delay

Several factors determine when rows are emitted from a query. SQLstream s-Server runs in a correctness mode, which means that it will wait until all of the data necessary to give a correct answer are available. This form of delay is called inherent delay because it is caused directly by the SQL semantics or by a business rule requirement. An example is discussed in the section Inherent delay due to SQL query semantics ; additional factors related to correctness mode are discussed in the Delay due to failures and slow writers section.

Other factors affecting when rows are emitted include the following:

  • Data time and Wallclock time
  • Delay due to failures and slow writers
  • Delays caused by other readers and writers
  • System delay
  • Time zones

It should be remembered that, with just one exception, SQLstream’s queries have nothing to do with system time, sometimes called wallclock time because it will be the time shown on the clock on the wall of the data center. The semantics of each query are driven by data time, that is, the ROWTIME* values and rowtime bounds in the streams on which that query depends. The difference between data time and wallclock time is called wallclock delay, and time zone differences also relate to wallclock delay, as discussed later at the indicated links.

There are also various kinds of system delay, such as the system running slowly because the CPU is overloaded or because the network is slow.

Inherent delay due to SQL query semantics

SQLstream’s extended SQL contains constructs that allow you to represent delays and time windows. Those constructs tend to match the business problem being solved, so it is usually obvious that a query cannot be answered without an inherent delay.

For example, one can write a query that finds all orders that have not shipped within an hour of being placed. That query cannot output an order placed at 10:00 until 11:00 has arrived without that order having shipped. The query to find such orders will use a streaming join between the Orders and Shipments streams and, not coincidentally, the SQL semantics by which a windowed join generates row timestamps match the business rule.

We would say that this query has an inherent delay of 1 hour, because the SQL semantics are implementing a business rule whereby it is impossible to output a row saying that a 10:00 order has not been shipped in time until the Shipments stream has reached 11:00.

Data time and Wallclock time

In examples, all the times are data time, rowtimes, which may be different from wallclock time. According to data time, it is 10:00 when the Orders stream sends a row (or a rowtime bound) timestamped 10:00 or later. Typically this will happen very soon after 10:00, of course. Using NTP (Network Time Protocol) on all computers can ensure that system clocks are closely synchronized. However, the system will still operate correctly if there is an offset between clocks.

In general, application data arrives with rowtimes that may bear no resemblance to wallclock time, although under many real world circumstances, data time lags only slightly behind wallclock time. SQLstream’s query processing only refers to the system time (wallclock time) in two places:

  • If a record is inserted into a stream without an explicit rowtime, SQLstream’s driver generates a timestamp value using the system clock.
  • When a user calls any of the following time functions, it requests the wallclock time from the SQLstream system:CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP, LOCALTIME, and LOCALTIMESTAMP.

Delay due to failures and slow writers

SQLstream s-Server’s correctness mode of operation makes it easy to write applications that always produce the right results, even in a complex distributed system with many clients and servers. But if a producer crashes or goes offline while it is feeding rows into a query, or if it is just running slowly, then the query will wait for it.

For some applications, correctness mode is exactly what is needed. The query will wait until the producer catches up. Some applications would rather produce a result that is possibly incorrect than wait for one or two producers that are running slow.

If the producer is producing data infrequently, it can aid throughput speed and efficiency by periodically sending a current rowtime bound during a gap in its output rows. Receiving such a bound can enable waiting processes, happy to know there will be no further data from that stream earlier than that bound, to release results that have no further dependency on data with rowtimes up to that bound.

Another solution is for the producer to close its prepared INSERT statement. This tells SQLstream not to wait for the producer to send rows or rowtime bounds for the query to make progress. The implication of this strategy is that if the producer wants to rejoin the query, it will need to send rowtimes at or greater than the high water mark that the query has reached. Anything less will be rejected as an out-of-order row.

Delays caused by other readers and writers

Suppose a stream S1 is producing 1 million rows per hour, and there are two processes reading from it, one of which is only reading 100,000 rows per hour. As time goes by, that one process falls further and further behind in reading the results sent by the producer.

The SQLstream system can not throw away an output row until it has been read by both consumers. It can either produce the data, and buffer it until the slower reader is ready for it, or delay input stream processing until output streams have caught up.

Delaying input stream processing lets the data back up through the system, like traffic backing up on a freeway. Feeder streams will eventually be prevented from sending records because their consumer, S1, has allowed its input queues to fill up. A further side-effect is that as those feeder streams are prevented from writing, any other queries that depend on those writers will also be forced to wait. S1’s delay eventually starves the faster reader, forcing it to wait because new records are not being generated.

Currently in such a scenario, SQLstream tends to let data back up, because this is more efficient, at least for small amounts of back up. It saves writing data to disk, and it tends to smooth out stream processing. This is particularly true for streams that produce rows at irregular rates, enabling the system work on larger numbers of records at a time, which tends to be more efficient.

For example, the following diagram shows the gridlock that ensues when there is a slow reader. Reader 1 is running slowly, and its buffer is full of unread rows (black). Reader 2 is reading results from same query, but is keeping pace with the query. The join operator cannot produce more data until reader 1 has read the existing data, so it suspends operation (blocks).

There are several effects. First, Reader 2 is starved of more data. Second, the backlog spreads to the join operator’s ancestors: Stream B, Writer 2 and Writer 3 are particularly affected, and block when their output buffers fill up, Stream A and Writer 1 less so. Third, because Stream B is unable to make progress, its other descendant, Reader 3, is starved. It is initially surprising that Reader 3 is affected, since it is neither an ancestor nor a descendant of Reader 1, the root cause of the backlog. Just like gridlock spreading through a congested highway network, flow problems can have far-reaching effects. To avoid this, Reader 1 needs to be re-engineered to run faster - perhaps by partitioning its work across a number of processes.

System delay

When there is a delay getting results from a system, the first assumption is that this is because SQLstream is running slowly. Actually, this is rarely the case, because SQLstream generally processes data very efficiently (much faster than a database, for example).

Depending on hardware specification of the system, a single node SQLstream system can generally handle tens of thousands of records per second, and/or hundreds of active queries. Under moderate loads, the system will shift its workload to work more efficiently at the expense of a slightly increased delay.

System delay can also arise from network traffic. Network latency can occur at various parts of the system. SQLstream uses TCP/IP for communications with Java clients and within the system. TCP/IP can have a significant delay, particularly over the Internet or other wide area networks (WANs), if the network is busy, and over wireless networks where there is radio interference. SQLstream’s Streaming Data Protocol (SDP) stack organizes rows into batches for efficient transmission. If there are several open streams, it will combine records from multiple streams into a single network packet. This batching increases efficiency, but may add a small delay.

Time zones

SQLstream’s time semantics, and in particular the TIMESTAMP values held in the ROWTIME column and passed as rowtime bounds, are consistent with the SQL standard. A timestamp value has no time zone associated with it. For example, the value TIMESTAMP ‘2001-01-01 00:00:00’ represents the start of the millennium, but interpretation is left to the system architect.

Time data accessed via JDBC are accessed as Java timestamp values and follow Java data semantics. A Java java.sql.Timestamp object represents a particular moment in time. It contains a long (64 bit signed integer) field that represents the number of milliseconds since January 1st, 1970 UTC. (For example, on any particular day, 6:00AM PST and 9:00AM EST are the same moment, and both correspond to the same java.sql.Timestamp.)

There is a tension between these notions of time, which system architects generally resolve by standardizing on UTC as the timezone for the SQLstream system. JDBC clients can remain in their local timezone.

If you read/write data using the JDBC setTimestamp(Timestamp) and getTimestamp() methods, timestamp values will automatically be converted (by the java runtime library) to UTC timestamp values. An alternative design is for the JDBC client to locate itself in the UTC zone; in this case the java library does no conversion.