Sink Output Types

External data stream sinks require you to select an output type. Output type determines where the file will be written. This can be the file system, a network socket, AMQP, Kafka, or Amazon Kinesis.

Using a File System Location as a Sink

Setting up a file system location as a sink is reasonably straightforward:

  • Enter a location where the file will be written. This location must be accessible by StreamLab.
  • If desired, enter a prefix and suffix for the file.
  • If desired, change the Filename Date Format for the file. (Files are renamed as they are rotated.)
  • Indicate how the file should be rotated, by entering either:

  • Maximum Bytes per File. Files will be rotated once they reach this size.

  • Maximum Time per File. Files will be rotated once you reach the time limit.

Using a Socket as a Sink

Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields. To have StreamLab serve as a host upon which other clients will listen, use the Server Host and Server Port fields. Connections can optionally be made using IPV6.

Name Description
Remote Host Hostname to send rows to or receive rows from. You can override this to ‘LOCALHOST’ to listen to only local connections, or a specific ip address, such as <168.212.226.204>.
Remote Port Port to send rows to or receive rows from.
Server Host The hostname or IP address to listen upon to send/receive rows (defaults to 0.0.0.0).
Server Port The port to listen upon to send/receive rows.
Socket uses TCP? Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
Socket uses IPV6? Whether or not the socket uses IPV6. Default is false.
Skip Header True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using AMQP as a Sink

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure AMQP sinks differently for 1.0 than for up to 0.9.1

Connection Options for AMQP 0.9.1

The AMQP option lets you define a sink with an AMQP 0.9.1 message bus. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see http://www.amqp.org/about/what. As with other input formats, AMQP simply intakes rows as strings in CSV, XML, or JSON format. To set up an AMQP 0.9.1 sink, you need the following pieces of information:

ame Description
Name hostname or IP address of the AMQP server (default localhost)
Port port of the AMQP server (default 5672)
EXCHANGE_TYPE the type of exchange to connect to (default fanout)
EXCHANGE_NAME the name of the exchange to send/receive tuples to/from,required

To set up an AMQP 1.0 sink, you need the following pieces of information:

As with other input formats, AMQP 1.0 simply intakes rows as strings in CSV, XML, or JSON format.

Name Description
Name This is the hostname or IP address of the AMQP server. Defaults to localhost.
Port This is the port of the AMQP server. Defaults to 5672.
Queue or Topic String value which identifies a target object to read from or to write to.
User ID Required. This is the user ID to use to connect to the AMQP 1.0 server,.
Password Required. This is the password to use to connect to the AMQP 1.0 server.

Using Kafka as a Sink

When you point a sink to Kafka, you can configure a wide range of options for delivering rows to Kafka: The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html.

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html.

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html.

Option name Description
TOPIC Kafka topic
Client ID Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
Partitioner Fully qualified Java classname of Kafka partitionerDefaults tocom.sqlstream.aspen.namespace.common.KafkaOutputSink.RoundRobinPartitioner
Serializer Fully qualified Java classname of Kafka serializer, Defaults to kafka.serializer.DefaultEncoder
Key Serializer Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
Producer Specifies whether messages sent asynchronously in a background thread. Async lets requests be batched. This helps throughput but increases the possibility that a failed client machine results in unsent data. Defaults to async.
Compression Specifies the compression codec for generated data, either “none”, “gzip” and “snappy”.
Compressed Topics If you have specified a compression.codec (other than “none”), this option lets you limit compression to those topics listed in this option. Empty means apply compression to all topics.
Message Send Retries If enabled, producer will automatically retry a failed send request for a set number of retries. Note: using this option may, according to Kafka documentation “lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgment to be lost.”
Request Required Acks If async is enabled in producer.mode, this sets a max time for buffering data in milliseconds. For example, “100” means “try to batch together 100ms of messages.” Like most buffering, improves throughput but adds message delivery latency.
Request Timeout Ms When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
Topic Metadata Refresh By default, the producer refreshes topic metadata along two lines:•First, at regular intervals, which defaults to every 600000 ms, or 10 minutes.•Second, any time there is a failure, such as a partition missing or leader not being available.This setting lets you change the regular polling interval by specifying a new interval. If you set this number to a negative, the producer only refreshes on failure. If you set this number to zero, the producer refreshes ever time a message is sent (this is not recommended).Note: refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
Request Timeout If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.
Max Messages If async is enabled in producer.mode, this lets you specify a maximum number of unsent messages to queue. Once this number is reached, either the producer must be blocked or data must be dropped.
queue.buffering.max.messages If async is enabled in producer.mode, this lets you specify a maximum number of messages to buffer.
Queue Enqueue Timout If async is enabled in producer.mode, this lets you specify an amount of time to block before dropping messages when the buffer has reached the value specified in queue.buffering.max.messages. If you set this option, to 0 events will be enqueued immediately or dropped if the queue is full. If you set this option to -1, the producer will block indefinitely.
Batch Messages If async is enabled in producer.mode, this lets you specify the number of messages to send in one batch. With this option enabled, the producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
Send Buffer Size Socket write buffer size.

Using AWS Kinesis as a Sink

To use AWS Kinesis as a sink, you need to specify at minimum the Kinesis stream name.

Option Name Description
Kinesis Stream Name Required. Name of Kinesis stream to write to. No default.
Kinesis Region Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
Partition ID Optional. Partition id of shard to write to. Defaults to “. Can be overwritten by a stream column named PARTITION_ID.
Buffer Size Maximum number of bytes per update request.
Max Retries Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.
Initial Backoff Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.
Max Backoff Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.
Max Records Per Request Optional. maximum number of records per update request. Defaults to 500.
AWS Profile Path See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:

[default]aws_access_key_id = xxxaws_secret_access_key = yyy
This defaults to “ - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide .
AWS Profile Name Optional. Profile name to use within credentials file. Defaults to default.
Report Frequency Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means “never.”