Formatting Data as Avro

s-Server's Avro formatter writes data from s-Server rows into Apache Avro data files based on options supplied through the options clause of a foreign table or foreign stream. To format data as Avro, you need to set FORMATTER to AVRO and provide a location for the Avro schema file. You can do so in either foreign stream options or agent options. (When Avro data is written, its schema is always written with it.)

For more information on Avro schemas, see https://avro.apache.org/docs/current/#schemas.

Limitations of Avro formatter:

  • The AVRO formatter does not support nested records or arrays in schemas. That is, the Avro schema needs to be flat.
  • If the schema contains nested records/arrays, the results are unspecified.

When using SQL, the Avro formatter converts rows to Avro that you define in a foreign table or foreign stream, outputting these rows to a data writer that you have defined for the foreign table or stream. When you INSERT into the foreign table or stream, s-Server begins writing Avro to the data writer that you specify.

When using the ECD agent, the Avro converts rows that you define in a properties file to Avro, outputting these rows to a data writer that you have defined for the foreign table or stream.

For a list of writers, and details on how to use them, see the Writing to Other Destinations.

For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

Using SQL to Write Avro

To write Avro data, you create a foreign table or stream that references one of s-Server's prebuilt server objects. Like all tables and streams, foreign tables and streams must be created within a schema. The example below creates and sets a schema called avro_data and creates a foreign stream called avro_sink that writes data to the file system.

To write data over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See Writing to Other Destinations for more details.

CREATE OR REPLACE SCHEMA avro_data;
SET SCHEMA 'avro_data';

   CREATE OR REPLACE FOREIGN STREAM avro_sink
   (
     KAFKA_KEY VARBINARY(128),
     KAFKA_PARTITION INTEGER,
     "boolean_value" BOOLEAN,
     "char_value" CHAR(15),
     "date_value" DATE,
     "decimal_value" DECIMAL(8,2),
     "double_value" DOUBLE,
     "real_value" REAL,
     "time_value" TIME,
     "timestamp_value" TIMESTAMP,
     "tinyint_value" TINYINT,
     "varchar_value" VARCHAR(15)
   )
   SERVER Kafka10Server
   OPTIONS
   (
       FORMATTER 'AVRO',
       DATE_FORMAT 'yyyy-MM-dd HH:mm:ss.SSS',
       AVRO_SCHEMA_LOCATION '/home/my-location/schema_value_types.avsc',
       "metadata.brokers.list" 'localhost:9092',
       "TOPIC" 'test'
   );

To actually write data, you need to write a pump containing an INSERT statement along the following lines. For more information on pumps, see the topic CREATE PUMP in the Guavus s-Server Streaming SQL Reference Manual.

CREATE OR REPLACE PUMP writer_pump STOPPED AS
INSERT INTO avro_sink
(
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Foreign Stream Options for Formatting Avro Data

Option Definition
FORMATTER This needs to be AVRO.
AVRO_SCHEMA_LOCATION Required option to specify the path for the schema file to be used for formatting the Avro payload.
FORMATTER_INCLUDE_ROWTIME Whether or not to include rowtime when writing data. Defaults to 'true'.
STARTING_OUTPUT_ROWTIME It is specified using ISO 8601. If this is set , then all rows with ROWTIME less than the specified value are silently dropped.

Using the ECD Agent to Write Avro

You can use the ECD agent to Avro Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

The properties below write Avro to the file system.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=Avro
CHARACTER_ENCODING=UTF-8
DIRECTORY=/home/guavus/output
ORIGINAL_FILENAME=stocks-output.avro
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.avro
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K
FORMATTER_INCLUDE_ROWTIME=true

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.properties --io file