Using s-Server for Machine Learning with SystemML

s-Server integrates Apache SystemML to provide real-time predictions based on continuous machine learning. Using data from s-Server, you can create continuously-updating prediction models, so that models train without interrupting the data pipeline.

Two s-Server components support this:

  1. A training component, which supports 19 different machine learning algorithms. As part of an s-Server pipeline, data is continuously streamed at a model. As the model is trained, it outputs a version number, which is used to dynamically update predictions without stopping the pipeline. You can determine how often this model is updated by setting a policy.
  2. A function that uses output from the training phase to make predictions/scoring. This function outputs both a prediction field and a model number, so that you can see the data model on which the prediction is based.

Your scoring function is expressed using SystemML’s DML (Declarative Machine Learning) language. For more information on DML, see

How to Build Models and Recreate Them by Using s-Server’s File Writer

To begin making predictions with s-Server, you should first identify data that you can use to “train” a SystemML model. This data should include one column that is predictive. A list of products, for example, might include a column predicting whether a given product is a good buy. The other columns might include information on price on this site, prices from competitor sites, customer ratings, and so on. This should be the same data that you will pass to the SystemML function in s-Server.

The following code creates a foreign stream to be used as a file-writing sink. is invoked through an option called “POSTPROCESS_CMD”.

CREATE or REPLACE FOREIGN STREAM "StreamLab_Output_srutinizertest"."guide_1_out_sink" (
    "id" INTEGER,
    "shift_no" DOUBLE,
    "reported_at" TIMESTAMP NOT NULL,
    "trip_no" VARCHAR(10),
    "latitude" DOUBLE,
    "longitude" DOUBLE,
    "speed" INTEGER,
    "baring" INTEGER,
    "active" BOOLEAN)
SERVER "ECDAWriterServer_1"
    "POSTPROCESS_CMD" 'scripts/ scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv',
    "SEPARATOR" ',',
    "WRITE_HEADER" 'false',
    "DIRECTORY" '/home/sqlstream/work/systemml/scripts/functions/jmlc/',
    "FILENAME_PREFIX" 'output-',
    "FILENAME_SUFFIX" '.csv',
    "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',

The POSTPROCESS_CMD option lets you run a script after the file is written. In this case, the script generates a set of “trained” data model on which you can use, later on, to calculate scores or predictibility.

The code above generates a file called crash_by_lat_long_model.mtx in the directory /home/sqlstream/work/systemml/scripts/functions/jmlc/. We will use this file below to generate a prediction.

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, Kinesis stream, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Using JMLC for Scoring on a Row-by-Row Basis

Systemml is installed on s-Server as a function called sys_boot.mgmt.predict. You can use this function to make predictions based on an existing DML file and the model data file that you generated above. The predict function requires the following parameters:

Parameter Description
inputRows Current cursor.
model Model data file.
columnIndexes Order number of the columns where we should look for values to use in prediction. For example, if you are using the 2nd, 3rd, and 5th columns, you would enter ‘2,3,5’ here.


The following code shows how we load buses data from a file and for each row we predict if something is about to happen or not based on the values taken from columns 2, 5 and 6 (shift_no, latitude, longitude).

ALTER PUMP "transit".* STOP;

CREATE OR REPLACE FOREIGN DATA WRAPPER ECDA LIBRARY 'class com.sqlstream.aspen.namespace.common.CommonDataWrapper' LANGUAGE JAVA;

OPTIONS (classname 'com.sqlstream.aspen.namespace.common.FileSetColumnSet');

"shift_no" DOUBLE,
"reported_at" TIMESTAMP,
"trip_no" VARCHAR(10),
"latitude" DOUBLE,
"longitude" DOUBLE,
"speed" INTEGER,
"bearing" INTEGER,
"active" BOOLEAN)
"ROW_SEPARATOR" u&'\000A',
"SKIP_HEADER" 'false',
"DIRECTORY" '/home/sqlstream',
"FILENAME_PATTERN" 'buses\.txt');

To call the function, use code along the following lines:

SELECT STREAM * FROM STREAM (sys_boot.mgmt.predict(cursor(SELECT STREAM * from "transit"."buses"),
'/home/sqlstream/work/systemml/scripts/functions/jmlc/crash_by_lat_long_model.mtx', 100, '2,5,6', 'prediction' ) );

This code uses a DML script in /home/sqlstream/work/systemml//scripts/functions/jmlc/ called m-svm-score.dml and a model data file in /home/sqlstream/work/systemml/scripts/functions/jmlc/ called crash_by_lat_long_model.mtx.