Integrating Azure Event Hubs

Using s-Server, you can read from and write to Azure Event Hubs. SQLstream uses the Kafka API to interact with Event Hubs - see Event Hubs for Kafka.

For more detail on SQLstream's Kafka plugin, please see Integrating Kafka. This section concentrates on the specifics and limitations of the Event Hub API.

Topics covered in this section include:

Azure Event Hubs Limitations

Azure Event Hubs does not provide transaction support, so exactly once semantics cannot be implemented - because we cannot commit offsets as we do with Kafka. Instead we store offsets into a separate "Index Topic" so that we can perform at least once semantics See Using an Index Topic below.

Security and Authentication

When using Azure Event Hubs for Kafka requires the TLS-encryption (as all data in transit with Azure Event Hubs is TLS encrypted). It can be done specifying the SASL_SSL option in your configuration file. Find example below that uses Shared access signature (SAS).

Azure Event Hubs Properties

The security parameters required by Azure Event Hub will be defined in a properties file. This file can be referenced by the *"kafka.consumer.config" option (when reading from Event Hubs) or the "kafka.producer.config" option (when writing to Event Hubs). Other properties may be included in this file.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ewqsdvasdasdfasdf+s=";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

Azure Event Hubs Property

Reading from Azure Event Hubs

For your convenience we include the Kafka options here. Just remember that transactions are not supported, and please do not use the "legacy" Kafka options - use Kafka10 plugin options only.

Using Provenance Columns

In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.

These are as follows:

Data Type Name in s-Server 6.0.0 Name in s-Server 6.0.1+ Kafka version Description
VARCHAR(256) KAFKA_TOPIC SQLSTREAM_PROV_KAFKA_TOPIC Kafka v.0.10.2 or later Returns name of Kafka topic.
INTEGER KAFKA_PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.10.2 or later Returns value of current Kafka partition.
INTEGER PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.8.2 Returns value of current Kafka partition.
BIGINT KAFKA_OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.10.2 or later Returns value of current Kafka offset.
BIGINT OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.8.2 Returns value of current Kafka offset.
TIMESTAMP KAFKA_TIMESTAMP SQLSTREAM_PROV_KAFKA_TIMESTAMP Kafka v.0.10.2 or later Returns current Kafka_timestamp.
VARBINARY(256) KAFKA_KEY SQLSTREAM_PROV_KAFKA_KEY Kafka v.0.10.2 or later Returns key value for message.
BINARY(32) PAYLOAD_PREFIX SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX Kafka v.0.10.2 or later Returns key value for message.
VARCHAR(1000) N/A SQLSTREAM_PROV_KAFKA_HEADERS Kafka v.0.10.2 or later Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers.

Foreign Stream Options for Reading from Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Adapter version Description
TOPIC All Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.)
SEED_BROKERS All A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION All Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Legacy Deprecated since 6.0.1 Port for Kafka seed brokers
STARTING_TIME All Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
STARTING_OFFSET All Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition
INDEX_TOPIC_NAME Kafka10 This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
INDEX_TOPIC_SEED_BROKERS Kafka10 The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed
MAX_POLL_RECORDS Kafka10 Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY Kafka10 This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines:

PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets'

PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
CLIENT_ID All For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset;
SCHEMA_ID All If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1.
PARSER_PAYLOAD_OFFSET Kafka10 The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0.
"kafka.consumer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"isolation.level" Kafka10 Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.
METRICS_PER_PARTITION Legacy True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
FETCH_SIZE Legacy Fetch size. Defaults to 1000000.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

SQL Example of Reading from Event Hubs

CREATE OR REPLACE FOREIGN STREAM "schema"."fs"
(
"SQLSTREAM_PROV_KAFKA_TOPIC" VARCHAR(128)
,"SQLSTREAM_PROV_KAFKA_PARTITION" INTEGER
,"SQLSTREAM_PROV_KAFKA_TIMESTAMP" TIMESTAMP
,"SQLSTREAM_PROV_KAFKA_OFFSET" BIGINT
,"COL0" VARCHAR(4000)
,"unparsed_attributes" VARCHAR(4096)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"PARTITION" '0-2',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SEPARATOR" ',',
"SEED_BROKERS" '<namespace>.servicebus.windows.net:9093',
"STARTING_TIME" 'EARLIEST',
"MAX_POLL_RECORDS" '100',
"BUFFER_SIZE" '1048576',
"FETCH_SIZE" '1000000',
"TOPIC" 'topic-03',
"kafka.consumer.config" '/home/sqlstream/kafka.consumer.config',
"CONSUMER_GROUP_ID" 'group-id-01'
);
select stream * from "schema"."fs";

Using Watermarks

Below is an example where we are trying to:

  • read starting offset from a view of a Kafka watermark function
  • configure the PARTITION_OFFSET_QUERY option to fetch the watermark so that the foreign stream can start reading data starting from the saved watermark.
CREATE OR REPLACE VIEW "schema".KAFKA_OFFSET_PARTITION AS SELECT * FROM TABLE (sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file('<namespace>.windows.net:9093', 'topic-03-index', 'group-id-03', '/home/sqlstream/kafka.consumer.config'));

CREATE OR REPLACE FOREIGN STREAM "schema"."fs"
(
"SQLSTREAM_PROV_KAFKA_TOPIC" VARCHAR(128),
"SQLSTREAM_PROV_KAFKA_PARTITION" INTEGER,
"SQLSTREAM_PROV_KAFKA_TIMESTAMP" TIMESTAMP,
"SQLSTREAM_PROV_KAFKA_OFFSET" BIGINT,
"COL0" VARCHAR(4000),
"unparsed_attributes" VARCHAR(4096)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"PARTITION" '0-2',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SEPARATOR" ',',
"SEED_BROKERS" '<namespace>.servicebus.windows.net:9093',
"TOPIC" 'topic-03',
"kafka.consumer.config" '/home/sqlstream/kafka.consumer.config',
"CONSUMER_GROUP_ID" 'group-id-03',
"PARTITION_OFFSET_QUERY" 'SELECT * FROM "schema".KAFKA_OFFSET_PARTITION'
);
select stream * from "schema"."fs";

Writing to Azure Event Hubs

For your convenience we include the Kafka options here. Just remember that transactions are not supported, and please do not use the "legacy" Kafka options - use Kafka10 plugin options only.

Using EGRESS Columns

These special columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.

Name Data Type Description Name in s-Server 6.0.0
SQLSTREAM_EGRESS_KAFKA_PARTITION INTEGER This column is used to determine to which partition the message will be written.

NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class"
KAFKA_PARTITION
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP TIMESTAMP This column is used to set the Kafka message timestamp KAFKA_TIMESTAMP
SQLSTREAM_EGRESS_KAFKA_KEY VARBINARY(256) This column is used as the message key KAFKA_KEY
SQLSTREAM_EGRESS_WATERMARK VARCHAR(256) This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows:
<CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
-

Foreign Stream Options for Writing to Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Name Adapter version Description
TOPIC All Kafka topic
"bootstrap.servers" Kafka10 hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
SEED_BROKERS Legacy A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost".
"metadata.broker.list" Legacy hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
"key.serializer" Kafka10 Names a serializer class for keys. If no class is given, Kafka uses value.serializer.
"key.serializer.class" Legacy Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
"value.serializer" Kafka10 Names a serializer class for values.
"serializer.class" Legacy Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[].
"partitioner.class" All Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
"compression.type" Kafka10 If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip'
"retry.backoff.ms" All Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
"request.timeout.ms" All When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
"send.buffer.bytes" All Socket write buffer size.
"client.id" All Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
"transactional.id" Kafka10 This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics.

NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.

(new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.
"pump.name" Kafka10 Deprecated since version 6.0.1.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
"linger.ms" All In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds)
"batch.size" All Number of messages sent as a batch. Defaults to '1000'
"kafka.producer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs.

NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"security.protocol" Kafka10 Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
"transaction.timeout.ms" Kafka10 The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
HA_ROLLOVER_TIMEOUT Kafka10 Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
COMMIT_METADATA_COLUMN_NAME Kafka10 The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK.
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION Kafka10 If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
  • MAX - use the lexical maximum value of all watermarks in the commit batch
  • NONE - (the default) - use the watermark value from the last row in the commit batch
HEADERS_COLUMNS Kafka10 Deprecated - please use HEADERS_COLUMN_LIST
HEADERS_COLUMN_LIST Kafka10 Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options;
POLL_TIMEOUT Legacy This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
PORT Legacy Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
"producer.type" Legacy This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync'
"compression.codec" Legacy The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'.
"compressed.topics" Legacy If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics.
"message.send.max.retries" Legacy This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3.
"topic.metadata.refresh.interval.ms" Legacy The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes).
"request.required.acks" Legacy How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait).
"queue.buffering.max.ms" Legacy Maximum time to buffer data when using async mode. Default 5000.
"queue.buffering.max.messages" Legacy The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000.
"queue.enqueue.timeout.ms" Legacy The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1.
"batch.num.messages" Legacy The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200.
"send.buffer.bytes" Legacy Socket write buffer size. Default 100x1024.
"SHARD_ID" kafka10 This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

SQL Example of Writing to Event Hubs

CREATE OR REPLACE FOREIGN STREAM "schema"."out_fs"
(
"PARTITION_ID" BIGINT,
"data" VARCHAR(4000),
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
"unparsed_attributes" VARCHAR(4096),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'CSV',
    "bootstrap.servers" '<namespace>.servicebus.windows.net:9093',
    "kafka.producer.config" '/home/sqlstream/kafka.producer.config',
    "TOPIC" 'topic-03',
    "INDEX_TOPIC_NAME" 'topic-03-index',
    "COMMIT_METADATA_COLUMN_NAME" 'SQLSTREAM_POSITION_KEY'
);

CREATE OR REPLACE PUMP "schema"."output_pump" STOPPED AS
INSERT INTO "schema"."out_fs"
SELECT  STREAM *
,CAST("PARTITION_ID" AS VARCHAR(50)) || ',' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50)) as "SQLSTREAM_POSITION_KEY"
FROM "schema"."data_edrflow_fs";

ALTER PUMP "schema".* START;

Using an Index Topic

As Azure Event Hubs does not support transactions we cannot rely on commiting offset metadata into the sink topic. Thus in order to achieve at least once delivery semantics the following approach is used.

  • INDEX_TOPIC_NAME and INDEX_TOPIC_SEED_BROKERS are optional properties that give us the ability to write offset metadata to a separate dedicated topic.
  • The UDF sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file is used by the source foreign stream to fetch that metadata from index topic, as shown in Using Watermarks above.

For more information please see Building and Using an Index Topic in the Integrating Kafka topic.