CREATE FOREIGN STREAM

A foreign stream is an instance of a foreign data wrapper that provides access in s-Server to a flow of data either from or to an external system. Foreign streams provide interfaces for sources of data coming into s-Server and sinks of data being written out of s-Server.

For example, the Extensible Common Data Adapter supports the creation of a foreign stream that will take rows (from a local stream) and send them by email to a recipient.

This topic contains the following subtopics:

Syntax

CREATE [OR REPLACE] FOREIGN STREAM foreign-stream-name (<column_list>) SERVER <server_name> [OPTIONS <options_specification>] [DESCRIPTION string_literal]

For detailed examples of using CREATE FOREIGN STREAM and stream options, see the topics Reading from Other Sources and Writing to Other Sources in the Integrating with Other Systems.

The OPTIONS required to create a foreign stream depend upon the server that the foreign stream references. Servers are often defined for an adapter, such as the SQL/MED Adapter or Extensible Common Data Adapter. See the s-Server Integrations page in the Integrating Guavus SQLstream with Other Systems guide for more details.

Using Prebuilt Server Objects

S-Server ships with a number of prebuilt server objects for the Extensible Common Data Adapter. In many cases, these objects will work fine for creating foreign streams for ECDA sources and sinks. You generally only need to create your own server object for ECDA sources and sinks if you know you will have multiple foreign streams that share options. In this case, it may be more efficient to define a custom server object with these shared options. Doing so allows foreign streams that invoke the custom server object to inherit options. See Using SQL/MED Inheritance with Server objects.

Prebuilt Server Objects Available in s-Server

This table lists the ECD plugin types, and the corresponding prebuilt servers.

The 'Type' is used in the TYPE clause when creating a SERVER, and with the --io switch when running an ECD agent.

Type Plugin Description Name of Prebuilt Server for Foreign Streams Notes
amqp_legacy AMQP Legacy: Reads to and writes to AMQP message bus for AMQP 0.9. See the topics Reading from AMQP and Writing to AMQP for more details. AMQP_LEGACY_SERVER
amqp10 AMQP 1.0: Reads to and writes to AMQP message bus for AMQP 1.0. See the topics Reading from AMQP and Writing to AMQP for more details. AMQP10_SERVER
hdfs Allows s-Server to write to the Hadoop/HDFS file system (also supports ADLS Gen 2). See Writing to Hadoop for more details. HDFS_SERVER sink only
hive Allows s-Server to write to Hadoop/Hive tables. See Writing to Hive Tables for more details. HIVE_SERVER sink only; ORC format only
http Allows s-Server to read and write data over HTTP / HTTPS. See the topics Reading over HTTP and Writing over HTTP for more details. HTTP_SERVER
file File System: Reading and writing over the file system. See the topics Reading from the File System and Writing to the File System for more details. FILE_SERVER
filevfs Allows s-Server to read files in the compressed format from ADLS Gen2, HDFS, local file system, S3 and SFTP. See the topics Integrating Files using VFS for more details. FILE_VFS_SERVER source only
ibmmq Allows s-Server to read from or write to queues and topics in IBM MQ. See Reading from IBM MQ and Writing to IBM MQ for more details. IBMMQ_SERVER
kafka Allows s-Server to exchange data with Kafka clusters. See the topics Reading from Kafka and Writing to Kafka for more details. KAFKA_SERVER non-transactional
kafka10 Allows s-Server to exchange data with Kafka clusters versions 0.1.0 and above. See the topics Reading from Kafka and Writing to Kafka for more details. Also supports Azure Event Hubs. KAFKA10_SERVER versions since 0.10.2
kinesis Allows s-Server to exchange data with Kinesis streams. See the topics Reading from Kinesis and Writing to Kinesis for more details. KINESIS_SERVER
pulsar Using s-Server, Using s-Server, you can read from and write to Pulsar streams. See the topic Integrating Pulsar for more details. PULSAR_SERVER
mail Allows s-Server to connect to an SMTP server in order to send emails. See the topic Writing to Mail Servers for details. MAIL_SERVER sink only
mongodb Allows s-Server to write to MongoDB. See the topic Writing to MongoDB for more details. MONGO_SERVER sink only
mqtt Allows s-Server to exchange data with MQTT brokers. See the topics Reading from MQTT and Writing to MQTT for more details. MQTT_SERVER
snowflake Allows s-Server to write to Snowflake warehouses. See the topic Writing to Snowflake for more details. SNOWFLAKE_SERVER sink only; uploads files
net Network Sockets (TCP/UDP): Configured for a socket. Reads or writes data streamed from a client program using TCP or UDP. See the topics Reading from Network Sockets and Writing to Network Sockets for more details. NET_SERVER
websocket Allows s-Server to read and write data over web sockets. See the topics Reading from Websockets and Writing to Websockets for more details. WEBSOCKET_SERVER

NOTES:

  • You can select a list of all servers (including these pre-defined servers):
    sqllineClient --run=$SQLSTREAM_HOME/support/sql/showForeignServers

Declaring Columns

You declare columns for foreign stream based on data type.

Example

The following example uses the ECDA file reader to write CSV data to the file system. In order to read from or write to the file system, you must reference a prebuilt server object called FILE_SERVER. In the foreign stream's options, you configure how s-Server connects to the file system.

Foreign streams must be created in a schema, which is what we create next. This foreign stream declares columns explicitly, but in some cases, you can derive column names from the external source.

See the topic Writing to the File System in the Integrating with Other Systems Guide for more information about this example.

CREATE OR REPLACE SCHEMA "FileWriterSchema"
SET SCHEMA 'FileWriterSchema';
CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER FILE_SERVER
OPTIONS
(directory 'myDirectory',
--directory for the file
formatter 'CSV',
filename_pattern 'myRecord.csv',
--regex for filename pattern to look for
character_encoding 'UTF-8',
write_header 'true');

To start writing, you need to create a pump to insert data into the foreign stream. You do so using code along the following lines. See the topic CREATE PUMP in this guide for more information on pumps.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Options

Options for Reading Data Options for Writing Data Options for Parsing Data Options for Formatting Data
Reading from ADLS Gen2
Reading from AMQP
Reading from Azure Event Hubs
Reading from the File System
Reading from Files with VFS
Reading from HDFS
Reading from HTTP
Reading from IBM MQ
Reading from Kafka
Reading from Kinesis
Reading from MQTT
Reading from Network Sockets
Reading from S3
Reading from SFTP
Reading from WebSockets
Writing to ADLS Gen2
Writing to AMQP
Writing to Azure Event Hubs
Writing to the File System
Writing to Hadoop
Writing to Hive
Writing to IBM MQ
Writing to Kafka
Writing to Kinesis
Writing to MongoDB
Writing to MQTT
Writing to Network Sockets
Writing to Prometheus Pushgateway
Writing to Pulsar
Writing to Snowflake
Writing to Websockets
Parsing Avro
Parsing CSV
Parsing JSON
Parsing Key Values
Parsing ProtoBuf
Parsing XML
None Parser
Formatting Avro Data
Formatting BSON Data
Formatting CSV Data
Formatting JSON Data
Formatting ORC Data
Formatting XML Data

You can specify an OPTIONS clause to provide parameters for a foreign stream or for any column.

Each OPTIONS specification is an option-value pair that names the option and gives its intended value.

No two options in the same clause may have the same name.

Options vary greatly depending on the kind of source for which you are setting up the foreign stream. The uses of foreign streams are not limited to the below, but these do represent common uses for foreign streams in s-Server.

Options for Reading Data

Foreign Stream Options for Reading from ADLS Gen2

See the ADLS section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS below.

Foreign Stream Options for Reading from AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Option Description
CONNECTION_URL Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
DESTINATION This can be in the form <destination prefix>{PARITITION}.
PARTITION_EXPRESSION You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>.
PARSER_QUEUE_SIZE Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack.

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set one or more options using select DESTINATION from TEST.amqp_options. For more details see the topic Using the Options Query Property.

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]`

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

See the topic Reading from AMQP in the Integration Guide for more details.

Foreign Stream Options for Reading from Azure Event Hubs

For reading from Azure Event Hubs we use the Kafka plugin - please see Foreign Stream Options for Reading from Kafka below. For more information see the topic Reading from Azure Event Hubs in the Integration Guide.

Foreign Stream Options for Reading from the File System

Option Description
DIRECTORY Directory in which file resides from which you are reading.
FILENAME_PATTERN Input only. Java regular expression defining which files to read.

See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions.
FILE_COMPRESSION Inflate / de-compress incoming files using the supplied codec. Currently only 'gzip' is supported (case is not significant). Any other value, or no value, means the file is not de-compressed. Default 'none'.
FILE_COMPRESSION_BUFFER Size in bytes of decompression buffer. Default '65536'.
STATIC_FILES Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. The file reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to).
REPEAT Defaults to 0, meaning do not repeat. Can be a positive whole number, a negative number, or 'FOREVER'.

For positive numbers, after processing all files that match FILENAME_PATTERN, start over again. Repeat for the specified number.

If negative or 'FOREVER', keep reprocessing all files that match FILENAME_PATTERN forever. You must set STATIC_FILES to true in order to use this option.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select DIRECTORY, FILENAME_PATTERN from TEST.file_options. For more details see the topic Using the Options Query Property. Often used to supply STARTING_POSITION or STARTING_TIME (see below)
SORT_BY_TIMESTAMP 'true' or 'false'. If 'true' we search in the file for the first timestamp that matches STARTING_TIME. If false we use the STARTING_POSITION watermark.
STARTING_POSITION The watermark in the form 'filename:linenumber' from which to start reading. When used, this option is normally retrieved dynamically using an OPTIONS_QUERY. See Using Exactly Once with File as a source.
STARTING_TIME The starting time in the format defined by FILENAME_DATE_FORMAT. Only used if SORT_BY_TIMESTAMP is true. When set, extract the file timestamp from incoming file names using the FILENAME_PATTERN option. The file timestamp is assumed to be extracted as group 1 of the pattern. Skip the file if its timestamp is less than the watermark timestamp. See Using Exactly Once with File as a source.
FILENAME_DATE_FORMAT The format of the date that is embedded in incoming filenames. Only used if SORT_BY_TIMESTAMP is true, to extract the time from the filename for matching to STARTING_TIME.
WATERMARKS Name of the watermark table or a view to retrieve the WATERMARK for the source
POSTPROCESS_COMMAND Optional. Input: shell script present in $SQLSTREAM_HOME/userbin directory. The shell script receives the watermark from the sink(s) of the pipeline through the WATERMARKS query. The users can hence archive all the files before that WATERMARK.

See the topic Reading From the File System in the Integration Guide for more details.

Foreign Stream Options for Reading from Files using VFS

Option Description Default
FILE_LOCATION (Mandatory). Represents the URI of target directory containing the files to be read - for example file:///tmp/newdirectory represents a directory on the local file system. For more information see URI Formats Supported by FILE-VFS.
FILENAME_PATTERN (Mandatory). A Java regular expression match filter selecting which files to be read from the FILE_LOCATION directory. When the SORT_FIELD option has the value TIME_IN_FILENAME or LEXICAL_IN_FILENAME the first capture group in the regex will be used to identify the sub-regex on which the files will be sorted.
Example:
buses_(\d{14})_[a-z]+.* - files matching this regex will be picked and the group \d{14} (14 digits) will be used to sort files.
FILE_TYPE The type of file you wish to read. Supported options:
  • none : for uncompressed files [Default]
  • gzip : for GZip compressed files
none
SORT_FIELD The file sorting mechanism you wish to use:
  • MODIFIED_FILE_TIME: Sort the files on the basis of the file’s modified time. If two files have the same modified time the files will be sorted lexicographically [Default]
  • TIME_IN_FILENAME: Sort the files on the basis of the time mentioned in the file name. The first group of the regex (as mentioned in FILENAME_PATTERN) will be used to isolate the timestamp in the filename and FILENAME_TIME_FORMAT will be used to specify the format of the timestamp. If two files have the same time the files will be sorted lexicographically
  • LEXICAL_IN_FILENAME: Sort the files on the basis of a substring in the file name. The first group of the regex (as mentioned in FILENAME_PATTERN) will be used to isolate the substring in the filename.
See file-based T-sort for more information.
MODIFIED_FILE_TIME
FILENAME_TIME_FORMAT The format of the timestamp in case the SORT_FIELD option is set to TIME_IN_FILENAME.

NOTE: This option is mandatory while using TIME_IN_FILENAME.
STARTING_FILE_NAME_AND_OFFSET Reads will start with the given starting file name and its offset value when SORT_FIELD option is set to LEXICAL_IN_FILENAME. The default value EARLIEST means all files will be picked. For example:
  • abc.txt:20 will read from line 20 in file abc.txt
  • abc.txt will read from the start of file abc.txt
NOTE: Ensure the starting file exists when STARTING_FILE_NAME_AND_OFFSET option is used.
EARLIEST
STARTING_TIME The starting time in case the SORT_FIELD option is set to TIME_IN_FILENAME - in the format yyyy-MM-dd HH:mm:ss.SSS. 1752-09-14 00:00:00.000.
STARTING_POSITION The starting position value for file read in case the option SORT_FIELD is set to MODIFIED_FILE_TIME. The format for STARTING_POSITION is modified_time_epoch[:file_name[:offset]] - file_name and offset are optional.
For example,
  • 1609423921 will pick all files with modified time matching epoch 1609423921 or later
  • 1609423921:abc.txt will ignore modified time and will pick file abc.txt or later
  • 1609423921:abc.txt:10 will ignore modified time and will pick file abc.txt and start from line number 11 (line number starts from 0)
1752-09-14 00:00:00.000
INGRESS_NUM_PARTITIONS Used in scaling. Denotes the number of logical partitions into which you want to divide your input data . A single partition is defined as the logical grouping of input data files based on hash-computation. Files having the same partition value belong to the same partition. Partition value for a file is computed as: hash(filename)%INGRESS_NUM_PARTITIONS; see Scaling in File-VFS for more information.
INGRESS_ASSIGNED_PARTITIONS Used in scaling. This identifies which partitions that are assigned to the current shard. A shard only processes these partitions and skips the data from other partitions. These are 0 indexed and can be comma-separated values (0,1,2) or range-based (4:7 which is equivalent to 4,5,6) or a mix of both (0,1,4:7 which is equivalent to 0,1,4,5,6) or all(:).
SHARD_ID Used in scaling. This is an optional parameter and used only in scenarios when you are running multiple shards of a pipeline. This parameter identifies the current shard, counting from 0.
INGRESS_DELAY_INTERVAL The period (in milliseconds) for file-based T-sort to wait for late files - applies only if SORT_FIELD is set to MODIFIED_FILE_TIME or TIME_IN_FILENAME. Valid values are:
-1disable file-based t-sort completely
0 Do not wait - late files will be dropped (the default)
>= 1wait for this interval in milliseconds
0
INGRESS_FILE_SCAN_WAIT The duration (in milliseconds) in which the reader thread checks the Queue for new files to be read. 2000.
IGNORE_CORRUPT_FILES In case of any fatal IOExceptions that may come up while reading from a file, should the VFS reader ignore the file? If true, the corrupt file is skipped else the pump is terminated and no more data will flow. False
NUMBER_OF_BUFFERS The number of buffers the File-VFS plugin can use to read data before it needs to recycle the buffers. Increasing the number of buffers might increase the plugin’s performance at the cost of memory. 2
INGRESS_RECURSIVE_LOOKUP Enable / disable recursive file lookups within subdirectories of the provided directory.

When the flag is 'true' the plugin checks each directory entry to distinguish files from sub-directories. If you do not need to include files from child sub-directories, and if you know that all the directory entries that match FILENAME_PATTERN will be files you can safely set INGRESS_RECURSIVE_LOOKUP 'false' to disable this check and save the latency of a round trip to the remote file system for each file in the FILE_LOCATION directory
true

Depending on the settings of SORT_FIELD and INGRESS_DELAY_INTERVAL, the file-based T-sort feature may come into operation.

File system specific options

Option File System Default Description
"vfs.hdfs.kerberos.enabled" HDFS 'false' In order to read from HDFS with kerberos authentication, the value for this option should be 'true'
"vfs.hdfs.kerberos.keytab" HDFS no default value In order to read from HDFS with kerberos authentication, the user must provide the location of the keytab here as a string
"vfs.hdfs.kerberos.principal" HDFS no default value In order to read from HDFS with kerberos authentication, the user must provide the value of the principal here as a string
"vfs.s3.useHttps" S3 'true' In order to read from s3 or minio over HTTP (and not HTTPS) the value for this option should be 'false'
AUTH_TYPE_ADLS ADLS Gen2 no default value (Mandatory when using ADLS Gen2). Specify the authentication mechanism. Supported types are 'SharedKey' and 'OAuth'. OAuth2.0 support is provided via Client Credentials
KEY_FILE ADLS Gen2 no default value (Mandatory when using ADLS Gen2). Provides path to the configuration file containing credentials for the appropriate authentication mechanism. Example here
REFRESH_CRED ADLS Gen2 no default value (Optional). Supports values 'yes' and 'no' (case insensitive). When set to yes, any changes made to the KEY_FILE will be used to establish a new connection with ADLS Gen2. If set to no or not provided changed credentials in the KEY_FILE won’t be used to establish connection.

Provenance Columns

When reading from files, you can define provenance columns. These return metadata for the file from which you are reading.

Option Data type Description
SQLSTREAM_PROV_FILE_SOURCE_FILE VARCHAR Adds the name of the file being read to the output columns
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER BIGINT Adds the current line number to the output columns. Line number starts from 0.
SQLSTREAM_PROV_FILE_MODIFIED_TIME BIGINT Adds the modified time of the file being read to the output columns.
SQLSTREAM_PROV_INGRES_PARTITION_ID INT Adds the partition from which this row was read, calculated by hashing the incoming filename as described in scaling.

See the topic Reading from Files using VFS in the Integration Guide for more details.

Foreign Stream Options for Reading from HDFS

See the HDFS section of the topic Reading from HDFS using VFS in the Integration Guide for more details or see Options for Reading from File using VFS above.

Foreign Stream Options for Reading Over HTTP

Option Description
URL URL for HTTP feed.
HEADER_<name_of_header> Tells HTTP reader to add a header to the request called <name_of_header>, with the option value as the value.
PARAM_<name_of_param> Tells HTTP reader to add a query parameter to the request called <name_of_param> with the option value as the value.
POLL_IN_MILLIS How often to request new data, in milliseconds.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx and PARAM_xxx options using select HEADER_ABC, PARAM_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

See the topic Reading from HTTP in the Integration Guide for more details.

Foreign Stream Options for Reading from IBM MQ

Option Description Required Default
QUEUE_MANAGER Queue manager name of the IBM MQ. True
SERVER_HOST Hostname of the IBM MQ Server True
SERVER_PORT Listener Port of the IBM MQ Server True
SERVER_CHANNEL Channel name used by the queue with data. True
APPLICATION_USER User name that application uses to connect to MQ False
APPLICATION_PASSWORD Password of the application user encoded in base64 format. False
QUEUE Name of the queue from which you are receiving data. Must specify either QUEUE or TOPIC but not both. False
TOPIC Name of the IBM MQ topic from which you are receiving data. Must specify either QUEUE or TOPIC but not both. False
TRANSACTION_ROW_LIMIT Commit Batch size of the files created from the messages. False 2000
TRANSACTION_ROWTIME_LIMIT Commit time interval in ms after which batch is acknowledged and created if the count of messages doesn’t reach batch size. False 20000
WATERMARK_QUERY Watermark query which returns string in watermark MQ format from the sink. False
CHARACTER_ENCODING Character encoding of the data False UTF-8
INGRESS_TEMP_FILE_PATH Persistent volume path where the files will get created. False /tmp
INGRESS_TEMP_FILE_RETENTION_IN_MIN Retention of files in the persistent path if watermark query is not provided. False 1
INGRESS_TEMP_FILE_LIMIT Limiting maximum number of temp files in persistent path. False 100
CIPHER_SUITE MQ Supported Cipher Spec corresponding to the channel used. False TLS_RSA_WITH_AES_128_CBC_SHA256
CONFIG_FILE_PATH Path of properties file for system properties of Java application for keystore, truststore path and password False

See the topic Reading from IBM MQ in the Integration Guide for more details.

Using Provenance Columns

In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.

These are as follows:

Data Type Name in s-Server 6.0.0 Name in s-Server 6.0.1+ Kafka version Description
VARCHAR(256) KAFKA_TOPIC SQLSTREAM_PROV_KAFKA_TOPIC Kafka v.0.10.2 or later Returns name of Kafka topic.
INTEGER KAFKA_PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.10.2 or later Returns value of current Kafka partition.
INTEGER PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.8.2 Returns value of current Kafka partition.
BIGINT KAFKA_OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.10.2 or later Returns value of current Kafka offset.
BIGINT OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.8.2 Returns value of current Kafka offset.
TIMESTAMP KAFKA_TIMESTAMP SQLSTREAM_PROV_KAFKA_TIMESTAMP Kafka v.0.10.2 or later Returns current Kafka_timestamp.
VARBINARY(256) KAFKA_KEY SQLSTREAM_PROV_KAFKA_KEY Kafka v.0.10.2 or later Returns key value for message.
BINARY(32) PAYLOAD_PREFIX SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX Kafka v.0.10.2 or later Returns key value for message.
VARCHAR(1000) N/A SQLSTREAM_PROV_KAFKA_HEADERS Kafka v.0.10.2 or later Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers.

Foreign Stream Options for Reading from Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Adapter version Description
TOPIC All Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.)
SEED_BROKERS All A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION All Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Legacy Deprecated since 6.0.1 Port for Kafka seed brokers
STARTING_TIME All Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
STARTING_OFFSET All Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition
INDEX_TOPIC_NAME Kafka10 This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
INDEX_TOPIC_SEED_BROKERS Kafka10 The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed
MAX_POLL_RECORDS Kafka10 Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY Kafka10 This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines:

PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets'

PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
CLIENT_ID All For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset;
SCHEMA_ID All If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1.
PARSER_PAYLOAD_OFFSET Kafka10 The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0.
"kafka.consumer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"isolation.level" Kafka10 Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.
METRICS_PER_PARTITION Legacy True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
FETCH_SIZE Legacy Fetch size. Defaults to 1000000.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

See the topic Reading from Kafka in the Integration Guide for more details.

Reading from Kinesis Streams

Reading from Amazon Kinesis Streams uses an agent framework and does not require creation of a foreign stream. Please see Reading from Kinesis.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Defaults to 30.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select USERNAME, PASSWORD from TEST.mqtt_options. For more details see the topic Using the Options Query Property.

See the topic Reading from MQTT in the Integration Guide for more details.

Foreign Stream Options for Reading from Sockets

The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Option Description
IS_IPV6 Whether or not the socket uses IPV6. Default is false.
PROTOCOL Whether the socket uses TCP or UDP. Default is TCP.
REMOTE_HOST Hostname to receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as 168.212.226.204. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.
REMOTE_PORT Port to sreceive tuples from when ECDA is acting as a client. REMOTE_ and SERVER_ tells ECDA's socket code how to start the network connection (as a server or a client).
SERVER_HOST The hostname or IP address to listen upon to receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.
SERVER_PORT the port to listen upon to receive tuples when ECDA is acting as a server.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the REMOTE_HOST and REMOTE_PORT options using select REMOTE_HOST, REMOTE_PORT from TEST.socket_options. For more details see the topic Using the Options Query Property.

Using Provenance Columns with the Sockets plugin

In reading from network sockets, you can declare provenance columns. These return metadata for the network socket from which you are reading.

These are as follows:

Data Type Name Value
VARCHAR(1024) SQLSTREAM_PROV_SOCKET_SOURCE_HOST Returns host name for socket.
INTEGER SQLSTREAM_PROV_SOCKET_SOURCE_PORT Returns port for socket.

See the topic Reading from Network Sockets in the Integration Guide for more details.

Foreign Stream Options for Reading from S3

See the S3 section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS above.

Foreign Stream Options for Reading from SFTP

See the SFTP section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS above.

Foreign Stream Options for Reading Over WebSockets

Option Description
URL URL for web socket.
HEADER_ Tells Web Socket reader to add a header called <name_of_header> to the request.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx options using select HEADER_ABC, HEADER_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

See the topic Reading from WebSockets in the Integration Guide for more details.

Options for Writing Data

Foreign Stream Options for Writing to ADLS Gen2

See the ADLS section of the topic Writing to Hadoop in the Integration Guide for more details, or see Options for Hadoop/HDFS below.

Foreign Stream Options for Writing to AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Name Description
CONNECTION_URL Required. Connection URL for AMQP server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)
Format: amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList>]
Example: amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'
AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:
Format: amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'
Example: amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default
Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.
DESTINATION Required. AMQP 1.0 queue or topic identifier. In general, the destination qualification syntax may be specific to the AMQP message broker implementation. The examples here are specific to ActiveMQ. You can fully qualify the AMQP destination by identifying the destination as a topic or a queue. ActiveMQ supports such qualification.
For a topic: "DESTINATION" 'topic://testTopic'
For a queue: "DESTINATION" 'queue://testTopic'
ActiveMQ treats an unqualified destination as a queue. In other words, for ActiveMQ, DESTINATION 'foo' is equivalent to DESTINATION 'queue://foo'
See http://camel.apache.org/activemq.html for more details.
In s-Server 6.0, DESTINATION can be either an absolute destination or be in the form: <destination prefix>{PARITITION}<destination suffix>
Example: /new/ConsumerGroups/$Default/Partitions/{PARTITION}
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.
DELIVERY_MODE Optional. Delivery mode for messages that ECDA communicates to the AMQP 1.0 server. Options are NON-PERSISTENT or PERSISTENT; defaults to NON-PERSISTENT. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-outbound-channel-adapter-xml-7a

See the topic Writing to AMQP in the Integration Guide for more details.

Foreign Stream Options for Writing to Azure Event Hubs

For writing to Azure Event Hubs we use the Kafka plugin - please see Foreign Stream Options for Writing to Kafka below. For more information see the topic Writing to Azure Event Hubs in the Integration Guide.

Foreign Stream Options for Writing to File Systems

Option Description
DIRECTORY Directory to which you are writing. s-Server needs permission to write to this directory.
ORIGINAL_FILENAME Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.
FILENAME_PREFIX Prefix of the final output file name, such as "test-".
FILENAME_DATE_FORMAT Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ss. Uses java SimpleDateFormat. This specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.
FILE_ROTATION_WATERMARK_COLUMN This declares the name of a source column whose contents will be used to further distinguish files in the series.
FILENAME_SUFFIX Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"
FILE_ROTATION_TIME Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation". You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d) - for example '15s', '20m', '2h' or '3d'. These express intervals of time from 1970-01-01; files rotate once a row arrives with a ROWTIME that passes the specified interval. See Using FILE_ROTATION_TIME.
FILE_ROTATION_SIZE Determines when files rotate based on file size. s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement, for example '64k', '100m', '8g'. Defaults to 0 - which means "don't use file size to trigger file rotation". See Using FILE_ROTATION_SIZE.
FILE_ROTATION_RESPECT_ROWTIME 'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime". See Using FILE_ROTATION_RESPECT_ROWTIME.
ESCAPE_<column name> True or false; defaults to true. Causes strings to be escaped before being written.
POSTPROCESS_COMMAND The POSTPROCESS_COMMAND option lets you run a script after each output file is written. To use this option, enter the path to the script, along with any parameters needed by the script, substituting <input> for the name of the file. When the file is complete, the script will execute with parameters, and <input> will be replaced by the name of the file.

Example: '/home/sqlstream/scripts/send-to-destination.sh <input> sftp://some.sftp.server.com'

See the topic Writing to the File System in the Integration Guide for more details.

Foreign Stream Options for Writing to Hadoop/HDFS

Option Name Scope Required? Description
HDFS_OUTPUT_DIR All Required Address for name node of HDFS, such as hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/ or for ADLS Gen2 abfs://@.dfs.core.windows.net/path/to/destination . This is where data will be written to on the HDFS server.
ABFS_CONFIG_FILE ABFS Required Path of the ABFS config file with credentials (only yaml file)
ABFS_AUTH_METHOD ABFS Required Authentication method used to upload data - either "SharedKey" or "OAuth" - which indicates OAuth 2.0. These values are case sensitive.
REFRESH_PROPS ABFS Optional If set ‘true’, config file will be read again and new instance of configurations will be used, if set 'false' or not used, any change in config file will not be detected unless s-server is restarted.
AUTH_METHOD HDFS, Hive Optional If desired, specify 'kerberos' Requires AUTH_USERNAME and AUTH_KEYTAB (the latter should be implemented using JNDI properties. See Hadoop Options for Use with JNDI Properties below
AUTH_USERNAME HDFS, Hive Optional User name for HDFS. Required if AUTH_METHOD is specified.
CONFIG_PATH HDFS, Hive Optional Specifies path to an HDFS client configuration file. This will be loaded and used by s-Server's HDFS client in it’s entire life cycle. Example: /home/me/work/kerberos/core-default.xml
AUTH_KEYTAB HDFS, Hive Optional Path to file containing pairs of Kerberos principals and encrypted keys, such as /tmp/nn.service.keytab Required if AUTH_METHOD is specified.
HIVE_TABLE_NAME Hive Required Table name inside HIVE_SCHEMA_NAME.
HIVE_SCHEMA_NAME Hive Required Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.
AUTH_METASTORE_PRINCIPAL Hive Required Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.
HIVE_URI Hive Required JDBC URL for accessing the Hive server when loading data into Hive tables. Must be specified if HIVE_TABLE_NAME is specified.
HIVE_METASTORE_URIS Hive Optional Location of the Hive metastore for loading data into Hive tables. Required if HIVE_TABLE_NAME is specified.
$columnName_HIVE_PARTITION_NAME_FORMAT Hive Optional This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.
OPTIONS_QUERY All Optional Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the HDFS_OUTPUT_DIR option from a table that contains options, as in select lastOffset as HDFS_OUTPUT_DIR from TEST.hdfs_options. For more details see the topic Using the Options Query Property.

Hadoop options for use with JNDI properties

We recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/.properties (Applies to all foreign streams that use this foreign server definition.)
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB...properties (Applies only to the foreign stream.)

See the topic Writing to Hadoop in the Integration Guide for more details.

Foreign Stream Options for Writing to Hive

Option Description
HIVE_TABLE_NAME Defaults to null. Table name inside HIVE_SCHEMA_NAME.
HIVE_SCHEMA_NAME Defaults to null. Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.
AUTH_METASTORE_PRINCIPAL Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.
HIVE_URI Defaults to null. JDBC URL for accessing the Hive server. Must be specified if HIVE_TABLE_NAME is specified.
HIVE_METASTORE_URIS Defaults to null. Location of the Hive metastore. Required if HIVE_TABLE_NAME is specified.
$columnName_HIVE_PARTITION_NAME_FORMAT This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.

Note: In order to include Kerberos options for an HDFS server, you need to configure your own server object. This server will allow all foreign tables or streams that reference this server to inherit Kerberos options.

See the topic Writing to Hive in the Integration Guide for more details.

Foreign Stream Options for Writing Over HTTP

Format Name Name
URL URL for HTTP feed.
HEADER_<name_of_header> Tells HTTP writer to add a header to the request called <name_of_header>, with the option value as the value.
PARAM_<name_of_param> Tells HTTP writer to add a query parameter to the request called <name_of_param> with the option value as the value.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx and PARAM_xxx options using select HEADER_ABC, PARAM_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

See the topic Writing to HTTP in the Integration Guide for more details.

Foreign Stream Options for Writing to IBM MQ

Option Description Required Default
QUEUE_MANAGER Queue manager name of the IBM MQ. True
SERVER_HOST Hostname of the IBM MQ Server True
SERVER_PORT Listener Port of the IBM MQ Server True
SERVER_CHANNEL Channel name used by the queue where data needs to be written. True
APPLICATION_USER User name that application uses to connect to IBM MQ False
APPLICATION_PASSWORD Password of the application user encoded in base64 format. False
QUEUE Name of the queue to which you are sending data. Must specify either QUEUE or TOPIC but not both. False
TOPIC Name of the IBM MQ topic to which you are sending data. Must specify either QUEUE or TOPIC but not both. False
WATERMARK_QUEUE_NAME Queue name to where the watermark will be stored. False
ASYNC_DISABLED If async is enabled the put property of producer is disabled. False
TRANSACTION_ROW_LIMIT Commit Batch size of the messages produced. False 5000
TRANSACTION_ROWTIME_LIMIT Commit time interval in ms after which batch is committed. False 20000
SQLSTREAM_POSITION_KEY Column name from row where the watermark is provided to the plugin. False
ENABLE_IBM_MESSAGE_PERSISTENCE Persistence for the messages produced to the queue. False False
CIPHER_SUITE MQ Supported Cipher Spec corresponding to the channel used. False TLS_RSA_WITH_AES_128_CBC_SHA256
CONFIG_FILE_PATH Path of properties file for system properties of java application for keystore, truststore path and password False

See the topic Writing to IBM MQ in the Integration Guide for more details.

Using EGRESS Columns

These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.

Name Data Type Description Name in s-Server 6.0.0
SQLSTREAM_EGRESS_KAFKA_PARTITION INTEGER This column is used to determine to which partition the message will be written.

NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class"
KAFKA_PARTITION
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP TIMESTAMP This column is used to set the Kafka message timestamp KAFKA_TIMESTAMP
SQLSTREAM_EGRESS_KAFKA_KEY VARBINARY(256) This column is used as the message key KAFKA_KEY
SQLSTREAM_EGRESS_WATERMARK VARCHAR(256) This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows:
<CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
-

Foreign Stream Options for Writing to Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Name Adapter version Description
TOPIC All Kafka topic
"bootstrap.servers" Kafka10 hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
SEED_BROKERS Legacy A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost".
"metadata.broker.list" Legacy hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
"key.serializer" Kafka10 Names a serializer class for keys. If no class is given, Kafka uses value.serializer.
"key.serializer.class" Legacy Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
"value.serializer" Kafka10 Names a serializer class for values.
"serializer.class" Legacy Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[].
"partitioner.class" All Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
"compression.type" Kafka10 If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip'
"retry.backoff.ms" All Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
"request.timeout.ms" All When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
"send.buffer.bytes" All Socket write buffer size.
"client.id" All Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
"transactional.id" Kafka10 This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics.

NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.

(new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.
"pump.name" Kafka10 Deprecated since version 6.0.1.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
"linger.ms" All In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds)
"batch.size" All Number of messages sent as a batch. Defaults to '1000'
"kafka.producer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs.

NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"security.protocol" Kafka10 Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
"transaction.timeout.ms" Kafka10 The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
HA_ROLLOVER_TIMEOUT Kafka10 Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
COMMIT_METADATA_COLUMN_NAME Kafka10 The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK.
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION Kafka10 If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
  • MAX - use the lexical maximum value of all watermarks in the commit batch
  • NONE - (the default) - use the watermark value from the last row in the commit batch
HEADERS_COLUMNS Kafka10 Deprecated - please use HEADERS_COLUMN_LIST
HEADERS_COLUMN_LIST Kafka10 Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options;
POLL_TIMEOUT Legacy This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
PORT Legacy Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
"producer.type" Legacy This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync'
"compression.codec" Legacy The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'.
"compressed.topics" Legacy If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics.
"message.send.max.retries" Legacy This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3.
"topic.metadata.refresh.interval.ms" Legacy The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes).
"request.required.acks" Legacy How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait).
"queue.buffering.max.ms" Legacy Maximum time to buffer data when using async mode. Default 5000.
"queue.buffering.max.messages" Legacy The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000.
"queue.enqueue.timeout.ms" Legacy The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1.
"batch.num.messages" Legacy The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200.
"send.buffer.bytes" Legacy Socket write buffer size. Default 100x1024.
"SHARD_ID" kafka10 This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

See the topic Writing to Kafka in the Integration Guide for more details.

Foreign Stream Options for Writing to a Kinesis Stream

Option Name Description
KINESIS_STREAM_NAME Required. Name of Kinesis stream to write to. No default.
AWS_REGION Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
AWS_PROFILE_PATH See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:[default]aws_access_key_id = xxxaws_secret_access_key = yyy This defaults to '' - which goes to ~/.aws/credentials.Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide .
AWS_PROFILE_NAME Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html
INITIAL_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.
MAX_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.
MAX_RETRIES Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.
BUFFER_SIZE Optional. Maximum number of bytes per update request. Defaults to 4194304.
MAX_RECORDS_PER_REQUEST Optional. maximum number of records per update request. Defaults to 500.
REPORT_FREQUENCY Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never".
KINESIS_DEFAULT_PARTITION_ID Optional. Partition id of shard to write to. Defaults to ''.

See the topic Writing to Kinesis Streams in the Integration Guide for more details.

Foreign Stream Options for Writing to Pulsar

The SQLstream Pulsar Sink Plugin supports all the configurations mentioned in the official documentation of Apache Pulsar. For more information, refer to http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.

There are three sets of configurations that are used by the Pulsar Sink Plugin:

All of these configurations can be added either as:

  • OPTIONs in the SQL CREATE FOREIGN STREAM
  • or OPTIONS defined as key=value in a JNDI properties file
  • or as a properties file, the path of which should be provided in the SQL using the OPTION PULSAR_CONFIG_FILE below.

NOTE: As well as providing the configuration options described here for the Pulsar sink, you will also have to define a value for the FORMATTER option; see Output Formats for Writing and also the Pulsar SQL pipeline example.

Client Configurations

This table lists various Client configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
serviceUrl String Service URL provider for Pulsar service. It is the URL to the Pulsar web service or broker service.

If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format:
- Web service URL: http://<host name>:<web service port> - for example: http://localhost:8080.
- Broker service URL: pulsar://<host name>:<broker service port>. For example: pulsar://localhost:6650

If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format:
pulsar+ssl://<host name>:<broker service TLS port> - for example: pulsar+ssl://localhost:6651
If useTls is set to true then:
pulsar+ssl://localhost:6651

If useTls is set to false then:
pulsar://localhost:6650
useTls boolean Whether to use TLS encryption on the connection. false
tlsTrustCertsFilePath String Path to the trusted TLS certificate file. It is required if useTls is set to true. None
authPluginClassName String Name of the authentication plugin.
Example
For TLS authentication we can have:
org.apache.pulsar.client.impl.auth.AuthenticationTls
None
authParams String String represents parameters for the authentication plugin
Example
key1:val1,key2:val2
For TLS authentication we can have:
tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem
None

Producer Configurations

This table lists various producer configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
topicName String Represents the topic name. Pass the topicName with the topic namespace as follows:
{persistent|non-persistent}://<namespace>/<topic-name>

If you enter a topic name only and no topic namespace is specified, then Pulsar uses the default location persistent://public/default/

To publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows:
my-topic
If the specified topic does not exist, Pulsar creates the topic when the pipeline starts.

You can use expressions to define the topic name.
For example: if the my-topic field in the record contains the topic name, enter the following as the topic name:
persistent://my-tenant/my-namespace/${record:value("/my-topic")}
Also, the namespace needs to be created explicitly. If the namespace does not exist, pulsar throws an exception.
None
producerName String Represents producer name SQLStreamProducer
sendTimeoutMs long Represents message send timeout in milliseconds. If a message is not acknowledged by a server before the sendTimeout expires, an error occurs. 30000
blockIfQueueFull boolean If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer blocks, rather than failing and throwing errors.

If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.
false
maxPendingMessages int Represents the maximum size of a queue holding pending messages - messages waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.
1000
messageRoutingMode MessageRoutingMode Message routing logic for producers on partitioned topics.
Note Apply the logic only when setting no key on messages.
Available options are as follows:
1. pulsar.RoundRobinDistribution: round robin
2. pulsar.UseSinglePartition: publish all messages to a single partition
3. pulsar.CustomPartition: a custom partitioning scheme
For more information on custom-router, please see https://pulsar.apache.org/docs/en/2.5.1/cookbooks-partitioned/
pulsar.RoundRobinDistribution
batchingEnabled boolean Enable batching of messages. true
batchingMaxMessages int The maximum number of messages permitted in a batch. 1000
compressionType CompressionType Message data compression type used by a producer. Available options:
1. LZ4
2. ZLIB
3. ZSTD
4. SNAPPY
No compression

Custom Configurations

This table lists some additional producer configurations which can be used to write data to the pulsar sink.

Name Type Description Default Value
sendAsync boolean Enable Asynchronous publishing of messages. true
messageEncryption boolean Enable end-to-end message encryption. If enabled, providing publicKeyFile is mandatory. false
publicKeyFile String The absolute path to the RSA or ECDSA public key which is required to encrypt the messages. Also, make sure that you have specified the corresponding privateKeyFile at the consumer end for the messages to be consumed successfully. null
encryptionKey String The encryption key name. sqlstream.key
COUNT_SCHEDULER_TIME long Use to schedule a counter that prints the total number of messages whose acknowledgement has been received by the client, in the SQLstream trace log files. This is calculated in milliseconds. Set the value to 0 to disable the counter. 0
PULSAR_CONFIG_FILE String Contains the absolute path of the pulsar properties file. If this option is not provided, the Pulsar plugin prints a WARNING and proceeds with the default configurations or the ones passed in the sink options. None

Using Egress Columns

In writing to the Pulsar sink, you can declare and use these special egress columns.

Name Type Description Default Value
PULSAR_PARTITION INTEGER When writing rows out of s-Server to a Pulsar topic, you can specify the topic partition to write the data as message to the indicated partition.

In case of a NULL value in a particular row, that message is allocated to a random partition. If this column is not referenced, the data is written in RoundRobin Routing Mode.
RoundRobin
PULSAR_KEY VARBINARY Messages can optionally be tagged with keys, which can be useful for things like topic compaction. These are also used for Routing messages to the partition. For more information please see https://pulsar.apache.org/docs/en/concepts-messaging/#routing-modes None
PULSAR_TIMESTAMP TIMESTAMP This column is used to add EventTime to the message. It is a mandatory (non-nullable) value. If the PULSAR_TIMESTAMP column value for a particular row is NULL, then that row is discarded. System’s Current Time

See the topic Writing to Pulsar in the Integration Guide for more details.

Foreign Stream Options for Mail Servers

Option Definition
USERNAME Required. User name for the SMTP server defined in HOST.
HOST Required. Host name for the SMTP server.
PASSWORD Optional. Password to use for the SMTP server defined in HOST. s-Server uses this option with USERNAME when authenticating to the SMTP server. If this option is empty, s-Server will not attempt authentication. Defaults to none.
PORT Optional. Port for SMTP server defined by HOST. Defaults to none.
CONNECTION_SECURITY Optional. Security used to connect to SMTP server. SSL, STARTTLS or NONE. Defaults to NONE.
SENDER Optional. Used in the “Sender” header when sending the email. Can also be specified as a special column in the foreign stream.
RECEIVER Optional. Address used in the “To” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas.
SUBJECT Optional. Subject for email. Can also be specified as a special column in the foreign stream.
REPLY_TO Optional. Address used in the "Reply-To" header when sending the email. Can also be specified as a special column in the foreign stream.
CC Optional. Address used in the “CC” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas.
BCC Optional. Address used in the “BCC” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas.
TIMEOUT Optional. Socket read timeout value in milliseconds. Defaults to 30000.
CONN_TIMEOUT Optional. Socket connection timeout value in milliseconds. Defaults to 30000.
FORMAT_CHARSET_KEY Optional. Charset for formatting mail. Defaults to UTF-8. See https://docs.oracle.com/javase/8/docs/api/java/nio/charset/StandardCharsets.html
LOCALHOST Optional. Used to set the mail.smtp.localhost option in the JavaMail API. See https://javaee.github.io/javamail/docs/api/com/sun/mail/smtp/package-summary.html for more information. It is generally best to leave this blank.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the PORT option using select PORT from TEST.mail_options. For more details see the topic Using the Options Query Property.

See the topic Writing to Mail in the Integration Guide for more details.

Options for Writing to a MongoDB Collection

Option Definition
URL Fully qualified URL starting with mongodb:// and including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number. Seehttps://docs.mongodb.com/manual/reference/connection-string/ for more information.
COLLECTION MongoDB collection to which data will be written.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the URL option from a table, as in select myUrl as URL from TEST.mongo_options. For more details see the topic Using the Options Query Property.

See the topic Writing to MongoDB in the Integration Guide for more details.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network

connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |

See the topic Writing to MQTT in the Integration Guide for more details.

Foreign Stream Options for Writing to Sockets

The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Name Description
IS_IPV6 Whether or not the socket uses IPV6. Default is false.
PROTOCOL Whether the socket uses TCP or UDP. Default is TCP.
REMOTE_HOST Hostname to send tuples to when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as 168.212.226.204. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.
REMOTE_PORT Port to send tuples to when ECDA is acting as a client. REMOTE_ and SERVER_ tells ECDA's socket code how to start the network connection (as a server or a client).
SERVER_HOST The hostname or IP address to listen upon to send tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.
SERVER_PORT the port to listen upon to send tuples when ECDA is acting as a server.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the REMOTE_HOST and REMOTE_PORT options using select REMOTE_HOST, REMOTE_PORT from TEST.socket_options. For more details see the topic Using the Options Query Property.

See the topic Writing to Network Sockets in the Integrating with Other Systems for more details.

Foreign Stream Options for Writing to a Prometheus pushgateway

Option Name Description
URL Required - denotes the Prometheus push gateway URL endpoint to which the metrices generated by the pipeline are sent.
JOB Required - An application identifier used to denote a specific application pipeline.
METRIC_TYPE Required - represents the type of metric, either 'counter' or 'gauge'. For more details, see https://www.prometheus.io/docs/concepts/metric_types/
Namespace A prefix that helps to identify multiple metrics name used across the application or a system. For example, in the _projectname:module:invalid_recordtotal metric, projectname:module: indicates a namespace.
Labels A global label that is applied to all the metrics using the specific Prometheus sink and can annotate a metric with very specific information such as component=7stream;. For more details, see https://prometheus.io/docs/practices/naming/

See the topic Writing to Prometheus Pushgateway in the Integration Guide for more details.

Foreign Stream Options for Writing to a Snowflake Warehouse

Option Name Description
ACCOUNT The name assigned to your account by Snowflake.
USER The user name for your Snowflake account.
PASSWORD The password for your Snowflake account.
DB The database to write to in Snowflake. This should be an existing database for which user/password has privileges.
SCHEMA The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.
WAREHOUSE The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.
DTABLE The table to write to in Snowflake. This should be an existing table for which user/password has privileges.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the SCHEMA option from a table that contains the schema, as in select schemaToUse as SCHEMA from TEST.snowflakeOptions. For more details see the topic Using the Options Query Property.

The minimum configuration options required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.

As well as these Snowflake options, you must also specify:

See the topic Writing to Snowflake in the Integration Guide for more details.

Foreign Stream Options for Writing to WebSockets

Format Name Name
URL URL for web socket.
HEADER_<name_of_header> Tells Web Socket writer to add a header called <name_of_header> to the request.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx options using select HEADER_ABC, HEADER_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

See the topic Writing to WebSockets in the Integration Guide for more details.

Options for Formatting Data

Foreign Stream Options for Formatting CSV Data

Option Definition
FORMATTER This needs to be CSV.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
WRITE_HEADER Whether or not to write header information (a header row) into the CSV data. Defaults to 'false'.
ROW_SEPARATOR Character(s) separating rows in CSV data. Defaults to '\n' (a newline). Supports multiple characters, and supports use of Unicode literals such as U&'\000D\000A'
SEPARATOR Character(s) separating field values within a row. Defaults to ','. Supports multi-character strings like '$$' or '@!@', and single or multi-character Unicode literals.
QUOTE_CHARACTER Lets you specify a quotation character to wrap the output value if the SEPARATOR string is present in the column value. There is no default for quote character. Only a single one-byte character may be used, which limits to code points between 0 and 127.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

Foreign Stream Options for Formatting XML Data

Option Name Description
FORMATTER This needs to be XML.
DOC_ELEMENTS Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch".
ROW_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row".
DATA_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
DATA_ATTRIBUTES Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple.
_ATTRIBUTES Specifies a name of an attribute to add for a specific column's datum in each row/tuple.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
NESTED_JSON Whether or not to write data in nested json format. Defaults to 'false'.
SCHEMA_FILE_LOCATION Format and validate the output json based on json schema file.
PATH_COLUMN_NAME Path of the duplicate fields, if any.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

Foreign Stream Options for Formatting BSON

Option Definition
FORMATTER This needs to be BSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

Foreign Stream Options for Formatting Avro Data

Option Definition
FORMATTER This needs to be AVRO.
AVRO_SCHEMA_LOCATION Required option to specify the path for the schema file to be used for formatting the Avro payload.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

Foreign Stream Options for Formatting ORC

Option Default Value Required Description
FORMATTER none yes This needs to be ORC.
"orc.version" none yes Currently supported values are 'V_0_11' and 'V_0_12'.
"orc.block.padding" true no Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space.
"orc.block.size" 268,435,456 no Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size.
"orc.direct.encoding.columns" null no Set the comma-separated list of case-sensitive names of columns that should be direct encoded.
"orc.batch.size" 10,000 no Number of rows to batch together before issuing an ORC write.
"orc.user.metadata_XXX" none no Where XXX is a string. XXX is the name of an application-specific metadata key. Therefore, it should be clear that it lives in a namespace unique to the application. There can be many of these options. The value of each option is a SQL BINARY literal with the leading X' stripped off.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

The options below apply only to cases when you are writing to the file system or HDFS. You cannot use these options when writing ORC data to a Hive table.

Option Default Value Description
"orc.bloom.filter.columns" null Column-separated list of bloom filter columns. These are names from the foreign table/stream's column signature.
"orc.bloom.filter.fpp" 0.05 Bloom filter false-positive probability.
"orc.compress" ZLIB Compression algorithm. See Appendix A for legal values.
"orc.compress.size" 262,144 Compression chunk size.
"orc.row.index.stride" 10,000 Number of rows between index entries.
"orc.stripe.size" 67,108,864 Size of in-memory write buffer.

Options for Parsing Data

Foreign Stream Options for Parsing CSV Data

Option Definition
SKIP_HEADER True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.
DISALLOW_QUOTED_ROW_SEPARATOR True or false: defaults to false.
If true, the parsing will not search for the row separator within a quoted field - so this allows multi-line field values to be ingested.
If false, finding the row separator terminates field parsing even if there is an unmatched start quote.
ROW_SEPARATOR Character(s) separating rows in CSV data. Defaults to '\n' (a newline). Supports multi-character values, and supports use of Unicode literals such as U&'\000D\000A' (CR/LF - for reading from Windows / DOS files).
SEPARATOR Character(s) separating field values within a row. Defaults to ','. Supports multi-character strings like '$$' or '@!@', and single or multi-character Unicode literals.
QUOTE_CHARACTER Lets you specify an expected quotation character (which may be applied to any incoming field, or may be present only when a field value includes the SEPARATOR string). There is no default for quote character. Only a single one-byte character may be used, which limits to code points between 0 and 127.
QUOTED_COLUMNS If set to anything non-blank, sets the quote character to a double quote
COLUMN_MAPPING Allows the extraction and re-ordering of a subset of fields from the CSV record. Fields can be re-ordered. This may not be combined with UNPARSED_TEXT
UNPARSED_TEXT What to do with additional trailing data in the CSV record. Options are 'TRUNCATE' (the default) 'LAST COLUMN' or 'NEW ROW'. This may not be combined with COLUMN_MAPPING

Provenance Columns for Parsers

When parsing data, you can define provenance columns for your foreign stream. These return metadata for the parsed data.

For CSV, these are as follows:

Data Type Name Value
BIGINT SQLSTREAM_PROV_PARSE_POSITION Parser position within message of last parse error.
VARCHAR(65535) SQLSTREAM_PROV_PARSE_ERROR Description of parser error.
BIGINT SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER How many lines have been parsed so far. This value is not reset per message or file.

Foreign Stream Options for Parsing JSON Data

Option Definition
PARSER This needs to be JSON.
ROW_PATH This is the JSON path for the row to be found. The JsonPath parser uses a row path to find JSON objects containing a row, then the path for each column is used to find the value to be extracted.
<COLUMN_NAME>_PATH Optional for each column in the ECDA column set. This defaults to \$..<COLUMN_NAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to $..FOO which would return the first property named FOO under the JSON object found by the ROW_PATH.
CHARACTER_ENCODING Character set for data.

Foreign Stream Options for Parsing XML

Option Definition
PARSER This needs to be XML.
CHARACTER_ENCODING Character set for data.
PARSER_XML_ROW_TAGS An absolute XPATH query that finds the XML element that becomes one row. No default.
_XPATH An XPATH that finds the text that becomes the value of the column of the same name. Examples include "RetailStoreID_XPATH" '/POSLog/Body/tri:RetailTransaction/RetailStoreID',"WorkstationID_XPATH" '/POSLog/Body/tri:RetailTransaction/WorkstationID',
PARSER_XML_USE_ATTRIBUTES True/false (default is false). Specifies the default XPATH query to find each column's data in a tuple. If false, the column name is assigned from a child element's tag name. If true, the default is \@<column_name>, meaning an attribute of the xml row element.
CUSTOM_TYPE_PARSER_ Allows overriding of individual column's parsing. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeParser

Provenance Columns for Parsers

When parsing data, you can declare provenance columns. These return metadata for the parsed data.

For XML, these are as follows:

Data Type Name in s-Server 6.0.0 Name in s-Server 6.0.1 Value
BIGINT PARSE_POSITION SQLSTREAM_PROV_PARSE_POSITION Parser position within message of last parse error.
BIGINT PARSE_LINE_NUMBER SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER How many lines have been parsed so far. This value is not reset per message or file.

Foreign Stream Options for Parsing Key Values

Option Definition
PARSER This needs to be KV.
QUOTE_CHARACTER Lets you specify a different quote character, such as a single quote ('). Default is double quote (").
KEY_VALUE_SEPARATOR_CHARACTER Lets you specify the character that connects keys and values. Default is equals symbol (=)
SEPARATOR Lets you specify a character that separates key-value pairs. Default is comma (,).
ROW_SEPARATOR Lets you specify a character that splits lines in the key-value source. Default is \n.

Provenance Columns for Parsers

When parsing data, you can declare provenance columns. These return metadata for the parsed data.

For Key Values, these are as follows:

Data Type Name Description
BIGINT SQLSTREAM_PROV_PARSE_POSITION Parser position within message of last parse error.
VARCHAR(65535) SQLSTREAM_PROV_PARSE_ERROR Description of parser error.
BIGINT SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER How many lines have been parsed so far. This value is not reset per message or file.

Foreign Stream Options for Parsing AVRO

Option Definition
PARSER This needs to be AVRO.
AVRO_SCHEMA_LOCATION This option can either be an HTTP URL to fetch the schema or it can be a path to a file on server host machine or VM.
SCHEMA_HEADER is a required option to indicate if the AVRO schema is embedded in the AVRO data. This option needs to be set to false for data sources like Kafka or AMQP, where each message can be one or more serialized Avro records without a schema.
ROW_PATH This is the AVRO path for the row to be found. The Avro parser uses a row path to find Avro objects containing a row, then the path for each column is used to find the value to be extracted.
PATH Path for each column in the ECDA column set. This defaults to _\$..<COLUMNNAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to \$..FOO which would return the first property named FOO under the Avro object found by the ROW_PATH.

Foreign Stream Options for Parsing ProtoBuf

Option Definition
PARSER 'PROTOBUF' Required. Indicates that ECD parser will parse files as protobuf.
SCHEMA_JAR Required. Jar containing compiled java classes created with the Google protocol buffer compiler (protoc command), such asunitsql/concurrent/plugins/common/protobufData/protobufpackage.jar. Locations are relative to $SQLSTREAM_HOME.

See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream//s-Server.
SCHEMA_CLASS Required. Class name of outer protobuf record created with the Google protocol buffer compiler (protoc command), such as protobuf.PackageProto.protobuf.PackageProto\$protobufPackage. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.
<column name>_PATH Not required. Lets you specify a path within the schema that maps to a column in the foreign stream. If these are not specified, s-Server populates a column using a field with the same name in the outer level record .
MESSAGE_STREAM True or false (defaults to false). This option tells the ProtoBuf parser to expect multiple messages within a single payload, each preceded by a bytes field containing their encoded length. When you set MESSAGE_STREAM to true, s-Server will expect that messages will be prefixed by this bytes field.

Foreign Stream Options for the None Parser

Option Name Description
PARSER Must be set to NONE to use the None parser.
CHARACTER_ENCODING Only applies with VARCHAR. Defaults to UTF-8. See https://docs.oracle.com/javase/8/docs/api/java/nio/charset/StandardCharsets.html
ROW_SEPARATOR If ROW_SEPARATOR is specified, it should either be a string of HEX digit pairs when going to VARBINARY or a string that encodes properly in the character encoding being used.
If ROW_SEPARATOR is not specified (or is empty) then each row will contain the entire content of one file or one message.