CREATE FOREIGN STREAM

A foreign stream is an instance of a foreign data wrapper that provides access in s-Server to a flow of data either from or to an external system. Foreign streams are often the site where data enters s-Server.

For example, the Extensible Common Data Adapter supports the creation of a foreign stream that will take rows (from a local stream) and send them by email to a recipient.

Syntax

CREATE [OR REPLACE] FOREIGN STREAM foreign-stream-name (<column_list>) SERVER <server_name> [OPTIONS <options_specification>] [DESCRIPTION string_literal]

For detailed examples of using CREATE FOREIGN STREAM and stream options, see the topics Reading from Other Sources and Writing to Other Sources in the Integrating with Other Systems.

The OPTIONS required to create a foreign stream depend upon the server that the foreign stream references. Servers are often defined for an adapter, such as the SQL/MED Adapter or Extensible Common Data Adapter. See the s-Server Integrations page in the Integrating Guavus SQLstream with Other Systems guide for more details.

Using Prebuilt Server Objects

S-Server ships with a number of prebuilt server objects for the Extensible Common Data Adapter. In many cases, these objects will work fine for creating foreign streams for ECDA sources and sinks. You generally only need to create your own server object for ECDA sources and sinks if you know you will have multiple foreign streams that share options. In this case, it may be more efficient to define a custom server object with these shared options. Doing so allows foreign streams that invoke the custom server object to inherit options. See Using SQL/MED Inheritance with Server objects.

Prebuilt Server Objects Available in s-Server

TYPE Name of Prebuilt Server
Kafka Legacy KAFKA_SERVER
Kafka 0.1 KAFKA10_SERVER
AMQP 1.0 AMQP10_SERVER
MQTT MQTT_SERVER
AMQP Legacy AMQP_LEGACY_SERVER
MAIL MAIL_SERVER
Amazon Kinesis KINESIS_SERVER
HTTP HTTP_SERVER
WebSocket WEBSOCKET_SERVER
MongoDB MONGODB_SERVER
Snowflake SNOWFLAKE_SERVER
Hadoop HDFS_SERVER
File System FILE_SERVER
Network Socket NET_SERVER

Example

The following example uses the ECDA file reader to write CSV data to the file system. In order to read from or write to the file system, you must reference a prebuilt server object called FILE_SERVER. In the foreign stream's options, you configure how s-Server connects to the file system. Foreign streams must be created in a schema, which is what we create next. This foreign stream declares columns explicitly, but in some cases, you can derive column names from the external source.

See the topic Writing to the File System in the Integrating with Other Systems Guide for more information about this example.

CREATE OR REPLACE SCHEMA "FileWriterSchema"
SET SCHEMA 'FileWriterSchema';
CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER FILE_SERVER
OPTIONS
(directory 'myDirectory',
--directory for the file
formatter 'CSV',
filename_pattern 'myRecord.csv',
--regex for filename pattern to look for
character_encoding 'UTF-8',
skip_header 'true');

To start writing, you need to create a pump to insert data into the foreign stream. You do so using code along the following lines. See the topic CREATE PUMP in this guide for more information on pumps.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Options

Options for Reading Data Options for Writing Data Options for Parsing Files Options for Formatting Data
Reading from Sockets
Reading from AMQP
Reading from Kafka
Reading from MQTT
Reading from HTTP
Reading from WebSockets
Reading from the File System
Writing to the File System
Writing to Sockets
Writing to AMQP

Writing to Kafka

Writing to MQTT
Writing to Amazon Kinesis
Writing to Snowflake
Writing to MongoDB
Parsing CSV
Parsing XML
Parsing JSON
Parsing Key Values
Parsing AVRO
Parsing ProtoBuf
None Parser
Formatting CSV Data
Formatting XML Data
Formatting JSON Data
Formatting BSON Data,Formatting Avro Data

You can specify an OPTIONS clause to provide parameters for a foreign stream or for any column.

Each OPTIONS specification is an option-value pair that names the option and gives its intended value.

No two options in the same clause may have the same name.

Options vary greatly depending on the kind of source for which you are setting up the foreign stream. The uses of foreign streams are not limited to the below, but these do represent common uses for foreign streams in s-Server.

Options for Reading Data

Foreign Stream Options for Reading from the File System

Option Description
DIRECTORY Directory in which file resides or to which you are writing.
FILENAME_PATTERN Input only. Java regular expression defining which files to read.

See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions.
STATIC_FILES Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. File reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to).
REPEAT Defaults to 0, meaning do not repeat. Can be a positive whole number, a negative number, or 'FOREVER'.

For positive numbers, after processing all files that match FILENAME_PATTERN, start over again. Repeat for the specified number.

If negative or 'FOREVER', keep reprocessing all files that match FILENAME_PATTERN forever. You must set STATIC_FILES to true in order to use this option.

See the topic Reading From the File System in the Integration Guide for more details.

Foreign Stream Options for Reading from Sockets

The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Option Description
IS_IPV6 Whether or not the socket uses IPV6. Default is false.
IS_TCP Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
REMOTE_HOST Hostname to send tuples to or receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.
REMOTE_PORT Port to send tuples to or receive tuples from when ECDA is acting as a client. REMOTE* and SERVER* tells ECDA's socket code how to start the network connection (as a server or a client).
SERVER_HOST The hostname or IP address to listen upon to send/receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.
SERVER_PORT the port to listen upon to send/receive tuples when ECDA is acting as a server.

See the topic Reading from Network Sockets in the Integration Guide for more details.

Foreign Stream Options for Reading from AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Option Description
CONNECTION_URL Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
DESTINATION This can be in the form {PARITITION}.
PARTITION_EXPRESSION You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>.
PARSER_QUEUE_SIZE Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack.

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]`

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

See the topic Reading from AMQP in the Integration Guide for more details.

Foreign Stream Options for Reading from Kafka

Option Description
TOPIC Required. Kafka Topic. You can use a regular expression as a "topic wild card." (This is not supported by legacy versions of the adapter.)
STARTING_TIME Either EARLIER, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
INDEX_TOPIC_NAME This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
STARTING_OFFSET When to start reading from (default is -1) as a long int representing a timestamp (milliseconds since epoch)
SEED_BROKERS A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Deprecated for Kafka10 adapter. For legacy adapter, port for Kafka seed broker
MAX_POLL_RECORDS Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the deafult) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY This is a SQL query text that fetches starting offsets for all partitions of the topic. For example, SELECT "TOPIC", "PARTITION", "OFFSET" FROM stored_offsets;'PARTITION should be of type INTEGER.OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
FETCH_SIZE Fetch size. Defaults to 1000000.
CLIENT_ID For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property group.id)Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
METRICS_PER_PARTITION True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
OPTIONS_QUERY Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as inselect lastOffset as STARTING_OFFSET from TEST.committedOffset');
kafka.consumer.config Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
isolation.level Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.

See the topic Reading from Kafka in the Integration Guide for more details.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Defaults to 30.

See the topic Reading from MQTT in the Integration Guide for more details.

Foreign Stream Options for Reading Over HTTP

Option Description
URL URL for HTTP feed.
HEADER_<name_of_header> Tells HTTP reader to look for a header called .

See the topic Reading from HTTP in the Integration Guide for more details.

Foreign Stream Options for Reading Over WebSockets

Option Description
URL URL for web socket.
HEADER_ Tells Web Socket reader to look for a header called <name_of_header>.

See the topic Reading from WebSockets in the Integration Guide for more details.

Options for Writing Data

Foreign Stream Options for Writing to the File System

Option Description
DIRECTORY Directory to which you are writing. s-Server needs permission to write to this directory.
ORIGINAL_FILENAME Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.
FILENAME_PREFIX Prefix of the final output file name, such as "test-".
FILENAME_DATE_FORMAT Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ssUses java SimpleDateFormatThis specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.
FILE_ROTATION_WATERMARK_COLUMN This declares the name of a source column whose contents will be used to further distinguish files in the series.
FILENAME_SUFFIX Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"
FILE_ROTATION_TIME Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation." You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval. Examples:FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on).FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour.FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM. More technically, FILE_ROTATION_TIME works as follows:Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.Let $lastWrittenRowtime be the ROWTIME of the last row in the file.Let $currentRowTime be the ROWTIME of the row about to be written. s-Server rotates to the next file whenintegerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)
FILE_ROTATION_SIZE Determines when files rotate based on file size. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation." Examples:FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte" s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE.
FILE_ROTATION_RESPECT_ROWTIME 'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime." Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIMEto false.
ESCAPE_ True or false; defaults to true. Causes strings to be escaped before being written.
POSTPROCESS_CMD The POSTPROCESS_CMD option lets you run a script after the file is written. To use this option, enter the path to the script, along with parameters, substituting <input> for the name of the file. When the file is written, the script will execute with parameters, and <input> will be replaced by the name of the file.

Example: 'scripts/runStandaloneSystemML.sh scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv',

See the topic Writing to the File System in the Integration Guide for more details.

Foreign Stream Options for Writing to Sockets

The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.

Name Description
IS_IPV6 Whether or not the socket uses IPV6. Default is false.
IS_TCP Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
REMOTE_HOST Hostname to send tuples to or receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as 168.212.226.204. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.
REMOTE_PORT Port to send tuples to or receive tuples from when ECDA is acting as a client. REMOTE_ and SERVER_ tells ECDA's socket code how to start the network connection (as a server or a client).
SERVER_HOST The hostname or IP address to listen upon to send/receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.
SERVER_PORT the port to listen upon to send/receive tuples when ECDA is acting as a server.

The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST. See the topic Writing to Network Sockets in the Integrating with Other Systems for more details.

a id="writing-amqp">

Foreign Stream Options for Writing to Sockets

Name Description
IS_IPV6 Whether or not the socket uses IPV6. Default is false.
REMOTE_HOST Hostname to send tuples to or receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client.
SERVER_HOST The hostname or IP address to listen upon to send/receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client.

See the topic Writing to AMQP in the Integration Guide for more details.

Foreign Stream Options for Writing to Kafka

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html. Aside from defaults, the information in this table is drawn from this page. *

Option name Description
topic Kafka topic
TRANSACTION_ROWTIME_LIMIT Kafka10 adapter only.

Range in milliseconds.Allows all rows received from the input query that have ROWTIME values within the specified range to be committed in a single transaction to the Kafka broker. Transactions are used only if the Kafka broker supports transaction semantics. If this option is set to 1000 (1 second), then all rows with ROWTIME between 10:00:00.000 and 10:00:00.999 get committed in the same transaction atomically.
seed_brokers Legacy adapter only.
A comma separated list of broker identifiers in the format "<broker_host_name>:<port>".

For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
transactional.id Case sensitive; must be lowercase and quoted.

This is the transaction ID used by the KafkaWriter instance for the given foreign stream.Each foreign stream should use a unique "transactional.id" to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2 . These support transaction semantics.

Note: You need to create a separate foreign stream definition for each pump that inserts(publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.
pump.name Case-sensitive; must be quoted. Kafka10 adapter only.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option.s-Server uses this setting to determine the mode in which the pump instance itself) is running (Leader or Follower) when configured to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
HA_ROLLOVER_TIMEOUT Kafka10 adapter only.

Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option.When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader".There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same "transactional.id" continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
POLL_TIMEOUT This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
COMMIT_METADATA_COLUMN_NAME Using this option, you can commit a stringified value of the specified column along with its ROWTIME in a CSV format, along with the offset of the last published message for each partition in a transaction.

The format of the metadata string is as follows:
CommitRowtimeInMillisFromEpoch>,<metadata_column_value>
PORT Deprecated option. As of s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
Kafka: metadata.broker.list

Kafka10 adapter: bootstrap.server(case sensitive; must be lowercase and quoted)
hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
kafka.producer.config Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
partitioner.class Deprecated for Kafka10

Fully qualified Java classname of Kafka partitionerDefaults tocom.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
serializer.class Case sensitive; must be lowercase and quoted. Fully qualified Java classname of Kafka serializer, Defaults to kafka.serializer.DefaultEncoder
key.serializer.class Case sensitive; must be lowercase and quoted. Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
producer.type Case sensitive; must be lowercase and quoted.

Specifies whether messages sent asynchronously in a background thread. Async lets requests be batched. This helps throughput but increases the possibility that a failed client machine results in unsent data. Defaults to async.
compression.codec Case sensitive; must be lowercase and quoted. Specifies the compression codec for generated data, either "none", "gzip" and "snappy".
compressed.topics Case sensitive; must be lowercase and quoted. If you have specified a compression.codec (other than "none"), this option lets you limit compression to those topics listed in this option. Empty means apply compression to all topics.
message.send.max.retries Case sensitive; must be lowercase and quoted. If enabled, producer will automatically retry a failed send request for a set number of retries.Note: using this option may, according to Kafka documentation "lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgment to be lost."
retry.backoff.ms Case sensitive; must be lowercase and quoted. Producers refreshes metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
request.required.acks Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this sets a max time for buffering data in milliseconds. For example, "100" means "try to batch together 100ms of messages." Like most buffering, improves throughput but adds message delivery latency.
request.timeout.ms Case sensitive; must be lowercase and quoted. When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
topic.metadata.refresh.interval.ms Case sensitive; must be lowercase and quoted. By default, the producer refreshes topic metadata along two lines:•First, at regular intervals, which defaults to every 600000 ms, or 10 minutes.•Second, any time there is a failure, such as a partition missing or leader not being available.This setting lets you change the regular polling interval by specifying a new interval. If you set this number to a negative, the producer only refreshes on failure. If you set this number to zero, the producer refreshes ever time a message is sent (this is not recommended).
Note: refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed.
queue.buffering.max.ms Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify a maximum number of unsent messages to queue. Once this number is reached, either the producer must be blocked or data must be dropped.
queue.buffering.max.messages Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify a maximum number of messages to buffer.
queue.enqueue.timeout.ms Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.
batch.num.messages Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify the number of messages to send in one batch. With this option enabled, the producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
send.buffer.bytes Case sensitive; must be lowercase and quoted. Socket write buffer size.
client.id Case sensitive; must be lowercase and quoted. Using this option, you can specify a string to help you identify the application making calls to the Kafka server.

See the topic Writing to Kafka in the Integration Guide for more details.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network

connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |

See the topic Writing to MQTT in the Integration Guide for more details.

Foreign Stream Options for Writing Over HTTP

Format Name Name
URL URL for HTTP feed.
HEADER_<name_of_header> Tells HTTP reader to look for a header called .

See the topic Writing to HTTP in the Integration Guide for more details.

Foreign Stream Options for Writing to WebSockets

Format Name Name
URL URL for web socket.
POLL_IN_MILLIS How often to request new data, in milliseconds.
HEADER_<name_of_header> Tells Web Socket reader to look for a header called <name_of_header>.

See the topic Writing to WebSockets in the Integration Guide for more details.

Foreign Stream Options for Writing to a Kinesis Stream

Option Name Description
KINESIS_STREAM_NAME Required. Name of Kinesis stream to write to. No default.
AWS_REGION Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
AWS_PROFILE_PATH See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:[default]aws_access_key_id = xxxaws_secret_access_key = yyy This defaults to '' - which goes to ~/.aws/credentials.Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide .
AWS_PROFILE_NAME Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html
INITIAL_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.
MAX_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.
MAX_RETRIES Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.
BUFFER_SIZE Optional. Maximum number of bytes per update request. Defaults to 4194304.
MAX_RECORDS_PER_REQUEST Optional. maximum number of records per update request. Defaults to 500.
REPORT_FREQUENCY Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never."
KINESIS_DEFAULT_PARTITION_ID Optional. Partition id of shard to write to. Defaults to ''.

See the topic Writing to Kinesis Streams in the Integration Guide for more details.

Foreign Stream Options for Writing to a Snowflake Warehouse

Version 5.2 Feature

Option Name Description
ACCOUNT The name assigned to your account by Snowflake.
USER The user name for your Snowflake account.
PASSWORD The password for your Snowflake account.
DB The database to write to in Snowflake. This should be an existing database for which user/password has privileges.
SCHEMA The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.
WAREHOUSE The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.
DTABLE The table to write to in Snowflake. This should be an existing table for which user/password has privileges.

See the topic Writing to Snowflake in the Integration Guide for more details.

Options for Writing to a MongoDB Collection

Option Definition
URL Fully qualified URL starting with mongodb:// and including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number. Seehttps://docs.mongodb.com/manual/reference/connection-string/ for more information.
COLLECTION MongoDB collection to which data will be written.

See the topic Writing to MongoDB in the Integration Guide for more details.

Options for Formatting Data

Foreign Stream Options for Formatting CSV Data

Option Definition
FORMATTER This needs to be CSV.
WRITE_HEADER Whether to write the column names into a header row. True or False.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
WRITE_HEADER Whether or not to write header information into the CSV data. Defaults to 'false."
ROW_SEPARATOR_CHAR_KEY Character separating rows in CSV data.
SEPARATOR Character separating values. Defaults to ","

Foreign Stream Options for Formatting XML Data

Option name Description
FORMATTER This needs to be XML.
DOC_ELEMENTS Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch".
ROW_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row".
DATA_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
DATA_ATTRIBUTES Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple.
_ATTRIBUTES Specifies a name of an attribute to add for a specific column's datum in each row/tuple.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Foreign Stream Options for Formatting BSON

Option Definition
FORMATTER This needs to be BSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Foreign Stream Options for Formatting Avro Data

Option Definition
FORMATTER This needs to be AVRO.
AVRO_SCHEMA_LOCATION Required option to specify the path for the schema file to be used for formatting the Avro payload.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Options for Parsing Data

Foreign Stream Options for Parsing CSV Data

Option Definition
SKIP_HEADER True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.
QUOTE_CHARACTER Lets you specify an expected quotation character. There is no default for quote character.
ROW_SEPARATOR Lets you specify a character that splits lines in the source. Defaults to /n.For example, in the case of reading from a DOS format file where line separators are CR/LF rather than just LF, you will need to specifyrow_separator U&'\000D\000A'(This is the hexidecimal Unicode value for CR/LF. Seehttp://unicode.org/standard/reports/tr13/tr13-5.html.
SEPARATOR Lets you specify a character that separates key-value pairs. Default is comma (,).

Foreign Stream Options for Parsing JSON Data

Option Definition
PARSER This needs to be JSON.
ROW_PATH This is the JSON path for the row to be found. The JsonPath parser uses a row path to find JSON objects containing a row, then the path for each column is used to find the value to be extracted.
<COLUMN_NAME>_PATH Optional for each column in the ECDA column set. This defaults to \$..<COLUMN_NAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to $..FOO which would return the first property named FOO under the JSON object found by the ROW_PATH.

Foreign Stream Options for Parsing XML

Option Definition
PARSER This needs to be XML.
PARSER_XML_ROW_TAGS An absolute XPATH query that finds the XML element that becomes one row. No default.
_XPATH An XPATH that finds the text that becomes the value of the column of the same name. Examples include "RetailStoreID_XPATH" '/POSLog/Body/tri:RetailTransaction/RetailStoreID',"WorkstationID_XPATH" '/POSLog/Body/tri:RetailTransaction/WorkstationID',
PARSER_XML_USE_ATTRIBUTES True/false (default is false). Specifies the default XPATH query to find each column's data in a tuple. If false, the column name is assigned from a child element's tag name. If true, the default is @, meaning an attribute of the xml row element.
CUSTOM_TYPE_PARSER_ Allows overriding of individual column's parsing. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeParser

Foreign Stream Options for Parsing Key Values {parsing-keyvalues}

Option Definition
PARSER This needs to be KV.
QUOTE_CHARACTER Lets you specify a different quote character, such as a single quote ('). Default is double quote (").
KEY_VALUE_SEPARATOR_CHARACTER Lets you specify the character that connects keys and values. Default is equals symbol (=)
SEPARATOR Lets you specify a character that separates key-value pairs. Default is comma (,).
ROW_SEPARATOR Lets you specify a character that splits lines in the key-value source. Default is \n.

Foreign Stream Options for Parsing AVRO

Option Definition
PARSER This needs to be AVRO.
AVROSCHEMAFILE This option can either be an HTTP URL to fetch the schema or it can be a path to a file on server host machine or VM.
SCHEMA_HEADER is a required option to indicate if the Avro schema is embedded in the Avro data. This option needs to be set to false for data sources like Kafka or AMQP, where each message can be one or more serialized Avro records without a schema.
ROW_PATH This is the Avro path for the row to be found. The Avro parser uses a row path to find Avro objects containing a row, then the path for each column is used to find the value to be extracted.
<COLUMNNAME>_PATH Path for each column in the ECDA column set. This defaults to _\$..<COLUMNNAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to \$..FOO which would return the first property named FOO under the Avro object found by the ROW_PATH.

Foreign Stream Options for Parsing ProtoBuf

Option Definition
PARSER \'PROTOBUF\' Required. Indicates that ECD parser will parse files as protobuf.
SCHEMA_JAR Required. Jar containing compiled java classes created with the Google protocol buffer compiler (protoc command), such asunitsql/concurrent/plugins/common/protobufData/protobufpackage.jar. Locations are relative to \$SQLSTREAM_HOME.

See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server.
SCHEMA_CLASS Required. Class name of outer protobuf record created with the Google protocol buffer compiler (protoc command), such as protobuf.PackageProto.protobuf.PackageProto\$protobufPackage. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.
<column name>_PATH Not required. Lets you specify a path within the schema that maps to a column in the foreign stream. If these are not specified, s-Server populates a column using a field with the same name in the outer level record .
MESSAGE_STREAM True or false (defaults to false). Indicates whether or not to treat messages as continuous stream. If MESSAGE_STREAM is true then protobuf messages will be sent to s-Server as they're received. This could make a difference for sources, such as files or sockets, which don't necessarily deliver data in complete chunks.Note: If MESSAGE_STREAM is true, then all outer fields used must have an index less than any repeated field used.
MESSAGE_LENGTH_PREFIXED True or false (default is false). Indicates whether or not are all records prefixed with a length field. Must be specified if MESSAGE_STREAM is set.

Foreign Stream Options for the None Parser

Option Name Description
PARSER Must be set to NONE to use the None parser.
CHARACTER_ENCODING Only applies with VARCHAR. Defaults to UTF_8. Defaults to UTF-8. See https://docs.oracle.com/javase/8/docs/api/java/nio/charset/StandardCharsets.html
ROW_SEPARATOR If ROW_SEPARATOR is specified, it should either be a string of HEX digit pairs when going to VARBINARY or a string that encodes properly in the character encoding being used.