You can write data out of s-Server to a file in a Snowflake warehouse. s-Server formats files for Snowflake according to the same principles as it does for writing to the file system. See Writing to the File System for more details. You can specify how files rotate according to time, size, or both, and you can specify how rotated files are named. Currently, the Snowflake ECDA adapter works with CSV only.
The minimum credentials required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.
To write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. See Writing to Snowflake Using SQL below. To write from remote locations, you configure such options using a properties file and launch the agent at the command line. See Writing to Snowflake Using the ECD Agent below. Many of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the <%SQLRF_TITLE%> has a complete list of options for the ECD adapter.
For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. Some of the options for the ECD adapter and agent are common to all I/O systems. The CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide has a complete list of options for the ECD adapter.
Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
To write to Snowflake, you need to create a foreign stream in SQL that references a prebuilt server object called SNOWFLAKE_SERVER. In the foreign stream's options, you configure how s-Server connects to Snowflake. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.
You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.
Streams, like most SQL objects (but unlike data wrappers and servers), must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named SnowflakeWriterSchema.SnowflakeWriterStream.
CREATE OR REPLACE SCHEMA SnowflakeWriterSchema; SET SCHEMA 'SnowflakeWriterSchema'; CREATE FOREIGN STREAM SnowflakeWriterSchema.SnowflakeWriterStream ( "id" INTEGER, "shift_no" DOUBLE, "reported_at" TIMESTAMP NOT NULL, "trip_no" VARCHAR(10), "latitude" DOUBLE, "longitude" DOUBLE, "speed" INTEGER, "bearing" INTEGER, "active" BOOLEAN) SERVER SNOWFLAKE_SERVER OPTIONS ( --credentials for Snowflake table "USER" 'myname', "PASSWORD" 'password', "ACCOUNT" 'sqlstream', "WAREHOUSE" 'DEMO_WH', "DB" 'TEST_DB', "SCHEMA" 'PUBLIC', "DTABLE" 'demo', --formatting for CSV data "FORMATTER" 'CSV', "CHARACTER_ENCODING" 'UTF-8', "QUOTE_CHARACTER" '', "SEPARATOR" ',', "WRITE_HEADER" 'false', "DIRECTORY" '/tmp', "ORIGINAL_FILENAME" 'bus-output.csv', "FILENAME_PREFIX" 'output-', "FILENAME_SUFFIX" '.csv', "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss', "FILE_ROTATION_SIZE" '20K' "FORMATTER_INCLUDE_ROWTIME" 'true');
To begin writing data to Snowflake, you INSERT into SnowflakeWriterSchema.SnowflakeWriterStream. When SnowflakeWriterSchema.SnowflakeWriterStream receives rows, s-Server writes data to the Snowflake warehouse you have configured in the foreign stream options.
In most cases, you will want to set up a pump that writes data to SnowflakeWriterSchema.SnowflakeWriterStream. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
You do so with code along the following lines:
CREATE OR REPLACE SCHEMA Pumps; SET SCHEMA 'Pumps'; CREATE OR REPLACE PUMP writerPump STOPPED AS --We recommend creating pumps as stopped --then using ALTER PUMP "Pumps"."writerPump" START to start it INSERT INTO SnowflakeWriterSchema.SnowflakeWriterStream SELECT STREAM * FROM "MyStream"; --where "MyStream" is a currently existing stream
To start writing data, use the following code:
ALTER PUMP Pumps.writerPump START;
|ACCOUNT||The name assigned to your account by Snowflake.|
|USER||The user name for your Snowflake account.|
|PASSWORD||The password for your Snowflake account.|
|DB||The database to write to in Snowflake. This should be an existing database for which user/password has privileges.|
|SCHEMA||The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.|
|WAREHOUSE||The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.|
|DTABLE||The table to write to in Snowflake. This should be an existing table for which user/password has privileges.|
|OPTIONS_QUERY||Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as inselect lastOffset as STARTING_OFFSET from TEST.committedOffset.For more details, see the topic, Using Options Query in Integrating Kafka at https://docs.sqlstream.com/integrating-sqlstream/kafka/.|
s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE, FILE_ROTATION_ROWCOUNT, and FILE_ROTATION_RESPECT_ROWTIME.
File rotation options work together in the following ways:
Rotate the file if (the FILE_ROTATION_ROWTIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied OR the FILE_ROTATION_ROWCOUNT condition is satisfied) AND (the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied).
You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.
You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation."
Note: You cannot use FILE_ROTATION_SIZE for writing to Hive tables.
When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte. To do so, you set FILE_ROTATION_RESPECT_ROWTIME to true (this is the default behavior).
(new in s-Server version 6.0.1) You can use FILE_ROTATION_ROWCOUNT to rotate files based the number of rows contained in the file. Expressed as a positive integer. Defaults to 0. This means "don't use row count to determine when to rotate to the next file." If > 0, then rotate to the next file (or directoryTree, in the case of writing to Hive tables) after FILE_ROTATION_ROWCOUNT rows have been written. If FILE_ROTATION_RESPECT_ROWTIME is true, then delay that rotation until ROWTIME advances.
As with FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. For example, if you have set FILE_ROTATION_ROWCOUNT to 1,000,000, and a new row arrives that causes the file to go over the 1,000,000 row threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. To do so, you set FILE_ROTATION_RESPECT_ROWTIME to true (this is the default behavior).
You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.
At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.
If you set ORIGINAL_FILENAME, then the value of ORIGINAL_FILENAME will serve as a temporary file name to use while data is being actively written.
When this file is rotated out, rotated file are named as follows:
If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form
Note: if you are writing from a table, and do note specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.
When this file is rotated out, rotated file are named as follows:
For example, the following options:
filename_prefix 'test-', filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
And the following options:
filename_date_format 'yyyy-MM-dd_HH:mm:ss', --note that you need to specify a period for suffix if desired. filename_suffix '.csv',
produce rotating file names like this:
You can use the ECD agent to write files to Snowflake from remote locations. See Writing Data to Remote Locations for more details. The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.
Note: Before using the ECD agent, you need to create a source stream for it. In the below example, you would need to create the foreign stream "HADOOPWRITERSTREAM."
Sample Property File for ECD Agent
SCHEMA_NAME="SnowflakeWriterSchema" TABLE_NAME="SnowflakeWriterStream" USER=myname PASSWORD=password ACCOUNT=sqlstream WAREHOUSE=DEMO_WH DB=TEST_DB SCHEMA=PUBLIC DTABLE=demo --formatting for CSV data FORMATTER=CSV CHARACTER_ENCODING=UTF-8 QUOTE_CHARACTER= SEPARATOR=, WRITE_HEADER=false DIRECTORY=/tmp ORIGINAL_FILENAME=bus-output.csv FILENAME_PREFIX=output- FILENAME_SUFFIX=.csv FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss FILE_ROTATION_SIZE=20K' FORMATTER_INCLUDE_ROWTIME=true
To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line:
$ ./commondataagent.sh --output --props sample.test.properties --io snowflake