(new in s-Server version 6.0.1)
You can load data from s-Server into Apache Hive tables by connecting to a Hive server and configuring a foreign table or stream with options that tell s-Server how to write into Hive tables. In order to connect to an HDFS server, you can use the prebuilt server HIVE_SERVER, though you will often want to configure your own server object to write to HDFS, since this technique allows you to store your HDFS connection information once and reuse these for multiple Hive tables.
Currently, writing to Hive tables is supported for Optimized Row Columnar (ORC) data only.
In the foreign table or stream, you need to specify the options HIVE_SCHEMA_NAME, HIVE_TABLE_NAME, AUTH_METASTORE_PRINCIPAL, HIVE_URI, and HIVE_METASTORE_URIS to identify the Hive table to which you want to write. You can also specify these options as part of a server definition.
You also need to specify FORMATTER 'ORC' as an option in the foreign table or stream and provide basic configuration for ORC. (At minimum, you need to specify a correct version number for ORC). See Formatting Data as ORC for more details. You also need to make sure that all column names in your foreign table or stream match those of the target Hive table exactly in terms of both name and type. In particular, column names for Hive partition keys, sort keys, and cluster keys must be included and match exactly.
You can only write to Hive tables that already exist; s-Server does not explicitly create Hive tables.
|HIVE_TABLE_NAME||Defaults to null. Table name inside HIVE_SCHEMA_NAME.|
|HIVE_SCHEMA_NAME||Defaults to null. Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.|
|AUTH_METASTORE_PRINCIPAL||Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.|
|HIVE_URI||Defaults to null. JDBC URL for accessing the Hive server. Must be specified if HIVE_TABLE_NAME is specified.|
|HIVE_METASTORE_URIS||Defaults to null. Location of the Hive metastore. Required if HIVE_TABLE_NAME is specified.|
|$columnName_HIVE_PARTITION_NAME_FORMAT||This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.|
Note: In order to include Kerberos options for an HDFS server, you need to configure your own server object. This server will allow all foreign tables or streams that reference this server to inherit Kerberos options.
Hive External tables are declared using the EXTERNAL keyword. To create a new partition at every file rotation point, instead of using LOAD command, MSCK REPAIR command can be used. The MSCK commands help to update the metadata about any newly created partitions or which are removed from the HDFS. It allows those partitions to be visible to the queries and allows those queries to run against the EXTERNAL table.
Note: When Hive sink is bound to an EXTERNAL table, the HDFS_OUTPUT_DIR option must not be specified. This option identifies a location in the remote HDFS file system where trees of bucket files are temporarily staged before being LOADed into a managed Hive table. In case of EXTERAL table, the bucket files will be written into the HDFS tree identified by the LOCATION clause of the EXTERNAL table.
Certain restrictions apply to partition key names and values when dealing with EXTERNAL tables. These restrictions arise from limitations in Hive’s ability to introspect key names and values from HDFS directory level names. In order for a directory level to be converted into a Hive-readable key/value pair, the name of the level must have the form
keyName = value
These limitations have the following consequences:
Here is an example of binding a Hive sink to an EXTERNAL table. Note the absence of HDFS_OUTPUT_DIR:
CREATE FOREIGN STREAM sample_external_table ( "p_bigintcol" BIGINT, "p_intcol" INT, "bigintcol" BIGINT, "intcol" INT, "c_bigintcol" BIGINT, "c_intcol" INT, "s_bigintcol" BIGINT, "s_intcol" INT, "payload_bigintcol" BIGINT, "payload_intcol" INT ) SERVER HiveWriterServer OPTIONS ( DIRECTORY '/tmp/bug6312/simple_table', FORMATTER 'ORC', FILENAME_prefix 'orc-test', FILENAME_suffix '.orc', FILE_ROTATION_TIME '1m', "orc.version" 'V_0_11', "orc.block.padding" 'false', "orc.block.size" '10000', "orc.direct.encoding.columns" 'c_bigintcol,c_intcol', "orc.batch.size" '20000', "orc.user.metadata_myWatermark" 'ABCDEF0123456789', HIVE_SCHEMA_NAME 'rick', HIVE_TABLE_NAME 'simple_table', HIVE_METASTORE_URIS 'thrift://sqlstream01-mst-01.cloud.in.guavus.com:9083,thrift://sqlstream01-mst-02.cloud.in.guavus.com:9083', HIVE_URI 'jdbc:hive2://sqlstream01-slv-01.cloud.in.guavus.com:2181,sqlstream01-slv-02.cloud.in.guavus.com:2181,sqlstream01-slv-03.cloud.in.guavus.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-hive2', CONFIG_PATH '/tmp/bug6312/core-site.xml:/tmp/bug6312/hdfs-site.xml', AUTH_METHOD 'kerberos', AUTH_USERNAME 'svc_sqlstream_guavus@GVS.GGN', AUTH_KEYTAB '/tmp/bug6312/svc_sqlstream_guavus.keytab', AUTH_METASTORE_PRINCIPAL 'hive/_HOST@GVS.GGN' );
For writing to Hive tables, columns in your foreign table or stream must match those of the target Hive table. Both column names and types must match. You also need to include all partition keys, sort keys, and cluster keys from the target Hive table as columns in the foreign table or stream. Missing columns will be filled with nulls.
s-Server file writers produce a sequence of files determined by the file rotation policy that you configure in your foreign table or stream. For writing to files, file rotation encodes the place of the file in the sequence, usually via a terminal timestamp and sometimes including a watermark.
For Hive tables, though, instead of producing a single file per rotation (or "cadence point"), for each rotation, s-Server emits a whole directory tree of result files that s-Server then copies into the Hive table.
The file rotation point serves as a recoverability point. In cases when a crash occurs, you will need to restart your SQLstream pipeline so that you regenerate the Hive tree.
You cannot use the FILE_ROTATION_SIZE option with the Hive tables. Instead, you should use FILE_ROTATION_TIME. See Foreign Stream Options for Writing to File Systems below for more details.
When you specify a Hive table, then the directory tree will be loaded into the target Hive table and s-Server will write files according to the process below.
Hive tables use partitions, clusters, and sorts to store data. From an s-Server perspective, you need to remember two things to get data loaded correctly:
When data is actually written to Hive, the top level directory of the directory tree is named according to s-Server's specifications for file names detailed below.
The names of the directory levels underneath mimic the conventions used by Hive to represent partition keys and cluster buckets. If a Hive table is specified, then the names of the partition levels will incorporate the names of the partition keys in the target Hive table in the form partitionkey=value. The leaf files are the cluster buckets. Each leaf file is ORC formatted and contains a sorted set of rows for a single cluster key.
Buckets are named as follows:
These trees will appear as follows.
--top directory busStats-2019-02-22_08:05:19.123.orc --the following four lines contain directory names in the form partitionkey=value. --these match columns defined in the s-Server foreign table or stream event_year=2019 event_month=11 driver_no=12345 highway=101 --the following three lines are leaf files that contain rows for buckets 1, 2, and 3 for this file rotation point. --data from all other columns are written to these leaf buckets 000001_0_2019-02-22_08-05-19-123 000002_0_2019-02-22_08-05-19-123 000003_0_2019-02-22_08-05-19-123
For example, consider an ORC-formatted target Hive table, defined via Hive QL as:
USE trafficApp; CREATE TABLE hive_bus_stats ( id BIGINT, reported_at TIMESTAMP, speed INT, gps VARCHAR(128) ) --These have to be declared in the column signature of your foreign table or stream. PARTITIONED BY (event_year INT, event_month INT, driver_no BIGINT, highway VARCHAR(8)) --These have to be in the column signature of your foreign table or stream. CLUSTERED BY (id) SORTED BY (speed, gps) INTO 10 BUCKETS --s-Server can only write to Hive tables that use ORC STORED AS ORC LOCATION '/data/svc_sqlstream_guavus/busStats' TBLPROPERTIES ( "orc.compress" = "SNAPPY", "orc.bloom.filter.columns" = "speed,gps" );
You would write a foreign stream to stream data into this table as follows:
--most likely you will want to create a custom server object in order to reuse it. --note that the foreign table below uses SERVER HiveWriterServer, which matches the name of this defined server. CREATE OR REPLACE SERVER HiveWriterServer TYPE 'HIVE' FOREIGN DATA WRAPPER ECDA OPTIONS ( HIVE_METASTORE_URIS 'thrift://sqlstream.guavus.com:9083,thrift://sqlstream.guavus.com:9083', HIVE_URI 'jdbc:hive2://sqlstream.guavus.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver', CONFIG_PATH '/tmp/bug6312/core-site.xml:/tmp/bug6312/hdfs-site.xml', AUTH_METHOD 'kerberos', AUTH_USERNAME 'svc_sqlstream_guavus@GVS.GGN', AUTH_KEYTAB '/tmp/bug6312/svc_sqlstream_guavus.keytab', AUTH_METASTORE_PRINCIPAL 'hive/_HOST@GVS.GGN' ); CREATE OR REPLACE SCHEMA HiveWriterSchema; SET SCHEMA 'HiveWriterSchema'; CREATE OR REPLACE FOREIGN TABLE HiveWriterSchema.busStats --signature has all columns and partition keys matching the Hive table defined above. --Order of the columns does not matter. Here, for clarity, the partition key columns appear first, --but these could appear last without affecting writing. ( --these four columns are partitions and will be written as directory names "event_year" INTEGER, "event_month" INTEGER, "driver_no" BIGINT, "highway" VARCHAR(8), --these four columns will be written into clusters --id is used as a cluster column and must be present "id" BIGINT, "reported_at" TIMESTAMP, --speed and gps are used as sort columns and must be 1) present and --2) sorted on before writing to Hive (we do so in the pump defined below) "speed" INTEGER, "gps" VARCHAR(128) ) SERVER HiveWriterServer OPTIONS ( FORMATTER 'ORC', --step 1: the local directory where s-Server initially writes data DIRECTORY '/tmp/busStats', --step 2: the HDFS directory where s-Server copies data for writing to Hive tables HDFS_OUTPUT_DIR 'hdfs://sqlstream01/data/svc_sqlstream_guavus', FILENAME_SUFFIX '.orc', FILE_ROTATION_TIME '1h', --step 3: configuration information for the Hive table itself --matches USE trafficApp; from Hive table definition HIVE_SCHEMA_NAME 'trafficApp', --matches CREATE TABLE hive_bus_stats from Hive table definition HIVE_TABLE_NAME 'hive_bus_stats', --Most of the ORC configuration is introspected from the Hive table --ORC version must be declared correctly; otherwise, data will be unreadable "orc.version" 'V_0_11', --watermark information for each bucket file --not mandatory "orc.user.metadata_com.acme.watermark" 'AC3E' ); --This pump orders the data by ROWTIME, then sort keys. CREATE PUMP HiveWriterSchema.sort_pump STOPPED AS INSERT INTO busStats SELECT STREAM "event_year", "event_month", "driver_no", "highway", "id", src.ROWTIME AS "reported_at", "speed", "gps" FROM busStats --Data going to into hive table has to be ordered by 1) rowtime (so that we get checkpoints that align with recoverable ----instances). Pump will die if rows are out of order. 2) sort columns so that it ends up in buckets in sort order. ORDER BY src.ROWTIME, "speed", "gps" ;
To get data flowing, you would use the following code:
ALTER PUMP HiveWriterSchema.sort_pump START;
The options below are standard s-Server configuration options for writing to files. The Hive writer uses this configuration information to write first to the local file system and then to copy files to the HDFS system. In addition, in a Hive context, file names are used to name the top-level directory of a Hive tree, and file rotation is used to configure a checkpoint.
|DIRECTORY||Directory to which you are writing. s-Server needs permission to write to this directory.|
|ORIGINAL_FILENAME||Name of file to write before rotation. This will be the name of the file that s-Server is currently writing.|
|FILENAME_PREFIX||Prefix of the final output file name, such as "test-".|
|FILENAME_DATE_FORMAT||Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ssUses java SimpleDateFormatThis specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file.|
|FILE_ROTATION_WATERMARK_COLUMN||This declares the name of a source column whose contents will be used to further distinguish files in the series.|
|FILENAME_SUFFIX||Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv"|
|FILE_ROTATION_TIME||Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation." You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.
FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on). FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour. FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM. More technically, FILE_ROTATION_TIME works as follows:Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.Let $lastWrittenRowtime be the ROWTIME of the last row in the file.Let $currentRowTime be the ROWTIME of the row about to be written. s-Server rotates to the next file whenintegerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)
|FILE_ROTATION_SIZE||Determines when files rotate based on file size. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation."
Examples: FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte" s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE.
|FILE_ROTATION_RESPECT_ROWTIME||'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime." Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIME to false.|
||True or false; defaults to true. Causes strings to be escaped before being written.|
|POSTPROCESS_CMD||The POSTPROCESS_CMD option lets you run a script after the file is written. To use this option, enter the path to the script, along with parameters, substituting <input> for the name of the file. When the file is written, the script will execute with parameters, and <input> will be replaced by the name of the file.
Example: 'scripts/runStandaloneSystemML.sh scripts/algorithms/l2-svm-predict.dml -nvargs X=<input> Y=data/haberman.test.labels.csv model=data/l2-svm-model.csv fmt="csv" confusion=data/l2-svm-confusion.csv',
File rotation options work together in the following ways:
Rotate the file if
( the FILE_ROTATION_ROWTIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied )
(the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied)
You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.
At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.
If you set ORIGINAL_FILENAME, then the value of ORIGINAL_FILENAME will serve as a temporary file name to use while data is being actively written.
When this file is rotated out, rotated file are named as follows:
If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form
Note: if you are writing from a table, and do note specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.
When this file is rotated out, rotated file are named as follows:
For example, the following options:
filename_prefix 'test-', filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
And the following options:
filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE and FILE_ROTATION_RESPECT_ROWTIME.
File rotation options work together in the following ways:
Rotate the file if (the FILE_ROTATION_ROWTIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied) AND (the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied).
You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.
You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation."
Note: You cannot use FILE_ROTATION_SIZE for writing to Hive tables.
When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. To do so, you set FILE_ROTATION_RESPECT_ROWTIME to true (this is the default behavior).
The Hive CREATE TABLE statement can be used to bind a tabular abstraction onto a directory of ORC-formatted files. The following table shows how these properties correspond to settings for the ORC file writer (org.apache.orc.Writer). At the end of the table are further ORC tuning switches which do not need corresponding Hive table properties:
|Property||Default Value||Corresponding method on org.apache.orc.OrcFile.WriterOptions||Description||Introspectible?|
|orc.compress||ZLIB||compress(CompressionKind)||Supported compression techniques are: LZ4, LZ0, NONE, SNAPPY, ZLIB, ZSTD.||Yes|
|orc.compress.size||262,144||n/a||Compression chunk size. It is unclear what this maps onto.||Yes|
|orc.stripe.size||67,108,864||stripeSize(long)||Memory buffer size (in bytes) for writing.||Yes|
|orc.row.index.stride||10,000||rowIndexStride(int)||Number of rows between index entries.||Yes|
|orc.create.index||true||n/a (lightweight indexes are always created)||Whether to create indexes. Unclear what this maps onto.||n/a|
|orc.bloom.filter.columns||none||bloomFilterColumns(String)||Comma-separated list of column names. Bloom filters are built on these columns.||Yes|
|orc.bloom.filter.fpp||0.05||bloomFilterFpp(double)||Bloom filter false positive probability.||Yes|
|true||blockPadding(boolean)||Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space.||No|
|268435456||blockSize(long)||Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size.||No|
|bufferSize(int)||The size of the memory buffers used for compressing and storing the stripe in memory. NOTE: ORC writer may choose to use smaller buffer size based on stripe size and number of columns for efficient stripe writing and memory utilization. To enforce writer to use the requested buffer size use enforceBufferSize().||No|
|null||directEncodingColumns(String)||Set the comma-separated list of columns that should be direct encoded.||No|
|false||enforceBufferSize()||Enforce writer to use requested buffer size instead of estimating buffer size based on stripe size and number of columns. See bufferSize() method for more info. Default: false||No|
|paddingTolerance(double)||Sets the tolerance for block padding as a percentage of stripe size.||No|