Integrating Kinesis

Using s-Server, you can read from and write to Amazon Kinesis streams.

This topic contains the following subtopics:

Setting Up an AWS Profile Path

In order to read from or write to Kinesis streams, you need to first set up an AWS profile path.

The aws_access_key_id and secret_access_key can be set up on the AWS Console as follows:

  1. Open the AWS Console.
  2. Click Identity & Access Management
  3. Click Users.
  4. Click your User ID.
  5. Create an Access Key.
    When you create an access key, the AWS console will allow you to download a credentials file which will contain the values for aws_access_key_id and secret_access_key.

Reading from Amazon Kinesis

Because of the particular nature of Amazon Kinesis streams, s-Server uses an agent to read from Kinesis streams. Kinesis agents run in s-Server and connect to Amazon Kinesis using options that you pass to the agent. The agent works as a Kinesis consumer. For more information on Kinesis consumers, see https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-consumers

The Kinesis agent is best used within the AWS system. You can read from Kinesis from outside AWS, but this will entail sending rows over the Internet.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.

Reading from Kinesis involves the following steps:

Setting Up a Destination for Rows

As with other agents, you will need to set up a native stream as a target for the Kinesis data to be read by the agent. As with other sources, this data needs to be parsed. Kinesis, like Kafka and similar systems, stores data in chunks--known as "blobs"--that can be any type of data. See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html for more information.

You set a parser and pass parser options to the agent. See Parser Types for Reading for a list of parser choices.

You give each agent a name, such as my_kinesis_stream. Each agent reads from exactly one Kinesis stream.

You start agents using START_AGENT_HOSTER and stop agents using STOP_AGENT_HOSTER. You will need to restart agents if s-Server is stopped. (Unlike pumps, which start up automatically when s-Server restarts, agents do not start up automatically.) Best practice is to start pumps before agents, so that you do not lose any rows sent by agents.

Agent names are internal to s-Server. This means that you can use agents with the same name on separate instances of s-Server, even if these are running on the same machine.

Simple Example of Starting Agent

The agent is started with code along the following lines. You can issue this code in SQLline or another JDBC client.

CALL SYS_BOOT.MGMT.START_AGENT_HOSTER('my_kinesis_agent',
'kinesis',

'SCHEMA_NAME=myschema&
TABLE_NAME=mystream&
PARSER=CSV&
--The following are settings for the CSV parser.
CHARACTER_ENCODING=ISO-8859-1&
--The following are Kinesis-specific options

AWS_REGION=us-west-1&
AWS_PROFILE_NAME=my_profile&
AWS_PROFILE_PATH=~/.aws/credentials&
KINESIS_STREAM_NAME=test&
KINESIS_APPLICATION_NAME=testrun3&
STREAM_FANOUT=1&
KINESIS_INITIAL_POSITION_IN_STREAM=TRIM_HORIZON&
KINESIS_MAX_RECORDS_PER_GET=1500&
KINESIS_SOCKET_TIMEOUT=-1&
KINESIS_IDLE_TIME_BETWEEN_READS=-1');

The first parameter ("my_agent" above) is the name you are assigning to the agent for use in subsequent calls.

The second parameter ("kinesis" above) is the agent type - in this case kinesis.

Third parameter is either the path to a properties file or list of properties separated by '&'.

Kinesis Agent Options

Option Name Description
SCHEMA_NAME s-Server schema in which the native stream to which data will be read resides. You need to create this before running the agent.
TABLE_NAME s-Server native stream to which data will be read. You need to create this before running the agent.
KINESIS_STREAM_NAME Required. Name of Kinesis stream to read from. No default.
AWS_REGION Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
AWS_PROFILE_PATH Must point to a credential file on the s-Server file system with the following format:
[default]
aws_access_key_id = xxxaws_secret_access_key = yyy

This defaults to ''" - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis.
See Setting Up an AWS Profile Path below.
AWS_PROFILE_NAME Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html
KINESIS_INITIAL_POSITION_IN_STREAM LATEST for latest or TRIM_HORIZON for earliest. Defaults to LATEST.
STREAM_FANOUT If > 1, instead of writing to TABLE_NAME will instead write to TABLE_NAME1 through TABLE_NAMEn depending on shard coming in.shard 1 will go to TABLE_NAME1. shard n will go to TABLE_NAMEn. shard n+1 will go to TABLE_NAME1 etc.
KINESIS_APPLICATION_NAME Identifies client in cloud watch (defaults to sqlstream). Identifies a group of consumers processes that will collaborate to process output from the source Kinesis stream. If two agents read from the same Kinesis stream with the same KINESIS_APPLICATION_NAME they will share out the Kinesis shards between them. Whereas if the two agents use two different KINESIS_APPLICATION_NAMEs they will both independently read all the data from the stream.
KINESIS_MAX_RECORDS_PER_GET (default -1) if > 0 will read multiple records per request. (Usually a large number is better.)
KINESIS_SOCKET_TIMEOUT Defaults to -1, which means leave at Kinesis setting. If set, will override Kinesis socket timeout in milliseconds.
KINESIS_IDLE_TIME_BETWEEN_READS Defaults to -1, which means leave at Kinesis setting. If set, will override Kinesis time between reads in milliseconds.
OPTIONS_QUERY Lets you query a table to update adapter options at runtime. You can use this to set options from a configuration table, as in select AWS_REGION, AWS_PROFILE_NAME, AWS_PROFILE_PATH from TEST.kinesis_options. For more details see the topic Using the Options Query Property.

Provenance Columns for Kinesis

In reading from Kinesis, you can declare provenance columns. These return metadata for the Kinesis stream from which you are reading.

These are as follows:

Data Type Name Value
INTEGER SQLSTREAM_PROV_KINESIS_PARTITION_ID Returns partition key for current Kinesis partition. See https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key
BIGINT SQLSTREAM_PROV_KINESIS_SEQUENCE_NUMBER Sequence number for Kinesis data record. See https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#sequence-number

Stopping the Agent

To stop the agent, issue code along the following lines:

call sys_boot.mgmt.stop_agent_hoster('my_agent');

Extended Example with JSON Parsing

The code below reads from a Kinesis stream called production-stream, and reads data into an s-Server stream called "kinesis_sink_read_json" in a schema called "myschema". It passes options in for JSON parsing. See Reading JSON for more details on these options.

- Read data from the TARGET Kinesis stream; not normally used but can be helpful for debugging etc
- Read into Sink_As_Source.kinesis_sink_read_json
- Expand JSON into the columns as defined for the output
-- agent_name = 'target_listen_json'
-- application_name = 'sqlstream_3'
call sys_boot.mgmt.start_agent_hoster('target_listen_json','kinesis','PARSER=JSON&
SCHEMA_NAME=myschema&
TABLE_NAME=kinesis_sink_read_json&
AWS_REGION=us-east-1&
AWS_PROFILE_NAME=default&
AWS_PROFILE_PATH=&
KINESIS_STREAM_NAME=production-stream&
KINESIS_APPLICATION_NAME=sqlstream_3&
STREAM_FANOUT=1&
KINESIS_INITIAL_POSITION_IN_STREAM=LATEST&
KINESIS_MAX_RECORDS_PER_GET_RECORDS=-1&
KINESIS_SOCKET_TIMEOUT=-1&
KINESIS_IDLE_TIME_BETWEEN_READS=-1&
PARSER=JSON&
ROW_PATH=$&
device_key_PATH=$.device_key&
model_code_PATH=$.model_code&
latitude_PATH=$.latitude&
longitude_PATH=$.longitude&
recorded_at_PATH=$.recorded_at&
channel_PATH=$.sensor_readings[0:].channel&
sensor_type_PATH=$.sensor_readings[0:].sensor_type&
metric_value_PATH=$.sensor_readings[0:].metric_value&
other_value_PATH=$.sensor_readings[0:].other_value&
value_PATH=$.sensor_readings[0:].value&
unit_PATH=$.sensor_readings[0:].unit');

Writing to Amazon Kinesis

Writing to Amazon Kinesis Using SQL

The Kinesis ECDA adapter writes batches of data to a Kinesis stream. You can specify CSV, Avro, XML, JSON, or BSON as a format for the data. In order to write to Kinesis, you must first define a server object for the Kinesis server. This topic describes setting up and performing an INSERT into a foreign stream in order to write data to a Kinesis server. The minimum credentials required to write to a Kinesis stream are the stream name and region.

This adapter will work best when run from within AWS. If you write to it from outside AWS, all rows will be sent over the Internet. (You would also need to install credentials from an account to do so.)

This topic contains the following subtopics:

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Sample Code

To write to Kinesis, you need to create a foreign stream in SQL that references a prebuilt server object called KINESIS_SERVER. In the foreign stream's options, you configure how s-Server connects to Kinesis. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.

Like all streams (but unlike server objects or data wrappers), all streams must be defined within a schema. The following code first creates a schema called KinesisWriterSchema then creates a foreign stream called KinesisWriterStream with the predefined server KINESIS_SERVER as a server option. To transfer data into Kinesis from this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kinesis-specific options. (These options are discussed below.)

Here is an example of the SQL used to define a foreign stream for the Kinesis adapter. When you INSERT into this stream (using a pump), s-Server writes data to the defined Kinesis location.

CREATE OR REPLACE SCHEMA KinesisWriterSchema
SET SCHEMA 'KinesisWriterSchema';

CREATE OR REPLACE FOREIGN STREAM KinesisWriterStream
("ts" TIMESTAMP NOT NULL,
"transactionCount" INT NOT NULL)
SERVER KINESIS_SERVER
OPTIONS
(KINESIS_STREAM_NAME 'AggregatedData',
AWS_REGION 'us-west-1',
formatter 'CSV',
row_separator '',
character_encoding 'UTF-8');

CREATE OR REPLACE SCHEMA Pumps;
SET SCHEMA 'Pumps';

CREATE OR REPLACE PUMP writerPump STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO KinesisWriterSchema.KinesisWriterStream
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP Pumps.writerPump START;

If one of the foreign stream columns is named PARTITION_ID, that will override the PARTITION_ID option.

Foreign Stream Options for Writing to a Kinesis Stream

Option Name Description
KINESIS_STREAM_NAME Required. Name of Kinesis stream to write to. No default.
AWS_REGION Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
AWS_PROFILE_PATH See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:[default]aws_access_key_id = xxxaws_secret_access_key = yyy This defaults to '' - which goes to ~/.aws/credentials.Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide .
AWS_PROFILE_NAME Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html
INITIAL_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.
MAX_BACKOFF Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.
MAX_RETRIES Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.
BUFFER_SIZE Optional. Maximum number of bytes per update request. Defaults to 4194304.
MAX_RECORDS_PER_REQUEST Optional. maximum number of records per update request. Defaults to 500.
REPORT_FREQUENCY Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never".
KINESIS_DEFAULT_PARTITION_ID Optional. Partition id of shard to write to. Defaults to ''.

Writing to Amazon Kinesis Using the ECD Agent

You can use the ECD agent to write files to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix
FORMATTER=CSV'
CHARACTER_ENCODING=UTF-8
SEPARATOR=,
KINESIS_STREAM_NAME=AggregatedData
AWS_REGION=us-west-1
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=stocks
TABLE_NAME=output_file
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.test.properties --io kinesis

Format Type Options

Other options are specific to format type. See Output Formats for Writing.

Note: Parameters that require an ampersand as a value, such as QUOTE_CHARACTER '&', for CSV parsing cannot be passed. (Parameter names are currently passed as a strong of key=value pairs delimited by ampersands, such as "key1=value1&key2=value2". There is currently have no way of escaping or quoting the ampersand character,)

Using Partition IDs to Write to a Specific Shard

Amazon Kinesis limits write rates to 1000 record/sec/shard or 1Mb/sec/shard, whichever is lower. So high throughput applications need to split data across many shards. You can do so by setting a partition key, using a column in the foreign stream named "PARTITION_ID". The value in this column will not be delivered to the target stream, but it will be used as the basis for choosing which shard the record gets sent to (the value gets hashed). For example, you might shard on a device_key (so readings from a given device always go to the same shard). Of course it is important to consider how downstream applications are going to consume the shards.

If you do not use a PARTITION_ID column, all data is written to the shard defined in the KINESIS_DEFAULT_PARTITION_ID parameter.

More Complex Example with Partitioning

The following code uses a device indicator in order to partition output on Kinesis. In this way, data is "sharded" by device id. See http://docs.aws.amazon.com/streams/latest/dev/key-concepts.html for more information about shards and partition ids. ​

CREATE OR REPLACE SCHEMA "Kinesis_Schema";
SET SCHEMA '"Kinesis_Schema"';

 -- Define the stream

CREATE FOREIGN STREAM "kinesis_output_sink" (
 --populated by pump from device_key to shard the output
"PARTITION_ID" varchar(32),
"Wind_Chill" DOUBLE,
"Barometric_Pressure" DOUBLE,
"Humidity" DOUBLE,
"Leak_Detection" DOUBLE,
"Rainfall" DOUBLE,
"Rainrate" DOUBLE,
"Remote_Humid" DOUBLE,
"Remote_Temp" DOUBLE,
"Wind_Direction" DOUBLE,
"runningCount" BIGINT,
"readings_uuid" VARCHAR(32),
"device_key" VARCHAR(32),
"model_code" VARCHAR(16),
"latitude" DOUBLE,
"longitude" DOUBLE,
"recorded_at" VARCHAR(32)
)
SERVER KINESIS_SERVER
OPTIONS (
"FORMATTER" 'JSON',
"KINESIS_STREAM_NAME" 'production',
"AWS_REGION" 'us-east-1',
 --"KINESIS_DEFAULT_PARTITION_ID" '',
"BUFFER_SIZE" '4194304',
"MAX_RETRIES" '10',
"INITIAL_BACKOFF" '20',
"MAX_BACKOFF" '20480',
"MAX_RECORDS_PER_REQUEST" '500',
"AWS_PROFILE_NAME" 'default',
"AWS_PROFILE_PATH" '',
"REPORT_FREQUENCY" '0'
)
;

CREATE FOREIGN STREAM "Kinesis_Output_Sink"
( "PARTITION_ID" varchar(32)populated by pump from device_key to shard the output
, "json" VARCHAR(3000)
)
SERVER KINESIS_SERVER
OPTIONS
( "FORMATTER" 'CONCATENATE'
, "KINESIS_STREAM_NAME" 'production-a_production_mar_sensor_readings_post_sql_stream'
, "AWS_REGION" 'us-east-1'
, "BUFFER_SIZE" '4194304'
, "MAX_RETRIES" '10'
, "INITIAL_BACKOFF" '20'
, "MAX_BACKOFF" '20480'
, "MAX_RECORDS_PER_REQUEST" '500'
, "AWS_PROFILE_NAME" 'user1'
, "AWS_PROFILE_PATH" ''
, "REPORT_FREQUENCY" '0'
)
;

CREATE PUMP "kinesis_output_sink-Pump" STOPPED AS

INSERT INTO "Kinesis_Output_Sink"
("PARTITION_ID" varchar(32),
 --populated by pump from device_key to shard the output
"Wind_Chill" DOUBLE,
"Barometric_Pressure" DOUBLE,
"Humidity" DOUBLE,
"Leak_Detection" DOUBLE,
"Rainfall" DOUBLE,
"Rainrate" DOUBLE,
"Remote_Humid" DOUBLE,
"Remote_Temp" DOUBLE,
"Wind_Direction" DOUBLE,
"runningCount" BIGINT,
"readings_uuid" VARCHAR(32),
"device_key" VARCHAR(32),
"model_code" VARCHAR(16),
"latitude" DOUBLE,
"longitude" DOUBLE,
"recorded_at" VARCHAR(32)
)
SELECT STREAM
"device_key" varchar(32), -- use this as partition id
"Wind_Chill" DOUBLE,
"Barometric_Pressure" DOUBLE,
"Humidity" DOUBLE,
"Leak_Detection" DOUBLE,
"Rainfall" DOUBLE,
"Rainrate" DOUBLE,
"Remote_Humid" DOUBLE,
"Remote_Temp" DOUBLE,
"Wind_Direction" DOUBLE,
"runningCount" BIGINT,
"readings_uuid" VARCHAR(32),
"device_key" VARCHAR(32),
"model_code" VARCHAR(16),
"latitude" DOUBLE,
"longitude" DOUBLE,
"recorded_at" VARCHAR(32)
FROM "Output_Sink"
;