Integrating Pulsar

(new in SQLstream s-Server version 7.2.1)

Using s-Server, you can read from and write to Pulsar streams. For more information on Pulsar Plugin Configurations, refer http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.

This topic contains the following subtopics:

Pulsar Sink Plugin

Run the following script to install the Pulsar Plugin in sqllineClient.

CREATE OR REPLACE JAR sys_boot.sys_boot.pulsar LIBRARY 'file:plugin/pulsar/pulsar.jar' OPTIONS(0);
alter system add catalog jar sys_boot.sys_boot.pulsar;
CREATE OR REPLACE SERVER PULSAR_SERVER type 'PULSAR' foreign data wrapper ECDA;

Foreign Stream Options for Writing to Pulsar

The SQLstream Pulsar Sink Plugin supports all the configurations mentioned in the official documentation of Apache Pulsar. For more information, refer to http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.

There are three sets of configurations that are used by the Pulsar Sink Plugin:

All of these configurations can be added either as:

  • OPTIONs in the SQL CREATE FOREIGN STREAM
  • or OPTIONS defined as key=value in a JNDI properties file
  • or as a properties file, the path of which should be provided in the SQL using the OPTION PULSAR_CONFIG_FILE below.

NOTE: As well as providing the configuration options described here for the Pulsar sink, you will also have to define a value for the FORMATTER option; see Output Formats for Writing and also the Pulsar SQL pipeline example.

Client Configurations

This table lists various Client configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
serviceUrl String Service URL provider for Pulsar service. It is the URL to the Pulsar web service or broker service.

If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format:
- Web service URL: http://<host name>:<web service port> - for example: http://localhost:8080.
- Broker service URL: pulsar://<host name>:<broker service port>. For example: pulsar://localhost:6650

If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format:
pulsar+ssl://<host name>:<broker service TLS port> - for example: pulsar+ssl://localhost:6651
If useTls is set to true then:
pulsar+ssl://localhost:6651

If useTls is set to false then:
pulsar://localhost:6650
useTls boolean Whether to use TLS encryption on the connection. false
tlsTrustCertsFilePath String Path to the trusted TLS certificate file. It is required if useTls is set to true. None
authPluginClassName String Name of the authentication plugin.
Example
For TLS authentication we can have:
org.apache.pulsar.client.impl.auth.AuthenticationTls
None
authParams String String represents parameters for the authentication plugin
Example
key1:val1,key2:val2
For TLS authentication we can have:
tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem
None

Producer Configurations

This table lists various producer configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
topicName String Represents the topic name. Pass the topicName with the topic namespace as follows:
{persistent|non-persistent}://<namespace>/<topic-name>

If you enter a topic name only and no topic namespace is specified, then Pulsar uses the default location persistent://public/default/

To publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows:
my-topic
If the specified topic does not exist, Pulsar creates the topic when the pipeline starts.

You can use expressions to define the topic name.
For example: if the my-topic field in the record contains the topic name, enter the following as the topic name:
persistent://my-tenant/my-namespace/${record:value("/my-topic")}
Also, the namespace needs to be created explicitly. If the namespace does not exist, pulsar throws an exception.
None
producerName String Represents producer name SQLStreamProducer
sendTimeoutMs long Represents message send timeout in milliseconds. If a message is not acknowledged by a server before the sendTimeout expires, an error occurs. 30000
blockIfQueueFull boolean If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer blocks, rather than failing and throwing errors.

If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.
false
maxPendingMessages int Represents the maximum size of a queue holding pending messages - messages waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.
1000
messageRoutingMode MessageRoutingMode Message routing logic for producers on partitioned topics.
Note Apply the logic only when setting no key on messages.
Available options are as follows:
1. pulsar.RoundRobinDistribution: round robin
2. pulsar.UseSinglePartition: publish all messages to a single partition
3. pulsar.CustomPartition: a custom partitioning scheme
For more information on custom-router, please see https://pulsar.apache.org/docs/en/2.5.1/cookbooks-partitioned/
pulsar.RoundRobinDistribution
batchingEnabled boolean Enable batching of messages. true
batchingMaxMessages int The maximum number of messages permitted in a batch. 1000
compressionType CompressionType Message data compression type used by a producer. Available options:
1. LZ4
2. ZLIB
3. ZSTD
4. SNAPPY
No compression

Custom Configurations

This table lists some additional producer configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
sendAsync boolean Enable Asynchronous publishing of messages. true
messageEncryption boolean Enable end-to-end message encryption. If enabled, providing publicKeyFile is mandatory. false
publicKeyFile String The absolute path to the RSA or ECDSA public key which is required to encrypt the messages. Also, make sure that you have specified the corresponding privateKeyFile at the consumer end for the messages to be consumed successfully. null
encryptionKey String The encryption key name. sqlstream.key
COUNT_SCHEDULER_TIME long Use to schedule a counter that prints the total number of messages whose acknowledgement has been received by the client, in the SQLstream trace log files. This is calculated in milliseconds. Set the value to 0 to disable the counter. 0
PULSAR_CONFIG_FILE String Contains the absolute path of the pulsar properties file. If this option is not provided, the Pulsar plugin prints a WARNING and proceeds with the default configurations or the ones passed in the sink options. None

Using Provenance Columns

In writing to Pulsar Sink, you can declare and use these special provenance columns.

Name Type Description Default Value
PULSAR_PARTITION INTEGER When writing rows out of s-Server to a Pulsar topic, you can specify the topic partition to write the data as message to the indicated partition.

In case of a NULL value in a particular row, that message is allocated to a random partition. If this column is not referenced, the data is written in RoundRobin Routing Mode.
RoundRobin
PULSAR_KEY VARBINARY Messages can optionally be tagged with keys, which can be useful for things like topic compaction. These are also used for Routing messages to the partition. For more information please see https://pulsar.apache.org/docs/en/concepts-messaging/#routing-modes None
PULSAR_TIMESTAMP TIMESTAMP This column is used to add EventTime to the message. It is a mandatory (non-nullable) value. If the PULSAR_TIMESTAMP column value for a particular row is NULL, then that row is discarded. System’s Current Time

SQL Pipeline Example

CREATE OR REPLACE SCHEMA UTILS;

SET SCHEMA 'UTILS';

SET PATH 'UTILS';

CREATE OR REPLACE SCHEMA "miq_pcmd";

ALTER PUMP "miq_pcmd".* STOP;

DROP SCHEMA "miq_pcmd" CASCADE;

CREATE OR REPLACE SCHEMA "miq_pcmd";

-- file input foreign stream
CREATE OR REPLACE FOREIGN STREAM "miq_pcmd"."http_input_fs"
(
    "sn_start_time" BIGINT,
    "sn_end_time" BIGINT,
    "radius_calling_station_id" VARCHAR(16),
    "transaction_uplink_bytes" BIGINT
)
SERVER "FILE_SERVER"
OPTIONS (
    "PARSER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "QUOTE_CHARACTER" '"',
    "SEPARATOR" '^',
    "SKIP_HEADER" 'true',
    "DIRECTORY" '/home/sqlstream/Downloads/',
    "FILENAME_PATTERN" 'data.csv'
);

----------
-- pipeline_1 1: Route to Pulsar
----------

CREATE OR REPLACE FOREIGN STREAM "miq_pcmd"."http_out_fs"
(
    "sn_start_time" BIGINT,
    "sn_end_time" BIGINT,
    "radius_calling_station_id" VARCHAR(16),
    "transaction_uplink_bytes" BIGINT
)
SERVER "PULSAR_SERVER"
OPTIONS (
    "FORMATTER" 'CSV',
    "COUNT_SCHEDULER_TIME" '1000', -- Milli seconds
    "CHARACTER_ENCODING" 'UTF-8',
    "PULSAR_CONFIG_FILE" '/home/sqlstream/Downloads/pulsar.properties'
);

CREATE OR REPLACE PUMP "miq_pcmd"."http_out_fs_pump" STOPPED AS
INSERT INTO "miq_pcmd"."http_out_fs" 
SELECT STREAM * FROM "miq_pcmd"."http_input_fs";

ALTER PUMP "miq_pcmd".* START;

Example pulsar.properties File

###############
# User-Defined
###############
#topicNamespace = persistent://public/sqlstream/
sendAsync = true

# End-to-End Encryption
# To enable end-to-end encryption set messageEncryption = true
# Default value false
# messageEncryption = true 

# Default value for encryptionKey is sqlstream.key
# It is the name for the key
# encryptionKey = sqlstream_1.key

# RSA or ECDSA public and private key
# For producer publicKeyFile is mandatory
# publicKeyFile = /home/sqlstream/Downloads/my-ca/test_ecdsa_pubkey.pem
# privateKeyFile = /home/sqlstream/Downloads/my-ca/test_ecdsa_privkey.pem


##############
# Pre-Defined
##############
# topicName is a required property
# Name of the topic along with namespace specified
# Incase the namespace is not specified, default namespace persistent://public/default/ will be considered
topicName = persistent://public/sqlstream/trial-1

# Service URL
# When TLS enabled use pulsar+ssl://127.0.0.1:6651
# When TLS disabled use pulsar://127.0.0.1:6650
# eg. pulsar://apachepulsar-02.cloud.in.guavus.com:6650
serviceUrl = pulsar+ssl://127.0.0.1:6651

# TLS Encryption
useTls = true
tlsTrustCertsFilePath = /home/sqlstream/Downloads/my-ca/certs/ca.cert.pem

# Authentication
authPluginClassName = org.apache.pulsar.client.impl.auth.AuthenticationTls
authParams = tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem

# Batching and queue size
# By default batching is enabled with batchingMaxMessages 1000
# batchingEnabled = false
# batchingMaxMessages = 1000
# batchingMaxPublishDelayMicros = 1000

# Message Routing Mode
# By default it is RoundRobinPartition
messageRoutingMode = CustomPartition

# Max pending messages queue size
# By default it is 1000
maxPendingMessages = 20000

# Compression Type
# By default None. It supports ZLIB, LZ4, ZSTD and SNAPPY(ver 2.4*)
compressionType = ZLIB

# Message send timeout in ms
# If the message is not acknowledged by the broker before
# the sendTimeout expires, an error occurs
# Set to 0 for infinite. By default 30000
sendTimeoutMs = 0