Integrating IBM MQ

Using s-Server, you can read from and write to IBM MQ. To read and write from MQ, you configure and launch the adapter in SQL, using foreign stream options.

Using IBM MQ, you can perform exactly once semantics. For more detail, see the topic Exactly once in IBM MQ in this guide.

The IBM MQ Adapter is based on 9.2.4.0 IBM MQ allclient and uses 2.0.1 JMS API.

This topic contains the following subtopics:

Reading from IBM MQ

s-Server's IBM MQ Source plugin instantiates a JMS consumer, which will consume messages from a queue in point-to-point connection or a topic using a durable subscriber in pub-sub connection, and can be configured using SQL options. The adapter will read from a queue/topic that belongs to a particular queue manager running on the given host. Data can be ingested using any supported parser such as CSV, Avro, XML, or JSON. You can specify the parser as part of configuration options. See Reading from IBM MQ Using SQL below.

Design of IBM MQ Source

Working of IBM MQ Source

The IBM MQ Source plugin supports exactly once semantics using a persistent volume.

  • Once a message is read from the IBM MQ, the message is deleted from IBM MQ.
  • If the SQLstream pipeline were to stop or crash for any reason before writing the message to a sink, any messages "in flight" would be lost.

To prevent message loss, we use a persistent volume. The messages from an MQ queue/topic are read and written to a file in the following format:

<JMS_Message_ID_Of_first_message>,<count_of_messages_in_file>

Only when the file is created and saved to the persistent volume do we acknowledge the messages and only then remove them from IBM MQ's queue or durable topic subscription. Now using the files created, a file thread reads these files and sends them to the SQLstream pipeline for processing.

  • If we provide a watermark query from the sink for our pipeline to work in exactly once semantic we can delete the files once the content has been processed and persisted to the sink.
  • If not then we can configure a retention period for these files after which the files will get deleted.

Reading from IBM MQ Using SQL

To read from IBM MQ, you need to create a foreign stream in SQL that references a prebuilt server object called IBMMQ_SERVER. The foreign stream's definition contains connection information for the IBM MQ server. You can also define provenance columns that let you pass metadata of the files created into s-Server. See Using Provenance Columns below.

You will also need to specify a parser for the foreign stream, such as 'CSV'. Specifying "PARSER" as a foreign stream option tells s-Server that this foreign stream reads data.

CREATE OR REPLACE FOREIGN STREAM "DI"."data_edrflow_fs"
(
    "test" VARCHAR(32),
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
    "SQLSTREAM_PROV_ROW_PAYLOAD" VARCHAR(4096) NOT NULL,
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
)
SERVER "IBMMQ_SERVER"
OPTIONS (
    "QUEUE_MANAGER" 'QM1',
    "SERVER_HOST" 'localhost',
    "SERVER_PORT" '1414',
    "APPLICATION_USER" 'app',
    "APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
    "QUEUE" 'DEV.QUEUE.1',
    "SERVER_CHANNEL" 'DEV.APP.SVRCONN',
    "PARSER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "TRANSACTION_ROW_LIMIT" '10000',
    "TRANSACTION_ROWTIME_LIMIT" '20000',
    "INGRESS_TEMP_FILE_RETENTION_IN_MIN" '1',
    "INGRESS_TEMP_FILE_PATH" '/tmp',
    "CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
    "CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256',
    "WATERMARK_QUERY" 'SELECT * FROM "DI".START_POSITION_QUERY'
);

Using Provenance Columns

In reading from IBM MQ, you can declare provenance columns. These return metadata for the file created by the MQ foreign stream from which you are reading.

These are as follows:

Option Data type Description
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER BIGINT Adds the current line number to the output columns. Line number starts from 0.
SQLSTREAM_PROV_FILE_MODIFIED_TIME BIGINT Adds the modified time of the file being read to the output columns.
SQLSTREAM_PROV_FILE_SOURCE_FILE VARCHAR Adds the name of the file being read to the output columns.
SQLSTREAM_PROV_ROW_PAYLOAD VARCHAR Adds the complete line present in the file to the output columns.

Foreign Stream Options for Reading from IBM MQ

Option Description Required Default
QUEUE_MANAGER Queue manager name of the IBM MQ. True
SERVER_HOST Hostname of the IBM MQ Server True
SERVER_PORT Listener Port of the IBM MQ Server True
SERVER_CHANNEL Channel name used by the queue with data. True
APPLICATION_USER User name that application uses to connect to MQ False
APPLICATION_PASSWORD Password of the application user encoded in base64 format. False
QUEUE Name of the queue from which you are receiving data. Must specify either QUEUE or TOPIC but not both. False
TOPIC Name of the IBM MQ topic from which you are receiving data. Must specify either QUEUE or TOPIC but not both. False
TRANSACTION_ROW_LIMIT Commit Batch size of the files created from the messages. False 2000
TRANSACTION_ROWTIME_LIMIT Commit time interval in ms after which batch is acknowledged and created if the count of messages doesn’t reach batch size. False 20000
WATERMARK_QUERY Watermark query which returns string in watermark MQ format from the sink. False
CHARACTER_ENCODING Character encoding of the data False UTF-8
INGRESS_TEMP_FILE_PATH Persistent volume path where the files will get created. False /tmp
INGRESS_TEMP_FILE_RETENTION_IN_MIN Retention of files in the persistent path if watermark query is not provided. False 1
INGRESS_TEMP_FILE_LIMIT Limiting maximum number of temp files in persistent path. False 100
CIPHER_SUITE MQ Supported Cipher Spec corresponding to the channel used. False TLS_RSA_WITH_AES_128_CBC_SHA256
CONFIG_FILE_PATH Path of properties file for system properties of Java application for keystore, truststore path and password False

Writing to IBM MQ

The IBM MQ adapter writes batches of data to an IBM MQ queue or publishes batches of data to an MQ topic. In order to write to a MQ queue/topic, you must first define a server object for the IBM MQ server with information on its host, port, queue name or topic name, channel and queue manager. s-Server can write data to IBM MQ formatted using any of the supported formatters such as CSV, JSON, XML, or Avro.

s-Server's IBM MQ Sink plugin instantiates a JMS producer, which will produce messages from a queue in point-to-point connection or publish to a topic in pub/sub connection in batches, you configure and launch the adapter in SQL using a foreign stream.

It uses session transactions. Using it, you can perform exactly once semantics between s-Server and IBM MQ.

Design of IBM MQ Sink

Working of IBM MQ Sink

For writing data to IBM MQ, we can use one topic or queue for the data and one queue for storing watermark if we need exactly once semantics. If we do not provide a watermark queue it will work in at most once semantic. Batch of messages is written depending on size or time whichever condition reaches first. The adapter uses session transactions in JMS Client to produce a batch of message, delete the watermark from the watermark queue and add new watermark to the watermark queue, which is atomic using transactions in JMS.

SQLSTREAM_POSITION_KEY or ROWTIME is used as the watermark column to provide the watermark. If SQLSTREAM_POSITION_KEY is present in the sink foreign stream, then it will be used as a watermark column; if not ROWTIME column will be used as the default watermark column from where the watermark can be fetched.

Writing to IBM MQ Using SQL

Like all streams, foreign streams must be defined within schemas. The following code first creates a schema then creates a foreign stream called ibm_output_fs with the predefined server IBMMQ_SERVER as a server option. To transfer data into IBM MQ using this stream, you will need to INSERT into it using a pump.

CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_output_fs"
(
    "test" VARCHAR(128),
    "SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
    "FORMATTER" 'CSV',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "SERVER_HOST" 'localhost',
    "SERVER_PORT" '1414',
    "SERVER_CHANNEL" 'DEV.APP.SVRCONN',
    "QUEUE_MANAGER" 'QM1',
    "APPLICATION_USER" 'app',
    "APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
    "QUEUE" 'DEV.QUEUE.1',
    "WATERMARK_QUEUE_NAME" 'DEV.QUEUE.1',
    "TRANSACTION_ROW_LIMIT" '5000',
    "TRANSACTION_ROWTIME_LIMIT" '30000',
    "ENABLE_IBM_MESSAGE_PERSISTENCE" 'false',
    "CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
    "CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256'
);

Foreign Stream Options for Writing to IBM MQ

Option Description Required Default
QUEUE_MANAGER Queue manager name of the IBM MQ. True
SERVER_HOST Hostname of the IBM MQ Server True
SERVER_PORT Listener Port of the IBM MQ Server True
SERVER_CHANNEL Channel name used by the queue where data needs to be written. True
APPLICATION_USER User name that application uses to connect to IBM MQ False
APPLICATION_PASSWORD Password of the application user encoded in base64 format. False
QUEUE Name of the queue to which you are sending data. Must specify either QUEUE or TOPIC but not both. False
TOPIC Name of the IBM MQ topic to which you are sending data. Must specify either QUEUE or TOPIC but not both. False
WATERMARK_QUEUE_NAME Queue name to where the watermark will be stored. False
ASYNC_DISABLED If async is enabled the put property of producer is disabled. False
TRANSACTION_ROW_LIMIT Commit Batch size of the messages produced. False 5000
TRANSACTION_ROWTIME_LIMIT Commit time interval in ms after which batch is committed. False 20000
SQLSTREAM_POSITION_KEY Column name from row where the watermark is provided to the plugin. False
ENABLE_IBM_MESSAGE_PERSISTENCE Persistence for the messages produced to the queue. False False
CIPHER_SUITE MQ Supported Cipher Spec corresponding to the channel used. False TLS_RSA_WITH_AES_128_CBC_SHA256
CONFIG_FILE_PATH Path of properties file for system properties of java application for keystore, truststore path and password False

Security in IBM MQ

For securing messages in transit through the use of Transport Layer Security (TLS) for MQ, we can provide the options of MQ Supported CIPHER_SUITE and SYSTEM_PROPERTIES. The file present in system properties path can have anonymous authentication (where only the MQ server provides a certificate) or mutual authentication (both the MQ server and your client application provide a certificate) depending we have provided a keystore or not in the properties. The properties file supports following properties.

Property Description
trustStore Path to the client truststore
trustStorePassword Truststore password encode in base64
keyStore Path to the client keystore
keyStorePassword Keystore password encode in base64
username User name that application uses to connect to MQ
password Password of the application user encoded in base64 format.

The username and password is overridden if defined in config properties, if we don't want the passwords to be printed in the sql, we can use the config file to provide a username and password and not use the option parameter. Following can be an example of ssl.properties file (assuming the option SYSTEM_PROPERTIES path point to ssl.properties file).

trustStore = /home/sqlstream/keystore/clientTruststore.p12
trustStorePassword = cGFzc3cwcmQ=
keyStore = /home/sqlstream/keystore/clientKeystore.p12
keyStorePassword = cGFzc3cwcmQ=
username = app
password = cGFzc3cwcmQ=

Exactly once in IBM MQ

Exactly once semantics can be achieved using the SQLstream IBM MQ plugin, provided some conditions are satisfied in the IBM MQ server:

  • All messages should have the same priority and the retention of the messages needs to be very high.
  • The ordering of the source records must remain unchanged
    • as the batch files are created by committing the messages in the queue or in the topic subscription, if the ordering of the messages changes during replay, the exactly once behaviour can fail.
  • For running the pipeline in exactly once semantics define properties as shown below.

IBM MQ Source

We must define a persistent volume path INGRESS_TEMP_FILE_PATH in the foreign stream; if the path provided is not reliable then there can be data loss scenarios on disk failure .

We must provide a WATERMARK_QUERY in the foreign stream, since if it is not provided, the file where the messages are stored might be deleted before being sent to the pipeline due to the retention policy, if the retention time is less than the processing time of the pipeline.

The watermark provided through the watermark query must be in following format:

CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(150)) || '-' || CAST("SQLSTREAM_PROV_FILE_SOURCE_FILE" AS VARCHAR(256)) || '-' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(150)) as "SQLSTREAM_POSITION_KEY"

IBM MQ Sink

We must provide either:

  • two queues to our plugin option, one for data and one for storing the watermark,
  • or one topic for data and one queue for storing watermark.

If SQLSTREAM_POSITION_KEY is present in the foreign stream then it will be used as watermark column; if not, then ROWTIME is used as the default watermark column name where the watermark is stored.

Below is an example pipeline which uses IBM Source and IBM Sink in exactly once semantics. Here we are using the same host for both sink and source, which is not necessary. We can have separate MQ servers to read from and write to if needed.

--
-- CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS 
-- SELECT * FROM (VALUES(sys_boot.mgmt.ibmmq_watermark_string_with_config('host', 
-- port, 'channel','queue manager','watermark queue',
-- 'cipher','config path'))) AS options("STARTING_POSITION");


CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS SELECT * FROM (VALUES(sys_boot.mgmt.ibmmq_watermark_string_with_config('localhost', 1414, 'DEV.APP1.SVRCONN','QM1','DEV.QUEUE.3','TLS_RSA_WITH_AES_128_CBC_SHA256','home/sqlstream/keystore/ssl.properties'))) AS options("STARTING_POSITION");

CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_input_fs"
(
    "test" VARCHAR(32),
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
    "SQLSTREAM_PROV_ROW_PAYLOAD" VARCHAR(4096) NOT NULL,
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
    "unparsed_attributes" VARCHAR(4096)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
    "QUEUE_MANAGER" 'QM1',
    "SERVER_HOST" 'localhost',
    "SERVER_PORT" '1414',
    "APPLICATION_USER" 'app',
    "APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
    "QUEUE" 'DEV.QUEUE.1',
    "SERVER_CHANNEL" 'DEV.APP1.SVRCONN',
    "PARSER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "TRANSACTION_ROW_LIMIT" '10000',
    "TRANSACTION_ROWTIME_LIMIT" '20000',
    "INGRESS_TEMP_FILE_RETENTION_IN_MIN" '1',
    "INGRESS_TEMP_FILE_PATH" '/tmp',
    "CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
    "CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256',
    "WATERMARK_QUERY" 'SELECT * FROM "DI".START_POSITION_QUERY'
);

CREATE OR REPLACE VIEW "DI"."view_1" AS
    SELECT STREAM
    "test",
    CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(150)) || '-' || CAST("SQLSTREAM_PROV_FILE_SOURCE_FILE" AS VARCHAR(256)) || '-' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(150)) as "SQLSTREAM_POSITION_KEY"
    FROM "DI"."ibm_input_fs" AS "input";

CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_output_fs"
(
"test" VARCHAR(128),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
    "FORMATTER" 'CSV',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "SERVER_HOST" 'localhost',
    "SERVER_PORT" '1414',
    "SERVER_CHANNEL" 'DEV.APP1.SVRCONN',
    "QUEUE_MANAGER" 'QM1',
    "APPLICATION_USER" 'app',
    "APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
    "QUEUE" 'DEV.QUEUE.2',
    "WATERMARK_QUEUE_NAME" 'DEV.QUEUE.3',
    "TRANSACTION_ROW_LIMIT" '5000',
    "TRANSACTION_ROWTIME_LIMIT" '30000',
    "ENABLE_IBM_MESSAGE_PERSISTENCE" 'false',
    "CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
    "CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256'
);

CREATE OR REPLACE PUMP "DI"."FS_PUMP" STOPPED AS
    INSERT INTO "DI"."ibm_output_fs"
    SELECT  STREAM "test"
    FROM "DI"."view_1" AS "input";

ALTER PUMP "DI".* START;