Tutorial 3: Using a Pump

In the last two tutorials, we imported data into s-Server and applied simple analytics to this data. Next, we’re going to learn to move this data around–from stream to stream, from table to stream, from stream to table, and so on. Moving data from place to place involves pumps.

Because of the nature of streaming data–it has to be moving in order for you to work with it–pumps are a crucial part of programming with s-Server. They let you perform different analytics on the same data set, “enrich” data using historical data. Above all, they let you query data at multiple stages in your application.

Eventually, you will develop pipelines* with pumps, whereby a series of pumps move data from stream to stream. For now, we’re going to work on making a simple pump to move data from one stream to another. Then, we’ll apply the same skill to moving data from an s-Server stream to an external database.

In technical terms, a pump is a SQLstream schema object that provides a continuously running INSERT INTO stream SELECT … FROM query functionality.

Basically, you set up a pump so that it selects data from one source (a stream or table) and inserts it into a sink (another stream or table). It does so until you stop the pump. It’s a good idea to create pumps stopped (they’re created as stopped by default), and then use a statement called ALTER to start the pump. (When you have multiple pumps, you should start them all using the same ALTER statement. When you do so, s-Server automatically ensures that they start in the right order for your application.) As a source, we’re going to use the stream that we set up in the previous tutorial. That stream was called “buses”.“buses_stream”.

This stream should still exist in s-Server. To get data flowing into the stream, we also need to make sure that the Buses demo data generator is running.

Starting the Buses Demo

Open the SQLstream desktop folder and double-click theStream CSV Bus Data icon. (You can also use the s-Server Guavus SQLstream cover page, also accessible from this folder.)

Once you click the application’s icon, an information window opens.

Once you start this data generator, the stream “buses”.“buses_stream” should have data flowing. To test this, open up sqllineClient by double*-clicking the sqlLine icon in the SQLstream desktop folder.

In the terminal window that opens, enter the following line:

SELECT STREAM* from "buses"."buses_stream";

This tells s-Server “get me every column in the streambuses_stream which is in the schema “buses”.

Note that we have to use the STREAM keyword here because we’re selecting data from a stream. Every time you run a SELECT on a stream in s-Server (or a streaming VIEW), you need to use this keyword.

The keyword tells s-Server that you are querying a stream whose results go on forever.

You should see data flowing:

After you’ve confirmed that data is flowing, you can close the terminal window.

Creating a Native Stream to Serve as a Destination

The next thing that we’re going to do is create a simple native stream–a stream that handles data within s-Server–that will serve as a sink, or destination, for the pump.

Double-click the SQLline icon again to open a new terminal window.

We can create a native stream pretty simply. We just do a CREATE OR REPLACE STREAM statement, give the stream a name and schema (we can use the same “buses” schema that we used in the previous tutorial), and designate columns for the stream. Here, we’ll just add two columns, “bus” and “time_reported”.

Enter the following code into sqlline. You can also open a text editor, enter these lines of code, and save the file with a .sql file extension. Then you can use the !run command in SQLline to run the code. This can be easier to debug than entering code directly into sqlline.

CREATE OR REPLACE STREAM "buses"."pump_stream"
(
"bus" VARCHAR(4096),
"time_reported" TIMESTAMP
);

To confirm that this stream has been created, enter the statement

SELECT STREAM FROM "buses"."pump_stream";

If you don’t get an error, the stream has been created. (You won’t see any data in the stream, because none is there yet.)

Leave this window open with the SELECT statement running. Once we start the pump, we will see data flowing in this terminal.

Matching Data

When you pump data from a source stream/table to a target stream/table, you need to make sure that column data types are compatible.

For example:

  • Columns of type VARCHAR() or CHAR() are compatible with columns with character types, as well as date and time types.
  • Columns of type BIGINT, DOUBLE, INTEGER are compatible with columns with numeric types .
  • Columns of type BOOLEAN are compatible with other BOOLEAN columns only.

If you’re creating a new stream, this is often not a problem, as you can simply make sure the target stream’s columns match those of the source stream. If you’re pumping data to an external database table, you will need to make sure the columns in the target database table also match.

If you try to pump data of the wrong type into a column, data transfer will often fail. For example, if you try to pump data with nonnumeric characters into a column of type DOUBLE, the transfer will fail.

Creating the Pump

You create pumps like any other SQL object, using a CREATE statement. Like streams, pumps need to be created within schemas. The pump below is going to copy data from “buses”.“test” to “buses”.“pump_stream”. We pick the columns into which we want to copy data–in this case, “driver” and “time_reported”. Then, we select the columns from which we want to copy–in this case, “id” and “reported_at”. Open a new SQLline window and enter the following code into sqlline.

CREATE OR REPLACE PUMP "buses"."my-Pump" AS
INSERT INTO "buses"."pump_stream"
("driver", "time_reported")
SELECT STREAM
"id",
"reported_at"
FROM "buses"."test";

By default, pumps are created as stopped. That means when you initially create them, they don’t move any data.

To start the pump, enter the following line:

ALTER PUMP "buses"."my-Pump" START;

Return to the terminal with the SELECT statement running. You should start to see data flowing in this window.

Now, return to the previous terminal and enter

ALTER PUMP "buses"."my-Pump" STOP;

Observe that data stops flowing in the terminal with the SELECT statement running. That’s because we just stopped the pump.

We’ve just used a pump to copy data from one streaming stream to stream. As long as data continues to flow into the first stream, data will also flow in the second stream. Using pumps is a key part of moving data around in s-Server. They are important because you can only get query results while data is flowing in a stream.

Another common use for pumps is to move data from a stream into an external database, in order to archive data. We will cover how to do so in the next section of this tutorial.

Using a Pump to Archive Data

Now that you’ve learned the basics of using a pump, we’re going to use a pump to move data from s-Server to an external database.

Because s-Server does not store data persistently–that is, after it has stopped flowing–you may want to use an external database to archive data.

Using an interface called SQL/MED, you can access tables in foreign databases as if they were present in s-Server. These are called foreign tables, and you create them just like any other stream or table. To tell s-Server how to connect to the external database, you use a server object. In tutorial # 2, we created one of these to pull data into s-Server from a file. This time, we create a server object with the URL, user name/password, and other information needed to connect to a Microsoft SQLserver, PostgreSQL, MySQL, Oracle, Terradata, or other database. The following steps describe how to set up a connection to a database, and then create a - pump* to move data from s-Server into the database.

Here, we create a server object with credentials for the external database: url, user name, password, driver name, and schema. s-Server can only communicate with one database schema for each server object.

Again, in order to move data into an external database, we need to set up two objects in s-Server:

  • a foreign table that maps to a table in the database.
  • a server object with connection information for the database.

Setting Up a Server Object

To create a server object that contains connection information for the database. The particular credentials here will depend on your database. See your database’s administrator if you do not know these credentials.

The code below creates a serve object for a PostgreSQL database running on a local machine.

CREATE OR REPLACE SERVER "PostgreSQL_DB_1"
FOREIGN DATA WRAPPER "SYS_JDBC"
OPTIONS (
"URL" 'jdbc:postgresql://localhost/demo',
"USER_NAME" 'demo',
"PASSWORD" 'demo',
"SCHEMA_NAME" 'public',
"DIALECT" 'PostgreSQL',
"pollingInterval" '1000',
"txInterval" '1000',
"DRIVER_CLASS" 'org.postgresql.Driver'
);

Now we’re going to set up a foreign table in s-Server, which references this server object. The foreign table matches a table in the database whose connection we just defined. A foreign table essentially provides an “interface” to a table in an external database, allowing you to move data in and out of this table from within s-Server.

To set up a foreign table in s-Server, use code along the following lines. As with streams and pumps, we create foreign tables within schemas. The code below first sets the same schema buses that we’ve been using all along.

We do need to make sure that if we declare columns for the foreign table, that these columns have compatible types. For example, if you have a column defined as type DOUBLE in s-Server, it needs to correspond to a column in the foreign database table with type DOUBLE. s-Server also give you options to manage how frequently you update the database:

  • The first option, TRANSACTION_ROWTIME_LIMIT, lets you limit update frequency in terms of time, telling s-Server “wait this number of milliseconds before updating”. Here, we’ve set this value to 1000, or one second. This means that the database table will be updated every second.
  • The second option, TRANSACTION_ROW_LIMIT, lets you limit update frequency in terms of the amount of data to be updated. The default value is 65536/maxSize, where maxSize is the maximum row size of the input stream. Here, we’ve set this value to 0. This means that the database will be updated each second (per TRANSACTION_ROWTIME_LIMIT*) without regards to the amount of data being updated.
SET SCHEMA '"buses"';
CREATE FOREIGN TABLE "postgres_archive"
--these are column names for the foreign table as it exists in s-Server
("id" double,
"reported_at" TIMESTAMP,
"shift_no" DOUBLE,
"trip_no" DOUBLE)
SERVER "PostgreSQL_DB_1"
OPTIONS (
"SCHEMA_NAME" 'public',
--this is the table name in the postgres database.
"TABLE_NAME" 'buses_archive',
--Amount of data to wait for before committing.
"TRANSACTION_ROW_LIMIT" '0',
--Amount of time to wait between commits.
"TRANSACTION_ROWTIME_LIMIT" '1000'
);

Now that we’ve created the foreign table in s-Server, we can easily move data into the external base, simply by pumping it into this foreign table.

Here, we’re going to create a pump to move data from s-Server into the external database. This pump will SELECT on the “buses” stream with which we’ve been working and INSERT into the foreign table. The effect will be to copy data from the s-Server stream into the external database table.

CREATE PUMP "buses"."postgres-pump" AS
INSERT INTO "buses"."postgres_archive"
("id", "reported_at", "shift_no", "trip_no")
SELECT STREAM "id", "reported_at", "shift_no", "trip_no"
FROM "buses"."buses";

Once again, pumps are created as stopped by default. To get data moving, we just need to execute an ALTER PUMP statement.

ALTER PUMP "buses"."postgres-pump" START;

At this point, your external database table should have data added to the “id”, “reported_at”, “shift_no”, and “trip_no” columns.

Once you get data into s-Server, pumps are a great way to move data around. In the next section, we’ll learn about the importance of VIEWS in s-Server.