Using Exactly Once Semantics

This topic contains the following subtopics:

Introduction to Exactly Once

In information delivery systems, exactly-once means that even if a producer retries sending a message, the message is delivered exactly once to the end consumer. Exactly-once semantics require that applications coordinate between sources (producers of data) and sinks (consumers of data). s-Server accomplishes such coordination by using ROWTIME as a watermark that tells s-Server the point from which data was last read from a source and written to a sink. Since ROWTIMEs are a component of every stream, you can persist a watermark through a pipeline relatively easily--provided you manage ROWTIME with care.

Exactly once semantics require that you create a pipeline that reads from a source with an identifiable watermark (the last position read) and write to a sink that supports transactional writes (currently Kafka only). That is, you need a data sink that supports transactions where a batch of rows produced by a streaming pipeline are committed atomically to the data sink.

While s-Server supports exactly once for pipelines with any source with an identifiable watermark that writes to a Kafka sink, we have tested exactly once with Kafka-Kafka pipelines. To use exactly once semantics with other sources, you will need to write code that uses these sources' watermarks to coordinate with the Kafka sink. In such cases, you may want to consult with Guavus SQLstream's technical support.

In essence, exactly once works like this when using ROWTIME as a watermark:

  1. An s-Server pipeline reads from a source with a watermark (for Kafka, a timestamp).
  2. You develop code to promote the source's watermark to ROWTIME (for Kafka, by promoting the Kafka timestamp to ROWTIME).
  3. As long as you do not alter ROWTIME (by doing another promotion, for example), s-Server retains ROWTIME throughout the pipeline.
  4. Using ROWTIME, s-Server commits data in transactions to the sink (for example, by using Kafka's transactional.id).

If a failure occurs, s-Server "rewinds" the data source to the position from which it last read data. For example, when you use Kafka as a source, the starting position can be partition:offset pairs for each partition in a topic or a starting_time where messages with timestamps greater than or equal to starting_time are replayed.

With exactly once, each batch (delta) of rows committed a sink constitutes a complete and repeatable result of a streaming analytic pipeline. If a failure occurs, s-Server can rewind to a watermark in the source and replay the pipeline from this position. Because the sink requires a transactional commit, in this manner all rows are written once or not at all.

SQLstream can use different types of watermark, depending on the incoming data and the capabilities of the source and target plugins. For example, the file-based plugins support using a location in a file as a watermark (file name plus line number).

Using Watermarks

To retrieve the watermark for a data sink, you use either the rowtime, or provenance columns. Provenance columns provide s-Server with information about a source's origins. In this case, we are interested in provenance columns that concern the point in a source from which data was last read to the streaming pipeline.

The following table shows provenance columns that you can use as watermarks. Again, we have only tested this process with Kafka, so if you are going to use another source, we recommend contacting Guavus technical support. For all sources, you can also set up a separate pipeline that writes to Kafka and use this as a source.

Data Source Type Provenance Columns Required for Watermarks
File SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_PARSE_POSITION} or
{SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER}
File-VFS and S3 SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_FILE_MODIFIED_TIME} or
{SQLSTREAM_PROV_FILE_SOURCE_FILE/SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER}
Kafka {SQLSTREAM_PROV_KAFKA_PARTITION/SQLSTREAM_PROV_KAFKA_OFFSET} or
{SQLSTREAM_PROV_KAFKA_TIMESTAMP}
AMQP {SQLSTREAM_PROV_AMQP_PARTITION/SQLSTREAM_PROV_AMQP_CREATION_TIME}
MQTT {SQLSTREAM_PROV_MQTT_MESSAGE_ID}
Kinesis {SQLSTREAM_PROV_KINESIS_PARTITION_ID/SQLSTREAM_PROV_KINESIS_SEQUENCE_NUMBER}
RDBMS The name of a column with monotonically-increasing data (such as an ordered timestamp) supplied through the "queryCol" option of the MED/JDBC foreign stream.

Using Exactly Once with a File-based source

To implement exactly once using a file-based source, we need to decide which method to use to generate and track watermarks:

We need a target that is transactional and from which we can read back the latest watermark. The target could be an RDBMS, for example.

Preparation for the file based examples

For both these examples we will use the Buses data shipped with the s-Server installer, and a PostgreSQL target. Everything needed can be found here or on the sqlstream/complete docker image. In this example we will be using CSV data (unlike the StreamLab 'Sydney Buses' gallery application, which uses XML).

First, we create a target table in PostgreSQL; we use the public schema in the demo database and we login as demo using password demodemo:

    psql -U demo -d demo

    create table vehicle_counts
    ( trans_minute timestamp primary key
    , vehicle_count integer
    , avg_speed real
    , watermark varchar(100)
    );

NOTE: the primary key constraint on trans_minute will catch any duplicate writes.

Now we set up the s-Server schema and source foreign stream:

CREATE OR REPLACE SCHEMA "test_watermark";
SET SCHEMA '"test_watermark"';
SET PATH '"test_watermark"';

-- create the source 
CREATE OR REPLACE FOREIGN STREAM "buses_input"
( "id" BIGINT
, "reported_at" VARCHAR(32)
, "shift_no" VARCHAR(20)
, "trip_no" VARCHAR(20)
, "route_variant_id" VARCHAR(20)
, "waypoint_id" VARCHAR(20)
, "last_known_location_state" VARCHAR(20)
, "lat" DOUBLE
, "lon" DOUBLE
, "speed" INTEGER
, "bearing" INTEGER
, "driver_no" VARCHAR(20)
, "prescribed" BOOLEAN
, "highway" VARCHAR(8)
, SQLSTREAM_PROV_FILE_SOURCE_FILE VARCHAR(128)
, SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER BIGINT
)
SERVER "FILE_SERVER"
OPTIONS 
( DIRECTORY          '/tmp'
, FILENAME_PATTERN   'buses.log'
, CHARACTER_ENCODING 'UTF-8'
, SKIP_HEADER        'true'
, PARSER             'CSV'
, OPTIONS_QUERY 'SELECT * FROM "test_watermark"."recovery_options"'
);

NOTES:

  • the OPTIONS_QUERY is not actually parsed until a SELECT statement reads from the source foreign stream. So there is no difficulty with a forward reference. If the parsing fails you would see a runtime error in the Trace.log file.
  • the "recovery_options" view is different depending on which watermark we are using
  • the query must explicitly specify the schema name of the table or view ("test_watermark" in this example).

The next step is to create a SQL/MED server for the PostgreSQL database:

CREATE OR REPLACE SERVER "PostgreSQL_DB"
    FOREIGN DATA WRAPPER "SYS_JDBC"
    OPTIONS (
        "URL" 'jdbc:postgresql://localhost/demo',
        "USER_NAME" 'demo',
        "PASSWORD" 'demodemo',
        "SCHEMA_NAME" 'public',
        "DIALECT" 'PostgreSQL',
        "DRIVER_CLASS" 'org.postgresql.Driver'
    );

And then we create the target table:

-- as the basis for a Postgres sink table
-- we plan to commit every 60,000ms (1 minute)

CREATE OR REPLACE FOREIGN TABLE "vehicle_counts_pg"
SERVER "PostgreSQL_DB"
OPTIONS 
( SCHEMA_NAME 'public'
, TABLE_NAME 'vehicle_counts'
, TRANSACTION_ROWTIME_LIMIT '60000'
);

We create the transformation pipeline that promotes reported_at to ROWTIME (for the STARTING_TIME method, constructs the required watermark (for the STARTING_POSITION method) and produces a simple time-based aggregate. Right at the start of the pipeline we introduce a throttle function to slow the processing down (to make it easier to see what is happening)

CREATE OR REPLACE FUNCTION "throttle"
(inputRows CURSOR, throttleScale int)
returns TABLE(inputRows.*)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'class:com.sqlstream.plugin.timesync.ThrottleStream.throttle';

-- throttle to 10ms between rows; each 5000 record file file takes a little under 1 minute to process

CREATE OR REPLACE VIEW "throttled_buses_view"
AS SELECT STREAM * 
FROM STREAM("throttle" 
  ( CURSOR(SELECT STREAM * FROM "buses_input")
  , 10
  ));

-- read from the throttle view, promote rowtime, and expose the latest starting_position

CREATE OR REPLACE VIEW "buses_watermark_view" AS 
SELECT STREAM CAST("reported_at" AS TIMESTAMP) AS ROWTIME
, SQLSTREAM_PROV_FILE_SOURCE_FILE || ':' ||
   CAST(SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER AS VARCHAR(20)) 
   AS STARTING_POSITION
, "id"
, "speed"
, "highway"
FROM  "throttled_buses_view"
WHERE "speed" > 0 
AND NOT ("lat" = 0.0 OR "lon" = 0.0)
;  

-- add a rowtime-based aggregation to demonstrate another typical step

CREATE OR REPLACE VIEW "buses_aggregate_view" AS
SELECT STREAM STEP(s.ROWTIME BY INTERVAL '1' MINUTE) as "trans_minute"
    , COUNT(*) as "vehicle_count"
    , AVG("speed") as "avg_speed"
    , MAX(STARTING_POSITION) as "watermark" 
FROM "buses_watermark_view" s
GROUP BY STEP(s.ROWTIME BY INTERVAL '1' MINUTE);


CREATE OR REPLACE PUMP "buses_output_Pump" STOPPED 
AS
INSERT INTO "vehicle_counts_pg" 
    ("trans_minute", "vehicle_count", "avg_speed", "watermark")
SELECT STREAM "trans_minute"
     , "vehicle_count"
     , "avg_speed"
     , "watermark"
FROM "buses_aggregate_view";

Now we have created the pipeline we can construct the method-specific steps:

SQL example - using STARTING_TIME with File as a Source

First, follow the steps in the preparation section above.

We use a timestamp that is encoded into the names of the input files as the watermark. The format of the timestamp must be defined by the FILENAME_PATTERN option - the timestamp must be represented as group 1 in the regular expression, and must be supplied in a format like 'yyyy-M-d hh:mm:ss[.SSS]'.

In our example the original data is perfectly ordered and we can generate a buses_yyyy-MM-ddTHH:mm:ss.log file for every 10 minutes.

FILENAME_PATTERN File Name Extracted Timestamp
buses_([^.]+).log buses_2020-10-11T12:20:00.log 2020-10-11T12:20:00

We promote the event time to ROWTIME (sorting it if necessary - see T-Sorting Stream Input) and then store the high watermark time in the target table (as "trans_minute" in this example).

When we have to restart the pipeline after an interruption, we can get the latest watermark time from the target using an OPTIONS_QUERY select statement that is executed as we restart the pump.

So we add a convenience view of the remote table that finds only the latest STARTING_TIME watermark. It also has to set the SORT_BY_TIME option.

Note that this view could be more efficiently stated if created in PostgreSQL (left as an exercise for the reader): This is the same view we referenced for the source foreign stream's OPTION_QUERY.

CREATE OR REPLACE VIEW "recovery_options"
AS
SELECT MAX(p."trans_minute") AS "STARTING_TIME", true AS "SORT_BY_TIME"
FROM "vehicle_counts_pg" p 
WHERE p."trans_minute" = (
    select max("trans_minute") from "vehicle_counts_pg"
);

The file reader identifies files to read in the input directory, and ignores any file whose name includes a timestamp earlier than the STARTING_TIME.

Note that data from the same time could be disordered and spread over two or more files. In that case this approach can be turned into an at-least-once method by making a suitable allowance in the view to ensure that one or two earlier files may be re-read:

CREATE OR REPLACE VIEW "recovery_options"
AS
SELECT MAX(p."trans_minute") - INTERVAL '10' MINUTE AS "STARTING_TIME", true AS "SORT_BY_TIME"
FROM "vehicle_counts_pg" p 
WHERE p."trans_minute" = (
    select max("trans_minute") from "vehicle_counts_pg"
);

Now we can run the example: see Testing the file based recovery.

SQL example - using STARTING_POSITION with File as a Source

This method uses the file position provenance columns SQLSTREAM_PROV_FILE_SOURCE_FILE and SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER as a watermark. We construct a watermark based on <source_file>:<line_number> which we store in the target table. Every time we restart the pumps, s-Server reads the latest watermark and use it to set the STARTING_POSITION option.

First, follow the steps in the preparation section above.

Next we add the convenience view of the remote table that finds only the latest STARTING_POSITION watermark.

Note that this view could be more efficiently stated if created in PostgreSQL (left as an exercise for the reader): This is the same view we referenced for the source foreign stream's OPTION_QUERY.

CREATE OR REPLACE VIEW "recovery_options"
AS
SELECT p."watermark" AS "STARTING_POSITION"
FROM "vehicle_counts_pg" p 
WHERE p."trans_minute" = (
    select max("trans_minute") from "vehicle_counts_pg"
);

Now we can run the example: see Testing the file based recovery

Testing the file based recovery

Whichever method we have used, we can check the state of the watermark; depending on which columns are returned you will know whether it is defined for (STARTING_POSITION) or (STARTING_TIME, SORT_BY_TIME)

   select * from "test_watermark"."recovery_options";

No row will be returned at this stage as we haven't executed the pipeline yet.

We want to generate a set of files in /tmp/buses from the stored tarball. The data runs from 2014-07-23 20:51:58.547 to 2014-07-23 21:12:27.390, so we will generate one file per minute using awk. The event time is in field 2 and the field separator is a comma.

. /etc/sqlstream/environment 
mkdir /tmp/buses                                                          
rm /tmp/buses/*
zcat $SQLSTREAM_HOME/demo/data/buses/30-min-at-50-rps.txt.gz | \
   awk -F, '{x=substr($2,1,16); sub(/ /,"T",x); print $0 >> ("/tmp/buses/buses_" x ":00.log");}'

$ ls -C1 /tmp/buses 

buses_2014-07-23T20:51:00.log
buses_2014-07-23T20:52:00.log
buses_2014-07-23T20:53:00.log
buses_2014-07-23T20:54:00.log
buses_2014-07-23T20:55:00.log
buses_2014-07-23T20:56:00.log
buses_2014-07-23T20:57:00.log
buses_2014-07-23T20:58:00.log
buses_2014-07-23T20:59:00.log
buses_2014-07-23T21:00:00.log
buses_2014-07-23T21:01:00.log
buses_2014-07-23T21:02:00.log
buses_2014-07-23T21:03:00.log
buses_2014-07-23T21:04:00.log
buses_2014-07-23T21:05:00.log
buses_2014-07-23T21:06:00.log
buses_2014-07-23T21:07:00.log
buses_2014-07-23T21:08:00.log
buses_2014-07-23T21:09:00.log
buses_2014-07-23T21:10:00.log
buses_2014-07-23T21:11:00.log
buses_2014-07-23T21:12:00.log

We can now start the application:

ALTER PUMP "test_watermark".* START;

We can track the content of "vehicle_counts_pg" or "recovery_options" as the application is running using sqllineClient:

select * from "test_watermark"."vehicle_counts_pg";

select * from "test_watermark"."recovery_options";

We can interrupt the application using:

ALTER PUMP "test_watermark".* STOP;

As we restart the application pump, we will see that data is delivered starting after the last committed watermark. No duplicate rows are rejected by the PostgreSQL primary key.

Using Exactly Once with Kafka as a Source

To implement exactly once using Kafka as a source, you promote a timestamp from a Kafka topic as the ROWTIME column for a pipeline. (See promoting a column to ROWTIME for more details.)

Once you promote Kafka timestamps to ROWTIME, s-Server can use ROWTIME as a watermark for the pipeline. In turn, this lets s-Server roll back changes and replay them from the source if writing to a sink fails. In other words, if a write fails, exactly once tells s-Server "do everything that went along with processing those N input rows and write the result".

This process allows s-Server to ensure that all data written to a sink as a result of this input data tracks this watermark. In turn, promoting a Kafka timestamp to ROWTIME ensures that s-Server knows that every input row has resulted in a correct write to a sink.

Note: Exactly once does not always mean a row-to-row correspondence between input and output rows, since rows may be aggregated in a pipeline (using SUM, for example). It means, instead, that all rows have been processed through a pipeline exactly once.

In order to use exactly once, you need to do the following:

  1. Promote a timestamp from the Kafka source to ROWTIME.
  2. Retain this ROWTIME throughout the entire pipeline (this happens automatically as long as you do not promote another value to ROWTIME).
  3. Use atomic commitments when writing to the Kafka sink.

Note: This functionality relies on the Kafka message queue storing data for two weeks. That means you cannot replay more than two weeks of data without adjusting how long your message view stores data.

Specifying the Starting Position for a Kafka Foreign Stream

In order to implement at least or exactly once semantics, you should set STARTING_TIME to a timestamp. This specifies the recovery/replay position for the Kafka topic.

When you promote Kafka message timestamps to ROWTIME (with or without t-sorting), these act as a natural watermark for time-series analytics pipelines on data streaming from Kafka.

You can specify a starting position through a number of SQL/MED options in the foreign stream definition.

Those options are used using the following precedence order.

  1. PARTITION_OFFSET_QUERY - This is a SQL query text that fetches starting offsets for all partitions of the topic. For example, SELECT "TOPIC", "PARTITION", "OFFSET" FROM stored_offsets;
  2. STARTING_OFFSET - This is a single offset value that can be passed to position within a non-partitioned Kafka topic.
  3. STARTING_TIME - This is the recommended option. As described earlier, this option helps achieve exactly once semantics and recovery/replay of time-series analytics based on message timestamps stored in kafka topics.

If none of the above options are specified then committed offsets for the specified consumer group(through the CLIENT_ID option of the foreign stream). Committed offsets are ignored when either of the above options are specified as the starting position.

The best practice is to start a new subscription (the first instance of the query in a consumer group) by specifying the STARTING_TIME option. All subsequent queries should use PARTITION_OFFSET_QUERY. In this way, we use committed offsets as a starting position when we rebalance assigned partitions across all queries with the same CLIENT_ID option.

Kafka allows messages to be slightly unordered. Once the adapter retrieves messages with their respective timestamps, those timestamps can be promoted as ROWTIME with a desirable t-sort interval. A t-sort introduces a latency equal to the t-sort interval in the stream processing pipeline but is not expected to significantly affect the throughput of the pipeline, particularly for a small t-sort interval. In most real world use cases, a t-sort interval is expected to be in the low single digit seconds. For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause in the SQLstream Streaming SQL Reference Guide.

SQL Example for Using Exactly Once for Kafka

The below code depicts a simple stream processing pipeline. Here, we pull in streaming data from a Kafka topic, perform time-series analytics on this data, and deliver it to another Kafka sink.

CREATE OR REPLACE SCHEMA order_schema_kafka;
ALTER PUMP order_schema_kafka.* STOP;
DROP SCHEMA order_schema_kafka CASCADE;
CREATE OR REPLACE SCHEMA order_schema_kafka;
SET SCHEMA 'order_schema_kafka';

CREATE OR REPLACE FOREIGN STREAM order_schema_kafka.orders
(
  MESSAGE_TIMESTAMP TIMESTAMP,
  ORDER_ID BIGINT,
  ITEM_ID   INTEGER,
  ITEM_PRICE DECIMAL(10,2),
  ITEM_QUANTITY INTEGER,
  PRODUCT_ID INTEGER,
  SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP
)

SERVER "KAFKA10_SERVER"

OPTIONS (
        "PARSER" 'CSV',
        "SEED_BROKERS" 'localhost:9092',
        --if pipeline has failed, returns watermark for last row written to the Kafka sink.
        --s-Server uses this value as STARTING_TIME.
        OPTIONS_QUERY 'SELECT * FROM (VALUES(sys_boot.mgmt.watermark_timestamp(''localhost:9092'', ''ORDERS_OUTPUT'', ''LOCALDB.order_schema_kafka.PUMP_Pump''))) AS options(STARTING_TIME)',
        "STARTING_TIME" 'EARLIEST', --s-Server uses this value if the OPTIONS_QUERY returns null.
        "MAX_POLL_RECORDS" '100',
        "STARTING_OFFSET" '-1',
        "BUFFER_SIZE" '1048576',
        "FETCH_SIZE" '1000000',
        "isolation.level" 'read_uncommitted',
        "TOPIC" 'orders_raw'
        );

--this view promotes Kafka's message timestamp to ROWTIME and orders data by message timestamp
CREATE OR REPLACE VIEW order_schema_kafka.tSortedOrders AS
--promotes Kafka's MESSAGE_TIMESTAMP to ROWTIME
SELECT STREAM SQLSTREAM_PROV_KAFKA_TIMESTAMP AS ROWTIME, *
FROM order_schema_kafka.orders
ORDER BY SQLSTREAM_PROV_KAFKA_TIMESTAMP WITHIN INTERVAL '2' SECOND;

--analytics in a view
--value of ROWTIME remains unaltered (it remains the value of SQLSTREAM_PROV_KAFKA_TIMESTAMP)
CREATE OR REPLACE VIEW order_schema_kafka.AnalyticsView AS
SELECT STREAM *,
  SUM(ITEM_PRICE * ITEM_QUANTITY) OVER lastHour AS quantitySoldLastHour
FROM order_schema_kafka.tSortedOrders
WINDOW lastHour AS (PARTITION BY PRODUCT_ID
​                  RANGE INTERVAL '1' HOUR PRECEDING);

CREATE OR REPLACE FOREIGN STREAM order_schema_kafka.output_sink (
  MESSAGE_TIMESTAMP TIMESTAMP,
  ORDER_ID BIGINT,
  ITEM_ID   INTEGER,
  ITEM_PRICE DECIMAL(10,2),
  ITEM_QUANTITY INTEGER,
  PRODUCT_ID INTEGER,
  SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP, --provenance column for kafka timestamp
  quantitySoldLastHour INTEGER NOT NULL
)
SERVER KAFKA10_SERVER
OPTIONS (
  TRANSACTION_ROWTIME_LIMIT '60000', -- 1 minute
    --passing this option to Kafka tells Kafka to commit rows atomically
   "transactional.id" 'auto',
   "FORMATTER" 'CSV',
   "FORMATTER_INCLUDE_ROWTIME" 'false',
   "bootstrap.servers" 'localhost:9092',
   "TOPIC" 'orders_output'
);

CREATE OR REPLACE PUMP order_schema_kafka.pump STOPPED AS
INSERT INTO order_schema_kafka.output_sink
SELECT STREAM
    MESSAGE_TIMESTAMP,
    ORDER_ID,
    ITEM_ID,
    ITEM_PRICE,
    ITEM_QUANTITY,
    PRODUCT_ID,
    SQLSTREAM_PROV_KAFKA_TIMESTAMP TIMESTAMP, --provenance column for kafka timestamp
    quantitySoldLastHour INTEGER NOT NULL
FROM order_schema_kafka.AnalyticsView;
--this data will be delivered exactly once. Either all rows in the one-minute window
--will be delivered, or no rows will be delivered.

Note that SQLSTREAM_PROV_KAFKA_TIMESTAMP is promoted to ROWTIME. This serves as a watermark for the pipeline.

It is critical to "commit" results of time-series analytics at ROWTIME boundaries. In the code above, _order_schema_kafka.outputsink is configured to atomically commit results every minute (TRANSACTION_ROWTIME_LIMIT '60000').

At each one minute boundary, all rows from order_schema_kafka.AnalyticsView from the last one minute are committed atomically. This ensures exactly once semantics.

Functions to Read Watermarks Committed During Kafka Transactions

The following functions help fetch committed watermarks (ROWTIME and/or specified metadata column values). These values are used as watermarks/offsets for recovering from upstream Kafka source topics).

-- This function returns watermark offsets to be used during
-- replay/recovery from the source topic(s) of a pipeline.
-- The topic passed as a parameter, however, is the name of the
-- sink topic. As transactions are committed to the sink, the
-- commit metadata saves the commit timestamp and, optionally,
-- the offset for the source topic.
-- This function returns these offsets for each partition of the
-- topic.
-- The consumer_group, for now, needs to be
-- '<sink_topic>_watermark'

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TABLE (
  "TOPIC" VARCHAR(128),
  "PARTITION" INTEGER,
  "OFFSET" BIGINT
)

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkOffsetsFromIndexTopic';


-- This function returns the ROWTIME of the last row committed in
-- the transaction.
-- Parameters are the same as described for watermark_offsets()
-- function above

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TIMESTAMP
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkTimestamp';

-- This function returns the HA mode (Leader/Follower) of a pump.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.pump_mode(
  pump_name VARCHAR(128)
)
RETURNS VARCHAR(256)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getMode';

Using Exactly Once with VFS or S3 as a Source

Exactly-once means that even if a producer retries sending a message, it is delivered exactly once to the end consumer. Exactly once WITH File VFS or S3 semantics requires a pipeline that reads from a source such as local file system or S3 with an identifiable watermark and writes to a sink such as Kafka, that supports transactional semantics where a batch of rows produced by a streaming pipeline are committed automically.This can be implemented for Stateless Pipeline or Stateful Pipeline as follows:

Stateless Pipeline : File-VFS source and Kafka Sink

The following diagram demonstrates the co-ordination between File-VFS source and Kafka sink that helps achieve exactly once when combined effectively. A similar architecture can be employed with another sink when using S3 as the source plugin.For the pipeline to achieve exactly once, SORT_FIELD must be Modified_time_of_the_file_in_seconds:File_name:Line_offset

Attached below is a sample SQL that reads from File-VFS & S3 source respectively and writes to the Kafka sink. When the pipeline starts, the Kafka UDF tries to fetch a watermark value from Kafka, serving as the position key for the File-VFS or S3 source . The source then reads the files according to the watermark it has received. The view "sample"."v1" is crucial as it is used to append SQLSTREAM_PROV_FILE_MODIFIED_TIME, SQLSTREAM_PROV_FILE_SOURCE_FILE and SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER into the SQLSTREAM_POSITION_KEY which is then passed as the watermark into the topic metadata.

Sample SQL

Reading from FILE-VFS

CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('<insert-kafka-brokers>', '<insert-kafka-topic-name>', 'LOCALDB.sample.<sink-pump-name>_Pump'))) AS options(STARTING_POSITION);

CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
(
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256),
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
    )
    SERVER "FILE_VFS_SERVER"

OPTIONS (
    "FILE_LOCATION" '<insert-input-file-location>',
    "SEPARATOR" ',',
    "UNPARSED_TEXT" 'LAST COLUMN',
    "ROW_SEPARATOR" u&'\000A',
    "FILENAME_PATTERN" '<insert-file-pattern>',
    "PARSER" 'CSV',
    "FILE_TYPE" '<insert-file-type>',
    "CHARACTER_ENCODING" 'UTF-8',
    "SKIP_HEADER" 'true',
    "SORT_FIELD" 'MODIFIED_FILE_TIME',
    "INGRESS_DELAY_INTERVAL" '<insert-ingress-delay-interval>',
    "INGRESS_FILE_SCAN_WAIT" '2000',
    "OPTIONS_QUERY" 'SELECT * FROM "7s".START_POSITION_QUERY'
);

CREATE OR REPLACE VIEW "sample"."v1" AS SELECT STREAM
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256),
CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(50)) || ':' || "SQLSTREAM_PROV_FILE_SOURCE_FILE" || ':' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50)) as "SQLSTREAM_POSITION_KEY"
FROM "sample"."fs" AS "input";

CREATE OR REPLACE VIEW "sample"."v2" AS
SELECT STREAM "event_time" AS "SQLSTREAM_EGRESS_KAFKA_TIMESTAMP", *
FROM "sample"."v1";

CREATE OR REPLACE FOREIGN STREAM "sample"."out_fs"
(
"SQLSTREAM_EGRESS_KAFKA_TIMESTAMP" TIMESTAMP,
"event_time" BIGINT,
"Col1" VARCHAR(256),
"Col2" VARCHAR(256),
"Col3" VARCHAR(256),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'AVRO',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "AVRO_SCHEMA_LOCATION" '/home/sqlstream/siq_edr_streaming_output.avsc',
    "bootstrap.servers" '<insert-kafka-brokers>',
    "linger.ms" '10',
    "batch.size" '10000',
    "compression.type" 'gzip',
    "TOPIC" '<insert-kafka-brokers-topic-name>',
    "transaction.timeout.ms" '60000',
    "transactional.id" 'auto',
    "TRANSACTION_ROWTIME_LIMIT" '60000',
    "transaction.enable.preemptive.commit" 'false',
    "transaction.first.preemptive.commit.check.percentage" '70',
    "COMMIT_METADATA_COLUMN_NAME" 'SQLSTREAM_POSITION_KEY'
);

CREATE OR REPLACE PUMP "sample"."<sink-pump-name>" STOPPED AS
    INSERT INTO "sample"."out_fs"
    SELECT  STREAM "SQLSTREAM_EGRESS_KAFKA_TIMESTAMP","event_time","Col1","Col2","Col3","SQLSTREAM_POSITION_KEY"
    FROM ""sample"."v2" AS "input"; 

Reading From S3

CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('<insert-kafka-brokers>', '<insert-kafka-topic-name>', 'LOCALDB.sample.<sink-pump-name>_Pump'))) AS options(STARTING_POSITION);

CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
(
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256),
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
    )
    SERVER "S3_SERVER"
OPTIONS (
    "FILE_LOCATION" '<insert-input-file-location>',
    "SEPARATOR" ',',
    "UNPARSED_TEXT" 'LAST COLUMN',
    "ROW_SEPARATOR" u&'\000A',
    "FILENAME_PATTERN" '<insert-file-pattern>',
    "PARSER" 'CSV',
    "FILE_TYPE" '<insert-file-type>',
    "CHARACTER_ENCODING" 'UTF-8',
    "SKIP_HEADER" 'true',
    "SORT_FIELD" 'MODIFIED_FILE_TIME',
    "INGRESS_DELAY_INTERVAL" '<insert-ingress-delay-interval>',
    "INGRESS_FILE_SCAN_WAIT" '2000',
    "OPTIONS_QUERY" 'SELECT * FROM "sample".START_POSITION_QUERY'
);

CREATE OR REPLACE VIEW "sample"."v1" AS SELECT STREAM
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256),
CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(50)) || ':' || "SQLSTREAM_PROV_FILE_SOURCE_FILE" || ':' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50)) as "SQLSTREAM_POSITION_KEY"
FROM "sample"."fs" AS "input";

CREATE OR REPLACE VIEW "sample"."v2" AS
SELECT STREAM "event_time" AS "SQLSTREAM_EGRESS_KAFKA_TIMESTAMP", *
FROM "sample"."v1";

CREATE OR REPLACE FOREIGN STREAM "sample"."out_fs"
(
"SQLSTREAM_EGRESS_KAFKA_TIMESTAMP" TIMESTAMP,
"event_time" BIGINT,
"Col1" VARCHAR(256),
"Col2" VARCHAR(256),
"Col3" VARCHAR(256),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'AVRO',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "AVRO_SCHEMA_LOCATION" '/home/sqlstream/siq_edr_streaming_output.avsc',
    "bootstrap.servers" '<insert-kafka-brokers>',
    "linger.ms" '10',
    "batch.size" '10000',
    "compression.type" 'gzip',
    "TOPIC" '<insert-kafka-brokers-topic-name>',
    "transaction.timeout.ms" '60000',
    "transactional.id" 'auto',
    "TRANSACTION_ROWTIME_LIMIT" '60000',
    "transaction.enable.preemptive.commit" 'true',
    "transaction.first.preemptive.commit.check.percentage" '70',
    "COMMIT_METADATA_COLUMN_NAME" 'SQLSTREAM_POSITION_KEY'
);

CREATE OR REPLACE PUMP "sample"."<sink-pump-name>" STOPPED AS
    INSERT INTO "sample"."out_fs"
    SELECT  STREAM "SQLSTREAM_EGRESS_KAFKA_TIMESTAMP","event_time","Col1","Col2","Col3","SQLSTREAM_POSITION_KEY"
    FROM ""sample"."v2" AS "input"; 

Stateful Pipeline : File-VFS source and Kafka Sink

The following diagram demonstrates the co-ordination between FILE-VFS source and Kafka sink that helps to achieve exactly once when the pipeline is stateful. When the record’s timestamp is being promoted to ROWTIME, followed by t-sort, assume that the filename also contains a timestamp. For the pipeline to achieve exactly once, SORT_FIELD must be TIME_IN_FILENAME, FILENAME_TIME_FORMAT must match the time format present in the file. Here, the watermark which is used to replay is a timestamp called STARTING_TIME.

Attached below is a sample SQL that reads from file-vfs source and writes to Kafka. When the pipeline starts, the Kafka UDF tries to fetch a watermark value from Kafka, serving as the starting time for the file-vfs source. The file-vfs source then reads files according to the watermark it has received.

Sample Code

CREATE OR REPLACE SCHEMA "sample";

-- Exactly once start time query
CREATE OR REPLACE VIEW "sample"."START_TIME_QUERY" AS
SELECT TIMESTAMP_TO_CHAR('yyyy-MM-dd HH:mm:ss.SSS', STARTING_TIME) AS STARTING_TIME 
FROM (VALUES(watermark_timestamp('<comma_separated_kafka_broker_hostname:kafka_broker_port>','<topic_name>','LOCALDB.sample.pipeline_out_fs_pump_Pump') - INTERVAL '1' MINUTE)) AS V(STARTING_TIME);

--  Input Data Schema
CREATE OR REPLACE FOREIGN STREAM "sample"."input_fs"
(
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256)
)
SERVER "FILE_VFS_SERVER"
OPTIONS (
    "PARSER" 'CSV',
    "FILE_TYPE" 'gzip',
    "CHARACTER_ENCODING" 'UTF-8',
    "QUOTE_CHARACTER" '"',
    "SEPARATOR" ',',
    "SKIP_HEADER" 'true',
    "FILE_LOCATION" '<FILE_PATH_STRING>',
    "ROW_SEPARATOR" u&'\000A',
    "FILENAME_PATTERN" '^.*_(?:http|tetheringhttp)_.*_(\d{14})_.*$',
    "SORT_FIELD" 'TIME_IN_FILENAME',
    "FILENAME_TIME_FORMAT" 'MMddyyyyHHmmss',
    "OPTIONS_QUERY" 'SELECT * FROM "sample"."START_TIME_QUERY"',
    "STARTING_TIME" '1970-01-01 00:00:00.0'
);

-- Tsort with interval of 1 minute
CREATE OR REPLACE VIEW "sample"."pipeline_step_1" AS
    SELECT STREAM "event_time" AS ROWTIME, *
    FROM "sample"."input_fs"
    ORDER BY "event_time" WITHIN INTERVAL '1' MINUTE;

-- Route to Kafka as JSON
CREATE OR REPLACE FOREIGN STREAM "sample"."output_fs"
(
    "event_time" TIMESTAMP,
    "Col1" VARCHAR(256),
    "Col2" VARCHAR(256),
    "Col3" VARCHAR(256)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'AVRO',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "AVRO_SCHEMA_LOCATION" '<avro_schema_path>',
    "bootstrap.servers" '<comma_separated_kafka_broker_hostname:kafka_broker_port>',
    "linger.ms" '10',
    "batch.size" '50000',
    "compression.type" 'lz4',
    "transactional.id" 'auto',
    "TOPIC" '<topic_name>',
    "TRANSACTION_ROWTIME_LIMIT" '1000'
);

-- Pump data into output fs pump
CREATE OR REPLACE PUMP "sample"."pipeline_out_fs_pump" STOPPED AS
    INSERT INTO "sample"."output_fs"
    SELECT STREAM * FROM "sample"."pipeline_step_1";

ALTER PUMP "sample".* START;