Machine Learning with Apache SystemML

s-Server integrates Apache SystemML (now known as Apache SystemDS) to provide real-time predictions based on continuous machine learning. Using data from s-Server, you can create continuously-updating prediction models, so that models train without interrupting the data pipeline.

s-Server supports both:

  1. A training component component, which supports 19 different machine learning algorithms. As part of an s-Server pipeline, data is continuously streamed at a model. As the model is trained, it outputs a version number, which is used to dynamically update predictions without stopping the pipeline. You can determine how often this model is updated by setting a policy.
  2. A scoring / prediction function that uses output from the training phase to make predictions/scoring. This function outputs both a prediction field and a model number, so that you can see the data model on which the prediction is based.

NOTE the SystemML plugin is an optional part of the s-Server installation; it is not included in the standard sqlstream/complete, sqlstream/development or sqlstream/minimal Docker images. To include it in an installation just add the command line switch –enable-components systemml when installing s-Server from the command line.

Learning about SystemML

SQLstream currently uses SystemML 1.2.0, documentation for which is at https://systemds.apache.org/docs/1.2.0/index. You can learn about later and earlier versions of Apache SystemML / SystemDS at https://systemds.apache.org/documentation.

Supported algorithms

See the SystemML Algorithms Reference.

The scoring function is expressed using SystemML’s DML (Declarative Machine Learning) language. For more information on DML, see the DML Language Reference.

Installing SystemML

The source code for SystemML 1.2.0 is at https://github.com/apache/systemds/tree/branch-1.2.0. Downloads of SystemML 1.20 are available here: http://archive.apache.org/dist/systemds/1.2.0/. You can download the binary tarball or the equivalent zip file and extract it under /home/sqlstream.

Assuming you unpack this under /home/sqlstream, you may want to set a variable in your .profile or .bashrc:

  export SYSTEMML_HOME=/home/sqlstream/systemml-1.2.0-bin

You may also want to create a link from the $SQLSTREAM_HOME directory:

  cd $SQLSTREAM_HOME
  ln -s $SYSTEMML_HOME systemml

In the following examples, you will need to ensure that all file paths are either:

  • relative to s-Server’s current working directory (usually that is $SQLSTREAM_HOME or /opt/sqlstream/<version>/s-Server) OR
  • an absolute path

In the SQL, do not use environment variables like $SYSTEMML_HOME in the various file name parameters as the variable will not be expanded. If you have created the systemml symbolic link from $SQLSTREAM_HOME you can use shorter relative paths starting with systemml as shown below.

Training a model

How to Train and Re-train Models Using s-Server’s File Writer

To begin making predictions with s-Server, you should first identify data that you can use to “train” a SystemML model. This data should include one column that is predictive. A list of products, for example, might include a column predicting whether a given product is a good buy. The other columns might include information on price on this site, prices from competitor sites, customer ratings, and so on. This should be the same data that you will pass to the SystemML function in s-Server.

We use a file sink for training so that the SystemML scripts have easy access to each batch of training data.

The following code creates a foreign stream to be used as a file-writing sink - see Writing to the File System. The runStandaloneSystemML.sh script is invoked through the “POSTPROCESS_COMMAND” option.

CREATE or REPLACE FOREIGN STREAM "StreamLab_Output_srutinizertest"."guide_1_out_sink" (
    "id" INTEGER,
    "shift_no" DOUBLE,
    "reported_at" TIMESTAMP NOT NULL,
    "trip_no" VARCHAR(10),
    "latitude" DOUBLE,
    "longitude" DOUBLE,
    "speed" INTEGER,
    "baring" INTEGER,
    "active" BOOLEAN
    )
SERVER "FILE_SERVER"
OPTIONS (
    "POSTPROCESS_COMMAND" 'systemml/runStandaloneSystemML.sh systemml/scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv',
    "FORMATTER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "QUOTE_CHARACTER" '',
    "SEPARATOR" ',',
    "WRITE_HEADER" 'false',
    "DIRECTORY" 'systemml/scripts/functions/jmlc/',
    "FILENAME_PREFIX" 'output-',
    "FILENAME_SUFFIX" '.csv',
    "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
    "FILE_ROTATION_SIZE" '20K',
    "FORMATTER_INCLUDE_ROWTIME" 'true'
);

The POSTPROCESS_COMMAND option lets you run a script after each file is written.

In this case, the runStandaloneSystemML.sh script is using the Binary Class Support Vector Machines algorithm embedded in systemml/scripts/algorithms/l2-svm-predict.dml - see Algorithm Classification Examples - to generate a “trained” data model. The script generates a file called crash_by_lat_long_model.mtx in the $SYSTEMML_HOME/scripts/functions/jmlc/ directory which you can later use to calculate scores or predictions.

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, Kinesis stream, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Scoring

Using JMLC for Scoring on a Row-by-Row Basis

SystemML is installed on s-Server as a function called sys_boot.mgmt.predict. You can use this function to make predictions based on an existing DML file and the model data file that you generated above. The predict function requires the following parameters:

Parameter Description
inputRows Current cursor.
model Model data file.
columnIndexes Order number of the columns where we should look for values to use in prediction. For example, if you are using the 2nd, 3rd, and 5th columns, you would enter ‘2,3,5’ here. Column numbering starts at 1 for ROWTIME.

Example

The following code shows how we read buses data from a file and for each row we predict if something is about to happen or not based on the values taken from columns 2, 5 and 6 (shift_no, latitude, longitude).

CREATE OR REPLACE SCHEMA "transit";
ALTER PUMP "transit".* STOP;

CREATE OR REPLACE SCHEMA "transit";

CREATE OR REPLACE FOREIGN STREAM "transit"."buses" 
( "id" INTEGER
, "shift_no" DOUBLE
, "reported_at" TIMESTAMP
, "trip_no" VARCHAR(10)
, "latitude" DOUBLE
, "longitude" DOUBLE
, "speed" INTEGER
, "bearing" INTEGER
, "active" BOOLEAN
)
SERVER "FILE_SERVER"
OPTIONS 
( "PARSER" 'CSV'
, "CHARACTER_ENCODING" 'UTF-8'
, "QUOTE_CHARACTER" '"'
, "SEPARATOR" ','
, "ROW_SEPARATOR" u&'\000A'
, "SKIP_HEADER" 'false'
, "DIRECTORY" '/home/sqlstream'
, "FILENAME_PATTERN" 'buses\.txt'
);

To call the function, use code along the following lines:

SELECT STREAM * FROM STREAM (
    sys_boot.mgmt.predict
      (cursor(SELECT STREAM * from "transit"."buses")                 -- stream of input data
      ,'systemml/scripts/functions/jmlc/m-svm-score.dml'              -- path of dml script
      ,'systemml/scripts/functions/jmlc/crash_by_lat_long_model.mtx'  -- model file from training
      , 100                                                           -- number of rows used for training
      ,'2,5,6'                                                        -- column numbers used in the model
      ,'prediction'                                                   -- name of result column
      )
);

This code uses:

  • a DML script in $SYSTEMML_HOME/scripts/functions/jmlc/ called m-svm-score.dml and
  • the model data file in $SYSTEMML_HOME/scripts/functions/jmlc/ called crash_by_lat_long_model.mtx generated by the training step.