Integrating the File System

Using s-Server, you can read from and write to the file system.

This topic contains the following subtopics:

Reading from the File System

You can read files from any accessible file system location using the Extensible Common Data Adapter (ECDA) or ECD Agent. Because the adapter and agent use inotify to discern changes in a file, both work only in conventional Linux directories. See the the note at the end of this topic.

To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Reading from the File System Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from the File System Using the ECD Agent below.

Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.

Reading from the File System Using SQL

To read from the file system, you need to create a foreign stream in SQL that references a prebuilt server object called FILE_SERVER. In the foreign stream's options, you configure how s-Server connects to the file system. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the SQLstream SQL Reference Guide.

You will also need to specify a parser for the foreign stream. Specifying "parser" as a foreign stream option tells s-Server that this foreign stream reads data. See Parsers for Reading in this guide for more details.

Streams, like most SQL objects (but unlike data wrappers and servers), must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named FileReaderStream.

CREATE OR REPLACE SCHEMA FileSource;
SET SCHEMA 'FileSource';

CREATE OR REPLACE FOREIGN STREAM FileSource.FileReaderStream
(
    "id" BIGINT,
    "reported_at" VARCHAR(32),
    "speed" INTEGER,
    "driver_no" BIGINT,
    "prescribed" BOOLEAN,
    "gps" VARCHAR(128),
    "highway" VARCHAR(8)
)
   --prebuilt server for file system
    SERVER "FILE_SERVER"
    OPTIONS (
        "PARSER" 'XML',
        "CHARACTER_ENCODING" 'UTF-8',
        "PARSER_XML_ROW_TAGS" '/Table1',
        "PARSER_XML_USE_ATTRIBUTES" 'false',
        "id_XPATH" '/Table1/id',
        "reported_at_XPATH" '/Table1/reported_at',
        "speed_XPATH" '/Table1/speed',
        "driver_no_XPATH" '/Table1/driver_no',
        "prescribed_XPATH" '/Table1/prescribed',
        "gps_XPATH" '/Table1/gps',
        "highway_XPATH" '/Table1/highway',
        "DIRECTORY" '/tmp',
        "FILENAME_PATTERN" 'buses\.log'
    );

Foreign Stream Options for Reading from the File System

Option Description
DIRECTORY Directory in which file resides from which you are reading.
FILENAME_PATTERN Input only. Java regular expression defining which files to read.

See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions.
FILE_COMPRESSION Inflate / de-compress incoming files using the supplied codec. Currently only 'gzip' is supported (case is not significant). Any other value, or no value, means the file is not de-compressed. Default 'none'.
FILE_COMPRESSION_BUFFER Size in bytes of decompression buffer. Default '65536'.
STATIC_FILES Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. The file reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to).
REPEAT Defaults to 0, meaning do not repeat. Can be a positive whole number, a negative number, or 'FOREVER'.

For positive numbers, after processing all files that match FILENAME_PATTERN, start over again. Repeat for the specified number.

If negative or 'FOREVER', keep reprocessing all files that match FILENAME_PATTERN forever. You must set STATIC_FILES to true in order to use this option.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select DIRECTORY, FILENAME_PATTERN from TEST.file_options. For more details see the topic Using the Options Query Property. Often used to supply STARTING_POSITION or STARTING_TIME (see below)
SORT_BY_TIMESTAMP 'true' or 'false'. If 'true' we search in the file for the first timestamp that matches STARTING_TIME. If false we use the STARTING_POSITION watermark.
STARTING_POSITION The watermark in the form 'filename:linenumber' from which to start reading. When used, this option is normally retrieved dynamically using an OPTIONS_QUERY. See Using Exactly Once with File as a source.
STARTING_TIME The starting time in the format defined by FILENAME_DATE_FORMAT. Only used if SORT_BY_TIMESTAMP is true. When set, extract the file timestamp from incoming file names using the FILENAME_PATTERN option. The file timestamp is assumed to be extracted as group 1 of the pattern. Skip the file if its timestamp is less than the watermark timestamp. See Using Exactly Once with File as a source.
FILENAME_DATE_FORMAT The format of the date that is embedded in incoming filenames. Only used if SORT_BY_TIMESTAMP is true, to extract the time from the filename for matching to STARTING_TIME.
WATERMARKS Name of the watermark table or a view to retrieve the WATERMARK for the source
POSTPROCESS_COMMAND Optional. Input: shell script present in $SQLSTREAM_HOME/userbin directory. The shell script receives the watermark from the sink(s) of the pipeline through the WATERMARKS query. The users can hence archive all the files before that WATERMARK.

Configuring the Postprocessing of Consumed Data:

SqlStream allows to purge the files from FILE_SERVER location after the file has been read completely. POSTPROCESS_COMMAND helps to receive the WATERMARK of the data consumed by the sink by specifying a user defined shell script in the $SQLSTREAM_HOME/userbin/ directory. This option is available for the FILE_SERVER plugin.

The shell script can be used to delete, archive or rename files upto the defined WATERMARK received by the the shell script. The method of archival is at end users discretion. This functionality can be invoked by specifying POSTPROCESS_COMMAND argument in OPTIONS of FILE_SERVER as:

Example

"POSTPROCESS_COMMAND" 'archive.sh'

Reading from the File System Using the ECD Agent

You can use the ECD agent to read data from remote locations. See Reading Data from Remote Locations for more details.

The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.

# Location, date format, prefix, suffix
PARSER=XML
CHARACTER_ENCODING=UTF-8
PARSER_XML_ROW_TAGS=/Table1
PARSER_XML_USE_ATTRIBUTES=false
id_XPATH=/Table1/id
reported_at_XPATH=/Table1/reported_at
speed_XPATH=/Table1/speed
driver_no_XPATH=/Table1/driver_no
prescribed_XPATH=/Table1/prescribed
gps_XPATH=/Table1/gps
highway_XPATH=/Table1/highway
DIRECTORY=/tmp
FILENAME_PATTERN=buses\.log
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=FileSource
TABLE_NAME=FileReaderStream
#columns
ROWTYPE=RECORDTYPE(BIGINT id, VARCHAR(32) reported_at, INTEGER speed, BIGINT driver_no, BOOLEAN prescribed, VARCHAR(128) gps, VARCHAR(8) highway)

Input Format

The code sample above uses CSV as a parser. To use other file options, see the Parser Types for Reading topic in this guide.

Note: The ECD file reader currently depends on inotify for efficiency. However inotify is not supported on certain file systems, most notably NFS.

If you use the ECD file reader to tail a file on an NFS file system, it will read up to the current high watermark in the file, but will NOT see any new data that is subsequently appended to the file.

To work around this, you will need to transfer files to a directory in the Linux file system on the same server as the ECDA agent or adapter using rsync --append (this implies --partial and --inplace).

Writing to the File System

You can write data out of s-Server to a file in the file system from a stream or table. The same principles described here apply to Writing to Snowflake and Writing to Hadoop. You can specify how files rotate according to time, size, or both, and you can specify how rotated files are named.

To write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Writing to the File System Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Writing to the File System Using the ECD Agent below.

Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Writing to the File System Using SQL

To write to the file system, you need to create a foreign stream in SQL that references a prebuilt server object called FILE_SERVER. In the foreign stream's options, you configure how s-Server connects to the file system. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.

Streams, like most SQL objects (but unlike data wrappers and servers), must be created within a schema. The following code first creates a schema called stocks in which to run the rest of the sample code below, then creates a simple native stream into which you can insert data, then creates a foreign stream named _outputfile.

CREATE OR REPLACE SCHEMA stocks;
SET SCHEMA 'stocks';

CREATE OR REPLACE STREAM ticker (
    order_time TIMESTAMP, --Time order was reported.
    amount INTEGER, --amount of order.
    ticker VARCHAR(100) --ticker.
  )
DESCRIPTION 'native stream ticker';

CREATE OR REPLACE FOREIGN STREAM output_file
(
  order_time TIMESTAMP, --these columns match columns in stream created above.
  amount INTEGER,
  ticker VARCHAR(100)
)
SERVER "FILE_SERVER"
OPTIONS (
        "FORMATTER" 'CSV',
        "CHARACTER_ENCODING" 'UTF-8',
        "ROW_SEPARATOR" u&'\000A',
        "SEPARATOR" ',',
        "WRITE_HEADER" 'false',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.csv',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.csv',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K',
        "FORMATTER_INCLUDE_ROWTIME" 'true'
    );

To test this configuration, you can use code along the following lines:

INSERT INTO stocks.ticker 
    (order_time, ticker, amount, recipient, subject, reply_to)
VALUES
(CAST('2019-03-30 03:02:00.000' as TIMESTAMP), 'ORCL', 20),
(CAST('2019-03-30 03:02:10.000' as TIMESTAMP), 'ORCL', 20),
(CAST('2019-03-30 03:03:00.000' as TIMESTAMP), 'IBM', 30),
(CAST('2019-03-30 03:04:00.000' as TIMESTAMP), 'ORCL', 15),
(CAST('2019-03-30 03:04:30.000' as TIMESTAMP), 'IBM', 40),
(CAST('2019-03-30 03:04:45.000' as TIMESTAMP), 'IBM', 10),
(CAST('2019-03-30 03:05:00.000' as TIMESTAMP), 'MSFT', 15),
(CAST('2019-03-30 05:46:40.000' as TIMESTAMP),NULL, 0)
;

In most cases, you will want to set up a pump that writes data to mail.outbox. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";

SET SCHEMA '"Pumps"';

CREATE OR REPLACE PUMP stocks_outputpump STOPPED AS

INSERT INTO stocks.output_file

SELECT STREAM * FROM stocks.ticker;

To start writing data, use the following code:

ALTER PUMP "Pumps".stocks_outputpump START;

Foreign Stream Options for Writing to File Systems

Option Description
DIRECTORY Directory to which you are writing. s-Server needs permission to write to this directory.
ORIGINAL_FILENAME Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.
FILENAME_PREFIX Prefix of the final output file name, such as "test-".
FILENAME_DATE_FORMAT Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ss. Uses java SimpleDateFormat. This specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.
FILE_ROTATION_WATERMARK_COLUMN This declares the name of a source column whose contents will be used to further distinguish files in the series.
FILENAME_SUFFIX Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"
FILE_ROTATION_TIME Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation". You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d) - for example '15s', '20m', '2h' or '3d'. These express intervals of time from 1970-01-01; files rotate once a row arrives with a ROWTIME that passes the specified interval. See Using FILE_ROTATION_TIME.
FILE_ROTATION_SIZE Determines when files rotate based on file size. s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement, for example '64k', '100m', '8g'. Defaults to 0 - which means "don't use file size to trigger file rotation". See Using FILE_ROTATION_SIZE.
FILE_ROTATION_RESPECT_ROWTIME 'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime". See Using FILE_ROTATION_RESPECT_ROWTIME.
ESCAPE_<column name> True or false; defaults to true. Causes strings to be escaped before being written.
POSTPROCESS_COMMAND The POSTPROCESS_COMMAND option lets you run a script after each output file is written. To use this option, enter the path to the script, along with any parameters needed by the script, substituting <input> for the name of the file. When the file is complete, the script will execute with parameters, and <input> will be replaced by the name of the file.

Example: '/home/sqlstream/scripts/send-to-destination.sh <input> sftp://some.sftp.server.com'

Rotating Files using FILE_ROTATION options

s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE and FILE_ROTATION_RESPECT_ROWTIME.

File rotation options work together in the following ways:

Rotate the file if (the FILE_ROTATION_TIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied) AND (the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied).

Using FILE_ROTATION_TIME

You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.

Examples:

  • FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on).
  • FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour.
  • FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM.

More technically, FILE_ROTATION_TIME works as follows:

  • Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.
  • Let $lastWrittenRowtime be the ROWTIME of the last row in the file.
  • Let $currentRowTime be the ROWTIME of the row about to be written.

s-Server rotates to the next file when:

  • integerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)

Using FILE_ROTATION_SIZE

You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation".

Note: You cannot use FILE_ROTATION_SIZE for writing to Hive tables.

Examples:

  • FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"
  • FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"
  • FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte"

When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. See FILE_ROTATION_RESPECT_ROWTIME below.

Using FILE_ROTATION_RESPECT_ROWTIME

Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte.

If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIME to false.

Configuring File Names

You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.

At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.

If you set ORIGINAL_FILENAME, then its value will serve as a temporary file name to use while data is being actively written.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix> depending on the options you specify.

If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form using the first rowtime in the file set.

Note: if you are writing from a table, and do not specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix>

For example, the following options:

filename_prefix 'test-',
filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                       --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

test-2020-05-23_19:44:00.csv

And the following options:

filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                      --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

2020-05-23_19:44:00.csv

Explanation of Elements

  • <prefix> is the string specified by the FILENAME_PREFIX option. If you did not specify FILENAME_PREFIX, then <prefix> is omitted from the filename.
  • <timestamp> is the ROWTIME of the last row written to the file. This element is always present if the source is a stream, but is omitted if the source is a table. s-Server formats <timestamp> as specified by FILENAME_DATE_FORMAT. If you do not specify FILENAME_DATE_FORMAT, then is formatted as yyyy-MM-dd_HH-mm-ss-SSS.
  • <watermark> is the value of a watermark column in the last row of the source. You use FILE_ROTATION_WATERMARK_COLUMN to specify a source watermark column corresponding to the last row of the file. If you do not specify a FILE_ROTATION_WATERMARK_COLUMN, then <watermark> is omitted from the filename.
  • <sequenceNumber> is added only if the you specify the FILE_ROTATION_RESPECT_ROWTIME option as false. In that case, files may have the same terminal <timestamp>, because s-Server can write rows with the same rowtime across multiple files. In these cases, file names in the series with the same terminal <timestamp> are distinguished from one another by a monotonically increasing 7 digit sequence number, starting at 0000001.
  • <suffix> is the string specified by the FILENAME_SUFFIX option. If you did not specify FILENAME_SUFFIX, then <suffix> is omitted from the filename.

Writing to the File System Using the ECD Agent

You can use the ECD agent to write files to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix
FORMATTER=CSV'
CHARACTER_ENCODING=UTF-8
ROW_SEPARATOR=\000A
SEPARATOR=,
WRITE_HEADER=false
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.csv
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.csv
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=stocks
TABLE_NAME=output_file
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.test.properties --io file