Parsing ProtoBuf

You can use s-Server's Extensible Common Data framework to parse files developed with the Google protocol buffer compiler. These files are compiled from .proto files. Using the ECD framework, you create a foreign stream and the ECD framework converts ProtoBuf messages into columns, which can be specified explicitly. You only choose columns for messages that you want to parse.

Before using the ECD parser for protobuf, you need to first write .proto files and the compile them using the Java compiler that Google provides. See https://developers.google.com/protocol-buffers/ for more details.

The Google protocol buffer compiler lets you define optional, repeated, and required messages. In parsing these message types, the ECD framework allows a single repeated inner message field. If you specify a a message as repeated, then one row will be output per repetition. All non repeated fields will be duplicated between those rows.

You also need to set up a server object for the data source. This can be a file, a Kafka topic, an AMQP message bus, a network socket, or an MQSeries topic/queue.

Note: You can also input data in larger chunks and parse it later using the Parser UDX ). This UDX calls the parsers listed above in a function. For more information on using functions, see the topic Transforming Data in s-Server.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide.

Proto File Used in Example Below

Our example for s-Server uses a simple .proto file, which appears below. package protobuf_data_definition;

option optimize_for = SPEED;
option java_package = "com.protobuf.types";
option java_outer_classname = "protobuf.PackageProto";

message protobufPackage {
-- ID 1 is reserved in case we ever want to add a self-describing attribute
-- timestamp in millis
optional uint64 timestamp = 2;
-- accountNumber
optional string accountNumber = 3;
-- sourceIP
optional string sourceIP = 4;
-- loginSuccessful
optional boolean loginSuccessful = 5;
-- user customerId
optional string customerId = 6;
}

Sample Foreign Stream for ProtoBuf

In order to access data from an external source using the Extensible Common Data Adapter, you need to create a special kind of stream called a - foreign stream. Once you create this stream, you can query it in s-Server, as you would any other table or stream. Options in the stream specify options specific to the format type.

Note: Use the ECD adapter when you are accessing a data source on the same machine. For data sources in a remote location, use the ECD agent. See the topic - Extensible Common Data Agent Overview for more details.

You indicate column names when you set up the stream, as in the following example, which creates a stream with the column names ts, accountNumber, sourceIP, loginSuccessful, and customerId. These five columns will be populated with messages from the ProtoBuf file. In the foreign stream below, these columns are explicitly assigned data from a path within the schema using the <column name>_PATH option.

Note: All specified stream columns must map onto protocol buffer messages. (Some columns are specified and populated by the data source, such as OFFSET for a Kafka source or FILE_NAME for a file source.)

Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called ProtoBufSchema, creates a server object using a Kafka topic as a data source, and creates a foreign stream called ProtoBufTest, which specifies a server called KafkaServer, as well as options specific to both the ProtoBuf parser and the Kafka server. Note that the options topic and Starting_Time are both specific to Kafka. To parse ProtoBuf over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats.

CREATE OR REPLACE SCHEMA 'ProtoBufSchema'
SET SCHEMA 'ProtoBufSchema';

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM ProtoBufTest
('"ts" TIMESTAMP,
"accountNumber" INTEGER,
"sourceIP" VARCHAR(32),
"loginSuccessful" BOOLEAN,
"customerId" INTEGER,)--Columns for the new stream
)
SERVER KafkaServer
OPTIONS
(topic 'test', --Kafka topic
"STARTING_TIME" 'latest', --Time to start reading Kafka topic.
--Options are LATEST, EARLIEST,
--or a long int representing a timestamp.
--Defaults to LATEST.
"SEED_BROKERS" 'localhost',
"PORT" '9092', --Host and port information for Kafka server
parser 'PROTOBUF', --Tells ECD adapter to parse files as ProtoBuf
ts_PATH 'mytable.timestamp', --Schema path that maps to the
--foreign stream column "ts"
--Note how this corresponds to .proto file above.
--The same pattern holds for the next four options.
accountNumber_PATH 'mytable.accountNumber',
sourceIP_PATH 'mytable.sourceIP',
loginSuccessful_PATH 'mytable.loginSuccessful',
customerId_PATH 'mytable.customerId',
SCHEMA_JAR 'unitsql/concurrent/plugins/common/protobufData/protobufpackage.jar',
--JAR generated with Google Protocol Buffer compiler.
SCHEMA_CLASS 'protobuf.PackageProto.protobuf.PackageProto$protobufPackage'
--Outer package generated with Google Protocol Buffer
--compiler. $ separates inner from outer package.
--Note how these correspond to .proto file above.
);

Foreign Stream Options for Parsing ProtoBuf

Option Definition
PARSER \'PROTOBUF\' Required. Indicates that ECD parser will parse files as protobuf.
SCHEMA_JAR Required. Jar containing compiled java classes created with the Google protocol buffer compiler (protoc command), such asunitsql/concurrent/plugins/common/protobufData/protobufpackage.jar. Locations are relative to \$SQLSTREAM_HOME.

See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/5.0.XXX/s-Server.
SCHEMA_CLASS Required. Class name of outer protobuf record created with the Google protocol buffer compiler (protoc command), such as protobuf.PackageProto.protobuf.PackageProto\$protobufPackage. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.
<column name>_PATH Not required. Lets you specify a path within the schema that maps to a column in the foreign stream. If these are not specified, s-Server populates a column using a field with the same name in the outer level record .
MESSAGE_STREAM True or false (defaults to false). Indicates whether or not to treat messages as continuous stream. If MESSAGE_STREAM is true then protobuf messages will be sent to s-Server as they're received. This could make a difference for sources, such as files or sockets, which don't necessarily deliver data in complete chunks.Note: If MESSAGE_STREAM is true, then all outer fields used must have an index less than any repeated field used.
MESSAGE_LENGTH_PREFIXED True or false (default is false). Indicates whether or not are all records prefixed with a length field. Must be specified if MESSAGE_STREAM is set.