Integrating AMQP

You can read from and write to AMQP message buses from s-Server.

This topic contains the following subtopics:

Reading from AMQP

To read from an AMQP message bus, you define a foreign stream. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see https://cwiki.apache.org/confluence/display/CAMEL/AMQP.

To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Reading from AMQP Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from AMQP Using the ECD Agent below.

See the topic CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide for more information on creating foreign streams.

Reading from AMQP Using SQL

To read from AMQP, you need to create a foreign stream in SQL that references a prebuilt server object called AMQP_LEGACY_SERVER or AMQP10_SERVER. The foreign stream's definition contains connection information for the AMQP server.

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

See CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide .

For AMQP 0.9

The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named AMQPStream.

--For AMQP 0.9

CREATE OR REPLACE SCHEMA AMQPSchema;
SET SCHEMA 'AMQPSchema';

CREATE OR REPLACE FOREIGN STREAM AMQPStream
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER AMQP_LEGACY_SERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672''',
PARSER 'CSV');

For AMQP 1.0

The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named AMQP10Stream.

--For AMQP 1.0

CREATE OR REPLACE SCHEMA AMQP10Schema;
SET SCHEMA 'AMQP10Schema';

CREATE OR REPLACE FOREIGN STREAM AMQP10Stream
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER AMQP10_SERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default',
PARSER 'CSV');

Foreign Stream Options for Reading from AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Option Description
CONNECTION_URL Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
DESTINATION This can be in the form <destination prefix>{PARITITION}.
PARTITION_EXPRESSION You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>.
PARSER_QUEUE_SIZE Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack.

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set one or more options using select DESTINATION from TEST.amqp_options. For more details see the topic Using the Options Query Property.

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]`

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

Special Columns Generated by for AMQP 1.0 (Input Only)

The Extensible Common Data Adapter generates one special row column when parsing AMQP 1.0. You can declare this column to make it part of a foreign stream or table.

Special Column Type Meaning
CREATION_TIME TIMESTAMP The time the containing message was created.

Input Format

The code sample above uses CSV as a format. To use other file options, see the Input Formats for Reading topic in this guide.

Reading from AMQP Using the ECD Agent

You can use the ECD agent to read data from remote locations. See Reading Data from Remote Locations for more details.

The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.

For AMQP 0.9

PARSER=CSV
FILENAME_PATTERN=myRecord\.csv
DESTINATION=amq.topic
CONNECTION_URL=amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672
CHARACTER_ENCODING=UTF-8
SCHEMA_NAME="AmqpReader"
TABLE_NAME="AMQPStream"
ROWTYPE=RECORDTYPE(VARCHAR(2040) line)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --input --props sample.test.properties --io amqp

For AMQP 1.0

PARSER=CSV
FILENAME_PATTERN=myRecord\.csv
DESTINATION=amq.topic
CONNECTION_URL=amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672
CHARACTER_ENCODING=UTF-8
SCHEMA_NAME="AmqpReader"
TABLE_NAME="AMQPStream"
ROWTYPE=RECORDTYPE(VARCHAR(2040) line)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --input --props sample.test.properties --io amqp10

Writing to AMQP

To write data to an AMQP message bus, use the Extensible Common Data Framework. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see https://cwiki.apache.org/confluence/display/CAMEL/AMQP.

To write data, you first define a server object with connection information. Once you define this server object, you can write to AMQP by referencing it. See the topic CREATE FOREIGN SERVER in the s-Server Streaming SQL Reference Guide for more details.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the s-Server Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Writing to AMQP Using SQL

In these streams each row represents a single message, and has two columns: the message headers and the message body. The message body is one varbinary column. The message headers are concatenated to form one varchar column, with the format TAG=VALUE TAG=VALUE ....

To write to AMQP, you need to create a foreign stream in SQL that references a prebuilt server object called AMQP10_SERVER or AMQP_LEGACY. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.

In the foreign stream's options, you configure how s-Server connects to AMQP. This configuration includes connection information for the AMQP server, such as format type, destination topic, connection url, and formatter. Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named "amqp_stream". Note that the options are slightly different for AMQP 0.9 vs. AMQP 1.0; you need to configure the CONNECTION_URL option differently for these.

For AMQP 0.9

CREATE OR REPLACE SCHEMA "AmqpWriterSchema"
SET SCHEMA 'AmqpWriterSchema';

CREATE OR REPLACE FOREIGN STREAM amqp_stream (
line VARCHAR(4096))
SERVER AMQP_LEGACY
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672''',
FORMATTER 'CSV');

For AMQP 1.0

CREATE OR REPLACE SCHEMA "AmqpWriterSchema"
SET SCHEMA 'AmqpWriterSchema';

CREATE OR REPLACE FOREIGN STREAM "AmqpWriterStream" (
line VARCHAR(4096))
SERVER AMQP10_SERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default',
FORMATTER 'CSV');

Again, to get data moving, you need to create and start a pump. You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";
SET SCHEMA '"Pumps"';

CREATE OR REPLACE PUMP "writerPump" STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO "AmqpWriterSchema"."AmqpWriterStream"
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

Foreign Stream Options for Writing to AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Name Description
CONNECTION_URL Required. Connection URL for AMQP server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)
Format: amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList>]
Example: amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'
AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:
Format: amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'
Example: amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default
Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.
DESTINATION Required. AMQP 1.0 queue or topic identifier. In general, the destination qualification syntax may be specific to the AMQP message broker implementation. The examples here are specific to ActiveMQ. You can fully qualify the AMQP destination by identifying the destination as a topic or a queue. ActiveMQ supports such qualification.
For a topic: "DESTINATION" 'topic://testTopic'
For a queue: "DESTINATION" 'queue://testTopic'
ActiveMQ treats an unqualified destination as a queue. In other words, for ActiveMQ, DESTINATION 'foo' is equivalent to DESTINATION 'queue://foo'
See http://camel.apache.org/activemq.html for more details.
In s-Server 6.0, DESTINATION can be either an absolute destination or be in the form: <destination prefix>{PARITITION}<destination suffix>
Example: /new/ConsumerGroups/$Default/Partitions/{PARTITION}
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.
DELIVERY_MODE Optional. Delivery mode for messages that ECDA communicates to the AMQP 1.0 server. Options are NON-PERSISTENT or PERSISTENT; defaults to NON-PERSISTENT. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-outbound-channel-adapter-xml-7a

Writing to AMQP Using the ECD Agent

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above. The following options are for AMQP 1.0.

For AMQP 0.9

DESTINATION=amq.topic
CONNECTION_URL=amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672''
FORMATTER=CSV
# Schema, name, and parameter signature of origin stream
SCHEMA=AMQPWRITER
TABLE_NAME=AmqpStream
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.test.properties --io amqp

For AMQP 1.0

DESTINATION=amq.topic
CONNECTION_URL=amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default
FORMATTER=CSV
# Schema, name, and parameter signature of origin stream
SCHEMA=AMQPWRITER
TABLE_NAME=AmqpStream
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.test.properties --io amqp10