Using Exactly Once Semantics

In information delivery systems, exactly-once means that even if a producer retries sending a message, the message is delivered exactly once to the end consumer. Exactly-once semantics require that applications coordinate between sources (producers of data) and sinks (consumers of data). s-Server accomplishes such coordination by using ROWTIME as a watermark that tells s-Server the point from which data was last read from a source and written to a sink. Since ROWTIMEs are a component of every stream, you can persist a watermark through a pipeline relatively easily--provided you manage ROWTIME with care.

Exactly once semantics require that you create a pipeline that reads from a source with an identifiable watermark (the last position read) and write to a sink that supports transactional writes (currently Kafka only). That is, you need a data sink that supports transactions where a batch of rows produced by a streaming pipeline are committed atomically to the data sink.

While s-Server supports exactly once for pipelines with any source with an identifiable watermark that writes to a Kafka sink, we have tested exactly once with Kafka-Kafka pipelines. To use exactly once semantics with other sources, you will need to write code that uses these sources' watermarks to coordinate with the Kafka sink. In such cases, you may want to consult with Guavus SQLstream's technical support.

In essence, exactly once works like this:

  1. An s-Server pipeline reads from a source with a watermark (for Kafka, a timestamp).
  2. You develop code to promote the source's watermark to ROWTIME (for Kafka, by promoting the Kafka timestamp to ROWTIME).
  3. As long as you do not alter ROWTIME (by doing another promotion, for example), s-Server retains ROWTIME throughout the pipeline.
  4. Using ROWTIME, s-Server commits data in transactions to the sink (using Kafka's transactional.id).

If a failure occurs, s-Server "rewinds" the data source to the position from which it last read data. For example, when you use Kafka as a source, the starting position can be partition:offset pairs for each partition in a topic or a starting_time where messages with timestamps greater than or equal to starting_time are replayed.

With exactly once, each batch (delta) of rows committed a sink constitutes a complete and repeatable result of a streaming analytic pipeline. If a failure occurs, s-Server can rewind to a watermark in the source and replay the pipeline from this position. Because the sink requires a transactional commit, in this manner all rows are written once or not at all.

This topic contains the following subtopics:

Using Watermarks

To retrieve the watermark for a data sink, you use provenance columns. Provenance columns provide s-Server with information about a source's origins. In this case, we are interested in provenance columns that concern the point in a source from which data was last read to the streaming pipeline.

The following table shows provenance columns that you can use as watermarks. Again, we have only tested this process with Kafka, so if you are going to use another source, we recommend contacting Guavus technical support. For all sources, you can also set up a separate pipeline that writes to Kafka and use this as a source.

Data Source Type Provenance Columns Required for Watermarks
File SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_PARSE_POSITION} or
{SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER}
Kafka {SQLSTREAM_PROV_KAFKA_PARTITION/SQLSTREAM_PROV_KAFKA_OFFSET} or
{SQLSTREAM_PROV_KAFKA_TIMESTAMP}
AMQP {SQLSTREAM_PROV_AMQP_PARTITION/SQLSTREAM_PROV_AMQP_CREATION_TIME}
MQTT {SQLSTREAM_PROV_MQTT_MESSAGE_ID}
Kinesis {SQLSTREAM_PROV_KINESIS_PARTITION_ID/SQLSTREAM_PROV_KINESIS_SEQUENCE_NUMBER}
RDBMS The name of a column with monotonically-increasing data (such as an ordered timestamp) supplied through the "queryCol" option of the MED/JDBC foreign stream.

Using Exactly Once with Kafka as a Source

To implement exactly once using Kafka as a source, you promote a timestamp from a Kafka topic as the ROWTIME column for a pipeline. (See promoting a column to ROWTIME for more details.)

Once you promote Kafka timestamps to ROWTIME, s-Server can use ROWTIME as a watermark for the pipeline. In turn, this lets s-Server roll back changes and replay them from the source if writing to a sink fails. In other words, if a write fails, exactly once tells s-Server "do everything that went along with processing those N input rows and write the result."

This process allows s-Server to ensure that all data written to a sink as a result of this input data tracks this watermark. In turn, promoting a Kafka timestamp to ROWTIME ensures that s-Server knows that every input row has resulted in a correct write to a sink.

Note: Exactly once does not always mean a row-to-row correspondence between input and output rows, since rows may be aggregated in a pipeline (using SUM, for example). It means, instead, that all rows have been processed through a pipeline exactly once.

In order to use exactly once, you need to do the following:

  1. Promote a timestamp from the Kafka source to ROWTIME.
  2. Retain this ROWTIME throughout the entire pipeline (this happens automatically as long as you do not promote another value to ROWTIME).
  3. Use atomic commitments when writing to the Kafka sink.

Note: This functionality relies on the Kafka message queue storing data for two weeks. That means you cannot replay more than two weeks of data without adjusting how long your message view stores data.

Specifying the Starting Position for a Kafka Foreign Stream

In order to implement at least or exactly once semantics, you should set STARTING_TIME to a timestamp. This specifies the recovery/replay position for the Kafka topic.

When you promote Kafka message timestamps to ROWTIME (with or without t-sorting), these act as a natural watermark for time-series analytics pipelines on data streaming from Kafka.

You can specify a starting position through a number of SQL/MED options in the foreign stream definition.

Those options are used using the following precedence order.

  1. PARTITION_OFFSET_QUERY - This is a SQL query text that fetches starting offsets for all partitions of the topic. For example, SELECT "TOPIC", "PARTITION", "OFFSET" FROM stored_offsets;
  2. STARTING_OFFSET - This is a single offset value that can be passed to position within a non-partitioned Kafka topic.
  3. STARTING_TIME - This is the recommended option. As described earlier, this option helps achieve exactly once semantics and recovery/replay of time-series analytics based on message timestamps stored in kafka topics.

If none of the above options are specified then committed offsets for the specified consumer group(through the CLIENT_ID option of the foreign stream). Committed offsets are ignored when either of the above options are specified as the starting position.

The best practice is to start a new subscription (the first instance of the query in a consumer group) by specifying the STARTING_TIME option. All subsequent queries should use PARTITION_OFFSET_QUERY. In this way, we use committed offsets as a starting position when we rebalance assigned partitions across all queries with the same CLIENT_ID option.

Kafka allows messages to be slightly unordered. Once the adapter retrieves messages with their respective timestamps, those timestamps can be promoted as ROWTIME with a desirable t-sort interval. A t-sort introduces a latency equal to the t-sort interval in the stream processing pipeline but is not expected to significantly affect the throughput of the pipeline, particularly for a small t-sort interval. In most real world use cases, a t-sort interval is expected to be in the low single digit seconds. For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause in the SQLstream Streaming SQL Reference Guide.

SQL Example for Using Exactly Once

The below code depicts a simple stream processing pipeline. Here, we pull in streaming data from a Kafka topic, perform time-series analytics on this data, and deliver it to another Kafka sink.

CREATE OR REPLACE SCHEMA order_schema_kafka;
ALTER PUMP order_schema_kafka.* STOP;
DROP SCHEMA order_schema_kafka CASCADE;
CREATE OR REPLACE SCHEMA order_schema_kafka;
SET SCHEMA 'order_schema_kafka';

CREATE OR REPLACE FOREIGN STREAM order_schema_kafka.orders
(
  MESSAGE_TIMESTAMP TIMESTAMP,
  ORDER_ID BIGINT,
  ITEM_ID   INTEGER,
  ITEM_PRICE DECIMAL(10,2),
  ITEM_QUANTITY INTEGER,
  PRODUCT_ID INTEGER,
  SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP
)

SERVER "KAFKA10_SERVER"

OPTIONS (
        "PARSER" 'CSV',
        "SEED_BROKERS" 'localhost:9092',
        --if pipeline has failed, returns watermark for last row written to the Kafka sink.
        --s-Server uses this value as STARTING_TIME.
        OPTIONS_QUERY 'SELECT * FROM (VALUES(sys_boot.mgmt.watermark_timestamp(''localhost:9092'', ''ORDERS_OUTPUT'', ''LOCALDB.order_schema_kafka.PUMP_Pump''))) AS options(STARTING_TIME)',
        "STARTING_TIME" 'EARLIEST', --s-Server uses this value if the OPTIONS_QUERY returns null.
        "MAX_POLL_RECORDS" '100',
        "STARTING_OFFSET" '-1',
        "BUFFER_SIZE" '1048576',
        "FETCH_SIZE" '1000000',
        "isolation.level" 'read_uncommitted',
        "TOPIC" 'orders_raw'
        );

--this view promotes Kafka's message timestamp to ROWTIME and orders data by message timestamp
CREATE OR REPLACE VIEW order_schema_kafka.tSortedOrders AS
--promotes Kafka's MESSAGE_TIMESTAMP to ROWTIME
SELECT STREAM SQLSTREAM_PROV_KAFKA_TIMESTAMP AS ROWTIME, *
FROM order_schema_kafka.orders
ORDER BY SQLSTREAM_PROV_KAFKA_TIMESTAMP WITHIN INTERVAL '2' SECOND;

--analytics in a view
--value of ROWTIME remains unaltered (it remains the value of SQLSTREAM_PROV_KAFKA_TIMESTAMP)
CREATE OR REPLACE VIEW order_schema_kafka.AnalyticsView AS
SELECT STREAM *,
  SUM(ITEM_PRICE * ITEM_QUANTITY) OVER lastHour AS quantitySoldLastHour
FROM order_schema_kafka.tSortedOrders
WINDOW lastHour AS (PARTITION BY PRODUCT_ID
​                  RANGE INTERVAL '1' HOUR PRECEDING);

CREATE OR REPLACE FOREIGN STREAM order_schema_kafka.output_sink (
  MESSAGE_TIMESTAMP TIMESTAMP,
  ORDER_ID BIGINT,
  ITEM_ID   INTEGER,
  ITEM_PRICE DECIMAL(10,2),
  ITEM_QUANTITY INTEGER,
  PRODUCT_ID INTEGER,
  SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP, --provenance column for kafka timestamp
  quantitySoldLastHour INTEGER NOT NULL
)
SERVER KAFKA10_SERVER
OPTIONS (
  TRANSACTION_ROWTIME_LIMIT '60000', -- 1 minute
    --passing this option to Kafka tells Kafka to commit rows atomically
   "transactional.id" 'auto',
   "FORMATTER" 'CSV',
   "FORMATTER_INCLUDE_ROWTIME" 'false',
   "bootstrap.servers" 'localhost:9092',
   "TOPIC" 'orders_output'
);

CREATE OR REPLACE PUMP order_schema_kafka.pump STOPPED AS
INSERT INTO order_schema_kafka.output_sink
SELECT STREAM
    MESSAGE_TIMESTAMP,
    ORDER_ID,
    ITEM_ID,
    ITEM_PRICE,
    ITEM_QUANTITY,
    PRODUCT_ID,
    SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP, --provenance column for kafka timestamp
    quantitySoldLastHour INTEGER NOT NULL
FROM order_schema_kafka.AnalyticsView;
--this data will be delivered exactly once. Either all rows in the one-minute window
--will be delivered, or no rows will be delivered.

Note that SQLSTREAM_PROV_KAFKA_TIMESTAMP is promoted to ROWTIME. This serves as a watermark for the pipeline.

It is critical to "commit" results of time-series analytics at ROWTIME boundaries. In the code above, _order_schema_kafka.outputsink is configured to atomically commit results every minute (TRANSACTION_ROWTIME_LIMIT '60000').

At each one minute boundary, all rows from order_schema_kafka.AnalyticsView from the last one minute are committed atomically. This ensures exactly once semantics.

Functions to Read Watermarks Committed During Kafka Transactions

The following functions help fetch committed watermarks (ROWTIME and/or specified metadata column values). These values are used as watermarks/offsets for recovering from upstream Kafka source topics).

-- This function returns watermark offsets to be used during
-- replay/recovery from the source topic(s) of a pipeline.
-- The topic passed as a parameter, however, is the name of the
-- sink topic. As transactions are committed to the sink, the
-- commit metadata saves the commit timestamp and, optionally,
-- the offset for the source topic.
-- This function returns these offsets for each partition of the
-- topic.
-- The consumer_group, for now, needs to be
-- '<sink_topic>_watermark'

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TABLE (
  "TOPIC" VARCHAR(128),
  "PARTITION" INTEGER,
  "OFFSET" BIGINT
)

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkOffsetsFromIndexTopic';


-- This function returns the ROWTIME of the last row committed in
-- the transaction.
-- Parameters are the same as described for watermark_offsets()
-- function above

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TIMESTAMP
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkTimestamp';

-- This function returns the HA mode (Leader/Follower) of a pump.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.pump_mode(
  pump_name VARCHAR(128)
)
RETURNS VARCHAR(256)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getMode';