The use case handled by this UDX is as follows: Imagine that you have a stream of sensor readings being produced by a large number of devices in the wild. Each device has many sensors. Each device produces a row of sensor data at some regular cadence, say, one row per minute. Each device is identified by (possibly several) key column(s) in the row. Due to flaky networks, from time to time there will be missing rows. We want to reconstruct that missing data. We use a simple linear interpolation algorithm for this purpose.
Here is the signature of the linear_interpolator UDX:
sys_boot.mgmt.linear_interpolator ( inputStream cursor, keyColumnNames cursor, sequenceNumberColumnName varchar(128), valueColumnNames cursor ) returns table ( inputStream.* )
The arguments have the following meaning:
The UDX produces an output stream which has the same shape as the input stream. When the UDX notices that there is a gap in the sequence numbers of a device’s rows, the UDX reconstructs the missing rows. For each missing sequence number, the UDX manufactures a row, using linear interpolation to fill in the sensor readings.
We understand that devices may not generate sequence numbers. Here is a technique for manufacturing sequence numbers: We use a pump to convert the timestamp on the incoming row into a BIGINT which increases monotonically at the expected cadence. So, for instance, if the expected cadence is 1 row per device per minute, then we turn the ROWTIME into a BIGINT (using the UNIX_TIMESTAMP function), then divide the result by the number of milliseconds in a minute (60000).
Here is an example of the UDX in action, along with a pump which manufactures sequence numbers:
-- this is the original stream of data coming from the devices create stream rawClimateReadings ( buildingNumber bigint, roomNumber bigint, temperature double, humidity double ); -- this is the stream after adding sequence numbers to it create stream climateReadings ( buildingNumber bigint, roomNumber bigint, sequenceNumber bigint, temperature double, humidity double ); -- this is the pump which adds sequence numbers to the raw data CREATE PUMP climateReadingsPump stopped as insert into climateReadings SELECT STREAM s.buildingNumber, s.roomNumber, unix_timestamp(s.rowtime) / 60000, s.temperature, s.humidity from rawClimateReadings s; alter pump climateReadingsPump start; -- this is a query which uses the linear_interpolator to -- re-construct missing rows SELECT STREAM * from stream ( sys_boot.mgmt.linear_interpolator ( cursor(SELECT STREAM * from climateReadings), cursor(values ('BUILDINGNUMBER'), ('ROOMNUMBER')), 'SEQUENCENUMBER', cursor(values ('TEMPERATURE'), ('HUMIDITY')) ) );
Note: When a table function returns an infinite result, it needs to be wrapped with a STREAM(…) marker when invoked. s-Server will return an error otherwise
The query produces the following output:
'BUILDINGNUMBER','ROOMNUMBER','SEQUENCENUMBER','TEMPERATURE','HUMIDITY' ------ '100','32','24300481', '70.0','0.33' '200','4', '24300483', '60.0','0.1' '100','31','24300483', '65.0','0.33' '100','32','24300482', '71.0','0.3433333333333333' '100','32','24300483', '72.0','0.3566666666666667' '100','32','24300484', '73.0','0.37' '200','4', '24300484', '60.5','0.095' '200','4', '24300485', '61.0','0.09'
when the following device-generated data is inserted into the raw data stream…
insert into test.rawClimateReadings (rowtime, buildingNumber, roomNumber, temperature, humidity) values (timestamp '2016-03-15 08:01:10', 100, 32, 70.0, 0.33) ,(timestamp '2016-03-15 08:03:05', 200, 4, 60.0, 0.1) ,(timestamp '2016-03-15 08:03:20', 100, 31, 65.0, 0.33) ,(timestamp '2016-03-15 08:04:11', 100, 32, 73.0, 0.37) ,(timestamp '2016-03-15 08:05:07', 200, 4, 61.0, 0.09) ;
Note that the raw data is missing some rows for the device identified by (buildingNumber 100, roomNumber 32). The rows are for minutes 08:02 and 08:03. However, the linear_interpolator re-constructs that missing data.