You can write data out of s-Server to a file in Hadoop/HDFS from a stream or table. The Hadoop ECDA adapter automatically uploads a file to a designated Hadoop file system location designated by the HDFS_OUTPUT_DIR property. s-Server formats files for HDFS according to the same principles as it does for writing to the file system. See Writing to the File System for more details. You can specify how files rotate according to time, size, or both, and you can specify how rotated files are named.
To write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Writing to HDFS Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Writing to HDFS Using the ECD Agent below.
Files can be formatted as CSV, Avro, XML, JSON, or BSON. These files do not require further formatting in order to be integrated with Hadoop. (Once in the Hadoop file system, these files will be accessed by second-level systems such as Hive. See https://cwiki.apache.org/confluence/display/Hive/Home for more details on Hive.)
For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.
To write data to HDFS, you need to create a foreign stream that references the prebuilt server object HDFS_SERVER.
Required parameters are:
All foreign streams must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named hadoopschema.hdfssink.
CREATE OR REPLACE SCHEMA hadoop_schema; SET SCHEMA 'hadoop_schema'; CREATE or REPLACE FOREIGN STREAM hadoop_schema.hdfs_sink ( "id" INTEGER, "shift_no" DOUBLE, "reported_at" TIMESTAMP NOT NULL, "trip_no" VARCHAR(10), "latitude" DOUBLE, "longitude" DOUBLE, "speed" INTEGER, "baring" INTEGER, "active" BOOLEAN) SERVER HDFS_SERVER OPTIONS ( "HADOOP_USER_NAME" 'root', "HDFS_OUTPUT_DIR" 'hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/', "FORMATTER" 'CSV', "CHARACTER_ENCODING" 'UTF-8', "QUOTE_CHARACTER" '', "SEPARATOR" ',', "WRITE_HEADER" 'false', "DIRECTORY" '/tmp', "ORIGINAL_FILENAME" 'bus-output.csv', "FILENAME_PREFIX" 'output-', "FILENAME_SUFFIX" '.csv', "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss', "FILE_ROTATION_SIZE" '20K' "FORMATTER_INCLUDE_ROWTIME" 'true');
To begin writing data to Hadoop with the above code, you INSERT into hadoop_schema.hdfs_sink. When hadoop_schema.hdfs_sink receives rows, s-Server writes data to the Hadoop server you have configured in the foreign stream options.
In most cases, you will want to set up a pump that writes data to _hadoopschema.hdfs_sink. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
You do so with code along the following lines:
CREATE OR REPLACE SCHEMA Pumps; SET SCHEMA 'Pumps'; CREATE OR REPLACE PUMP writerPump STOPPED AS --We recommend creating pumps as stopped --then using ALTER PUMP "Pumps"."writerPump" START to start it INSERT INTO hadoop_schema.hdfs_sink SELECT STREAM * FROM "MyStream"; --where "MyStream" is a currently existing stream
To start writing data, use the following code:
ALTER PUMP Pumps.writerPump START;
You configure adapter options through foreign streams/tables. You configure agent options through the ECD agent property file.
The following options are specific to HDFS:
|hadoop_user_name||User name for HDFS.|
|hdfs_output_dir||Address for name node of HDFS, such as hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/|
The following options apply to all file-based sinks:
|DIRECTORY||Directory to which you are writing. s-Server needs permission to write to this directory.|
|ORIGINAL_FILENAME||Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.|
|FILENAME_PREFIX||Prefix of the final output file name, such as "test-".|
|FILENAME_DATE_FORMAT||Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ssUses java SimpleDateFormatThis specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.|
|FILE_ROTATION_WATERMARK_COLUMN||This declares the name of a source column whose contents will be used to further distinguish files in the series.|
|FILENAME_SUFFIX||Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"|
|FILE_ROTATION_TIME||Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation." You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval. Examples:FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on).FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour.FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM. More technically, FILE_ROTATION_TIME works as follows:Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.Let $lastWrittenRowtime be the ROWTIME of the last row in the file.Let $currentRowTime be the ROWTIME of the row about to be written. s-Server rotates to the next file whenintegerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)|
|FILE_ROTATION_SIZE||Determines when files rotate based on file size. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation." Examples:FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte" s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE.|
|FILE_ROTATION_RESPECT_ROWTIME||'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime." Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIMEto false.|
||True or false; defaults to true. Causes strings to be escaped before being written.|
|POSTPROCESS_CMD||The POSTPROCESS_CMD option lets you run a script after the file is written. To use this option, enter the path to the script, along with parameters, substituting <input> for the name of the file. When the file is written, the script will execute with parameters, and <input> will be replaced by the name of the file.
Example: 'scripts/runStandaloneSystemML.sh scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv',
s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE, and FILE_ROTATION_RESPECT_ROWTIME.
You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.
You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation."
When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. To do so, you set FILE_ROTATION_RESPECT_ROWTIME to true (this is the default behavior).
You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.
At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.
If you set ORIGINAL_FILENAME, then the value of ORIGINAL_FILENAME will serve as a temporary file name to use while data is being actively written.
When this file is rotated out, rotated file are named as follows:
If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form
Note: if you are writing from a table, and do note specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.
When this file is rotated out, rotated file are named as follows:
For example, the following options:
filename_prefix 'test-', filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
And the following options:
filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
SEPARATOR=^A DIRECTORY=/home/sqlstream/customers/sample FILENAME_PREFIX=test- FILENAME_DATE_FORMAT=yyyy-MM-dd-HH-mm-ss MAX_BYTES_PER_FILE=25000000 FILENAME_SUFFIX=.csv CHARACTER_ENCODING=US-ASCII ROWTYPE=RecordType(INTEGER COL1, INTEGER COL2, VARCHAR(256) COL3, VARCHAR(256) COL4) SCHEMA_NAME=HADOOPWRITERTEST TABLE_NAME=HADOOPWRITERSTREAM HDFS_OUTPUT_DIR=hdfs://storm-s3.disruptivetech.com:8020/user/sample/NOCLOBBER=false
To invoke the agent:
From the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line
$ ./commondataagent.sh --output --props sample.test.properties --io hdfs