Integrating MQTT

You can read from and write to MQTT (Message Queuing Telemetry Transport) from s-Server. s-Server implements an MQTT client to connect to an MQTT server. s-Server supports MQTT version 3.1 or 3.1.1. In reading from or writing to MQTT, s-Server starts with a clean session and uses in memory persistence only.

This topic contains the following subtopics:

Reading from MQTT

To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Reading from MQTT Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from MQTT Using the ECD Agent below.

Reading from MQTT Using SQL

To read from MQTT, you need to create a foreign stream in s-Server that references a prebuilt server object called MQTT_SERVER. The foreign stream's definition contains as options connection information for the MQTT server. s-Server uses this information to implement an MQTT client that reads data into the foreign stream. Minimum options required are TOPIC and CONNECTION_URL.

s-Server receives the payload for the MQTT message and parses it using the specified parser. Specifying "parser" as a foreign stream option tells s-Server that this foreign stream writes data. See Parser Types for Reading in this guide for more details.

See the topic CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide for more information on creating foreign streams.

The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named MQTTReaderStream.

CREATE OR REPLACE SCHEMA MQTTSchema;
SET SCHEMA 'MQTTSchema';

CREATE OR REPLACE FOREIGN STREAM MQTTReaderStream
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER MQTT
OPTIONS (TOPIC 'test',
CONNECTION_URL 'tcp://127.0.0.1:1883',
PARSER 'JSON'
);

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Defaults to 30.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select USERNAME, PASSWORD from TEST.mqtt_options. For more details see the topic Using the Options Query Property.

Provenance Columns for MQTT

When reading from MQTT, you can declare provenance columns. These return metadata for the MQTT topic from which you are reading.

Column Explanation
SQLSTREAM_PROV_MQTT_MESSAGE_ID MQTT message ID.
SQLSTREAM_PROV_MQTT_TOPIC MQTT topic.

If you include either of those two columns in a foreign stream definition for a foreign stream that reads from MQTT, they will be populated on each row read with the topic and message_id from the broker.

Reading from MQTT using the ECD Agent

ou can use the ECD agent to read files from remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.

TOPIC=test
CONNECTION_URL=tcp://127.0.0.1:1883
PARSER=JSON 
SCHEMA_NAME=MQTTSchema
STREAM_NAME=MQTTNativeStream
ROWTYPE=RecordType(INTEGER recNo, TIMESTAMP ts, INTEGER accountNumber,BOOLEAN loginSuccessful,VARCHAR(32) sourceIP,VARCHAR(32) destIP,INTEGER customerId)

Writing to MQTT

To write data to MQTT, use the Extensible Common Data Framework. See the topic CREATE FOREIGN SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Writing to MQTT Using SQL

To write to MQTT, you need to create a foreign stream in SQL that references a prebuilt server object called MQTT_SERVER. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

The foreign stream's definition contains as options connection information for the MQTT server. s-Server uses this information to implement an MQTT client that writes data from the foreign stream into MQTT.

Each row for the stream is sent as the payload for the MQTT message, formatted using the specified formatter. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.

The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named MQTTWriterStream.

CREATE OR REPLACE SCHEMA MQTTSchema;
SET SCHEMA 'MQTTSchema';

CREATE OR REPLACE FOREIGN STREAM MQTTWriterStream
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER MQTT
OPTIONS (TOPIC 'test',
CONNECTION_URL 'tcp://127.0.0.1:1883',
FORMATTER 'JSON');

Again, to get data moving, you need to create and start a pump. You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";
SET SCHEMA '"Pumps"';

CREATE OR REPLACE PUMP "writerPump" STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO "MQTTWriterSchema"."MQTTWriterStream"
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network

connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |

Writing to MQTT Using the ECD Agent

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

SCHEMA=MQTTWRITER
TABLE_NAME=MQTTWriterStream
DESTINATION=amq.topic
CONNECTION_URL=tcp://127.0.0.1:1883
TOPIC=test
FORMATTER=CSV
SCHEMA_NAME=MQTTSchema
STREAM_NAME=MQTTNativeWriterStream
ROWTYPE=RecordType(INTEGER recNo, TIMESTAMP ts, INTEGER accountNumber,BOOLEAN loginSuccessful,VARCHAR(32) sourceIP,VARCHAR(32) destIP,INTEGER customerId)