Most s-Server applications are built as pipelines. Pipelines move data from sources (such as a Kafka topic, or log file, or AMQP message) to sinks (such as a Kafka topic, an RDBMS system, or a StreamLab application). Their exact composition will depend on your business requirements, but they are generally a series of internal s-Server streams, pumps, sources (foreign streams), and sinks (foreign streams).
This topic features the following subtopics:
Pipelines consist of the following elements:
|Source Streams||These are streaming read access to third-party platforms, such as the file system, databases, sockets, Kafka, AMQP, or Amazon Kinesis. You set these up by defining a server object and a foreign stream object. Once these are established, you can use pumps to move data from them into named streams.|
|Named Streams||These are intermediate tanks in the pipeline that you “fill” by starting a pump. You can get data from these by running a SELECT query. The key thing about intermediate streams (tanks) is that the data in them evaporates if there are no readers hooked up to them. You will often use these to move subsets of data around, so that such data is available for analytic views.|
|Views||Views are reusable queries. These are often used to cleanse or analyze data.|
|Pumps||Pumps (INSERT macros) are used in s-Server to pass data from one point in a streaming pipeline to the other. You can think of a pipeline as similar to a plumbing network. In this model, pumps are like faucets installed on top of a tank (a named stream). These take water from water processing equipment and push it into the tank. The processing equipment is assembled on demand based on the description of the processing (view definition).|
|Sink Streams or Tables||These are “tanks” that feed other systems, such as visualization tools or databases.|
It is generally best practice to create all of these objects within the same schema. This will let you do things like drop all streams at once, or start all pumps at once. If you are using SQLline to communicate with s-Server, you could create all of these items in a single SQL script and then use the !run command to run the script.
In order for data to move through a pipeline, stream columns must always be compatible. Otherwise, errors might result.
A source stream needs information to connect to its source. The code below sets up a server object for a file accessible through the file system.
CREATE OR REPLACE SERVER "BusFileReaderServer" FOREIGN DATA WRAPPER ECDA OPTIONS (classname 'com.sqlstream.aspen.namespace.common.FileSetColumnSet', parser 'CSV', character_encoding 'UTF-8', separator ',', skip_header 'false', directory '/tmp', filename_pattern 'buses\.log' );
It then creates a foreign stream that uses this server object to pull data out of the file. The result is a stream with columns that you can query or pump to a named stream.
CREATE OR REPLACE SCHEMA "buses"; SET SCHEMA '"buses"'; CREATE OR REPLACE FOREIGN STREAM "buses_stream" ( "id" DOUBLE, --Identification number for the bus. "reported_at" TIMESTAMP, --Time location was reported. "shift_no" DOUBLE, --Shift number for the bus's driver. "trip_no" VARCHAR(4096), --Trip number for the bus. "route_variant_id" VARCHAR(4096), --ID number for bus route. "waypoint_id" VARCHAR(4096), --ID number for bus waypoint. "lat" VARCHAR(4096), --Latitude of location. "lon" VARCHAR(4096), --Longitude of location. "speed" DOUBLE, --Reported speed of bus. "bearing" VARCHAR(4096), --Navigational bearing for bus. "driver_no" DOUBLE, --Driver identification for number. ) SERVER "BusFileReaderServer" --Server created in the previous step. --Provides connection information for the log file. ;
To enrich data means to take streaming data and use historical data to add information to the stream. The streaming data from our buses demonstration includes information on buses longitude and latitude. Using information from a mapping database, we can identify the road segment for a particular latitude/longitude.
CREATE OR REPLACE VIEW "ConditionedPositionsWithRoadInfo" DESCRIPTION 'Vehicle positions and road information' AS SELECT STREAM FROM STREAM("roadInfo"(CURSOR(SELECT STREAM VID, "DateTime", CAST(NULL AS SMALLINT) AS "segmentId", "vLat", "vLon", "Bearing" AS "vBearing", "Speed" AS "vSpeed" FROM "Stage1" WHERE MISSING = 0 AND MISMATCH = 0 AND BOUNDS = 0), CURSOR(SELECT STREAM FROM "RoadInfoControl"), 'road_segments', 'vLat', 'vLon', 10000, -- cache false, -- no preload false -- no fuzzy ));
Here, you use SQL or a UDX to perform some calculation on your data. This might be calculating a running average, sorting data into groups, calculating frequency distribution, calculating deviation, and so on. This is the middle of the pipeline, after data has been pulled into s-Server and before it is visualized or written. Often, you will use views to perform such analysis. Views are reusable “macros” that
For example, the SQL below creates a view that identifies speeding buses by testing if speed is over 75 mph.
CREATE or REPLACE VIEW "buses"."speeders" AS SELECT STREAM FROM "buses"."buses_with_rowtime" WHERE "speed" > 75;
Here, we are creating a server object that connects with an external database.
CREATE OR REPLACE SERVER "PostgreSQL_DB_1" FOREIGN DATA WRAPPER "SYS_JDBC" OPTIONS ( "URL" 'jdbc:postgresql://localhost/demo', "USER_NAME" 'demo', "PASSWORD" 'demo', "SCHEMA_NAME" 'public', "DIALECT" 'PostgreSQL', "pollingInterval" '1000', "txInterval" '1000', "DRIVER_CLASS" 'org.postgresql.Driver' );
We then create a foreign table that uses this server object. As you move data into this table, it automatically moves to the defined table in the foreign database.
SET SCHEMA '"buses"'; CREATE FOREIGN TABLE "postgres_archive" --these are column names for the foreign table as it exists in s-Server ("id" double, "reported_at" TIMESTAMP, "shift_no" DOUBLE, "trip_no" DOUBLE) SERVER "PostgreSQL_DB_1" OPTIONS ( "SCHEMA_NAME" 'public', --this is the table name in the postgres database. "TABLE_NAME" 'buses_archive', --Amount of data to wait for before committing. "TRANSACTION_ROW_LIMIT" '0', --Amount of time to wait between commits. "TRANSACTION_ROWTIME_LIMIT" '1000' );
For all of the above stages, we need to create pumps to move data from stage to stage. Like streams, views, and tables, you create pumps in schemas. By default, pumps are created as stopped.
CREATE PUMP "buses"."postgres-pump" AS INSERT INTO "buses"."postgres_archive" ("id", "reported_at", "shift_no", "trip_no") SELECT STREAM "id", "reported_at", "shift_no", "trip_no" FROM "buses"."buses";
You can start all pumps in a single schema by using ALTER PUMP yourschema.* START. This ensures that all pumps are started in what we call topological order. Starting pumps this way ensures that data will be available for all streams.
ALTER PUMP "buses".* START;
This section describes tips for building and troubleshooting application pipelines.
This topic describes how to organize SQL files using directory structures and naming conventions that best suit stream computing. Stream computing has particular structuring requirements when it comes to segmenting and organizing SQL, in that data arrives from sources in various locations in various format, and the same set of data may be leveraged for multiple business use cases. We recommend building a componentized stream computing system as described below, in order to provide yourself the most flexibility in data parsing, enriching, and analyzing.
Some of the common tasks involved with stream computing include:
We use the term “processor” to refer to groups of SQL and plugins devoted to a particular task or group of tasks. Processors correspond to groups of SQL files that reference both each other and the plugins required to interface with other systems. Processors and links are the basic building blocks of a stream computing application. For example, you might have a source processor with the following files:
By building your streaming computing system in this modular way, you can mix and match your source, feature, and mode processors to acquire, condition, enrich, and archive data where you need it. This modular approach also lets you swap sources and features in and out and add data enrichment or archiving.
Each processor node runs its own schema, created by a create.sql file. You might run multiple processors in a single instance of SQLstream s-Server. Between each processor link, - pumps will likely move data from one processor to another. A key facet of stream computing design is to keep row types of input/output streams consistent. These are equivalent to interface definitions in object-oriented methodology.
Processors generally fall into three categories: source, feature, and mode. Source processors* often write into streams, using an INSERT statement. This processor receives data from external sources like log files, databases, TCP sockets, AMQP feeds, and so on, performs operations like parsing and conditioning (validations, cleansing) and produces a relational streaming output.
Feature processors primarily use SELECT statements, in that they usually read from streams instead of writing into them. This processor corresponds to an analytics pipeline to solve a business problem. Feature processors often aggregate data over time windows (sliding or otherwise) to produce results that can be delivered to target systems such as databases or real-time dashboards. You also might use these results to create a feedback loop into the real-time application. For example, in a transport system, you might have the following feature processors:
Mode processors might have enrichment, persistence, and action elements. These combine source processors and feature processors with links among pairs of processors. Mode processors combine streaming data from multiple processors to address a specific need in the system. For example, in a transportation system, you might have a mode processor to address the separate cases of roads, rail network, maritime transportation, and so on, whereas feature processors tend to work over the entire system. As the name suggests, multi-mode processors combine data from multiple mode processors to address other needs.
As in object-oriented computing, naming conventions are important in SCA, because proper naming conventions will allow scripts to find the right SQL files, coders to find the right schemas, and pumps to start in the proper order.
Each processor (source or feature) will create a schema for all SQLstream objects in it. We recommend naming the schema using the following format, which uses a source processor as an example.
"<ProcessorType>_<SourceType>_<SourceFormat>_<version>". e.g., "sp_gps_MinorPlanet_1" For other Processors, "<ProcessorType>_<ProcessorName>_<version>" e.g., "fp_TravelTime_1", "mp_Roads_2"
All SQLstream object names should use quoted CamelCase, as in “listPumps.sql”. Each processor can have one or more input/output streams. Input streams should use an “_InputStream” suffix. Likewise, Output streams should use an “_OutputStream” suffix. All intermediate objects including streams and views which are not intended to be used by other processors should use a “_private” suffix.
Pumps will use numeric prefixes based on the topological order of pumps in the entire pipeline. The following table describes the numeric prefixes to be used for pumps in various processors.
These are done in numerical order so that they can start in order
It’s important to organize processors in the file system so that they can be easily found and easily connected to each other. We recommend the following best practices in file system organization.
A sample file tree for a Stream Computing Architecture (SCA) might look like the following:
├── server │├── features ││├── featureOne ││├── Enrich │││├── analyze.sql │││├── deliver.sql │││├── package.markdown │││└── setup.sql ││└── Rate ││├── analyze.sql ││├── deliver.sql ││├── package.markdown ││└── setup.sql ││ │├── modes ││├── package.markdown ││└── modeOne ││├── catalog.sql ││├── interface.sql ││├── jndi │││└── streamingsolution.mode_1.archive.properties ││├── package.markdown ││├── startPumps.sql ││├── stopPumps.sql ││└── mode_1.conf │├── sources ││└── sourceOne ││├── acquire.sql ││├── condition.sql ││├── deliver.sql ││├── package.markdown ││└── setup.sql │└── utils │├── deploy.sh │├── functions.sql │├── generateCatalog.sh │├── listPumps.sql │├── package.markdown │├── sqldoc.sql │└── wrappers.sql └── streaming_solution_src.tgz
-* utils/wrappers.sql* This file will contain definitions for all data wrappers for external sources such as logfiles and databases.
The directory for a processor will have a subset of the following files.
The mode processor or a multi-mode processor is primarily a collection of source processors and feature processors, with optional mode- or multimode-specific features. An SQLstream s-Server instance could host a mode processor or a multi-mode processor. With distributed capability, each processor may extend itself to run on multiple instances of SQLstream s-Server. The following files will be present in a mode processor directory:
In order to generate automatic documentation, we recommend using markDown for inline comments. You can then generate HTML documentation (“SQLDoc”, in the same sense as “JavaDoc”) for each object in the tree, which sits in the same directory as the file tree, as in the following, which corresponds to the featureOne segment of the file tree above.
├── docs ├── css │├── reset.css │└── sqldoc.css ├── index.html └── server ├── features └── featureOne ├── Enrich ├── analyze.html ├── deliver.html ├── index.html └── setup.html
See ://en.wikipedia.org/wiki/Markdown for details.
Streaming SQL is inherently time-based, and several factors determine when results are emitted. This section describes time-related issues; methods to determine whether results can be delivered earlier; and corresponding system and query changes.
Let’s suppose that you have defined some streams, written some streaming queries, and started to write records into those streams. But no records are coming out yet. You’re probably wondering: Is the system slow? Did I write the query wrong?
Several factors determine when rows are emitted from a query. SQLstream s-Server runs in a correctness mode, which means that it will wait until all of the data necessary to give a correct answer are available. This form of delay is called inherent delay because it is caused directly by the SQL semantics or by a business rule requirement. An example is discussed in the section Inherent delay due to SQL query semantics ; additional factors related to correctness mode are discussed in the Delay due to failures and slow writers section.
Other factors affecting when rows are emitted include the following:
It should be remembered that, with just one exception, SQLstream’s queries have nothing to do with system time, sometimes called wallclock time because it will be the time shown on the clock on the wall of the data center. The semantics of each query are driven by data time, that is, the ROWTIME* values and rowtime bounds in the streams on which that query depends. The difference between data time and wallclock time is called wallclock delay, and time zone differences also relate to wallclock delay, as discussed later at the indicated links.
There are also various kinds of system delay, such as the system running slowly because the CPU is overloaded or because the network is slow.
SQLstream’s extended SQL contains constructs that allow you to represent delays and time windows. Those constructs tend to match the business problem being solved, so it is usually obvious that a query cannot be answered without an inherent delay.
For example, one can write a query that finds all orders that have not shipped within an hour of being placed. That query cannot output an order placed at 10:00 until 11:00 has arrived without that order having shipped. The query to find such orders will use a streaming join between the Orders and Shipments streams and, not coincidentally, the SQL semantics by which a windowed join generates row timestamps match the business rule.
We would say that this query has an inherent delay of 1 hour, because the SQL semantics are implementing a business rule whereby it is impossible to output a row saying that a 10:00 order has not been shipped in time until the Shipments stream has reached 11:00.
In examples, all the times are data time, rowtimes, which may be different from wallclock time. According to data time, it is 10:00 when the Orders stream sends a row (or a rowtime bound) timestamped 10:00 or later. Typically this will happen very soon after 10:00, of course. Using NTP (Network Time Protocol) on all computers can ensure that system clocks are closely synchronized. However, the system will still operate correctly if there is an offset between clocks.
In general, application data arrives with rowtimes that may bear no resemblance to wallclock time, although under many real world circumstances, data time lags only slightly behind wallclock time. SQLstream’s query processing only refers to the system time (wallclock time) in two places:
SQLstream s-Server’s correctness mode of operation makes it easy to write applications that always produce the right results, even in a complex distributed system with many clients and servers. But if a producer crashes or goes offline while it is feeding rows into a query, or if it is just running slowly, then the query will wait for it.
For some applications, correctness mode is exactly what is needed. The query will wait until the producer catches up. Some applications would rather produce a result that is possibly incorrect than wait for one or two producers that are running slow.
If the producer is producing data infrequently, it can aid throughput speed and efficiency by periodically sending a current rowtime bound during a gap in its output rows. Receiving such a bound can enable waiting processes, happy to know there will be no further data from that stream earlier than that bound, to release results that have no further dependency on data with rowtimes up to that bound.
Another solution is for the producer to close its prepared INSERT statement. This tells SQLstream not to wait for the producer to send rows or rowtime bounds for the query to make progress. The implication of this strategy is that if the producer wants to rejoin the query, it will need to send rowtimes at or greater than the high water mark that the query has reached. Anything less will be rejected as an out-of-order row.
Suppose a stream S1 is producing 1 million rows per hour, and there are two processes reading from it, one of which is only reading 100,000 rows per hour. As time goes by, that one process falls further and further behind in reading the results sent by the producer.
The SQLstream system can not throw away an output row until it has been read by both consumers. It can either produce the data, and buffer it until the slower reader is ready for it, or delay input stream processing until output streams have caught up.
Delaying input stream processing lets the data back up through the system, like traffic backing up on a freeway. Feeder streams will eventually be prevented from sending records because their consumer, S1, has allowed its input queues to fill up. A further side-effect is that as those feeder streams are prevented from writing, any other queries that depend on those writers will also be forced to wait. S1’s delay eventually starves the faster reader, forcing it to wait because new records are not being generated.
Currently in such a scenario, SQLstream tends to let data back up, because this is more efficient, at least for small amounts of back up. It saves writing data to disk, and it tends to smooth out stream processing. This is particularly true for streams that produce rows at irregular rates, enabling the system work on larger numbers of records at a time, which tends to be more efficient.
For example, the following diagram shows the gridlock that ensues when there is a slow reader. Reader 1 is running slowly, and its buffer is full of unread rows (black). Reader 2 is reading results from same query, but is keeping pace with the query. The join operator cannot produce more data until reader 1 has read the existing data, so it suspends operation (blocks).
There are several effects. First, Reader 2 is starved of more data. Second, the backlog spreads to the join operator’s ancestors: Stream B, Writer 2 and Writer 3 are particularly affected, and block when their output buffers fill up, Stream A and Writer 1 less so. Third, because Stream B is unable to make progress, its other descendant, Reader 3, is starved. It is initially surprising that Reader 3 is affected, since it is neither an ancestor nor a descendant of Reader 1, the root cause of the backlog. Just like gridlock spreading through a congested highway network, flow problems can have far-reaching effects. To avoid this, Reader 1 needs to be re-engineered to run faster - perhaps by partitioning its work across a number of processes.
When there is a delay getting results from a system, the first assumption is that this is because SQLstream is running slowly. Actually, this is rarely the case, because SQLstream generally processes data very efficiently (much faster than a database, for example).
Depending on hardware specification of the system, a single node SQLstream system can generally handle tens of thousands of records per second, and/or hundreds of active queries. Under moderate loads, the system will shift its workload to work more efficiently at the expense of a slightly increased delay.
System delay can also arise from network traffic. Network latency can occur at various parts of the system. SQLstream uses TCP/IP for communications with Java clients and within the system. TCP/IP can have a significant delay, particularly over the Internet or other wide area networks (WANs), if the network is busy, and over wireless networks where there is radio interference. SQLstream’s Streaming Data Protocol (SDP) stack organizes rows into batches for efficient transmission. If there are several open streams, it will combine records from multiple streams into a single network packet. This batching increases efficiency, but may add a small delay.
SQLstream’s time semantics, and in particular the TIMESTAMP values held in the ROWTIME column and passed as rowtime bounds, are consistent with the SQL standard. A timestamp value has no time zone associated with it. For example, the value TIMESTAMP ‘2001-01-01 00:00:00’ represents the start of the millennium, but interpretation is left to the system architect.
Time data accessed via JDBC are accessed as Java timestamp values and follow Java data semantics. A Java java.sql.Timestamp object represents a particular moment in time. It contains a long (64 bit signed integer) field that represents the number of milliseconds since January 1st, 1970 UTC. (For example, on any particular day, 6:00AM PST and 9:00AM EST are the same moment, and both correspond to the same java.sql.Timestamp.)
There is a tension between these notions of time, which system architects generally resolve by standardizing on UTC as the timezone for the SQLstream system. JDBC clients can remain in their local timezone.
If you read/write data using the JDBC setTimestamp(Timestamp) and getTimestamp() methods, timestamp values will automatically be converted (by the java runtime library) to UTC timestamp values. An alternative design is for the JDBC client to locate itself in the UTC zone; in this case the java library does no conversion.