Linear Interpolation UDX

The use case handled by this UDX is as follows: Imagine that you have a stream of sensor readings being produced by a large number of devices in the wild. Each device has many sensors. Each device produces a row of sensor data at some regular cadence, say, one row per minute. Each device is identified by (possibly several) key column(s) in the row. Due to flaky networks, from time to time there will be missing rows. We want to reconstruct that missing data. We use a simple linear interpolation algorithm for this purpose.

Here is the signature of the linear_interpolator UDX:

sys_boot.mgmt.linear_interpolator
(
  inputStream cursor,
  keyColumnNames cursor,
  sequenceNumberColumnName varchar(128),
  valueColumnNames cursor
)
returns table
(
  inputStream.*
)

The arguments have the following meaning:

  • inputStream - This is an incoming stream of data generated by devices in the wild.
  • keyColumnNames - This is a VALUES clause. It lists the names of the key columns in the inputStream. The key columns form a unique identifier for the device.
  • sequenceNumberColumnName - This is the name of a BIGINT column in the inputStream. This column increases monotonically for each device. That is, this column increments at the cadence that the rows are being generated.
  • valueColumnNames - This is another VALUES clause. It lists the names of the columns which hold the sensor readings.

The UDX produces an output stream which has the same shape as the input stream. When the UDX notices that there is a gap in the sequence numbers of a device’s rows, the UDX reconstructs the missing rows. For each missing sequence number, the UDX manufactures a row, using linear interpolation to fill in the sensor readings.

We understand that devices may not generate sequence numbers. Here is a technique for manufacturing sequence numbers: We use a pump to convert the timestamp on the incoming row into a BIGINT which increases monotonically at the expected cadence. So, for instance, if the expected cadence is 1 row per device per minute, then we turn the ROWTIME into a BIGINT (using the UNIX_TIMESTAMP function), then divide the result by the number of milliseconds in a minute (60000).

Here is an example of the UDX in action, along with a pump which manufactures sequence numbers:

-- this is the original stream of data coming from the devices
create stream rawClimateReadings
(
  buildingNumber bigint,
  roomNumber bigint,
  temperature double,
  humidity double
);

-- this is the stream after adding sequence numbers to it
create stream climateReadings
(
  buildingNumber bigint,
  roomNumber bigint,
  sequenceNumber bigint,
  temperature double,
  humidity double
);

-- this is the pump which adds sequence numbers to the raw data
CREATE PUMP climateReadingsPump stopped as
 insert into climateReadings
 SELECT STREAM
  s.buildingNumber, s.roomNumber,
  unix_timestamp(s.rowtime) / 60000,
  s.temperature, s.humidity
 from rawClimateReadings s;

alter pump climateReadingsPump start;

-- this is a query which uses the linear_interpolator to
-- re-construct missing rows
SELECT STREAM *
from stream
(
  sys_boot.mgmt.linear_interpolator
  (
    cursor(SELECT STREAM * from climateReadings),
    cursor(values ('BUILDINGNUMBER'), ('ROOMNUMBER')),
    'SEQUENCENUMBER',
    cursor(values ('TEMPERATURE'), ('HUMIDITY'))
  )
);

Note: When a table function returns an infinite result, it needs to be wrapped with a STREAM(…) marker when invoked. s-Server will return an error otherwise

The query produces the following output:

'BUILDINGNUMBER','ROOMNUMBER','SEQUENCENUMBER','TEMPERATURE','HUMIDITY'

------

'100','32','24300481', '70.0','0.33'
'200','4', '24300483', '60.0','0.1'
'100','31','24300483', '65.0','0.33'
'100','32','24300482', '71.0','0.3433333333333333'
'100','32','24300483', '72.0','0.3566666666666667'
'100','32','24300484', '73.0','0.37'
'200','4', '24300484', '60.5','0.095'
'200','4', '24300485', '61.0','0.09'

when the following device-generated data is inserted into the raw data stream…

insert into test.rawClimateReadings
(rowtime, buildingNumber, roomNumber, temperature, humidity)
values
(timestamp '2016-03-15 08:01:10', 100, 32, 70.0, 0.33)
,(timestamp '2016-03-15 08:03:05', 200, 4, 60.0, 0.1)
,(timestamp '2016-03-15 08:03:20', 100, 31, 65.0, 0.33)
,(timestamp '2016-03-15 08:04:11', 100, 32, 73.0, 0.37)
,(timestamp '2016-03-15 08:05:07', 200, 4, 61.0, 0.09)
;

Note that the raw data is missing some rows for the device identified by (buildingNumber 100, roomNumber 32). The rows are for minutes 08:02 and 08:03. However, the linear_interpolator re-constructs that missing data.