s-Server integrates Apache SystemML to provide real-time predictions based on continuous machine learning. Using data from s-Server, you can create continuously-updating prediction models, so that models train without interrupting the data pipeline.
Two s-Server components support this:
Your scoring function is expressed using SystemML’s DML (Declarative Machine Learning) language. For more information on DML, see https://systemml.apache.org/docs/0.12.0/dml-language-reference.
To begin making predictions with s-Server, you should first identify data that you can use to “train” a SystemML model. This data should include one column that is predictive. A list of products, for example, might include a column predicting whether a given product is a good buy. The other columns might include information on price on this site, prices from competitor sites, customer ratings, and so on. This should be the same data that you will pass to the SystemML function in s-Server.
The following code creates a foreign stream to be used as a file-writing sink. runStandaloneSystemML.sh is invoked through an option called “POSTPROCESS_CMD”.
CREATE or REPLACE FOREIGN STREAM "StreamLab_Output_srutinizertest"."guide_1_out_sink" ( "id" INTEGER, "shift_no" DOUBLE, "reported_at" TIMESTAMP NOT NULL, "trip_no" VARCHAR(10), "latitude" DOUBLE, "longitude" DOUBLE, "speed" INTEGER, "baring" INTEGER, "active" BOOLEAN) SERVER "ECDAWriterServer_1" OPTIONS ( "POSTPROCESS_CMD" 'scripts/runStandaloneSystemML.sh scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv', "FORMATTER" 'CSV', "CHARACTER_ENCODING" 'UTF-8', "QUOTE_CHARACTER" '', "SEPARATOR" ',', "WRITE_HEADER" 'false', "DIRECTORY" '/home/sqlstream/work/systemml/scripts/functions/jmlc/', "FILENAME_PREFIX" 'output-', "FILENAME_SUFFIX" '.csv', "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss', "FILE_ROTATION_SIZE" '20K' "FORMATTER_INCLUDE_ROWTIME" 'true';
The POSTPROCESS_CMD option lets you run a script after the file is written. In this case, the runStandaloneSystemML.sh script generates a set of “trained” data model on which you can use, later on, to calculate scores or predictibility.
The code above generates a file called crash_by_lat_long_model.mtx in the directory /home/sqlstream/work/systemml/scripts/functions/jmlc/. We will use this file below to generate a prediction.
Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, Kinesis stream, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
Systemml is installed on s-Server as a function called sys_boot.mgmt.predict. You can use this function to make predictions based on an existing DML file and the model data file that you generated above. The predict function requires the following parameters:
|model||Model data file.|
|columnIndexes||Order number of the columns where we should look for values to use in prediction. For example, if you are using the 2nd, 3rd, and 5th columns, you would enter ‘2,3,5’ here.|
The following code shows how we load buses data from a file and for each row we predict if something is about to happen or not based on the values taken from columns 2, 5 and 6 (shift_no, latitude, longitude).
CREATE OR REPLACE SCHEMA "transit"; ALTER PUMP "transit".* STOP; CREATE OR REPLACE SCHEMA "transit"; CREATE OR REPLACE FOREIGN DATA WRAPPER ECDA LIBRARY 'class com.sqlstream.aspen.namespace.common.CommonDataWrapper' LANGUAGE JAVA; CREATE OR REPLACE SERVER "ECDA_FILEREADER" FOREIGN DATA WRAPPER ECDA OPTIONS (classname 'com.sqlstream.aspen.namespace.common.FileSetColumnSet'); CREATE OR REPLACE FOREIGN STREAM "transit"."buses" ( "id" INTEGER, "shift_no" DOUBLE, "reported_at" TIMESTAMP, "trip_no" VARCHAR(10), "latitude" DOUBLE, "longitude" DOUBLE, "speed" INTEGER, "bearing" INTEGER, "active" BOOLEAN) SERVER "ECDA_FILEREADER" OPTIONS ( "PARSER" 'CSV', "CHARACTER_ENCODING" 'UTF-8', "QUOTE_CHARACTER" '"', "SEPARATOR" ',', "ROW_SEPARATOR" u&'\000A', "SKIP_HEADER" 'false', "DIRECTORY" '/home/sqlstream', "FILENAME_PATTERN" 'buses\.txt');
To call the function, use code along the following lines:
SELECT STREAM * FROM STREAM (sys_boot.mgmt.predict(cursor(SELECT STREAM * from "transit"."buses"), '/home/sqlstream/work/systemml//scripts/functions/jmlc/m-svm-score.dml', '/home/sqlstream/work/systemml/scripts/functions/jmlc/crash_by_lat_long_model.mtx', 100, '2,5,6', 'prediction' ) );
This code uses a DML script in /home/sqlstream/work/systemml//scripts/functions/jmlc/ called m-svm-score.dml and a model data file in /home/sqlstream/work/systemml/scripts/functions/jmlc/ called crash_by_lat_long_model.mtx.