Output Formats for Writing

In s-Server, the destination for data writing and format for data are separate, meaning you can, for example, write JSON formatted data to the file system, a network socket, AMQP, Kafka, Amazon Kinesis, Snowflake, MongoDB, over HTTP, and to a WebSocket.

s-Server supports the following data formats:

You set data format through the FORMATTER parameter of the foreign stream for a data source. For example, the following AMQP foreign stream formats data as CSV.

CREATE OR REPLACE FOREIGN STREAM amqp_stream (
line VARCHAR(4096))
SERVER AMQPSERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672',
FORMATTER 'CSV'
);

To change formatting to XML, enter FORMATTER 'XML', and so on.

Formatting Data as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options section of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

  • A formatter of CSV
  • A character encoding type for the file.

Using SQL to Write CSV Data

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";
SET SCHEMA '"WebData"';

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("STR1" VARCHAR(32))
SERVER "FileWriterServer" OPTIONS
(directory 'path/to/myfile',
formatter 'CSV',
filename_date_format 'yyyy-MM-dd-HH:mm:ss',
filename_prefix 'test-',
filename_suffix '.csv',
character_encoding 'US-ASCII',
formatter_include_rowtime 'false',
file_rotation_size '20K');

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Formatting CSV Data

Option Definition
FORMATTER This needs to be CSV.
WRITE_HEADER Whether to write the column names into a header row. True or False.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
WRITE_HEADER Whether or not to write header information into the CSV data. Defaults to 'false."
ROW_SEPARATOR_CHAR_KEY Character separating rows in CSV data.
SEPARATOR Character separating values. Defaults to ","

Using the ECD Agent to Write CSV Data

You can use the ECD agent to CSV Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix
FORMATTER=CSV'
CHARACTER_ENCODING=UTF-8
ROW_SEPARATOR=\000A
SEPARATOR=,
WRITE_HEADER=false
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.csv
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.csv
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=stocks
TABLE_NAME=output_file
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

Formatting Data as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify. If no value for DATA_ELEMENTS or _ELEMENTS or _ATTRIBUTES is specified, then the column name is used as an element name (not an name). So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Using SQL to Write XML Data

The following code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP NOT NULL,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER,)
SERVER "FileWriterServer"
OPTIONS (
	 "FORMATTER" 'XML',
        "CHARACTER_ENCODING" 'UTF-8',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.xml',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.xml',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
);


To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Formatting XML Data

Option name Description
FORMATTER This needs to be XML.
DOC_ELEMENTS Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch".
ROW_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row".
DATA_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
DATA_ATTRIBUTES Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple.
_ATTRIBUTES Specifies a name of an attribute to add for a specific column's datum in each row/tuple.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Using the ECD Agent to Write XML Data

You can use the ECD agent to XML Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=XML
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.xml
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.xml
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Using SQL to Write JSON Data

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CCREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"
   ("id" DOUBLE,
   "reported_at" VARCHAR(4096),
   "shift_no" DOUBLE,
   "trip_no" DOUBLE,
   "route_variant_id" VARCHAR(4096),
   "waypoint_id" DOUBLE,
   "last_known_location_state" VARCHAR(4096)
    )
    SERVER "FileWriterServer"
    --note that this uses the server defined above
    OPTIONS
    (
     "DIRECTORY" '/tmp/json_test/',
     --file directory where JSON file will be written.
     "FORMATTER" 'JSON',
     "CHARACTER_ENCODING" 'UTF-8',
     "ORIGINAL_FILENAME" 'stocks-output.csv',
     "FILENAME_PREFIX" 'output-',
     "FILENAME_SUFFIX" '.json',
     "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
     "FILE_ROTATION_SIZE" '20K'
     "FORMATTER_INCLUDE_ROWTIME" 'true'
     );

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "JSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Using the ECD Agent to Write JSON Data

You can use the ECD agent to JSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=JSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.json
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.json
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as BSON

The ECDA adapter writes batches of data to BSON tuples. BSON, or Binary Javascript Object Notation extends the JSON model to provide additional data types and ordered fields. It is used primarily for MongoDB. See Writing to MongoDB for more details. To configure how the adapter writes tuples to BSON, you use foreign stream options. These options are listed below.

Using SQL to Write BSON Data

Here is an example of the SQL used to define a foreign stream for the BSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "BSON_OutputStream"
("id" DOUBLE,
"reported_at" VARCHAR(4096),
"shift_no" DOUBLE,
"trip_no" DOUBLE,
"route_variant_id" VARCHAR(4096),
"waypoint_id" DOUBLE,
"last_known_location_state" VARCHAR(4096)
)
SERVER "FileWriterServer"
--note that this uses the server defined above
OPTIONS
(
--file directory where BSON file will be written.
OPTIONS (
        "FORMATTER" 'BSON',
        "CHARACTER_ENCODING" 'UTF-8',
        "WRITE_HEADER" 'false',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.csv',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.bson',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
    );
);

To actually write to a file in /tmp/BSON_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "BSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Writing BSON Data Using the ECD Agent

You can use the ECD agent to BSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=BSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.bson
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.bson
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as Avro

The Avro formatter formats s-Server rows into Apache Avro data files. To format data as Avro, you need to set FORMATTER to AVRO and provide a location for the Avro schema file. You can do so in either foreign stream options or agent options. (When Avro data is written, its schema is always written with it.)

For more information on Avro schemas, see https://avro.apache.org/docs/current/#schemas.

Limitations of Avro formatter:

  • The AVRO formatter does not support nested records or arrays in schemas. That is, the Avro schema needs to be flat.
  • If the schema contains nested records/arrays, the results are unspecified.

Foreign Stream Options for Formatting Avro Data

Option Definition
FORMATTER This needs to be AVRO.
AVRO_SCHEMA_LOCATION Required option to specify the path for the schema file to be used for formatting the Avro payload.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

SQL Example:

   CREATE OR REPLACE FOREIGN STREAM kafka_sink
   (
     KAFKA_KEY VARBINARY(128),
     KAFKA_PARTITION INTEGER,
     "boolean_value" BOOLEAN,
     "char_value" CHAR(15),
     "date_value" DATE,
     "decimal_value" DECIMAL(8,2),
     "double_value" DOUBLE,
     "real_value" REAL,
     "time_value" TIME,
     "timestamp_value" TIMESTAMP,
     "tinyint_value" TINYINT,
     "varchar_value" VARCHAR(15)
   )
   SERVER Kafka10Server
   OPTIONS
   (
       FORMATTER 'AVRO',
       DATE_FORMAT 'yyyy-MM-dd HH:mm:ss.SSS',
       AVRO_SCHEMA_LOCATION '/home/my-location/schema_value_types.avsc',
       "metadata.brokers.list" 'localhost:9092',
       "TOPIC" 'test'
   );