Integrating Kafka

Using s-Server, you can read from and write to Kafka. To read and write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. To read and write from remote locations, you configure such options using a properties file and launch the Extensible Common Data agent at the command line.

This topic contains the following subtopics:

To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. Data can be in CSV, Avro, XML, JSON, or BSON, or other formats. You specify a parser as part of configuration options. See Reading from Kafka Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from Kafka Using the ECD Agent below.

Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

See the overview at Reading from Other Sources for more details.

The Kafka adapter is based on the new Kafka client API v.0.10.0.X The adapter leverages features available on Kafka brokers up to v.10.0.X. This API works with Kafka v1.0 and with the Confluent distribution from at least v3.3 -v4.0.

As of Kafka 0.10, every Kafka message has a timestamp field, specifying the time when the message was produced. The new adapter uses this feature to let you start processing at a given timestamp. Basically, the adapter identifies all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME.

Once you set up a foreign stream for Kafka in s-Server, each query on the stream reads from a Kafka topic via a Kafka consumer. If multiple queries are run on the same stream, and the Kafka topic has partitions, Kafka will distribute data among these queries according to partition. In many cases, you may need to aggregate multiple queries in order to achieve the right results. You can also specify a partition in foreign stream options. See Using the Kafka ECD to Process Partitioned Streams below.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide.

Reading from Kafka Using SQL

To read from Kafka, you need to create a foreign stream in SQL that references a prebuilt server object called KAFKA_SERVER. The foreign stream's definition contains connection information for the Kafka server. In Kafka's case, you can also define provenance columns that let you pass metadata on the Kafka source into s-Server. See Using Provenance Columns below. See the CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide for more details on creating a foreign stream.

You will also need to specify a parser for the foreign stream, such as 'CSV'. Specifying "parser" as a foreign stream option tells s-Server that this foreign stream reads data. See Parsers for Reading in this guide for more details.

Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named KafkaStream.

CREATE OR REPLACE SCHEMA KafkaSource;
SET SCHEMA 'KafkaSource';

CREATE OR REPLACE FOREIGN STREAM KafkaReaderStream
(kafka_offset BIGINT NOT NULL,
"ts" TIMESTAMP NOT NULL,
"KAFKA_PARTITION" INT NOT NULL,
--special column for Kafka partition
"zipcode" CHAR(10) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'AggregatedData',
seed_brokers 'localhost',
starting_time '2020-02-01 22:23:45:892',
max_poll_records '2MB'
parser 'CSV',
character_encoding 'UTF-8',
skip_header 'false');

Input

As with all ECDA adapters, data is only moved from Kafka to s-Server once you execute a SELECT statement on the foreign stream that you have defined for the adapter. (Often, you will define a SELECT statement on the foreign stream as part of a pump, in which case data moves once you start the pump.) For example, the following code block copies the values of the columns partition, zipcode, transactionTotal, and transactionCount into s-Server using the foreign stream defined above, KafkaSourceStream.

SELECT STREAM "ts" AS ROWTIME, "partition", "zipcode", "transactionTotal", "transactionCount"
FROM "KafkaSourceStream"
ORDER BY "ts" WITHIN INTERVAL '2' SECOND;

Foreign Stream Options for Reading from Kafka

Option Description
TOPIC Required. Kafka Topic. You can use a regular expression as a "topic wild card." (This is not supported by legacy versions of the adapter.)
STARTING_TIME Either EARLIER, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
INDEX_TOPIC_NAME This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
STARTING_OFFSET When to start reading from (default is -1) as a long int representing a timestamp (milliseconds since epoch)
SEED_BROKERS A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Deprecated for Kafka10 adapter. For legacy adapter, port for Kafka seed broker
MAX_POLL_RECORDS Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the deafult) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY This is a SQL query text that fetches starting offsets for all partitions of the topic. For example, SELECT "TOPIC", "PARTITION", "OFFSET" FROM stored_offsets;'PARTITION should be of type INTEGER.OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
FETCH_SIZE Fetch size. Defaults to 1000000.
CLIENT_ID For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property group.id)Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
METRICS_PER_PARTITION True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
OPTIONS_QUERY Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as inselect lastOffset as STARTING_OFFSET from TEST.committedOffset');
kafka.consumer.config Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
isolation.level Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.

Provenance Columns for Kafka

Option Description
SQLSTREAM_PROV_KAFKA_HEADERS When you specify this column and select it in a query, it returns any headers for Kafka message topics as key-value pairs in serialized text format.

Using Provenance Columns

In reading from Kafka, you can declare several "provenance columns." These return metadata for the Kafka topic for which you are reading.

These are as follows:

Provenance Column Value
KAFKA_PARTITION Returns name of Kafka partition.
KAFKA_OFFSET Returns value of current Kafka_offset.
KAFKA_TOPIC Returns name of Kafka column.
KAFKA_TIMESTAMP Returns current Kafka_timestamp.
KAFKA_KEY Returns key value for message.

Using the Options Query

You can use the Options Query to read from a configuration table. You can then use this table to update adapter options at SQL start time. Usually, this will be when you start application pumps.

You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset.

You can create a configuration table (or view) and read from it using the OPTIONS_QUERY option (not property).

The Options Query returns one row. Table column names should be set for property names and row value should be set for property value.

If you set the OPTIONS_QUERY property to

select * from conf

and that query returns 1 row with 1 column called TOPIC containing the value "topic1", then the adapter is configured with the TOPIC property set to "topic1." Each time the adapter runs its configuration gets dynamically computed from the conf table. You can also use views for the OPTIONS_QUERY.

CREATE OR REPLACE FOREIGN STREAM testOut (
KAFKA_OFFSET BIGINT NOT NULL,
line VARCHAR(4096))
SERVER KAFKASERVER
OPTIONS (TOPIC 'testThroughput', OPTIONS_QUERY 'select lastOffset as STARTING_OFFSET from TEST.committedOffset');

Using the Kafka ECDA to Process Partitioned Streams

This topic describes an example of setting up SQL to process a pipeline handling one partition. For each partition you can have N pipelines (often on separate instances of s-Server) listening to that partition, where N is your redundancy level. It assumes two Kafka topics have been set up:

  • TransactionData. This takes something like credit card transactions. It should be partitioned with some kind of round robin scheme.
  • AggregatedData. This will be used to communicate between the agregation pipeline servers an the rollup server.

Each pipeline will

  1. Read from the partition of a topic.
  2. Parse incoming data into columns using the ECDA CSV Parser.
  3. Lookup in address table zipcode for recipient of transaction.
  4. Aggregate by zipcode transaction amounts and counts by second.

In order to ensure all pipelines for the same partition output the same data, the code discards data for the first second's aggregation. This lets you restart an instance of s-Server running pipelines at any time without affecting results.

Results are written to the AggregatedData topic. One or more instances of s-Server will then read that AggregatedData topic, discarding duplicate rows. Aggregates are then rolled up and written to a stream.

CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA10'
FOREIGN DATA WRAPPER ECDA

CREATE OR REPLACE FOREIGN STREAM "KafkaPartitionedInputStream"
(KAFKA_OFFSET BIGINT NOT NULL,
"ts" TIMESTAMP NOT NULL,
"cardNumber" BIGINT NOT NULL,
"zipcode" CHAR(10) NOT NULL,
"transactionAmount" DOUBLE NOT NULL,
"recipientId" BIGINT NOT NULL,
"transactionId" BIGINT NOT NULL)
SERVER "KafkaServer"
OPTIONS
(topic 'TransactionData',
"PARTITION" '1',
"SEED_BROKERS" 'localhost',
"PORT" '9092',
"STARTING_TIME" 'latest',
parser 'CSV',
character_encoding 'UTF-8',
skip_header 'false');

CREATE OR REPLACE FOREIGN STREAM "KafkaOutputStream"
(
"ts" TIMESTAMP NOT NULL,
"partition" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"
OPTIONS
(topic 'AggregatedData',
"metadata.broker.list" 'localhost:9092',
parser 'CSV',
row_separator '',
character_encoding 'UTF-8');

-- Source columns we're interested in
CREATE OR REPLACE VIEW "sourceData" AS
SELECT STREAM "ts" AS ROWTIME, "zipcode", "transactionAmount"
FROM "KafkaPartitionedInputStream";

CREATE OR REPLACE FUNCTION "getZipcode"(
​     inputRows CURSOR,
​        dsName VARCHAR(64),
​     tableName VARCHAR(128),
​       colName VARCHAR(128),
​     cacheSize INTEGER,
  prefetchRows BOOLEAN,
  fuzzyLookups BOOLEAN)
  RETURNS TABLE(
​       inputRows.*,
​       "zipcode" CHAR(5))
  LANGUAGE JAVA
  PARAMETER STYLE SYSTEM DEFINED JAVA
  NO SQL
  EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

-- adorn with zipcode using in memory lookup if possible
CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));

CREATE OR REPLACE VIEW "aggregatedData" AS
SELECT STREAM "zipcode", SUM("transactionAmount") AS "transactionTotal", COUNT(*) AS "transactionCount"
FROM "sourceData"
GROUP BY FLOOR((("sourceData".ROWTIME - timestamp '1970-01-01 00:00:00') second)/10 to second), "zipcode";

-- Creates output pump
-- Does not output first group of rows (all rows in group will have same rowtime)
-- as this group may be partial if restarting after failure.

CREATE OR REPLACE PUMP "aggregatedDataOutputPump" STOPPED AS
INSERT INTO "kafkaAgg1a"."KafkaOutputStream"
SELECT STREAM ROWTIME AS "ts", 1 AS "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *, a.ROWTIME as thistime, FIRST_VALUE(a.ROWTIME) OVER (ROWS UNBOUNDED PRECEDING) as firsttime from "kafkaAgg1a"."aggregatedData"a) b
where firsttime <> thistime;

ALTER PUMP "aggregatedDataOutputPump" START;

Using the Kafka Adapter for Fault Tolerance

To ensure fault tolerance, you can set up multiple instances of s-Server to listen to each partition, using a program like Puppet to start servers and pipelines. You can run more than one pipeline on each version of s-Server, but you cannot have multiple pipelines on the same server listening to the same partition.

The diagram below shows multiple instances of s-Server listening to Kafka partitions. This prevents data loss if an s-Server instance goes down.

Adding and Removing Processing Nodes for Kafka

After aggregating data in multiple instances of s-Server, you can create one or more s-Server pumps in order to write to Kafka topics, and add and remove these pumps using the ALTER PUMP command.

Implementing the Kafka ECDA Adapter to Pump Data to Kafka

Every time you use an adapter, you need to implement it within an s-Server schema. The following code first creates a schema and implements the Kafka ECDA adapter.

CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "KafkaAggregatedData"
(offset LONG NOT NULL,
"ts" TIMESTAMP NOT NULL,
"PARTITION" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"

OPTIONS
(topic: 'AggregatedData',
seed_brokers: 'localhost',
starting_time: 'latest',
parser 'CSV',
character_encoding 'UTF-8',
skip_header 'false');

Setting up the Pump

The code below creates a stream with three columns, zipcode, transactionTotal, and transactionCount. This stream will be used to pump data from a Kafka topic.

CREATE OR REPLACE STREAM "AggregatedData"(
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL);

The next code block creates a view on the foreign stream KafkaAggregatedData, ordered by timestamp ("ts") and selecting the columns PARTITION, zipcode, transactionTotal, and transactionCount.

CREATE OR REPLACE VIEW "AggregatedDataWithRowTime" AS
SELECT STREAM "ts" AS ROWTIME, "PARTITION", "zipcode", "transactionTotal", "transactionCount"
FROM "KafkaAggregatedData"
ORDER BY "ts" WITHIN INTERVAL '2' SECOND;

The next code block uses a WHERE statement to identify and discard duplicate rows.

CREATE OR REPLACE VIEW "AggregatedData" AS
SELECT STREAM "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *,
COUNT(*) OVER (PARTITION BY "partition", "zipcode" RANGE INTERVAL '0' SECOND PRECEDING) AS c
FROM "AggregatedDataWithRowTime") AS dd
WHERE c = 1;

The next code block pumps the the columns zipcode, the total of column transactionTotal and the total of column transactionCount.

CREATE OR REPLACE PUMP "rollupPump" STOPPED AS
INSERT INTO "AggregatedData"
SELECT STREAM "zipcode", sum("transactionTotal"), sum("transactionCount")
FROM "AggregatedDataDeDuped"
GROUP BY "AggregatedDataDeDuped".ROWTIME, "zipcode";

Starting and Stopping the Pump

You can then start and stop the pump using the ALTER PUMP command:

ALTER PUMP "rollupPump" START;
ALTER PUMP "rollupPump" STOP;

See the topic ALTER PUMP in the SQLstream Streaming SQL Reference Guide for more details on starting and stopping pumps.

Using the TableLookup UDX to Prefetch a Partitioned Part of a Kafka Topic

Once you have read in data from Kafka and created a view of this data, as in Using the Kafka ECDA for Fault Tolerance, you can use the TableLookup UDX to prefetch a partitioned portion of a database to form a preloaded cache. The code below prefetches zip codes.

CREATE OR REPLACE FUNCTION "getZipcode"(
     inputRows CURSOR,
        dsName VARCHAR(64),
     tableName VARCHAR(128),
       colName VARCHAR(128),
     cacheSize INTEGER,
  prefetchRows BOOLEAN,
  fuzzyLookups BOOLEAN)
  RETURNS TABLE(
       inputRows.*,
       "zipcode" CHAR(5))
  LANGUAGE JAVA
  PARAMETER STYLE SYSTEM DEFINED JAVA
  NO SQL
  EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

You can then use the function to prefetch zip code from a partitioned portion of a database:

CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));

Reading from Kafka Using the ECD Agent

You can use the ECD agent to read files from remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.

Writing to Kafka

The Kafka ECDA adapter writes batches of data to a Kafka topic. In order to write to a Kafka topic, you must first define a server object for the Kafka server with information on its seed broker(s) and topic (at minimum). s-Server writes to Kafka in data formatted as CSV, JSON, XML, or BSON.

The Kafka adapter works with Kafka 0.10. It uses atomic commitments. Using it, you can perform exactly once semantics between s-Server and Kafka. See the subtopic exactly once semantics below.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Many of the options for the ECD adapter and agent are common to all I/O systems. The topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter. To actually write data, you INSERT into the foreign stream.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Writing to Kafka Using SQL

Like all streams (but unlike server objects or data wrappers), foreign streams must be defined within schemas. The following code first creates a schema called KafkaWriterSchema then creates a foreign stream called KafkaWriterStream with the predefined server _KAFKA10SERVER as a server option. To transfer data into Kafka using this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kafka-specific options. (These options are discussed below.)

Here is an example of the SQL used to define a foreign stream for the Kafka adapter:

CREATE OR REPLACE SCHEMA KafkaWriterSchema
SET SCHEMA 'KafkaWriterSchema';

CREATE OR REPLACE FOREIGN STREAM KafkaWriterStream
(
"ts" TIMESTAMP NOT NULL,
"partition" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'AggregatedData',
"metadata.broker.list" 'localhost:9092',
formatter 'CSV',
row_separator '',
character_encoding 'UTF-8');

CREATE OR REPLACE SCHEMA Pumps;
SET SCHEMA 'Pumps';
--we recommend creating pumps in their own schema
--so that you can stop or start all of them at once.

CREATE OR REPLACE PUMP writerPump STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP Pumps.writerPump START to start it
INSERT INTO KafkaWriterSchema.KafkaWriterStream
SELECT STREAM * FROM MyStream;
--where"MyStream is a currently existing stream

To start writing data, use the following code:

ALTER PUMP Pumps.writerPump START;

Foreign Stream Options for Writing to Kafka

The following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation.html. Aside from defaults, the information in this table is drawn from this page. *

Option Name Description
topic Kafka topic
TRANSACTION_ROWTIME_LIMIT Kafka10 adapter only.

Range in milliseconds. Allows all rows received from the input query that have ROWTIME values within the specified range to be committed in a single transaction to the Kafka broker. Transactions are used only if the Kafka broker supports transaction semantics. If this option is set to 1000 (1 second), then all rows with ROWTIME between 10:00:00.000 and 10:00:00.999 get committed in the same transaction atomically.
PORT Deprecated option. As of s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
Kafka: metadata.broker.list

Kafka10 adapter: bootstrap.server(case sensitive; must be lowercase and quoted)
hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
key.serializer Case sensitive; must be lowercase and quoted. Names a serializer class for keys. If no class is given, Kafka uses value.serializer.
partitioner.class Deprecated for Kafka10

Fully qualified Java classname of Kafka partitionerDefaults tocom.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
compression.type If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip'
retry.backoff.ms Case sensitive; must be lowercase and quoted. Producers refreshes metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
request.timeout.ms Case sensitive; must be lowercase and quoted. When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
send.buffer.bytes Case sensitive; must be lowercase and quoted. Socket write buffer size.
client.id Case sensitive; must be lowercase and quoted. Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
transactional.id Case sensitive; must be lowercase and quoted.

This is the transaction ID used by the KafkaWriter instance for the given foreign stream.Each foreign stream should use a unique "transactional.id" to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2 . These support transaction semantics.

Note: You need to create a separate foreign stream definition for each pump that inserts(publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.
pump.name Case-sensitive; must be quoted. Kafka10 adapter only.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option.s-Server uses this setting to determine the mode in which the pump instance itself) is running (Leader or Follower) when configured to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
linger.ms In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds)
batch.size Number of messages sent as a batch. Defaults to '1000'
kafka.producer.config Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
HA_ROLLOVER_TIMEOUT Kafka10 adapter only.

Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option.When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader".There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same "transactional.id" continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
POLL_TIMEOUT This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
COMMIT_METADATA_COLUMN_NAME Using this option, you can commit a stringified value of the specified column along with its ROWTIME in a CSV format, along with the offset of the last published message for each partition in a transaction.

The format of the metadata string is as follows:
CommitRowtimeInMillisFromEpoch>,<metadata_column_value>
seed_brokers Legacy adapter only.
A comma separated list of broker identifiers in the format "<broker_host_name>:<port>".

For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server.

Because these options involve storing passwords, you should implement these using JNDI properties. The SQL/MED framework used by s-Server supports loading options through .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/.properties. See Using Property Files.

We recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/.properties (Applies to all foreign streams that use this foreign server definition.)
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB...properties (Applies only to the foreign stream.)
Option Name Description
ssl.truststore.location Location of the trust store file.
ssl.truststore.password Password for the trust store file.
ssl.keystore.location The location of the key store file. Use this to implement two-way authentication for the Kafka client used by s-Server.
ssl.keystore.password The store password for the key store file. Only needed if you have configured ssl.keystore.location.
ssl.key.password The password of the private key in the key store file.

Using Columns to Specify Partition and Key for Kafka Topics

When writing rows out of s-Server to a Kafka topic, you can specify 1) partition and 2) key by including columns named, respectively, kafka_partition and kafka_key. Data will be written as a message to the indicated partition in the topic, and kafka_key will serve as the first part of the key-value pair that constitutes a Kafka message in Kafka.

If you do not include a kafka_partition column in the foreign stream, Kafka will assign a partition number to the message being published. If you have included a kafka_key column, partition number is determined by hashing the kafka_key column. If you have not included the kafka_key column, then Kafka assigns partition numbers by a "round robin" method.

If you do not include a kafka_key column in the foreign stream, (key, value) will be (null,value) where value is the serialized row using the FORMATTER option. Our KAFKA10 plugin excludes the columns kafka_key & kafka_partition columns from getting serialized in values.

Using Atomic Commitments

As of 6.0.0, s-Server writes messages to Kafka atomically. A multi-step operation (writing multiple records to a database as one transaction) is said to be atomic when it requires all the steps in the transaction to be completed successfully or else rolls back all changes. In a ten-row insertion, for example, either all 10 rows are inserted successfully or none at all.

In order to commit insertions atomically, our adapter makes use of a new Kafka broker that supports transactional semantics. As a result, in order to implement atomic commitments, you must be using Kafka v1.0 or later. When creating a foreign stream that connects to Kafka v1.0 or later, use the prebuilt server KAFKA10_SERVER in the foreign stream definition. (KAFKA_SERVER is the legacy Kafka plugin).

To implement atomic commitments:

  1. Set TRANSACTION_ROWTIME_LIMIT = 60000 (1 minute),
  2. The adapter begins a new transaction at the top of a minute (that is, ROWTIME = FLOOR(ROWTIME TO MINUTE))
  3. Start sending messages to the Kafka broker until you receive the last row for the minute
  4. When the first row is received for the next minute, commit the transaction opened earlier and start a new transaction.
  5. Promote ROWTIME to KAFKA_TIMESTAMP.

You also need to use an OPTIONS_QUERY in the foreign stream, along the following lines:

    CREATE OR REPLACE FOREIGN STREAM my_kafka10_source (
    KAFKA_TIMESTAMP TIMESTAMP,
    ....
    )
    SERVER KAFKA10_SERVER
    OPTIONS (
        ....
        -- parameters are: bootstrap_server, topic_name, consumerGroup used for the sink
        -- 6 seconds are subtracted from the watermark to replay 6 seconds of history to
        -- to prime up the pipeline.
        "OPTIONS_QUERY" 'SELECT * FROM (VALUES(sys_boot.mgmt.watermark_timestamp(''localhost:9092'', ''result_topic'', ''result_topic_watermark'') - INTERVAL ''6'' SECOND)) AS options(STARTING_TIME)',
        ....
    );

Using Exactly Once Semantics

SQLstream supports exactly once semantics when you use Kafka as both a source (reading from) and a sink (writing to). Exactly once means that even if a producer retries sending a message, the message is delivered exactly once to the end consumer.

We do so by using timestamps from a Kafka topic as the rowtime column for a pipeline. (This process is known as promoting a column to rowtime.) Once you promote Kafka timestamps to rowtime, s-Server can use rowtime as a watermark for the pipeline. In turn, this lets s-Server roll back changes and replay them from the source if writing to a sink fails. In other words, if a write fails, exactly once tells s-Server "do everything that went along with processing those N input rows and write the result"

Any time you want to perform a rowtime promotion, you need to set up a Kafka staging topic.

This process allows s-Server to make sure that all data written to a sink as a result of this input data tracks this watermark. This ensures that s-Server knows that every input row has resulted in a correct write to a sink. This does not always mean a row-to-row correspondence between input and output rows, since rows may be aggregated in a pipeline (using SUM, for example). It means, instead, that all rows have been processed through a pipeline exactly once.

In order to use exactly once, you need to do the following:

  1. Promote a timestamp from the Kafka source to rowtime.
  2. Retain this rowtime throughout the entire pipeline (this happens automatically as long as you do not promote another value to rowtime).
  3. Use atomic commitments when writing to the Kafka sink.

Note: This functionality relies on the Kafka message queue storing data for two weeks. That means you cannot replay more than two weeks of data without adjusting how long your message view stores data.

Specifying the Starting Position for a Kafka Foreign Stream

In order to implement at least or exactly once semantics, you should set STARTING_TIME to a timestamp. This specifies the recovery/replay position for the Kafka topic.

When you promote Kafka message timestamps to ROWTIME (with or without t-sorting), these act as a natural watermarks for time-series analytics pipelines on data streaming from Kafka.

You can specify a starting position can be specified through a number of SQL/MED options in the foreign stream definition. Those options are used using the following precedence order.

  1. PARTITION_OFFSET_QUERY - This is a SQL query text that fetches starting offsets for all partitions of the topic. For example, SELECT "TOPIC", "PARTITION", "OFFSET" FROM stored_offsets;
  2. STARTING_OFFSET - This is a single offset value that can be passed to position within the non-partitioned kafka topic.
  3. STARTING_TIME - This is the recommended option. As described earlier, this option helps achieve exactly once semantics and recovery/replay of time-series analytics based on message timestamps stored in kafka topics.

If none of the above options are specified then committed offsets for the specified consumer group(through the CLIENT_ID option of the foreign stream). Committed offsets are ignored when either of the above options are specified as the starting position.

The best practice is to start a new subscription (the first instance of the query in a consumer group) by specifying the STARTING_TIME option. All subsequent queries will pass neither options. In this way, we use committed offsets as a starting position when we rebalance assigned partitions across all queries with the same CLIENT_ID option.

For example:

CREATE OR REPLACE STREAM OrderStream (
  ...
)

SERVER KAFKA10_SERVER

OPTIONS (
   TOPIC 'OrdersTopic',
   SEED_BROKERS 'localhost:9092',
   PARSER 'CSV'
   OPTIONS_QUERY 'SELECT * FROM utils.kafka_options'
);

CREATE OR REPLACE VIEW utils.kafka_options AS
SELECT *
FROM (VALUES('CONSUMER_GROUP1', '2018-02-01 00:00:00')) AS v(CLIENT_ID, STARTING_TIME);

To pass the STARTING_TIME option, start the first instance of the "distributed" query with the kafka_options view defined:

SELECT STREAM * FROM OrdersStream;

Next, redefine the view that is used to start new instances of the "distributed" query.

CREATE OR REPLACE VIEW utils.kafka_options AS SELECT *
FROM (VALUES('CONSUMER_GROUP1')) AS v(CLIENT_ID);

All subsequent queries on OrdersStream will start without the STARTING_TIME option. s-Server will use offsets committed by earlier instances during automatic partition rebalancing.

SELECT STREAM * FROM OrdersStream;

Kafka allows messages to be slightly unordered. Once the adapter retrieves messages with their respective timestamps, those timestamps can be promoted as ROWTIME with a desirable t-sort interval. Please note that a t-sort introduces a latency equal to the t-sort interval in the stream processing pipeline but is not expected to significantly affect the throughput of the pipeline, particularly for a small t-sort interval. In most real world use cases, a t-sort interval is expected to be in the low single digit seconds. See the discussion of T-sorting Stream Input in the SQLstream Streaming SQL Reference Guide.

In order to implement at least or exactly once semantics, we recommend that you specify the recovery/replay position for the Kafka topic as a timestamp.

SQL Example

The below code depicts a simple stream processing pipeline. Here, we pull in streaming data from a Kafka topic, perform time-series analytics on this data, and deliver it to another Kafka "sink."

CREATE OR REPLACE SERVER "KafkaServer"
TYPE 'KAFKA10'
FOREIGN DATA WRAPPER ECDA;

--Code to read from Kafka topic
CREATE OR REPLACE SCHEMA "OnlineOrders";
SET SCHEMA '"OnlineOrders"';

CREATE OR REPLACE FOREIGN STREAM OrdersStream (
  MESSAGE_TIMESTAMP TIMESTAMP,
  ORDER_ID BIGINT,
  ITEM_ID   INT,
  ITEM_PRICE DECIMAL(10,2),
  ITEM_QUANTITY INT,
  PRODUCT_ID INT
);
SERVER KAFKA10_SERVER
OPTIONS (
  SEED_BROKERS 'localhost:9092',
  TOPIC 'OrdersTopic',
  STARTING_TIME '2018-02-01 00:00:00',
   --parser because we are reading here
  PARSER 'JSON'
);

CREATE OR REPLACE VIEW tSortedOrders AS
SELECT STREAM MESSAGE_TIMESTAMP AS ROWTIME, *
--promotes MESSAGE_TIMESTAMP to ROWTIME
FROM OrdersStream
ORDER BY MESSAGE_TIMESTAMP WITHIN INTERVAL '2' SECOND;

--analytics in a view
CREATE OR REPLACE VIEW AnalyticsView AS
SELECT STREAM *,
  SUM(ITEM_PRICE * ITEM_QUANTITY) OVER lastHour AS quantitySoldLastHour
FROM tSortedOrders
WINDOW lastHour AS (PARTITION BY PRODUCT_ID
​                  RANGE INTERVAL '1' HOUR PRECEDING);

--foreign stream for writing; uses same server
--because connection information is the same
CREATE OR REPLACE FOREIGN STREAM "OnlineOrders"."ProcessedOrdersStream" (
)
SERVER KAFKA10_SERVER
OPTIONS (
  TRANSACTION_ROWTIME_LIMIT '60000', -- 1 minute
   --this tells s-Server to commit rows atomically
  "transactional.id" 'transactional_Processed_Orders',
  "pump.name" 'LOCALDB.OnlineOrders.ProcessedOrdersPump',
  FORMATTER 'JSON'
);
--this is the "sink" that lets us write to Kafka.
--note that data is formatted as JSON.
CREATE OR REPLACE PUMP "OnlineOrders"."ProcessedOrdersPump" STOPPED AS
INSERT INTO "ProcessedOrdersStream"
SELECT STREAM * FROM "OnlineOrders".AnalyticsView;
--this data will be delivered exactly once. Either all rows in the one-minute window
--will be delivered, or no rows will be delivered.

Note that MESSAGE_TIMESTAMP is promoted as ROWTIME. This serves as a watermark for the pipeline.

It is critical to "commit" results of time-series analytics at "ROWTIME" boundaries. In the code above, KafkaSinkStream is configured to atomically commit results every minute (TRANSACTION_ROWTIME_LIMIT '60000'). At each one minute boundary, all rows from AnalyticsView for the last one minute are committed atomically. This ensures exactly once semantics.

Using Transaction_ID for Load Balancing

When a whole pipeline dies, we have to replay everything. This takes the entire time block for TRANSACTION_ROWTIME_LIMIT. For high availability situations, we recommend creating a duplicate pipeline. This will process data in the same order and same order, but will not commit. Using Kafka's transaction_id option, you can set up pipelines that read from redundant Kafka sessions in a single Kafka producer while ensuring that rows are processed only once. To do so, you must identify:

  1. the transaction_id for the session
  2. the pump for the pipeline.

If s-Server identifies two foreign streams/pipelines that use the same transaction_id, one foreign stream/pipeline is designated the leader and all others are designated followers. s-Server will actively read from the leader only. If the leader fails, one of the followers will be designated the leader, and so on.

If a follower catches up to leader and leader has not committed, follower "sleeps" for a few seconds

If I did not see the leader commit, then the follow concludes that leader has probably failed. Follower then topples leader. I will then start committing rows.

"transactional.id" = functions as a baton. This is what lets the follower declare itself to be leader.

With three redundant pipelines running, you would one leader and two followers. If leader fails, one of the other two become leader.

Functions to Read Watermarks Committed During Kafka Transactions

The following functions help fetch committed watermarks (ROWTIME and/or specified metadata column values). These values are used as watermarks/offsets for recovering from upstream Kafka source topics).

-- This function returns watermark offsets to be used during
-- replay/recovery from the source topic(s) of a pipeline.
-- The topic passed as a parameter, however, is the name of the
-- sink topic. As transactions are committed to the sink, the
-- commit metadata saves the commit timestamp and, optionally,
-- the offset for the source topic.
-- This function returns these offsets for each partition of the
-- topic.
-- The consumer_group, for now, needs to be
-- '<sink_topic>_watermark'

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TABLE (
  "TOPIC" VARCHAR(128),
  "PARTITION" INTEGER,
  "OFFSET" BIGINT
)

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkOffsetsFromIndexTopic';


-- This function returns the ROWTIME of the last row committed in
-- the transaction.
-- Parameters are the same as described for watermark_offsets()
-- function above

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TIMESTAMP
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkTimestamp';

-- This function returns the HA mode (Leader/Follower) of a pump.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.pump_mode(
  pump_name VARCHAR(128)
)
RETURNS VARCHAR(256)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getMode';

Building and Using an Index Topic

In order to automatically rebalance of assigned partitions or implement exactly once semantics, s-Server relies on a Kafka v0.10.2 feature called Time Index. This feature provides the timestamps that you promote to rowtime, which serve as a natural watermark for the pipeline.

But what if you're working with an earlier version of Kafka, or have other trouble getting timestamps into your pipeline?

In these cases, you can still do automatic partition rebalancing and exactly once semantics, but you will need to create your own timestamps for each message. You can do so by building an index topic.

To do so, you set up a stream that uses the first values and last values of the topic's offset and applies a timestamp at build time. Using the offset range for the actual source topic, we read records from the source topic and return the timestamp from the index topic for all records (messages) in that offset range.

MESSAGE_TIMESTAMP START_OFFSET END_OFFSET
2018-01-01 00:00:01 64000 67903
2018-01-01 00:00:02 67904 68112
2018-01-01 00:00:04 68113 72892

For example:

CREATE OR REPLACE SCHEMA "IndexBuilder";
SET SCHEMA '"IndexBuilder"';

CREATE OR REPLACE FOREIGN STREAM "IndexBuilder"."OrdersTopicIndex" (
  "PARTITION" INTEGER,
  "MESSAGE_TIMESTAMP" TIMESTAMP,
  "START_OFFSET" BIGINT,
  "END_OFFSET" BIGINT
)

SERVER KAFKA10_SERVER
OPTIONS (
  SEED_BROKERS 'localhost:9092,
  TOPIC 'OrdersTopic_index',
  "COMMIT_METADATA_COLUMN_NAME" 'END_OFFSET',
  "transactional.id" 'OrdersTopic_index_builder',
  "pump.name" 'LOCALDB.IndexBuilder.Index1Pump,
  "TRANSACTION_ROWTIME_LIMIT" '1000' -- 1 second
  FORMATTER 'CSV'
);

CREATE OR REPLACE PUMP "IndexBuilder"."Index1Pump" STOPPED AS
INSERT INTO "IndexBuilder"."OrdersTopicIndex"
SELECT STREAM * FROM "IndexBuilder".AggregationView;

Once the index topic is built, you can use it as follows:

CREATE OR REPLACE FOREIGN STREAM OrdersStream (
   MESSAGE_TIMESTAMP TIMESTAMP,
   ...
   ORDER_ID BIGINT,
   ...
)

SERVER KAFKA10_SERVER
OPTIONS (
   SEED_BROKERS 'localhost:9092',
   TOPIC 'OrdersTopic',
   INDEX_TOPIC_NAME_SUFFIX '_index', -- use OrdersTopic_index to determine starting position
   STARTING_TIME '2018-02-01 00:00:00',
   ...,
   PARSER 'JSON'
);

Writing to Kafka Using the ECD Agent

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

FORMATTER=CSV
TOPOC=AggregatedData
"metadata.broker.list"=localhost:9092
FORMATTER=CSV
ROW_SEPARATOR=''
CHARACTER_ENCODING=UTF-8
SCHEMA_NAME=KAFKASOURCE
TABLE_NAME=KAFKAWRITER_STREAM
ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

Working with Kafka Message Headers

(new in s-Server version 6.0.1) Messages published to a Kafka topic can have associated headers. s-Server lets you both read and write these headers.

Writing Headers to Kafka Based on Column Data

For Kafka10 Sinks, the KAFKA10 adapter supports an option called HEADERS_COLUMN_LIST. This is a comma-separated list of column names (case sensitive) that are turned into Kafka headers in the following form:

column_name1=column_value1),...,(column_nameN=column_valueN)

For example, the following stream specifies that headers will be written to Kafka using the columns prescribed and highway:

--this line creates a schema for use in the two examples that follow
CREATE OR REPLACE SCHEMA headers_example;

CREATE OR REPLACE FOREIGN STREAM headers_example.headers_egress
(
    "id" BIGINT,
    "reported_at" VARCHAR(32),
    "speed" INTEGER,
    "driver_no" BIGINT,
    "prescribed" BOOLEAN,
    "highway" VARCHAR(8)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'JSON',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "bootstrap.servers" 'localhost:9092',
    --option for writing headers
    "HEADERS_COLUMN_LIST" 'prescribed,highway',
    "TOPIC" 'headers_test'
);

Reading and Parsing Headers

Once you have written these headers to a Kafka message, you can read them by specifying a stream column called SQLSTREAM_PROV_KAFKA_HEADERS, defined as VARCHAR. (This column is known as a provenance column because it tells you something about the source of your data.)

Headers do not need to have been written by s-Server; the KAFKA10 adapter will read any headings present in a Kafka topic's messages. If there are no headers for the Kafka message, then the column SQLSTREAM_PROV_KAFKA_HEADERS returns null.

Limitations:

  • Header values are assumed to be 'UTF-8' encodings of String values
  • Keys & values in headers cannot contain '<double-quote>', '<newline>', '\='

When you specify this column and select it in a query, it returns headers as key-value pairs, in serialized text format. If a Kafka message has 2 headers ("key1", "value1") & ("key2", "value2"), the value of SQLSTREAM_PROV_KAFKA_HEADERS will be: 'key1=value1<newline-character>key2=value2', where ( is the actual newline character).

Each Kafka message may have an arbitrary number of headers (key-value pairs). You can use the Parser UDX to parse SQLSTREAM_PROV_KAFKA_HEADERS in a pipeline using PARSER = 'KV' as a setting for the UDX.

The following is an example of a KAFKA10 foreign stream with SQLSTREAM_PROV_KAFKA_HEADERS specified as a provenance column:

CREATE OR REPLACE FOREIGN STREAM headers_example.headers_ingest
(
    "id" BIGINT,
    "reported_at" TIMESTAMP,
    "prescribed" BOOLEAN,
    "highway" VARCHAR(8),
    "SQLSTREAM_PROV_KAFKA_HEADERS" VARCHAR(256)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "PARSER" 'JSON',
    "ROW_PATH" '$',
    "SEED_BROKERS" 'localhost:9092',
    "STARTING_TIME" 'LATEST',
    "isolation.level" 'read_uncommitted',
    "TOPIC" 'headers_test'
);

For a stream with headers written by s-Server as "HEADERS_COLUMN_LIST" 'prescribed,highway' (as we defined above, headers might look like the following. Note that you call SQLSTREAM_PROV_KAFKA_HEADERS using quotation marks. Also note that headers return with a line break.

SELECT STREAM "id", "SQLSTREAM_PROV_KAFKA_HEADERS" FROM headers_example.headers_ingest;

'id','SQLSTREAM_PROV_KAFKA_HEADERS'
'50115871536','prescribed=false
highway=MR549'
'346866848365','prescribed=false
highway=MR620'
'50116198282','prescribed=false
highway=MR184'