This topic contains the following subtopics:
The File-VFS source plugin is based on the Apache Commons VFS library, which provides support for multiple sources and compression formats. The file systems that are currently supported by this plugin, and their URI formats to be used in specifying the FILE_LOCATION option and their respective options are as follows:
This plugin also supports reading data in uncompressed and Gzip compressed formats.
Option | Description | Default | ||||||
---|---|---|---|---|---|---|---|---|
FILE_LOCATION | (Mandatory). Represents the URI of target directory containing the files to be read - for example file:///tmp/newdirectory represents a directory on the local file system. For more information see URI Formats Supported by FILE-VFS. | |||||||
FILENAME_PATTERN | (Mandatory). A Java regular expression match filter selecting which files to be read from the FILE_LOCATION directory. When the SORT_FIELD option has the value TIME_IN_FILENAME or LEXICAL_IN_FILENAME the first capture group in the regex will be used to identify the sub-regex on which the files will be sorted. Example: buses_(\d{14})_[a-z]+.* - files matching this regex will be picked and the group \d{14} (14 digits) will be used to sort files. |
|||||||
FILE_TYPE | The type of file you wish to read. Supported options:
|
none | ||||||
SORT_FIELD | The file sorting mechanism you wish to use:
|
MODIFIED_FILE_TIME | ||||||
FILENAME_TIME_FORMAT | The format of the timestamp in case the SORT_FIELD option is set to TIME_IN_FILENAME. NOTE: This option is mandatory while using TIME_IN_FILENAME. |
|||||||
STARTING_FILE_NAME_AND_OFFSET | Reads will start with the given starting file name and its offset value when SORT_FIELD option is set to LEXICAL_IN_FILENAME. The default value EARLIEST means all files will be picked. For example:
|
EARLIEST | ||||||
STARTING_TIME | The starting time in case the SORT_FIELD option is set to TIME_IN_FILENAME - in the format yyyy-MM-dd HH:mm:ss.SSS. | 1752-09-14 00:00:00.000. | ||||||
STARTING_POSITION | The starting position value for file read in case the option SORT_FIELD is set to MODIFIED_FILE_TIME. The format for STARTING_POSITION is modified_time_epoch[:file_name[:offset]] - file_name and offset are optional. For example,
|
1752-09-14 00:00:00.000 | ||||||
INGRESS_NUM_PARTITIONS | Used in scaling. Denotes the number of logical partitions into which you want to divide your input data . A single partition is defined as the logical grouping of input data files based on hash-computation. Files having the same partition value belong to the same partition. Partition value for a file is computed as: hash(filename)%INGRESS_NUM_PARTITIONS; see Scaling in File-VFS for more information. | |||||||
INGRESS_ASSIGNED_PARTITIONS | Used in scaling. This identifies which partitions that are assigned to the current shard. A shard only processes these partitions and skips the data from other partitions. These are 0 indexed and can be comma-separated values (0,1,2) or range-based (4:7 which is equivalent to 4,5,6) or a mix of both (0,1,4:7 which is equivalent to 0,1,4,5,6) or all(:). | |||||||
SHARD_ID | Used in scaling. This is an optional parameter and used only in scenarios when you are running multiple shards of a pipeline. This parameter identifies the current shard, counting from 0. | |||||||
INGRESS_DELAY_INTERVAL | The period (in milliseconds) for file-based T-sort to wait for late files - applies only if SORT_FIELD is set to MODIFIED_FILE_TIME or TIME_IN_FILENAME. Valid values are:
|
0 | ||||||
INGRESS_FILE_SCAN_WAIT | The duration (in milliseconds) in which the reader thread checks the Queue for new files to be read. | 2000. | ||||||
IGNORE_CORRUPT_FILES | In case of any fatal IOExceptions that may come up while reading from a file, should the VFS reader ignore the file? If true, the corrupt file is skipped else the pump is terminated and no more data will flow. | False | ||||||
NUMBER_OF_BUFFERS | The number of buffers the File-VFS plugin can use to read data before it needs to recycle the buffers. Increasing the number of buffers might increase the plugin’s performance at the cost of memory. | 2 | ||||||
INGRESS_RECURSIVE_LOOKUP | Enable / disable recursive file lookups within subdirectories of the provided directory. When the flag is 'true' the plugin checks each directory entry to distinguish files from sub-directories. If you do not need to include files from child sub-directories, and if you know that all the directory entries that match FILENAME_PATTERN will be files you can safely set INGRESS_RECURSIVE_LOOKUP 'false' to disable this check and save the latency of a round trip to the remote file system for each file in the FILE_LOCATION directory |
true |
Depending on the settings of SORT_FIELD and INGRESS_DELAY_INTERVAL, the file-based T-sort feature may come into operation.
Option | File System | Default | Description |
---|---|---|---|
"vfs.hdfs.kerberos.enabled" | HDFS | 'false' | In order to read from HDFS with kerberos authentication, the value for this option should be 'true' |
"vfs.hdfs.kerberos.keytab" | HDFS | no default value | In order to read from HDFS with kerberos authentication, the user must provide the location of the keytab here as a string |
"vfs.hdfs.kerberos.principal" | HDFS | no default value | In order to read from HDFS with kerberos authentication, the user must provide the value of the principal here as a string |
"vfs.s3.useHttps" | S3 | 'true' | In order to read from s3 or minio over HTTP (and not HTTPS) the value for this option should be 'false' |
AUTH_TYPE_ADLS | ADLS Gen2 | no default value | (Mandatory when using ADLS Gen2). Specify the authentication mechanism. Supported types are 'SharedKey' and 'OAuth'. OAuth2.0 support is provided via Client Credentials |
KEY_FILE | ADLS Gen2 | no default value | (Mandatory when using ADLS Gen2). Provides path to the configuration file containing credentials for the appropriate authentication mechanism. Example here |
REFRESH_CRED | ADLS Gen2 | no default value | (Optional). Supports values 'yes' and 'no' (case insensitive). When set to yes, any changes made to the KEY_FILE will be used to establish a new connection with ADLS Gen2. If set to no or not provided changed credentials in the KEY_FILE won’t be used to establish connection. |
When reading from files, you can define provenance columns. These return metadata for the file from which you are reading.
Option | Data type | Description |
---|---|---|
SQLSTREAM_PROV_FILE_SOURCE_FILE | VARCHAR | Adds the name of the file being read to the output columns |
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | BIGINT | Adds the current line number to the output columns. Line number starts from 0. |
SQLSTREAM_PROV_FILE_MODIFIED_TIME | BIGINT | Adds the modified time of the file being read to the output columns. |
SQLSTREAM_PROV_INGRES_PARTITION_ID | INT | Adds the partition from which this row was read, calculated by hashing the incoming filename as described in scaling. |
Sharding is the practice of running multiple instances of SQLstream pipelines, each using the same SQL code but configured so that each shard processes a subset of the data.
Term | Description |
---|---|
Partition | A single partition is defined as a logical grouping of input data files based on hash-computation. The files having same partition value belong to the same partition. The Partition for a file is computed as: hash(filename) % total_num_partitions. Here, total_num_partitions is the total number of partitions that you want to divide your data into. See the glossary term partition for more information. |
Shard | The s-Server process launched for processing input data. See the glossary term shard for more information. |
Let’s say you divide the input data into P logical partitions. A hash is computed for each arriving input file (on the basis of its name) as it arrives and the file is mapped to a particular partition, by taking the modulo of the hash with the number of partitions. The number of partitions is static, hence, the file to partition mapping is always fixed.
NOTES:
In a sharded pipeline the sharding / partitioning options will be set as follows:
Option | Notes |
---|---|
SHARD_ID | the shard id of the current shard. No two shards should have the same SHARD_ID. |
INGRESS_NUM_PARTITIONS | the total number of partitions across the entire data set; all shards must use the same value for this option. |
INGRESS_ASSIGNED_PARTITIONS | the specific partitions that are assigned to the current shard; any given partition should only be allocated to a single shard. |
Normally the values for these three options will be assigned as the pipeline starts by external configuration and passed to the foreign stream using an OPTIONS_QUERY, which overrides any options set in the CREATE FOREIGN STREAM statement.
Here is a SQL example of a sharded pipeline; it is assumed that we are working with 4 partitions and there is only a single instance running with SHARD_ID = 0.
NOTES:
CREATE OR REPLACE SCHEMA "sample";
ALTER PUMP "sample".* STOP;
ALTER STREAM "sample".* RESET;
-- model of the START_POSITION_QUERY showing the meaning of each parameter
--
-- CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS
-- SELECT * FROM (
-- VALUES(sys_boot.mgmt.watermark_string('bootstrap-servers', 'topic_name', 'consumerGroupPrefix','assignedPartitions',totalPartitions))
-- ) AS options(STARTING_POSITION);
CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('my.kafka.example.com:6667', 'demo', '0_',':',4))) AS options(STARTING_POSITION);
CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
(
"event_time" TIMESTAMP,
"Col1" INTEGER,
"Col2" INTEGER,
"Col3" INTEGER,
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
"SQLSTREAM_PROV_INGRES_PARTITION_ID" VARCHAR(256)
)
SERVER "FILE_VFS_SERVER"
OPTIONS (
"PARSER" 'CSV',
"SEPARATOR" ',',
"QUOTE_CHARACTER" '"',
"ROW_SEPARATOR" u&'\000A',
"CHARACTER_ENCODING" 'UTF-8',
"SKIP_HEADER" 'false',
"FILE_LOCATION" 'file:///path/to/demo/',
"FILENAME_PATTERN" '.*',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"INGRESS_DELAY_INTERVAL" '0',
"INGRESS_FILE_SCAN_WAIT" '2000',
"INGRESS_NUM_PARTITIONS" '4',
"INGRESS_ASSIGNED_PARTITIONS" ':',
"SHARD_ID" "0",
"INGRESS_RECURSIVE_LOOKUP" 'false',
"OPTIONS_QUERY" 'SELECT * FROM "sample".START_POSITION_QUERY'
);
CREATE OR REPLACE VIEW "sample"."v1" AS SELECT STREAM
"event_time",
"Col1",
"Col2",
"Col3",
CAST("SQLSTREAM_PROV_INGRES_PARTITION_ID" AS VARCHAR(50)) || ',' ||
CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(50)) || ':' ||
"SQLSTREAM_PROV_FILE_SOURCE_FILE" || ':' ||
CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50))
as "SQLSTREAM_EGRESS_WATERMARK"
FROM "sample"."fs" AS "input";
CREATE OR REPLACE VIEW "sample"."v2" AS
SELECT STREAM *
FROM "sample"."v1";
CREATE OR REPLACE FOREIGN STREAM "sample"."out_fs"
(
"event_time" TIMESTAMP,
"Col1" INTEGER,
"Col2" INTEGER,
"Col3" INTEGER,
"SQLSTREAM_EGRESS_WATERMARK" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"FORMATTER" 'JSON',
"FORMATTER_INCLUDE_ROWTIME" 'false',
"bootstrap.servers" 'my.kafka.example.com:6667',
"linger.ms" '10',
"batch.size" '10000',
"compression.type" 'gzip',
"TOPIC" 'demo',
"transaction.timeout.ms" '60000',
"transactional.id" 'auto',
"TRANSACTION_ROWTIME_LIMIT" '60000',
"transaction.enable.preemptive.commit" 'false',
"transaction.first.preemptive.commit.check.percentage" '70',
"SHARD_ID" "0"
);
CREATE OR REPLACE PUMP "sample"."testpump" STOPPED AS
INSERT INTO "sample"."out_fs"
SELECT STREAM "event_time","Col1","Col2","Col3","SQLSTREAM_EGRESS_WATERMARK"
FROM "sample"."v2" AS "input";
ALTER PUMP "sample"."testpump" START;
For reading from the local file system, the URI format is file://
file:///absolute/path/to/directory
For example:
"FILE_LOCATION" 'file:///sample_directory/data'
Note: When the file:// prefix is used, the leading / of the absolute path makes a total of 3 forward slash characters.
CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
( "COL0" INTEGER,
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
) SERVER "FILE_VFS_SERVER"OPTIONS
("FILE_LOCATION" 'file:///shubham/data',
"SEPARATOR" ',',
"ROW_SEPARATOR" u&'\000A',
"FILENAME_PATTERN" '.*\.csv',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SKIP_HEADER" 'false',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"INGRESS_DELAY_INTERVAL" '0',
"INGRESS_FILE_SCAN_WAIT" '2000',
"QUOTE_CHARACTER" '"',
"INGRESS_NUM_PARTITIONS" '1',
"INGRESS_ASSIGNED_PARTITIONS" ':',
"SHARD_ID" '',
"INGRESS_RECURSIVE_LOOKUP" 'true',
);
ABFS (Azure Blob File System) is a connector used to connect to Azure Data Lake Storage Gen2. It is available as an extension of the Hadoop File system.
For reading files from an ADLS Gen2 account, the URI format is:
abfs://container@storage-account.dfs.core.windows.net/
A sample URI is abfs://sample-file-system@sample-account.dfs.core.windows.net/
sample-file-system
is the name of the container created in ADLS Gen2.sample-account
is the name of the storage account.CREATE OR REPLACE SCHEMA "abfs";
CREATE OR REPLACE FOREIGN STREAM "abfs"."abfs_source"
(
"Str1" VARCHAR(30),
"Str2" VARCHAR(30),
"Str3" VARCHAR(30),
"int1" INTEGER,
"int2" INTEGER,
"int3" INTEGER,
"int4" INTEGER,
"doub1" DOUBLE,
"doub2" DOUBLE,
"lon1" BIGINT,
"lon2" BIGINT
)
SERVER "FILE_VFS_SERVER"
OPTIONS
(
"CHARACTER_ENCODING" 'UTF-8',
"FILE_LOCATION" 'abfs://container-name@account-name.dfs.core.windows.net/folder/',
"AUTH_TYPE_ADLS" 'SharedKey',
"FILENAME_PATTERN" '.*.txt',
"SEPARATOR" ',',
"ROW_SEPARATOR" u&'\000A',
"PARSER" 'CSV',
"FILE_TYPE" 'none',
"SKIP_HEADER" 'true',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"KEY_FILE" '~/config-abfs.yaml',
"REFRESH_CRED" 'yes',
"INGRESS_NUM_PARTITIONS" '1',
"INGRESS_ASSIGNED_PARTITIONS" ':',
"SHARD_ID" '',
"INGRESS_RECURSIVE_LOOKUP" 'false',
);
CREATE OR REPLACE SCHEMA "abfs";
CREATE OR REPLACE FOREIGN STREAM "abfs"."abfs_source"
(
"Str1" VARCHAR(30),
"Str2" VARCHAR(30),
"Str3" VARCHAR(30),
"int1" INTEGER,
"int2" INTEGER,
"int3" INTEGER,
"int4" INTEGER,
"doub1" DOUBLE,
"doub2" DOUBLE,
"lon1" BIGINT,
"lon2" BIGINT
)
SERVER "FILE_VFS_SERVER"
OPTIONS
(
"CHARACTER_ENCODING" 'UTF-8',
"FILE_LOCATION" 'abfs://container-name@account-name.dfs.core.windows.net/folder/',
"AUTH_TYPE_ADLS" 'OAuth',
"FILENAME_PATTERN" '.*.txt',
"SEPARATOR" ',',
"ROW_SEPARATOR" u&'\000A',
"PARSER" 'CSV',
"FILE_TYPE" 'none',
"SKIP_HEADER" 'true',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"KEY_FILE" '~/config-abfs.yaml',
"REFRESH_CRED" 'yes',
"INGRESS_RECURSIVE_LOOKUP" 'false',
);
# config-abfs.yaml
SharedKey:
KEY: <access-key>
OAuth:
TenantID: <tenant ID>
ClientID: <client ID>
ClientSecret: <client secret>
For example:
# config-abfs.yaml
SharedKey:
KEY: 'XpPQt0TpcjiIyNYaZNIdRMj8A9823r43RKHKPJjfUmk9U/4bop7A6oLLmsGiytbAgRuUrDl3mbXoVZ4cZ4XQQ2g=='
OAuth:
TenantID: '0a998bfc-809a-47c3-b5a2-5b38321e4690'
ClientID: '9bg2680f-dacd-7098-88ab-2f07eda87513'
ClientSecret: 'AbB.ce7Zd~-MZ2_5~_IKv42RtBL8Q71c7a'
For reading files from HDFS, the URI format is:
hdfs://hostname[:port][absolute-path]
For example:
*hdfs://sample-mst-02.cloud.in.com:8020/sample-absolute-path/data*.
SERVER "FILE_VFS_SERVER"
OPTIONS (
"CHARACTER_ENCODING" 'UTF-8',
"FILE_LOCATION" 'hdfs://rafsoak001-mgt-01.cloud.in.guavus.com:2443/shubham/data2',
"FILENAME_PATTERN" '.*\.csv',
"SEPARATOR" ',', "vfs.hdfs.kerberos.enabled" 'true',
"vfs.hdfs.kerberos.keytab" '/tmp/hdfs.headless.keytab',
"vfs.hdfs.kerberos.principal" 'hdfs-rafsoak001@GVS.GGN',
Using the S3 plugin, you can read compressed and un-compressed files from Amazon S3 or other in-cloud or on-premise S3 API compatible sources such as MinIO.
For reading files from over S3, the URI format is:
s3://[Access-Key]:[Secret-Key]@Endpoint-uri-containing-bucket-name-and-region/prefix-for-objects
For example, if:
The URI would be:
s3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By@test-01.s3.us-east-1.amazonaws.com/multi
If:
URI would be
s3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By:us-east-1@test-01.s3.amazonaws.com/multi
NOTE: The endpoint URI must not use the http/https protocol and the components of the URI must not contain any HTML reserved characters. For more information, refer to the section Encoding HTML reserved characters in the URL
CREATE OR REPLACE FOREIGN STREAM "sample"."fs1"
(
"start-time" VARCHAR(32),
"end-time" VARCHAR(32),
"duration" DECIMAL,
"src-addr" VARCHAR(64),
"dst-addr" VARCHAR(64),
"protocol" VARCHAR(32),
"src-port" INTEGER,
"dst-port" INTEGER,
"tcp-flags" VARCHAR(6),
"packets" BIGINT,
"bytes" BIGINT,
"sensor" VARCHAR(64),
"in-if" INTEGER,
"out-if" INTEGER,
"log-time" VARCHAR(32),
"src-asn" BIGINT,
"dst-asn" BIGINT,
"flow-src" VARCHAR(32),
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(128),
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT
)
SERVER "FILE_VFS_SERVER"
OPTIONS (
"CHARACTER_ENCODING" 'UTF-8',
"FILE_LOCATION" 's3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By@test-01.s3.us-east-1.amazonaws.com/multi',
"FILENAME_PATTERN" '.*\.csv',
"SEPARATOR" ',',
"ROW_SEPARATOR" u&'\000A',
"PARSER" 'CSV',
"FILE_TYPE" 'none',
"SKIP_HEADER" 'false',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"STARTING_POSITION" '1600682400:sample_2020-12-25-14-00.gz:300000',
"INGRESS_DELAY_INTERVAL" '60000',
"vfs.s3.useHttps" 'true'
);
For reading files from a remote file system over SFTP, the URI format is:
sftp://username[:password]@hostname[:port]relative-path
For example, if:
user = root,
password = root@123,
hostname = sample-mst-01.cloud.in.guavus.com
path = /root/inputsource
the URI becomes:
sftp://root:root%40123@sample-mst-01.cloud.in.guavus.com/inputsource
Note:
The components of the URI must not contain HTML reserved characters. If they do, they should be percent-encoded as mentioned in the following table. Hence if your password is root\@1, it should be encoded as root%401.
The components of the URI must not contain any HTML reserved characters. In case they do, they should be percent-encoded as shown in the table below. Hence if your password is root\@1, it should be encoded as root%401.
Reserved character | Encoded value |
---|---|
Line Feed | %0A |
Carriage Return | %0D |
space | %20 |
# | %23 |
$ | %24 |
% | %25 |
& | %26 |
/ | %2F |
: | %3A |
; | %3B |
< | %3C |
> | %3E |
? | %3F |
@ | %40 |
[ | %5B |
\ | %5C |
^ | %5E |
] | %5D |
` | %60 |
{ | %7B |
| | %7C |
} | %7D |
~ | %7E |
CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
(
"COL0" INTEGER,
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
) SERVER "FILE_VFS_SERVER"
OPTIONS (
"FILE_LOCATION" 'sftp://guavus:guavus%40123@di001-mst-01.cloud.in.guavus.com/shubham/data',
"SEPARATOR" ',',
"ROW_SEPARATOR" u&'\000A',
"FILENAME_PATTERN" '.*\.csv',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SKIP_HEADER" 'false',
"SORT_FIELD" 'MODIFIED_FILE_TIME',
"INGRESS_DELAY_INTERVAL" '0',
"INGRESS_FILE_SCAN_WAIT" '2000',
"QUOTE_CHARACTER" '"'
);
The components of the URI must not contain any HTML reserved characters. In case they do, they should be percent-encoded as shown in the table below. Hence if your password is root\@1, it should be encoded as root%401.
Reserved character | Encoded value |
---|---|
Line Feed | %0A |
Carriage Return | %0D |
space | %20 |
# | %23 |
$ | %24 |
% | %25 |
& | %26 |
/ | %2F |
: | %3A |
; | %3B |
< | %3C |
> | %3E |
? | %3F |
@ | %40 |
[ | %5B |
\ | %5C |
^ | %5E |
] | %5D |
` | %60 |
{ | %7B |
| | %7C |
} | %7D |
~ | %7E |