Integrating Hadoop

You can write data out of s-Server to a file in Hadoop/HDFS or Azure/ADLS from a stream or table. The Hadoop ECDA adapter automatically uploads a file to a designated Hadoop file system location designated by the HDFS_OUTPUT_DIR property. s-Server implements an HDFS client to connect to an HDFS server.

s-Server formats files for HDFS according to the same principles as it does for writing to the file system. See Writing to the File System for more details. You can specify how files rotate according to time, size, or both, and you can specify how rotated files are named.

To write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Writing to HDFS Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Writing to HDFS Using the ECD Agent below.

Files can be formatted as CSV, Avro, XML, JSON, or BSON. These files do not require further formatting in order to be integrated with Hadoop/ADLS. (Once in the Hadoop file system, these files will be accessed by second-level systems such as Hive. See https://cwiki.apache.org/confluence/display/Hive/Home for more details on Hive.)

For ADLS, support is only for ADLS Gen2 - sending data to the ADLS Gen2 location designated by the HDFS_OUTPUT_DIR property using this pattern: abfs://<container_name>@<account_name>/path/to/destination .

This topic includes the following sections:

Writing to HDFS Using SQL

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

To write data to HDFS, you need to create a foreign stream that references the prebuilt server object HDFS_SERVER.

Required parameters are:

  • AUTH_USERNAME
  • HDFS_OUTPUT_DIR
  • FORMATTER
  • FILENAME_PREFIX
  • FILENAME_SUFFIX
  • Either FILE_ROTATION_TIME or FILE_ROTATION_SIZE

All foreign streams must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named hadoopschema.hdfssink.

CREATE OR REPLACE SCHEMA hadoop_schema;
SET SCHEMA 'hadoop_schema';

CREATE or REPLACE FOREIGN STREAM hadoop_schema.hdfs_sink (
    "id" INTEGER,
    "shift_no" DOUBLE,
    "reported_at" TIMESTAMP NOT NULL,
    "trip_no" VARCHAR(10),
    "latitude" DOUBLE,
    "longitude" DOUBLE,
    "speed" INTEGER,
    "baring" INTEGER,
    "active" BOOLEAN)
    --built-in server object for HDFS
    SERVER HDFS_SERVER
    OPTIONS (
    "CONFIG_PATH" '/home/mydirectory/default.xml',
    "AUTH_METHOD" 'kerberos',
    "AUTH_USERNAME" 'nn/hadoop.docker.com@EXAMPLE.COM',
    "AUTH_KEYTAB" '/tmp/nn.service.keytab',
    "HDFS_OUTPUT_DIR" 'hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/',
    "FORMATTER" 'CSV',
    "QUOTE_CHARACTER" '',
    "SEPARATOR" ',',
    "WRITE_HEADER" 'false',
    "DIRECTORY" '/tmp',
    "ORIGINAL_FILENAME" 'bus-output.csv',
    "FILENAME_PREFIX" 'output-',
    "FILENAME_SUFFIX" '.csv',
    "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
    "FILE_ROTATION_SIZE" '20K'
    "FORMATTER_INCLUDE_ROWTIME" 'true');

To begin writing data to Hadoop with the above code, you INSERT into hadoop_schema.hdfs_sink. When hadoop_schema.hdfs_sink receives rows, s-Server writes data to the Hadoop server you have configured in the foreign stream options.

In most cases, you will want to set up a pump that writes data to _hadoopschema.hdfs_sink. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

You do so with code along the following lines:

CREATE OR REPLACE SCHEMA Pumps;
SET SCHEMA 'Pumps';

CREATE OR REPLACE PUMP writerPump STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO hadoop_schema.hdfs_sink
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP Pumps.writerPump START;

Writing to ADLS using SQL

To write data to ADLS, you need to create a foreign stream that references the prebuilt server object HDFS_SERVER.

Required parameters are:

  • ABFS_AUTH_METHOD
  • ABFS_CONFIG_FILE
  • HDFS_OUTPUT_DIR
  • FORMATTER
  • FILENAME_PREFIX
  • FILENAME_SUFFIX
  • Either FILE_ROTATION_TIME or FILE_ROTATION_SIZE

Below is the sample SQL code:

CREATE OR REPLACE SCHEMA hadoop_schema;
SET SCHEMA 'hadoop_schema';

CREATE or REPLACE FOREIGN STREAM hadoop_schema.hdfs_sink (
"Col1" VARCHAR(30),
"Col2" INTEGER,
"Col3" BIGINT,
)
SERVER HDFS_SERVER
OPTIONS (
"CONFIG_PATH" '/home/mydirectory/default.xml',
"ABFS_CONFIG_FILE" '/home/sqlstream/config.yml',
"ABFS_AUTH_METHOD" 'SharedKey',
"HDFS_OUTPUT_DIR" 'abfs://<container_name>@<account_name>/path/to/destination',
"FORMATTER" 'CSV',
"QUOTE_CHARACTER" '',
"SEPARATOR" ',',
"WRITE_HEADER" 'false',
"DIRECTORY" '/tmp',
"ORIGINAL_FILENAME" 'output-bus.csv',
"FILENAME_PREFIX" 'output-',
"FILENAME_SUFFIX" '.csv',
"REFRESH_PROPS" 'true',
"FILE_ROTATION_SIZE" '100m',
"FORMATTER_INCLUDE_ROWTIME" 'true'
);

Rest of the pipeline is similar to the one provided for Hadoop.

ABFS Config file

Config file parameters should match exactly as they are given in the format above. Currently SQLstream supports:

Either one or both of these can be provided in the config file; the plugin will use the mechanism which you have specified in the option ABFS_AUTH_METHOD.

Shared Key

Config file format
SharedKey:
  accountName: <ADLS storage account name>
  SharedKey: <access key>
  fileSystemName: <container name>
SQL Example for Shared Key
CREATE OR REPLACE SCHEMA hadoop_schema;
SET SCHEMA 'hadoop_schema';

CREATE or REPLACE FOREIGN STREAM hadoop_schema.hdfs_sink (
"Col1" VARCHAR(30),
"Col2" INTEGER,
"Col3" BIGINT,
)
SERVER HDFS_SERVER
OPTIONS (
"CONFIG_PATH" '/home/mydirectory/default.xml',
"ABFS_CONFIG_FILE" '/home/sqlstream/config.yml',
"ABFS_AUTH_METHOD" 'SharedKey',   --case sensitive
"HDFS_OUTPUT_DIR" 'abfs://<container_name>@<account_name>.dfs.core.windows.net/path/to/destination',
"FORMATTER" 'CSV',
"QUOTE_CHARACTER" '',
"SEPARATOR" ',',
"WRITE_HEADER" 'false',
"DIRECTORY" '/tmp',
"ORIGINAL_FILENAME" 'output-bus.csv',
"FILENAME_PREFIX" 'output-',
"FILENAME_SUFFIX" '.csv',
"REFRESH_PROPS" 'true',
"FILE_ROTATION_SIZE" '100m',
"FORMATTER_INCLUDE_ROWTIME" 'true');
  • Account name refers to the storage account name. Access key is the shared key that can be copied from the azure portal by clicking on access key in ‘security+networking’
  • fileSystemName is the name of container that you created in your ADLS storage account. To know more about access keys authentication click here.

OAuth

To write data to ADLS using OAuth 2.0 your app registration should have contributor access on Azure AD.

Config file format
OAuth:
  providerType: org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
  clientEndpoint: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
  clientId: <CLIENT_ID>
  clientSecret: <CLIENT_SECRET>
SQL Example for OAuth
CREATE OR REPLACE SCHEMA hadoop_schema;
SET SCHEMA 'hadoop_schema';

CREATE or REPLACE FOREIGN STREAM hadoop_schema.hdfs_sink (
"Col1" VARCHAR(30),
"Col2" INTEGER,
"Col3" BIGINT,
)
SERVER HDFS_SERVER
OPTIONS (
"CONFIG_PATH" '/home/mydirectory/default.xml',
"ABFS_CONFIG_FILE" '/home/sqlstream/config.yml',
"ABFS_AUTH_METHOD" 'OAuth', --case sensitive
"HDFS_OUTPUT_DIR" 'abfs://<container_name>@<account_name>.dfs.core.windows.net/path/to/destination',
"FORMATTER" 'CSV',
"QUOTE_CHARACTER" '',
"SEPARATOR" ',',
"WRITE_HEADER" 'false',
"DIRECTORY" '/tmp',
"ORIGINAL_FILENAME" 'output-bus.csv',
"FILENAME_PREFIX" 'output-',
"FILENAME_SUFFIX" '.csv',
"REFRESH_PROPS" 'true',
"FILE_ROTATION_SIZE" '100m',
"FORMATTER_INCLUDE_ROWTIME" 'true');
  • ProviderType is the OAuth mechanism you are using: Client Credentials in this case; it will remain as it is.
  • ClientEndpoint is the endpoint of your token, you need to add the tenant ID/directory ID in place of ;
  • client ID is your application ID that can be copied by going to your app registration on the portal
  • For ClientSecret you need to create a secret of your own and then copy its value from there. Note that you need to copy the value of client Secret and not the secret ID. For more details on OAuth click here.

Options for HDFS

You configure adapter options through foreign streams/tables. You configure agent options through the ECD agent property file.

The following options are specific to HDFS:

Foreign Stream Options for Writing to Hadoop/HDFS

Option Name Scope Required? Description
HDFS_OUTPUT_DIR All Required Address for name node of HDFS, such as hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/ or for ADLS Gen2 abfs://@.dfs.core.windows.net/path/to/destination . This is where data will be written to on the HDFS server.
ABFS_CONFIG_FILE ABFS Required Path of the ABFS config file with credentials (only yaml file)
ABFS_AUTH_METHOD ABFS Required Authentication method used to upload data - either "SharedKey" or "OAuth" - which indicates OAuth 2.0. These values are case sensitive.
REFRESH_PROPS ABFS Optional If set ‘true’, config file will be read again and new instance of configurations will be used, if set 'false' or not used, any change in config file will not be detected unless s-server is restarted.
AUTH_METHOD HDFS, Hive Optional If desired, specify 'kerberos' Requires AUTH_USERNAME and AUTH_KEYTAB (the latter should be implemented using JNDI properties. See Hadoop Options for Use with JNDI Properties below
AUTH_USERNAME HDFS, Hive Optional User name for HDFS. Required if AUTH_METHOD is specified.
CONFIG_PATH HDFS, Hive Optional Specifies path to an HDFS client configuration file. This will be loaded and used by s-Server's HDFS client in it’s entire life cycle. Example: /home/me/work/kerberos/core-default.xml
AUTH_KEYTAB HDFS, Hive Optional Path to file containing pairs of Kerberos principals and encrypted keys, such as /tmp/nn.service.keytab Required if AUTH_METHOD is specified.
HIVE_TABLE_NAME Hive Required Table name inside HIVE_SCHEMA_NAME.
HIVE_SCHEMA_NAME Hive Required Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.
AUTH_METASTORE_PRINCIPAL Hive Required Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.
HIVE_URI Hive Required JDBC URL for accessing the Hive server when loading data into Hive tables. Must be specified if HIVE_TABLE_NAME is specified.
HIVE_METASTORE_URIS Hive Optional Location of the Hive metastore for loading data into Hive tables. Required if HIVE_TABLE_NAME is specified.
$columnName_HIVE_PARTITION_NAME_FORMAT Hive Optional This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.
OPTIONS_QUERY All Optional Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the HDFS_OUTPUT_DIR option from a table that contains options, as in select lastOffset as HDFS_OUTPUT_DIR from TEST.hdfs_options. For more details see the topic Using the Options Query Property.

Hadoop options for use with JNDI properties

We recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/.properties (Applies to all foreign streams that use this foreign server definition.)
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB...properties (Applies only to the foreign stream.)

The following options apply to all file-based sinks.

Note: When you specify DIRECTORY with a Hadoop sink, this location refers to where s-Server writes data locally before copying it to HDFS.

Foreign Stream Options for Writing to File Systems

Option Description
DIRECTORY Directory to which you are writing. s-Server needs permission to write to this directory.
ORIGINAL_FILENAME Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.
FILENAME_PREFIX Prefix of the final output file name, such as "test-".
FILENAME_DATE_FORMAT Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ss. Uses java SimpleDateFormat. This specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.
FILE_ROTATION_WATERMARK_COLUMN This declares the name of a source column whose contents will be used to further distinguish files in the series.
FILENAME_SUFFIX Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"
FILE_ROTATION_TIME Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation". You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d) - for example '15s', '20m', '2h' or '3d'. These express intervals of time from 1970-01-01; files rotate once a row arrives with a ROWTIME that passes the specified interval. See Using FILE_ROTATION_TIME.
FILE_ROTATION_SIZE Determines when files rotate based on file size. s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement, for example '64k', '100m', '8g'. Defaults to 0 - which means "don't use file size to trigger file rotation". See Using FILE_ROTATION_SIZE.
FILE_ROTATION_RESPECT_ROWTIME 'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime". See Using FILE_ROTATION_RESPECT_ROWTIME.
ESCAPE_<column name> True or false; defaults to true. Causes strings to be escaped before being written.
POSTPROCESS_COMMAND The POSTPROCESS_COMMAND option lets you run a script after each output file is written. To use this option, enter the path to the script, along with any parameters needed by the script, substituting <input> for the name of the file. When the file is complete, the script will execute with parameters, and <input> will be replaced by the name of the file.

Example: '/home/sqlstream/scripts/send-to-destination.sh <input> sftp://some.sftp.server.com'

Rotating Files using FILE_ROTATION options

s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE and FILE_ROTATION_RESPECT_ROWTIME.

File rotation options work together in the following ways:

Rotate the file if (the FILE_ROTATION_TIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied) AND (the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied).

Using FILE_ROTATION_TIME

You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.

Examples:

  • FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on).
  • FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour.
  • FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM.

More technically, FILE_ROTATION_TIME works as follows:

  • Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.
  • Let $lastWrittenRowtime be the ROWTIME of the last row in the file.
  • Let $currentRowTime be the ROWTIME of the row about to be written.

s-Server rotates to the next file when:

  • integerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)

Using FILE_ROTATION_SIZE

You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation".

Note: You cannot use FILE_ROTATION_SIZE for writing to Hive tables.

Examples:

  • FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"
  • FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"
  • FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte"

When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. See FILE_ROTATION_RESPECT_ROWTIME below.

Using FILE_ROTATION_RESPECT_ROWTIME

Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte.

If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIME to false.

Configuring File Names

You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.

At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.

If you set ORIGINAL_FILENAME, then its value will serve as a temporary file name to use while data is being actively written.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix> depending on the options you specify.

If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form using the first rowtime in the file set.

Note: if you are writing from a table, and do not specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix>

For example, the following options:

filename_prefix 'test-',
filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                       --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

test-2020-05-23_19:44:00.csv

And the following options:

filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                      --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

2020-05-23_19:44:00.csv

Explanation of Elements

  • <prefix> is the string specified by the FILENAME_PREFIX option. If you did not specify FILENAME_PREFIX, then <prefix> is omitted from the filename.
  • <timestamp> is the ROWTIME of the last row written to the file. This element is always present if the source is a stream, but is omitted if the source is a table. s-Server formats <timestamp> as specified by FILENAME_DATE_FORMAT. If you do not specify FILENAME_DATE_FORMAT, then is formatted as yyyy-MM-dd_HH-mm-ss-SSS.
  • <watermark> is the value of a watermark column in the last row of the source. You use FILE_ROTATION_WATERMARK_COLUMN to specify a source watermark column corresponding to the last row of the file. If you do not specify a FILE_ROTATION_WATERMARK_COLUMN, then <watermark> is omitted from the filename.
  • <sequenceNumber> is added only if the you specify the FILE_ROTATION_RESPECT_ROWTIME option as false. In that case, files may have the same terminal <timestamp>, because s-Server can write rows with the same rowtime across multiple files. In these cases, file names in the series with the same terminal <timestamp> are distinguished from one another by a monotonically increasing 7 digit sequence number, starting at 0000001.
  • <suffix> is the string specified by the FILENAME_SUFFIX option. If you did not specify FILENAME_SUFFIX, then <suffix> is omitted from the filename.

Writing to HDFS Using the ECD Agent

You can use the ECD agent to write files to HDFS from remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

SEPARATOR=^A
ROWTYPE=RECORDTYPE(INTEGER COL1, INTEGER COL2, VARCHAR(256) COL3, VARCHAR(256) COL4)
SCHEMA_NAME=HADOOPWRITERTEST
TABLE_NAME=HADOOPWRITERSTREAM
CONFIG_PATH=/HOME/MYDIRECTORY/DEFAULT.XML
AUTH_METHOD=KERBEROS
AUTH_USERNAME=NN/HADOOP.DOCKER.COM@EXAMPLE.COM
AUTH_KEYTAB=/TMP/NN.SERVICE.KEYTAB
HDFS_OUTPUT_DIR=HDFS://STORM-S3.DISRUPTIVETECH.COM:8020/USER/SQLSTREAM/
FORMATTER=CSV
QUOTE_CHARACTER=
SEPARATOR=,
WRITE_HEADER=FALSE
DIRECTORY=/TMP
ORIGINAL_FILENAME=BUS-OUTPUT.CSV
FILENAME_PREFIX=OUTPUT-
FILENAME_SUFFIX=.CSV
FILENAME_DATE_FORMAT=YYYY-MM-DD-HH:MM:SS
FILE_ROTATION_SIZE=20K'
FORMATTER_INCLUDE_ROWTIME=TRUE

To invoke the agent:

From the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.test.properties --io hdfs