Output Formats for Writing

In s-Server, the destination for data writing and format for data are separate, meaning you can, for example, write JSON formatted data to the file system, a network socket, AMQP, Kafka, Amazon Kinesis, Snowflake, MongoDB, over HTTP, and to a WebSocket.

s-Server supports the following data formats:

You set data format through the FORMATTER parameter of the foreign stream for a data source. For example, the following AMQP foreign stream formats data as CSV.

CREATE OR REPLACE FOREIGN STREAM amqp_stream (
line VARCHAR(4096))
SERVER AMQPSERVER
OPTIONS (DESTINATION 'amq.topic',
CONNECTION_URL 'amqp://guest:guest@clientid/?brokerlist=''tcp://localhost:5672',
FORMATTER 'CSV'
);

To change formatting to XML, enter FORMATTER 'XML', and so on.

Formatting Data as CSV

When writing CSV files, the Extensible Common Data Adapter converts rows into character-separated output based on options supplied through the options section of the CREATE FOREIGN STREAM statement. It converts streaming tuples into a character-separated file.

For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

To write a CSV file, you need to give the Extensible Common Data Adapter the following information:

  • A formatter of CSV
  • A character encoding type for the file.

Using SQL to Write CSV Data

To write to CSV files, you need to set up a server object which references one of the I/O systems.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

Finally, you create a foreign stream which references the server object. Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called "WebData," and creates a foreign stream called "FileWriterStream." To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA "WebData";
SET SCHEMA '"WebData"';

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("STR1" VARCHAR(32))
SERVER "FileWriterServer" OPTIONS
(directory 'path/to/myfile',
formatter 'CSV',
filename_date_format 'yyyy-MM-dd-HH:mm:ss',
filename_prefix 'test-',
filename_suffix '.csv',
character_encoding 'US-ASCII',
formatter_include_rowtime 'false',
file_rotation_size '20K');

To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Formatting CSV Data

Option Definition
FORMATTER This needs to be CSV.
WRITE_HEADER Whether to write the column names into a header row. True or False.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
WRITE_HEADER Whether or not to write header information into the CSV data. Defaults to 'false."
ROW_SEPARATOR_CHAR_KEY Character separating rows in CSV data.
SEPARATOR Character separating values. Defaults to ","

Using the ECD Agent to Write CSV Data

You can use the ECD agent to CSV Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Location, date format, prefix, suffix
FORMATTER=CSV'
CHARACTER_ENCODING=UTF-8
ROW_SEPARATOR=\000A
SEPARATOR=,
WRITE_HEADER=false
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.csv
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.csv
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true
# Schema, name, and parameter signature of origin stream
SCHEMA_NAME=stocks
TABLE_NAME=output_file
#columns
ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)

Formatting Data as XML

For XML files, the Extensible Common Data Adapter takes batches of rows and maps them to XML elements, depending on the options you specify. If no value for DATA_ELEMENTS or _ELEMENTS or _ATTRIBUTES is specified, then the column name is used as an element name (not an name). So a column named foo would be in an XML element named /batch/row/foo if no values were specified.

Using SQL to Write XML Data

The following code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP NOT NULL,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER,)
SERVER "FileWriterServer"
OPTIONS (
	 "FORMATTER" 'XML',
        "CHARACTER_ENCODING" 'UTF-8',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.xml',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.xml',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
);


To actually write to a file in path/to/myfile, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "FileWriterStream"
SELECT STREAM "MyStream";
--where "MyStream" is a currently existing stream

Foreign Stream Options for Formatting XML Data

Option Name Description
FORMATTER This needs to be XML.
DOC_ELEMENTS Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch".
ROW_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row".
DATA_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
DATA_ATTRIBUTES Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML.
_ELEMENTS Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple.
_ATTRIBUTES Specifies a name of an attribute to add for a specific column's datum in each row/tuple.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Using the ECD Agent to Write XML Data

You can use the ECD agent to XML Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=XML
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.xml
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.xml
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as JSON

The ECDA adapter writes batches of data to JSON tuples. To configure how the adapter writes such tuples, you use foreign stream options. These options are listed below.

Using SQL to Write JSON Data

Here is an example of the SQL used to define a foreign stream for the JSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CCREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "JSON_OutputStream"
   ("id" DOUBLE,
   "reported_at" VARCHAR(4096),
   "shift_no" DOUBLE,
   "trip_no" DOUBLE,
   "route_variant_id" VARCHAR(4096),
   "waypoint_id" DOUBLE,
   "last_known_location_state" VARCHAR(4096)
    )
    SERVER "FileWriterServer"
    --note that this uses the server defined above
    OPTIONS
    (
     "DIRECTORY" '/tmp/json_test/',
     --file directory where JSON file will be written.
     "FORMATTER" 'JSON',
     "CHARACTER_ENCODING" 'UTF-8',
     "ORIGINAL_FILENAME" 'stocks-output.csv',
     "FILENAME_PREFIX" 'output-',
     "FILENAME_SUFFIX" '.json',
     "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
     "FILE_ROTATION_SIZE" '20K'
     "FORMATTER_INCLUDE_ROWTIME" 'true'
     );

To actually write to a file in /tmp/json_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "JSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Using the ECD Agent to Write JSON Data

You can use the ECD agent to JSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=JSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.json
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.json
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as BSON

The ECDA adapter writes batches of data to BSON tuples. BSON, or Binary Javascript Object Notation extends the JSON model to provide additional data types and ordered fields. It is used primarily for MongoDB. See Writing to MongoDB for more details. To configure how the adapter writes tuples to BSON, you use foreign stream options. These options are listed below.

Using SQL to Write BSON Data

Here is an example of the SQL used to define a foreign stream for the BSON ECDA adapter. This code uses the file system for output. To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SERVER "FileWriterServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "BSON_OutputStream"
("id" DOUBLE,
"reported_at" VARCHAR(4096),
"shift_no" DOUBLE,
"trip_no" DOUBLE,
"route_variant_id" VARCHAR(4096),
"waypoint_id" DOUBLE,
"last_known_location_state" VARCHAR(4096)
)
SERVER "FileWriterServer"
--note that this uses the server defined above
OPTIONS
(
--file directory where BSON file will be written.
OPTIONS (
        "FORMATTER" 'BSON',
        "CHARACTER_ENCODING" 'UTF-8',
        "WRITE_HEADER" 'false',
        "DIRECTORY" '/home/guavus/output',
        "ORIGINAL_FILENAME" 'stocks-output.csv',
        "FILENAME_PREFIX" 'output-',
        "FILENAME_SUFFIX" '.bson',
        "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
        "FILE_ROTATION_SIZE" '20K'
        "FORMATTER_INCLUDE_ROWTIME" 'true'
    );
);

To actually write to a file in /tmp/BSON_test/, you need to write a pump containing an INSERT statement along the following lines:

CREATE OR REPLACE PUMP "writerPump" STARTED AS
INSERT INTO "BSON_OutputStream" (
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Output

[{"id":"5.0115809712E10",
"reported_at":"2014-07-23 20:52:04.527000000",
"shift_no":"NULL",
"trip_no":"653.0",
"route_variant_id":"L38 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"5.0115854098E10",
"reported_at":"2014-07-23 20:52:05.443000000",
"shift_no":"NULL",
"trip_no":"NULL",
"route_variant_id":"310 7",
"waypoint_id":"NULL",
"last_known_location_state":"NULL"},
{"id":"3.46866848031E11",
"reported_at":"2014-07-23 20:52:07.713000000",
"shift_no":"1016.0",
"trip_no":"NULL",
"route_variant_id":"806 160",
"waypoint_id":"1.5588646E7",
"last_known_location_state":"NULL"}]

Foreign Stream Options for Formatting JSON Data

Option Definition
FORMATTER This needs to be JSON.
CHARACTER_ENCODING Character set for data.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

Writing BSON Data Using the ECD Agent

You can use the ECD agent to BSON Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=BSON
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.bson
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.bson
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

Formatting Data as Avro

The Avro formatter formats s-Server rows into Apache Avro data files. To format data as Avro, you need to set FORMATTER to AVRO and provide a location for the Avro schema file. You can do so in either foreign stream options or agent options. (When Avro data is written, its schema is always written with it.)

For more information on Avro schemas, see https://avro.apache.org/docs/current/#schemas.

Limitations of Avro formatter:

  • The AVRO formatter does not support nested records or arrays in schemas. That is, the Avro schema needs to be flat.
  • If the schema contains nested records/arrays, the results are unspecified.

Foreign Stream Options for Formatting Avro Data

Option Definition
FORMATTER This needs to be AVRO.
AVRO_SCHEMA_LOCATION Required option to specify the path for the schema file to be used for formatting the Avro payload.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.

SQL Example

   CREATE OR REPLACE FOREIGN STREAM kafka_sink
   (
     KAFKA_KEY VARBINARY(128),
     KAFKA_PARTITION INTEGER,
     "boolean_value" BOOLEAN,
     "char_value" CHAR(15),
     "date_value" DATE,
     "decimal_value" DECIMAL(8,2),
     "double_value" DOUBLE,
     "real_value" REAL,
     "time_value" TIME,
     "timestamp_value" TIMESTAMP,
     "tinyint_value" TINYINT,
     "varchar_value" VARCHAR(15)
   )
   SERVER Kafka10Server
   OPTIONS
   (
       FORMATTER 'AVRO',
       DATE_FORMAT 'yyyy-MM-dd HH:mm:ss.SSS',
       AVRO_SCHEMA_LOCATION '/home/my-location/schema_value_types.avsc',
       "metadata.brokers.list" 'localhost:9092',
       "TOPIC" 'test'
   );

Formatting Data as ORC

(new in s-Server version 6.0.1)

The ORC formatter formats s-Server rows into the Optimized Row Columnar (ORC) file format.

ORC data can only be written to the file system as flat files, HDFS as flat files, or Hive tables as directory trees.

If you write ORC data into Hive tables, ORC-formatted data will be written as a series of directory trees that each correspond to the directory layout of the target Hive table. Otherwise, ORC data will be written to a flat file. See Writing to Hive Tables for more details.

For more information on ORC, see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ORCFiles

The ORC formatter has the following limitations:

  • The ORC formatter does not support column encryption or Hive indexes.
  • The ORC formatter also does not support HA watermarks. Watermarks (for HA recovery) are well-defined when a stream is used as the source for an INSERT into an ORC-formatted sink. In that case, the ROWTIME of the last row INSERTed into a file is recorded in the file name (or directory name if you are writing ORC formatted data to Hive tables). In all other cases, you need to include watermarks in the target rows or build some other mechanism for tracking recovery points.

Foreign Stream Options for Formatting ORC

Option Default Value Required Description
FORMATTER none yes This needs to be ORC.
"orc.version" none yes Currently supported values are 'V_0_11' and 'V_0_12'.
"orc.block.padding" true no Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space.
"orc.block.size" 268,435,456 no Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size.
"orc.direct.encoding.columns" null no Set the comma-separated list of case-sensitive names of columns that should be direct encoded.
"orc.batch.size" 10,000 no Number of rows to batch together before issuing an ORC write.
"orc.user.metadata_XXX" none no Where XXX is a string. XXX is the name of an application-specific metadata key. Therefore, it should be clear that it lives in a namespace unique to the application. There can be many of these options. The value of each option is a SQL BINARY literal with the leading X' stripped off.

The options below apply only to cases when you are writing to the file system or HDFS. You cannot use these options when writing ORC data to a Hive table.

Option Default Value Description
"orc.bloom.filter.columns" null Column-separated list of bloom filter columns. These are names from the foreign table/stream's column signature.
"orc.bloom.filter.fpp" 0.05 Bloom filter false-positive probability.
"orc.compress" ZLIB Compression algorithm. See Appendix A for legal values.
"orc.compress.size" 262,144 Compression chunk size.
"orc.row.index.stride" 10,000 Number of rows between index entries.
"orc.stripe.size" 67,108,864 Size of in-memory write buffer.

SQL Example

CREATE or REPLACE FOREIGN STREAM busStats
(
  "id" BIGINT,
  "reported_at" TIMESTAMP,
  "speed" INTEGER,
  "driver_no" BIGINT,
  "gps" VARCHAR(128),
  "highway" VARCHAR(8)
  "event_year" INTEGER,
  "event_month" INTEGER
)
SERVER HDFS_SERVER
OPTIONS
(
  FORMATTER 'ORC',
  DIRECTORY '/tmp/busStats',
  CONFIG_PATH '/home/hive/kerberos/core-default.xml',
  AUTH_METHOD 'kerberos',
  AUTH_USERNAME 'sqlstream_guavus@my.GGN',
  AUTH_KEYTAB '/home/hive/kerberos/sqlstream_guavus.keytab',
  AUTH_METASTORE_PRINCIPAL 'hive/_HOST@my.GGN'
  HDFS_OUTPUT_DIR 'hdfs:///hiveInstallation/data/svc_sqlstream_guavus/busStats',
  FILENAME_SUFFIX '.orc',
  FILE_ROTATION_TIME '1h',
  --the following options are specific to writing to a Hive table
  HIVE_SCHEMA_NAME 'trafficApp',
  HIVE_TABLE_NAME 'hive_bus_stats',
  "orc.version" 'V_0_12',
  "orc.user.metadata_com.acme.watermark" 'AC3E'
  );