Output Formats for Writing

In s-Server, the destination for data writing and format for data are separate, meaning you can, for example, write JSON formatted data to the file system, a network socket, AMQP, Kafka, Amazon Kinesis, Snowflake, MongoDB, over HTTP, and to a WebSocket.

s-Server supports the following data formats:

You set data format through the FORMATTER parameter of the foreign stream for a data source. For example, the following AMQP foreign stream formats data as CSV.


CREATE OR REPLACE FOREIGN STREAM amqp_stream (
line VARCHAR(4096))
SERVER AMQPSERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672```,
FORMATTER 'CSV');

To change formatting to XML, enter FORMATTER 'XML', and so on.

Formatting Data as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options section of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

  • A destination directory for the file.
  • A formatter of CSV
  • A character encoding type for the file.
  • How often the file will rotate in terms of bytes or time.

Using SQL to Write CSV Data

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";
SET SCHEMA '"WebData"';

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("STR1" VARCHAR(32))
SERVER "FileWriterServer" OPTIONS
(directory 'path/to/myfile',
formatter 'CSV',
filename_date_format 'yyyy-MM-dd-HH:mm:ss',
filename_prefix 'test-',
filename_suffix '.csv',
character_encoding 'US-ASCII',
formatter_include_rowtime 'false',
file_rotation_size '20K')

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Writing CSV Data

Option Definition
FORMATTER This needs to be CSV.
WRITE_HEADER Whether to write the column names into a header row. True or False.
CUSTOM_FORMATTER_ Allows overriding of individual column's formatting. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeFormatterwhere T is the Java type that matches the SQL type of the column, such asjava.lang.String to VARCHAR/CHAR and java.lang.Double to DOUBLE

Using the ECD Agent to Write CSV Data

You can use the ECD agent to CSV Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix
FORMATTER=CSV'
CHARACTER_ENCODING=UTF-8
ROW_SEPARATOR=\000A
SEPARATOR=,
WRITE_HEADER=false
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.csv
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.csv
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=stocks
TABLE_NAME=output_file
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

Formatting Data as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify. If no value for DATA_ELEMENTS or _ELEMENTS or _ATTRIBUTES is specified, then the column name is used as an element name (not an name). So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Using SQL to Write XML Data

The following code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP NOT NULL,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER,)
SERVER "FileWriterServer"
OPTIONS (
	 "FORMATTER" 'XML',
        "CHARACTER_ENCODING" 'UTF-8',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.xml',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.xml',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
);


To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Writing XML

Option name Description
FORMATTER This needs to be XML.
DOC_ELEMENTS Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch".
ROW_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row".
DATA_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
DATA_ATTRIBUTES Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple.
_ATTRIBUTES Specifies a name of an attribute to add for a specific column's datum in each row/tuple.

Using the ECD Agent to Write XML Data

You can use the ECD agent to XML Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=XML
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.xml
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.xml
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Using SQL to Write JSON Data

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CCREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"
   ("id" DOUBLE,
   "reported_at" VARCHAR(4096),
   "shift_no" DOUBLE,
   "trip_no" DOUBLE,
   "route_variant_id" VARCHAR(4096),
   "waypoint_id" DOUBLE,
   "last_known_location_state" VARCHAR(4096)
    )
    SERVER "FileWriterServer"
    --note that this uses the server defined above
    OPTIONS
    (
     "DIRECTORY" '/tmp/json_test/',
     --file directory where JSON file will be written.
     "FORMATTER" 'JSON',
     "CHARACTER_ENCODING" 'UTF-8',
     "ORIGINAL_FILENAME" 'stocks-output.csv',
     "FILENAME_PREFIX" 'output-',
     "FILENAME_SUFFIX" '.json',
     "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
     "FILE_ROTATION_SIZE" '20K'
     "FORMATTER_INCLUDE_ROWTIME" 'true'
     );

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "JSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Writing JSON or BSON

Option Definition
FORMATTER This needs to be JSON or BSON.
CUSTOM_FORMATTER_<column_name> Allows overriding of individual column type. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeFormatter where T is the Java type that matches the SQL type of the column, such as java.lang.String to VARCHAR/CHAR andjava.lang.Double to DOUBLE

Using the ECD Agent to Write JSON Data

You can use the ECD agent to JSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=JSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.json
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.json
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as BSON

The ECDA adapter writes batches of data to BSON tuples. BSON, or Binary Javascript Object Notation extends the JSON model to provide additional data types and ordered fields. It is used primarily for MongoDB. See Writing to MongoDB for more details. To configure how the adapter writes tuples to BSON, you use foreign stream options. These options are listed below.

Using SQL to Write BSON Data

Here is an example of the SQL used to define a foreign stream for the BSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "BSON_OutputStream"
("id" DOUBLE,
"reported_at" VARCHAR(4096),
"shift_no" DOUBLE,
"trip_no" DOUBLE,
"route_variant_id" VARCHAR(4096),
"waypoint_id" DOUBLE,
"last_known_location_state" VARCHAR(4096)
)
SERVER "FileWriterServer"
--note that this uses the server defined above
OPTIONS
(
--file directory where BSON file will be written.
OPTIONS (
        "FORMATTER" 'BSON',
        "CHARACTER_ENCODING" 'UTF-8',
        "WRITE_HEADER" 'false',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.csv',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.bson',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
    );
);

To actually write to a file in /tmp/BSON_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "BSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Writing JSON or BSON

Option Definition
FORMATTER This needs to be JSON or BSON.
CUSTOM_FORMATTER_<column_name> Allows overriding of individual column type. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeFormatter where T is the Java type that matches the SQL type of the column, such as java.lang.String to VARCHAR/CHAR andjava.lang.Double to DOUBLE

Writing BSON Data Using the ECD Agent

You can use the ECD agent to BSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=BSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.bson
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.bson
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true