s-Server's Avro formatter writes data from s-Server rows into Apache Avro data files based on options supplied through the options clause of a foreign table or foreign stream. To format data as Avro, you need to set FORMATTER to AVRO and provide a location for the Avro schema file. You can do so in either foreign stream options or agent options. (When Avro data is written, its schema is always written with it.)
For more information on Avro schemas, see https://avro.apache.org/docs/current/#schemas.
Limitations of Avro formatter:
When using SQL, the Avro formatter converts rows to Avro that you define in a foreign table or foreign stream, outputting these rows to a data writer that you have defined for the foreign table or stream. When you INSERT into the foreign table or stream, s-Server begins writing Avro to the data writer that you specify.
When using the ECD agent, the Avro converts rows that you define in a properties file to Avro, outputting these rows to a data writer that you have defined for the foreign table or stream.
For a list of writers, and details on how to use them, see the Writing to Other Destinations.
For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.
To write Avro data, you create a foreign table or stream that references one of s-Server's prebuilt server objects. Like all tables and streams, foreign tables and streams must be created within a schema. The example below creates and sets a schema called avro_data and creates a foreign stream called avro_sink that writes data to the file system.
To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.
CREATE OR REPLACE SCHEMA avro_data; SET SCHEMA 'avro_data'; CREATE OR REPLACE FOREIGN STREAM avro_sink ( KAFKA_KEY VARBINARY(128), KAFKA_PARTITION INTEGER, "boolean_value" BOOLEAN, "char_value" CHAR(15), "date_value" DATE, "decimal_value" DECIMAL(8,2), "double_value" DOUBLE, "real_value" REAL, "time_value" TIME, "timestamp_value" TIMESTAMP, "tinyint_value" TINYINT, "varchar_value" VARCHAR(15) ) SERVER Kafka10Server OPTIONS ( FORMATTER 'AVRO', DATE_FORMAT 'yyyy-MM-dd HH:mm:ss.SSS', AVRO_SCHEMA_LOCATION '/home/my-location/schema_value_types.avsc', "metadata.brokers.list" 'localhost:9092', "TOPIC" 'test' );
To actually write data, you need to write a pump containing an INSERT statement along the following lines. For more information on pumps, see the topic CREATE PUMP in the Guavus s-Server Streaming SQL Reference Manual.
CREATE OR REPLACE PUMP writer_pump STOPPED AS INSERT INTO avro_sink ( "id", "reported_at", "shift_no", "trip_no", "route_variant_id", "waypoint_id", "last_known_location_state" ) SELECT STREAM "id", "reported_at", "shift_no", "trip_no", "route_variant_id", "waypoint_id", "last_known_location_state" from "buses_stream"; --this assumes that a stream called "buses_stream" exists in the same schema
|FORMATTER||This needs to be AVRO.|
|AVRO_SCHEMA_LOCATION||Required option to specify the path for the schema file to be used for formatting the Avro payload.|
|FORMATTER_INCLUDE_ROWTIME||Whether or not to include rowtime when writing data. Defaults to 'true'.|
You can use the ECD agent to Avro Data to remote locations. See Writing Data to Remote Locations for more details.
The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.
The properties below write Avro to the file system.
# Column types for the source stream ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7) FORMATTER=Avro CHARACTER_ENCODING=UTF-8 DIRECTORY=/home/guavus/output ORIGINAL_FILENAME=stocks-output.avro FILENAME_PREFIX=output- FILENAME_SUFFIX=.avro FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss FILE_ROTATION_SIZE=20K FORMATTER_INCLUDE_ROWTIME=true
To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line
$ ./commondataagent.sh --output --props sample.properties --io file