Configuring External Stream Sinks

External Stream sinks make use of s-Server's Extensible Common Data framework. This framework allows you to read and write rows of data in a range of forms over a range of input/output formats, including the file system, network sockets, AMQP, Amazon Kinesis, and Kafka. All data is sent as a string in CSV, XML, or JSON format

Using the File System as a External Stream Sink

To read streaming data over the file system, you need two pieces of information:

  • The directory in which the file resides.
  • A pattern for the file's name. Here, you enter part of the file's name, such as output, csv, or log. No quotation marks are necessary.

| DIRECTORY | Directory in which file resides. |

Using a Network Socket as a External Stream Sink

To read from a line, CSV, XML, or JSON file over a network socket, you need to configure the socket connection. You may want to consult with whoever has set up the application with which StreamLab will connect over the socket. Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields. Connections can optionally be made using IPV6.

Name Description
Remote Host Hostname to send rows to or receive rows from. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>.
Socket uses TCP? Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
Skip Header True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using MongoDB as a External Stream Sink

Options for Writing to a MongoDB Collection

Option Definition
URL Fully qualified URL starting with *mongodb:// and including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number. See https://docs.mongodb.com/manual/reference/connection-string/ for more information.

Using AMQP as a External Stream Sink

To read from a External Stream sink over AMQP, you need to configure the AMQP connection. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see http://www.amqp.org/about/what. You may want to consult with whoever has set up AMQP in your environment.

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure StreamLab for AMQP differently for 1.0 than for up to 0.9.1

| Name | Description | | --- | --- | | AMQP URL | Required. Connection URL for AMQP legacy server. This includes the server's hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version. |

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.|

Option Name Description
Partition Expression You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as *<0-3>.
Acknowledge Mode Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html# amqp-inbound-ack

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

Using Amazon Kinesis as a External Stream sink

To read from a External Stream sink over Amazon Kinesis, you need to configure the Amazon Kinesis connection. | Option Name | Description | | --- | --- | | Kinesis Stream Name | Required. Name of Kinesis stream to write to. No default.| | Kinesis Application Name | Identifies client in cloud watch (defaults to sqlstream). | | . Must point to a credential file on the s-Server file system with the following format:

[default]
aws_access_key_id = xxx
aws_secret_access_key = yyy

This defaults to blank, which goes to ~/.aws/credentials.

You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See - Setting Up an AWS Profile Path in the topic - Reading from Kinesis Streams in the SQLstream Integration Guide .

Option Definition
AWS Profile Name Optional. Profile name to use within credentials file. Defaults to *default.
Initial Position LATEST for latest or TRIM_HORIZON for earliest. Defaults to LATEST.
Socket Timeout (defaults to -1) if set will override kinesis socket timeout

Using Kafka as a External Stream Sink

To read from a line, CSV, XML, or JSON file over Kafka, you need to configure the connection to Kafka. Kafka is an open-source, real-time publish-subscribe messaging framework. See http://kafka.apache.org/ for more details. You may want to consult with whoever has set up the Kafka messaging system in your environment.

To connect with Kafka, you need two pieces of information:

  • The name and port of the Kafka broker (this defaults to localhost:9092, but the source will not work if a Kafka broker is not working at this location).
  • The Kafka topic name from which you are reading.

The other configuration details below help manage the starting point for reading Kafka topics as well as the amount of data fed to StreamLab.

Foreign Stream Options for Writing to Kafka

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html. Aside from defaults, the information in this table is drawn from this page. *

Option name Description
topic Kafka topic
TRANSACTION_ROWTIME_LIMIT Kafka10 adapter only.

Range in milliseconds.Allows all rows received from the input query that have ROWTIME values within the specified range to be committed in a single transaction to the Kafka broker. Transactions are used only if the Kafka broker supports transaction semantics. If this option is set to 1000 (1 second), then all rows with ROWTIME between 10:00:00.000 and 10:00:00.999 get committed in the same transaction atomically.
seed_brokers Legacy adapter only.
A comma separated list of broker identifiers in the format "<broker_host_name>:<port>".

For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
transactional.id Case sensitive; must be lowercase and quoted.

This is the transaction ID used by the KafkaWriter instance for the given foreign stream.Each foreign stream should use a unique "transactional.id" to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2 . These support transaction semantics.

Note: You need to create a separate foreign stream definition for each pump that inserts(publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.
pump.name Case-sensitive; must be quoted. Kafka10 adapter only.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option.s-Server uses this setting to determine the mode in which the pump instance itself) is running (Leader or Follower) when configured to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
HA_ROLLOVER_TIMEOUT Kafka10 adapter only.

Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option.When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader".There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same "transactional.id" continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
POLL_TIMEOUT This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
COMMIT_METADATA_COLUMN_NAME Using this option, you can commit a stringified value of the specified column along with its ROWTIME in a CSV format, along with the offset of the last published message for each partition in a transaction.

The format of the metadata string is as follows:
CommitRowtimeInMillisFromEpoch>,<metadata_column_value>
PORT Deprecated option. As of s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
Kafka: metadata.broker.list

Kafka10 adapter: bootstrap.server(case sensitive; must be lowercase and quoted)
hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
kafka.producer.config Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
partitioner.class Deprecated for Kafka10

Fully qualified Java classname of Kafka partitionerDefaults tocom.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
serializer.class Case sensitive; must be lowercase and quoted. Fully qualified Java classname of Kafka serializer, Defaults to kafka.serializer.DefaultEncoder
key.serializer.class Case sensitive; must be lowercase and quoted. Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
producer.type Case sensitive; must be lowercase and quoted.

Specifies whether messages sent asynchronously in a background thread. Async lets requests be batched. This helps throughput but increases the possibility that a failed client machine results in unsent data. Defaults to async.
compression.codec Case sensitive; must be lowercase and quoted. Specifies the compression codec for generated data, either "none", "gzip" and "snappy".
compressed.topics Case sensitive; must be lowercase and quoted. If you have specified a compression.codec (other than "none"), this option lets you limit compression to those topics listed in this option. Empty means apply compression to all topics.
message.send.max.retries Case sensitive; must be lowercase and quoted. If enabled, producer will automatically retry a failed send request for a set number of retries.Note: using this option may, according to Kafka documentation "lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgment to be lost."
retry.backoff.ms Case sensitive; must be lowercase and quoted. Producers refreshes metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
request.required.acks Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this sets a max time for buffering data in milliseconds. For example, "100" means "try to batch together 100ms of messages." Like most buffering, improves throughput but adds message delivery latency.
request.timeout.ms Case sensitive; must be lowercase and quoted. When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
topic.metadata.refresh.interval.ms Case sensitive; must be lowercase and quoted. By default, the producer refreshes topic metadata along two lines:•First, at regular intervals, which defaults to every 600000 ms, or 10 minutes.•Second, any time there is a failure, such as a partition missing or leader not being available.This setting lets you change the regular polling interval by specifying a new interval. If you set this number to a negative, the producer only refreshes on failure. If you set this number to zero, the producer refreshes ever time a message is sent (this is not recommended).
Note: refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed.
queue.buffering.max.ms Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify a maximum number of unsent messages to queue. Once this number is reached, either the producer must be blocked or data must be dropped.
queue.buffering.max.messages Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify a maximum number of messages to buffer.
queue.enqueue.timeout.ms Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.
batch.num.messages Case sensitive; must be lowercase and quoted. If async is enabled in producer.mode, this lets you specify the number of messages to send in one batch. With this option enabled, the producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
send.buffer.bytes Case sensitive; must be lowercase and quoted. Socket write buffer size.
client.id Case sensitive; must be lowercase and quoted. Using this option, you can specify a string to help you identify the application making calls to the Kafka server.

Using MQTT as a External Stream Sink

To read data from or write data to MQTT, you need to configure the connection to MQTT. StreamLab uses this information to implement an MQTT client that reads data into the foreign stream. Minimum options required are TOPIC and CONNECTION_URL.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network

connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |