Integrating Files using VFS

This topic contains the following subtopics:

Introduction

The File-VFS source plugin is based on the Apache Commons VFS library, which provides support for multiple sources and compression formats. The file systems that are currently supported by this plugin, and their URI formats to be used in specifying the FILE_LOCATION option and their respective options are as follows:

This plugin also supports reading data in uncompressed and Gzip compressed formats.

Foreign Stream Options for Reading from Files using VFS

Option Description Default
FILE_LOCATION (Mandatory). Represents the URI of target directory containing the files to be read - for example file:///tmp/newdirectory represents a directory on the local file system. For more information see URI Formats Supported by FILE-VFS.
FILENAME_PATTERN (Mandatory). A Java regular expression match filter selecting which files to be read from the FILE_LOCATION directory. When the SORT_FIELD option has the value TIME_IN_FILENAME or LEXICAL_IN_FILENAME the first capture group in the regex will be used to identify the sub-regex on which the files will be sorted.
Example:
buses_(\d{14})_[a-z]+.* - files matching this regex will be picked and the group \d{14} (14 digits) will be used to sort files.
FILE_TYPE The type of file you wish to read. Supported options:
  • none : for uncompressed files [Default]
  • gzip : for GZip compressed files
none
SORT_FIELD The file sorting mechanism you wish to use:
  • MODIFIED_FILE_TIME: Sort the files on the basis of the file’s modified time. If two files have the same modified time the files will be sorted lexicographically [Default]
  • TIME_IN_FILENAME: Sort the files on the basis of the time mentioned in the file name. The first group of the regex (as mentioned in FILENAME_PATTERN) will be used to isolate the timestamp in the filename and FILENAME_TIME_FORMAT will be used to specify the format of the timestamp. If two files have the same time the files will be sorted lexicographically
  • LEXICAL_IN_FILENAME: Sort the files on the basis of a substring in the file name. The first group of the regex (as mentioned in FILENAME_PATTERN) will be used to isolate the substring in the filename.
See file-based T-sort for more information.
MODIFIED_FILE_TIME
FILENAME_TIME_FORMAT The format of the timestamp in case the SORT_FIELD option is set to TIME_IN_FILENAME.

NOTE: This option is mandatory while using TIME_IN_FILENAME.
STARTING_FILE_NAME_AND_OFFSET Reads will start with the given starting file name and its offset value when SORT_FIELD option is set to LEXICAL_IN_FILENAME. The default value EARLIEST means all files will be picked. For example:
  • abc.txt:20 will read from line 20 in file abc.txt
  • abc.txt will read from the start of file abc.txt
NOTE: Ensure the starting file exists when STARTING_FILE_NAME_AND_OFFSET option is used.
EARLIEST
STARTING_TIME The starting time in case the SORT_FIELD option is set to TIME_IN_FILENAME - in the format yyyy-MM-dd HH:mm:ss.SSS. 1752-09-14 00:00:00.000.
STARTING_POSITION The starting position value for file read in case the option SORT_FIELD is set to MODIFIED_FILE_TIME. The format for STARTING_POSITION is modified_time_epoch[:file_name[:offset]] - file_name and offset are optional.
For example,
  • 1609423921 will pick all files with modified time matching epoch 1609423921 or later
  • 1609423921:abc.txt will ignore modified time and will pick file abc.txt or later
  • 1609423921:abc.txt:10 will ignore modified time and will pick file abc.txt and start from line number 11 (line number starts from 0)
1752-09-14 00:00:00.000
INGRESS_NUM_PARTITIONS Used in scaling. Denotes the number of logical partitions into which you want to divide your input data . A single partition is defined as the logical grouping of input data files based on hash-computation. Files having the same partition value belong to the same partition. Partition value for a file is computed as: hash(filename)%INGRESS_NUM_PARTITIONS; see Scaling in File-VFS for more information.
INGRESS_ASSIGNED_PARTITIONS Used in scaling. This identifies which partitions that are assigned to the current shard. A shard only processes these partitions and skips the data from other partitions. These are 0 indexed and can be comma-separated values (0,1,2) or range-based (4:7 which is equivalent to 4,5,6) or a mix of both (0,1,4:7 which is equivalent to 0,1,4,5,6) or all(:).
SHARD_ID Used in scaling. This is an optional parameter and used only in scenarios when you are running multiple shards of a pipeline. This parameter identifies the current shard, counting from 0.
INGRESS_DELAY_INTERVAL The period (in milliseconds) for file-based T-sort to wait for late files - applies only if SORT_FIELD is set to MODIFIED_FILE_TIME or TIME_IN_FILENAME. Valid values are:
-1disable file-based t-sort completely
0 Do not wait - late files will be dropped (the default)
>= 1wait for this interval in milliseconds
0
INGRESS_FILE_SCAN_WAIT The duration (in milliseconds) in which the reader thread checks the Queue for new files to be read. 2000.
IGNORE_CORRUPT_FILES In case of any fatal IOExceptions that may come up while reading from a file, should the VFS reader ignore the file? If true, the corrupt file is skipped else the pump is terminated and no more data will flow. False
NUMBER_OF_BUFFERS The number of buffers the File-VFS plugin can use to read data before it needs to recycle the buffers. Increasing the number of buffers might increase the plugin’s performance at the cost of memory. 2
INGRESS_RECURSIVE_LOOKUP Enable / disable recursive file lookups within subdirectories of the provided directory.

When the flag is 'true' the plugin checks each directory entry to distinguish files from sub-directories. If you do not need to include files from child sub-directories, and if you know that all the directory entries that match FILENAME_PATTERN will be files you can safely set INGRESS_RECURSIVE_LOOKUP 'false' to disable this check and save the latency of a round trip to the remote file system for each file in the FILE_LOCATION directory
true

Depending on the settings of SORT_FIELD and INGRESS_DELAY_INTERVAL, the file-based T-sort feature may come into operation.

File system specific options

Option File System Default Description
"vfs.hdfs.kerberos.enabled" HDFS 'false' In order to read from HDFS with kerberos authentication, the value for this option should be 'true'
"vfs.hdfs.kerberos.keytab" HDFS no default value In order to read from HDFS with kerberos authentication, the user must provide the location of the keytab here as a string
"vfs.hdfs.kerberos.principal" HDFS no default value In order to read from HDFS with kerberos authentication, the user must provide the value of the principal here as a string
"vfs.s3.useHttps" S3 'true' In order to read from s3 or minio over HTTP (and not HTTPS) the value for this option should be 'false'
AUTH_TYPE_ADLS ADLS Gen2 no default value (Mandatory when using ADLS Gen2). Specify the authentication mechanism. Supported types are 'SharedKey' and 'OAuth'. OAuth2.0 support is provided via Client Credentials
KEY_FILE ADLS Gen2 no default value (Mandatory when using ADLS Gen2). Provides path to the configuration file containing credentials for the appropriate authentication mechanism. Example here
REFRESH_CRED ADLS Gen2 no default value (Optional). Supports values 'yes' and 'no' (case insensitive). When set to yes, any changes made to the KEY_FILE will be used to establish a new connection with ADLS Gen2. If set to no or not provided changed credentials in the KEY_FILE won’t be used to establish connection.

Provenance Columns

When reading from files, you can define provenance columns. These return metadata for the file from which you are reading.

Option Data type Description
SQLSTREAM_PROV_FILE_SOURCE_FILE VARCHAR Adds the name of the file being read to the output columns
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER BIGINT Adds the current line number to the output columns. Line number starts from 0.
SQLSTREAM_PROV_FILE_MODIFIED_TIME BIGINT Adds the modified time of the file being read to the output columns.
SQLSTREAM_PROV_INGRES_PARTITION_ID INT Adds the partition from which this row was read, calculated by hashing the incoming filename as described in scaling.

Scaling Out / Sharding pipeline instances in File-VFS

Sharding is the practice of running multiple instances of SQLstream pipelines, each using the same SQL code but configured so that each shard processes a subset of the data.

Term Description
Partition A single partition is defined as a logical grouping of input data files based on hash-computation. The files having same partition value belong to the same partition. The Partition for a file is computed as: hash(filename) % total_num_partitions. Here, total_num_partitions is the total number of partitions that you want to divide your data into. See the glossary term partition for more information.
Shard The s-Server process launched for processing input data. See the glossary term shard for more information.

Let’s say you divide the input data into P logical partitions. A hash is computed for each arriving input file (on the basis of its name) as it arrives and the file is mapped to a particular partition, by taking the modulo of the hash with the number of partitions. The number of partitions is static, hence, the file to partition mapping is always fixed.

NOTES:

  • There is no physical partitioning occurring in the input source; the partitioning that we are talking about is entirely logical.
  • It is the developer's responsibility to ensure that each shard is configured correctly (by setting SHARD_ID, INGRESS_NUM_PARTITIONS and INGRESS_ASSIGNED_PARTITIONS). The developer must ensure that across all instances, each partition is assigned to only one instance.
  • Although it is normal to map each shard to a separate s-Server installation, it is also possible to map multiple shards onto a single s-Server by running the same pipeline in multiple different schemas.
  • Each shard reads all the filenames of the source data as files arrive, computes the partition number for each input file (Hash(filename) % P) and only processes the file if this computed partition is one of those that are assigned to this shard. Otherwise the current shard skips the file.

In a sharded pipeline the sharding / partitioning options will be set as follows:

Option Notes
SHARD_ID the shard id of the current shard. No two shards should have the same SHARD_ID.
INGRESS_NUM_PARTITIONS the total number of partitions across the entire data set; all shards must use the same value for this option.
INGRESS_ASSIGNED_PARTITIONS the specific partitions that are assigned to the current shard; any given partition should only be allocated to a single shard.

Normally the values for these three options will be assigned as the pipeline starts by external configuration and passed to the foreign stream using an OPTIONS_QUERY, which overrides any options set in the CREATE FOREIGN STREAM statement.

Example SQL pipeline for File-VFS to Kafka

Here is a SQL example of a sharded pipeline; it is assumed that we are working with 4 partitions and there is only a single instance running with SHARD_ID = 0.

NOTES:

  • The SHARD_ID for the Kafka sink is set to be the same as the SHARD_ID in the file source.
  • The watermark from the source is stored into the sink by setting the SQLSTREAM_EGRESS_WATERMARK column.
CREATE OR REPLACE SCHEMA "sample";
ALTER PUMP "sample".* STOP;
ALTER STREAM "sample".* RESET;

-- model of the START_POSITION_QUERY showing the meaning of each parameter
--
-- CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS
-- SELECT * FROM (
--      VALUES(sys_boot.mgmt.watermark_string('bootstrap-servers', 'topic_name', 'consumerGroupPrefix','assignedPartitions',totalPartitions))
--      ) AS options(STARTING_POSITION);

CREATE OR REPLACE VIEW "sample"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('my.kafka.example.com:6667', 'demo', '0_',':',4))) AS options(STARTING_POSITION);

CREATE OR REPLACE FOREIGN STREAM "sample"."fs"
(
    "event_time" TIMESTAMP,
    "Col1" INTEGER,
    "Col2" INTEGER,
    "Col3" INTEGER,
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
    "SQLSTREAM_PROV_INGRES_PARTITION_ID" VARCHAR(256)
)
SERVER "FILE_VFS_SERVER"
OPTIONS (
    "PARSER" 'CSV',
    "SEPARATOR" ',',
    "QUOTE_CHARACTER" '"',
    "ROW_SEPARATOR" u&'\000A',
    "CHARACTER_ENCODING" 'UTF-8',
    "SKIP_HEADER" 'false',
    "FILE_LOCATION" 'file:///path/to/demo/',
    "FILENAME_PATTERN" '.*',
    "SORT_FIELD" 'MODIFIED_FILE_TIME',
    "INGRESS_DELAY_INTERVAL" '0',
    "INGRESS_FILE_SCAN_WAIT" '2000',
    "INGRESS_NUM_PARTITIONS" '4',
    "INGRESS_ASSIGNED_PARTITIONS" ':',
    "SHARD_ID" "0",
    "INGRESS_RECURSIVE_LOOKUP" 'false',
    "OPTIONS_QUERY" 'SELECT * FROM "sample".START_POSITION_QUERY'
);

CREATE OR REPLACE VIEW "sample"."v1" AS SELECT STREAM
    "event_time",
    "Col1",
    "Col2",
    "Col3",
    CAST("SQLSTREAM_PROV_INGRES_PARTITION_ID" AS VARCHAR(50)) || ',' ||
        CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(50)) || ':' ||
        "SQLSTREAM_PROV_FILE_SOURCE_FILE" || ':' ||
        CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50))
            as "SQLSTREAM_EGRESS_WATERMARK"
FROM "sample"."fs" AS "input";

CREATE OR REPLACE VIEW "sample"."v2" AS
SELECT STREAM *
FROM "sample"."v1";

CREATE OR REPLACE FOREIGN STREAM "sample"."out_fs"
(
	"event_time" TIMESTAMP,
	"Col1" INTEGER,
	"Col2" INTEGER,
	"Col3" INTEGER,
	"SQLSTREAM_EGRESS_WATERMARK" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'JSON',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "bootstrap.servers" 'my.kafka.example.com:6667',
    "linger.ms" '10',
    "batch.size" '10000',
    "compression.type" 'gzip',
    "TOPIC" 'demo',
    "transaction.timeout.ms" '60000',
    "transactional.id" 'auto',
    "TRANSACTION_ROWTIME_LIMIT" '60000',
    "transaction.enable.preemptive.commit" 'false',
    "transaction.first.preemptive.commit.check.percentage" '70',
    "SHARD_ID" "0"
);

CREATE OR REPLACE PUMP "sample"."testpump" STOPPED AS
    INSERT INTO "sample"."out_fs"
    SELECT  STREAM "event_time","Col1","Col2","Col3","SQLSTREAM_EGRESS_WATERMARK"
    FROM "sample"."v2" AS "input";

ALTER PUMP "sample"."testpump" START;

Local File System

URI format for Local File System

For reading from the local file system, the URI format is file:// from which the files must be read:

file:///absolute/path/to/directory

For example:

"FILE_LOCATION" 'file:///sample_directory/data' 

Note: When the file:// prefix is used, the leading / of the absolute path makes a total of 3 forward slash characters.

Sample SQL for Local File System

CREATE OR REPLACE FOREIGN STREAM "sample"."fs" 

( "COL0" INTEGER,   
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),   
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,   
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT 

) SERVER "FILE_VFS_SERVER"OPTIONS  

("FILE_LOCATION" 'file:///shubham/data',   
"SEPARATOR" ',',   
"ROW_SEPARATOR" u&'\000A',  
"FILENAME_PATTERN" '.*\.csv',  
"PARSER" 'CSV',  
"CHARACTER_ENCODING" 'UTF-8', 
"SKIP_HEADER" 'false',  
"SORT_FIELD" 'MODIFIED_FILE_TIME',  
"INGRESS_DELAY_INTERVAL" '0',  
"INGRESS_FILE_SCAN_WAIT" '2000',   
"QUOTE_CHARACTER" '"',
"INGRESS_NUM_PARTITIONS" '1',
"INGRESS_ASSIGNED_PARTITIONS" ':',
"SHARD_ID" '',
"INGRESS_RECURSIVE_LOOKUP" 'true',
); 

ADLS Gen2

ABFS URI Format

ABFS (Azure Blob File System) is a connector used to connect to Azure Data Lake Storage Gen2. It is available as an extension of the Hadoop File system.

For reading files from an ADLS Gen2 account, the URI format is:

abfs://container@storage-account.dfs.core.windows.net/

A sample URI is abfs://sample-file-system@sample-account.dfs.core.windows.net/

  • sample-file-system is the name of the container created in ADLS Gen2.
  • sample-account is the name of the storage account.

ABFS SQL Examples

SharedKey Example

CREATE OR REPLACE SCHEMA "abfs";

CREATE OR REPLACE FOREIGN STREAM "abfs"."abfs_source"
(         
  "Str1" VARCHAR(30),         
  "Str2" VARCHAR(30),         
  "Str3" VARCHAR(30),         
  "int1" INTEGER,         
  "int2" INTEGER,         
  "int3" INTEGER,         
  "int4" INTEGER,         
  "doub1" DOUBLE,         
  "doub2" DOUBLE,         
  "lon1" BIGINT,         
  "lon2" BIGINT
)
SERVER "FILE_VFS_SERVER"
OPTIONS
(         
  "CHARACTER_ENCODING" 'UTF-8',         
  "FILE_LOCATION" 'abfs://container-name@account-name.dfs.core.windows.net/folder/',         
  "AUTH_TYPE_ADLS" 'SharedKey',         
  "FILENAME_PATTERN" '.*.txt',         
  "SEPARATOR" ',',         
  "ROW_SEPARATOR" u&'\000A',         
  "PARSER" 'CSV',         
  "FILE_TYPE" 'none',         
  "SKIP_HEADER" 'true',         
  "SORT_FIELD" 'MODIFIED_FILE_TIME',         
  "KEY_FILE" '~/config-abfs.yaml',         
  "REFRESH_CRED" 'yes',
  "INGRESS_NUM_PARTITIONS" '1',
  "INGRESS_ASSIGNED_PARTITIONS" ':',
  "SHARD_ID" '',
  "INGRESS_RECURSIVE_LOOKUP" 'false',
);

OAuth Example

CREATE OR REPLACE SCHEMA "abfs";

CREATE OR REPLACE FOREIGN STREAM "abfs"."abfs_source"
(         
  "Str1" VARCHAR(30),         
  "Str2" VARCHAR(30),         
  "Str3" VARCHAR(30),         
  "int1" INTEGER,         
  "int2" INTEGER,         
  "int3" INTEGER,         
  "int4" INTEGER,         
  "doub1" DOUBLE,         
  "doub2" DOUBLE,         
  "lon1" BIGINT,         
  "lon2" BIGINT
)
SERVER "FILE_VFS_SERVER"
OPTIONS
(         
  "CHARACTER_ENCODING" 'UTF-8',         
  "FILE_LOCATION" 'abfs://container-name@account-name.dfs.core.windows.net/folder/',         
  "AUTH_TYPE_ADLS" 'OAuth',         
  "FILENAME_PATTERN" '.*.txt',         
  "SEPARATOR" ',',         
  "ROW_SEPARATOR" u&'\000A',         
  "PARSER" 'CSV',         
  "FILE_TYPE" 'none',         
  "SKIP_HEADER" 'true',         
  "SORT_FIELD" 'MODIFIED_FILE_TIME',         
  "KEY_FILE" '~/config-abfs.yaml',         
  "REFRESH_CRED" 'yes',
  "INGRESS_RECURSIVE_LOOKUP" 'false',
);

ABFS Key File

# config-abfs.yaml

SharedKey:
        KEY: <access-key>
OAuth:
        TenantID: <tenant ID>
        ClientID: <client ID>
        ClientSecret: <client secret>

For example:

# config-abfs.yaml

SharedKey:
        KEY: 'XpPQt0TpcjiIyNYaZNIdRMj8A9823r43RKHKPJjfUmk9U/4bop7A6oLLmsGiytbAgRuUrDl3mbXoVZ4cZ4XQQ2g=='
OAuth:
        TenantID: '0a998bfc-809a-47c3-b5a2-5b38321e4690'
        ClientID: '9bg2680f-dacd-7098-88ab-2f07eda87513'
        ClientSecret: 'AbB.ce7Zd~-MZ2_5~_IKv42RtBL8Q71c7a'

HDFS

URI Format for HDFS

For reading files from HDFS, the URI format is:

hdfs://hostname[:port][absolute-path]

For example:

*hdfs://sample-mst-02.cloud.in.com:8020/sample-absolute-path/data*.    

Sample SQL for HDFS

SERVER "FILE_VFS_SERVER" 

OPTIONS (     

"CHARACTER_ENCODING" 'UTF-8', 

"FILE_LOCATION" 'hdfs://rafsoak001-mgt-01.cloud.in.guavus.com:2443/shubham/data2',    
"FILENAME_PATTERN" '.*\.csv',   
"SEPARATOR" ',',    "vfs.hdfs.kerberos.enabled" 'true',    
"vfs.hdfs.kerberos.keytab" '/tmp/hdfs.headless.keytab',  
"vfs.hdfs.kerberos.principal" 'hdfs-rafsoak001@GVS.GGN', 

S3

Using the S3 plugin, you can read compressed and un-compressed files from Amazon S3 or other in-cloud or on-premise S3 API compatible sources such as MinIO.

URI Format for S3

For reading files from over S3, the URI format is:

s3://[Access-Key]:[Secret-Key]@Endpoint-uri-containing-bucket-name-and-region/prefix-for-objects 

For example, if:

  • Access-Key: ABCEDABCEDABCEDABCED
  • Secret-Key: Sd9ADVCfHEIETyXSXU9niFn9/w8fMkN9lcZpUD+y
  • Endpoint uri : test-01.s3.us-east-1.amazonaws.com
  • Prefix (directory) : multi

The URI would be:

s3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By@test-01.s3.us-east-1.amazonaws.com/multi 

If:

  • Access-Key:ABCEDABCEDABCEDABCED
  • Secret-Key: Sd9ADVCfHEIETyXSXU9niFn9/w8fMkN9lcZpUD+y
  • endpoint uri : test-01.s3.amazonaws.com
  • Region : us-east-1
  • Prefix (directory) : multi

URI would be

s3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By:us-east-1@test-01.s3.amazonaws.com/multi 

NOTE: The endpoint URI must not use the http/https protocol and the components of the URI must not contain any HTML reserved characters. For more information, refer to the section Encoding HTML reserved characters in the URL

Sample SQL for S3

CREATE OR REPLACE FOREIGN STREAM "sample"."fs1"
(
    "start-time" VARCHAR(32),
    "end-time" VARCHAR(32),
    "duration" DECIMAL,
    "src-addr" VARCHAR(64),
    "dst-addr" VARCHAR(64),
    "protocol" VARCHAR(32),
    "src-port" INTEGER,
    "dst-port" INTEGER,
    "tcp-flags" VARCHAR(6),
    "packets" BIGINT,
    "bytes" BIGINT,
    "sensor" VARCHAR(64),
    "in-if" INTEGER,
    "out-if" INTEGER,
    "log-time" VARCHAR(32),
    "src-asn" BIGINT,
    "dst-asn" BIGINT,
    "flow-src" VARCHAR(32),
    "SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(128),
    "SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
    "SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT
)
    SERVER "FILE_VFS_SERVER"
OPTIONS (
    "CHARACTER_ENCODING" 'UTF-8',
    "FILE_LOCATION" 's3://ABCEDABCEDABCEDABCED:Sd9ADVCfHEIETyXSXU9niFn9%2Fw8fMkN9lcZpUD%2By@test-01.s3.us-east-1.amazonaws.com/multi',
    "FILENAME_PATTERN" '.*\.csv',
    "SEPARATOR" ',',
    "ROW_SEPARATOR" u&'\000A',
    "PARSER" 'CSV',
    "FILE_TYPE" 'none',
    "SKIP_HEADER" 'false',
    "SORT_FIELD" 'MODIFIED_FILE_TIME',
    "STARTING_POSITION" '1600682400:sample_2020-12-25-14-00.gz:300000',
    "INGRESS_DELAY_INTERVAL" '60000',
    "vfs.s3.useHttps" 'true'
);

SFTP

URI Format for SFTP

For reading files from a remote file system over SFTP, the URI format is:

sftp://username[:password]@hostname[:port]relative-path 
  1. Password is optional - reading files over SFTP requires either a password in the URI or the .ssh/id_rsa file in the SQLstream user’s home directory (such as /home/sqlstream/) for authentication. In the second case the RSA private key is being used for authentication. Ensure that the user sqlstream has read permissions on .ssh/id_rsa.
  2. Port is optional; if port is not provided, port 22 will be used.
  3. The components of the URI must not contain HTML reserved characters. If they do, they should be percent-encoded as mentioned in the following table.
  4. The directory’s path (on the remote machine) is provided as the relative-path in the URL (relative to the remote user's home directory). To read from /root/inputsource with the username root, the relative path becomes /inputsource since /root is the root user’s home directory.

For example, if:

    user = root,
    password = root@123,
    hostname = sample-mst-01.cloud.in.guavus.com
    path =  /root/inputsource

the URI becomes:

    sftp://root:root%40123@sample-mst-01.cloud.in.guavus.com/inputsource

Note:

  1. Password is optional - reading files over SFTP requires either a password in the URI or the .ssh/id_rsa file in the SQLstream user’s home directory (such as /home/sqlstream/) for authentication. In the second case the RSA private key is being used for authentication. Ensure that the user sqlstream has read permissions on .ssh/id_rsa.
  2. Port is optional; if port is not provided, port 22 will be used.
  3. The directory’s path (on the remote machine) is provided as the relative-path in the URL (relative to the remote user's home directory).
  4. To read from /root/inputsource with the username root, the relative path becomes /inputsource since /root is the root user’s home directory.

The components of the URI must not contain HTML reserved characters. If they do, they should be percent-encoded as mentioned in the following table. Hence if your password is root\@1, it should be encoded as root%401.

Encoding HTML reserved characters in the URL

The components of the URI must not contain any HTML reserved characters. In case they do, they should be percent-encoded as shown in the table below. Hence if your password is root\@1, it should be encoded as root%401.

Reserved character Encoded value
Line Feed %0A
Carriage Return %0D
space %20
# %23
$ %24
% %25
& %26
/ %2F
: %3A
; %3B
< %3C
> %3E
? %3F
@ %40
[ %5B
\ %5C
^ %5E
] %5D
` %60
{ %7B
| %7C
} %7D
~ %7E

Sample SQL For SFTP

CREATE OR REPLACE FOREIGN STREAM "sample"."fs" 

(
"COL0" INTEGER, 
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,    
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
) SERVER "FILE_VFS_SERVER" 

OPTIONS ( 
"FILE_LOCATION" 'sftp://guavus:guavus%40123@di001-mst-01.cloud.in.guavus.com/shubham/data',    
"SEPARATOR" ',',    
"ROW_SEPARATOR" u&'\000A',    
"FILENAME_PATTERN" '.*\.csv',   
"PARSER" 'CSV',    
"CHARACTER_ENCODING" 'UTF-8',    
"SKIP_HEADER" 'false',   
"SORT_FIELD" 'MODIFIED_FILE_TIME',   
"INGRESS_DELAY_INTERVAL" '0',    
"INGRESS_FILE_SCAN_WAIT" '2000',   
"QUOTE_CHARACTER" '"'
); 

Encoding HTML reserved characters in the URL

The components of the URI must not contain any HTML reserved characters. In case they do, they should be percent-encoded as shown in the table below. Hence if your password is root\@1, it should be encoded as root%401.

Reserved character Encoded value
Line Feed %0A
Carriage Return %0D
space %20
# %23
$ %24
% %25
& %26
/ %2F
: %3A
; %3B
< %3C
> %3E
? %3F
@ %40
[ %5B
\ %5C
^ %5E
] %5D
` %60
{ %7B
| %7C
} %7D
~ %7E