Parsing ProtoBuf

You can use s-Server's Extensible Common Data framework to parse data developed with the Google protocol buffer compiler. ProtoBuf is a definition language and storage format for hierarchical data structures called "messages". See https://developers.google.com/protocol-buffers/ for more details.

In an s-Server (relational data) context, these structures can represent single rows or row sets. For ProtoBuf, row parsing is determined by information on the message's schema that you provide to s-Server (using options called SCHEMA_JAR and SCHEMA_CLASS).

Using the ECD framework, you create a foreign stream and the ECD framework converts ProtoBuf messages into columns, which can be specified explicitly.

Note: You can also input data in larger chunks and parse it later using the Parser UDX ). This UDX calls the parsers listed above in a function. For more information on using functions, see the topic Transforming Data in s-Server.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.

Note on Messages Defined as Repeated

The Google protocol buffer compiler lets you define optional, repeated, and required messages. In parsing these message types, the ECD framework allows a single repeated inner message field. If you specify a message as repeated, then one row will be output per repetition. All non repeated fields will be duplicated between those rows.

Parsing ProtoBuf with SQL

In order to access data from an external source using the Extensible Common Data Adapter, you need to create a foreign stream. Once you create this stream, you can query it in s-Server, as you would any other table or stream. Options in the stream specify options specific to the format type.

Note: Use the ECD adapter when you are accessing a data source on the same machine. For data sources in a remote location, use the ECD agent. See the topic Using the Extensible Common Data Framework for more details. For the ECD agent, you configure the ProtoBuf parser using a properties file.

You indicate column names when you set up the stream. The following example creates a stream with the column names ts, accountNumber, sourceIP, loginSuccessful, and customerId. These five columns will be populated with data from the ProtoBuf message. In the foreign stream below, these columns are explicitly assigned data from a path within the schema using the <column name>_PATH option.

Like all streams, foreign streams must be created within a schema. The example below creates and sets a schema called ProtoBufSchema, creates a server object using a Kafka topic as a data source, and creates a foreign stream called ProtoBufTest, which specifies a built in server called KAFKA10_SERVER, as well as options specific to both the ProtoBuf parser and the Kafka server. Note that options such as topic and Starting_Time are both specific to Kafka. To parse ProtoBuf over other input/output systems, such as Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats.

CREATE OR REPLACE SCHEMA 'ProtoBufSchema'
SET SCHEMA 'ProtoBufSchema';

CREATE OR REPLACE FOREIGN STREAM ProtoBufTest
('"ts" TIMESTAMP,
"accountNumber" INTEGER,
"sourceIP" VARCHAR(32),
"loginSuccessful" BOOLEAN,
"customerId" INTEGER,)--Columns for the new stream
)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'test', --Kafka topic
"STARTING_TIME" 'latest', --Time to start reading Kafka topic.
--Options are LATEST, EARLIEST,
--or a long int representing a timestamp.
--Defaults to LATEST.
"SEED_BROKERS" 'localhost',
"PORT" '9092', --Host and port information for Kafka server
parser 'PROTOBUF', --Tells ECD adapter to parse files as ProtoBuf
ts_PATH 'mytable.timestamp', --Schema path that maps to the
--foreign stream column "ts"
--Note how this corresponds to .proto file above.
--The same pattern holds for the next four options.
accountNumber_PATH 'mytable.accountNumber',
sourceIP_PATH 'mytable.sourceIP',
loginSuccessful_PATH 'mytable.loginSuccessful',
customerId_PATH 'mytable.customerId',
SCHEMA_JAR 'unitsql/concurrent/plugins/common/protobufData/protobufpackage.jar',
--JAR generated with Google Protocol Buffer compiler.
SCHEMA_CLASS 'protobuf.PackageProto.protobuf.PackageProto$protobufPackage'
--Outer package generated with Google Protocol Buffer
--compiler. $ separates inner from outer package.
);

Foreign Stream Options for Parsing ProtoBuf

Option Definition
PARSER 'PROTOBUF' Required. Indicates that ECD parser will parse files as protobuf.
SCHEMA_JAR Required. Jar containing compiled java classes created with the Google protocol buffer compiler (protoc command), such asunitsql/concurrent/plugins/common/protobufData/protobufpackage.jar. Locations are relative to $SQLSTREAM_HOME.

See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream//s-Server.
SCHEMA_CLASS Required. Class name of outer protobuf record created with the Google protocol buffer compiler (protoc command), such as protobuf.PackageProto.protobuf.PackageProto\$protobufPackage. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.
<column name>_PATH Not required. Lets you specify a path within the schema that maps to a column in the foreign stream. If these are not specified, s-Server populates a column using a field with the same name in the outer level record .
MESSAGE_STREAM True or false (defaults to false). This option tells the ProtoBuf parser to expect multiple messages within a single payload, each preceded by a bytes field containing their encoded length. When you set MESSAGE_STREAM to true, s-Server will expect that messages will be prefixed by this bytes field.

Using the MESSAGE_STREAM Option for Multiple-Message Payloads

The Extensible Common Data framework ingests data in chunks called payloads. A "payload" is a unit of ingestion, which could be a whole file or a whole object from a Kafka topic or other message bus. A payload may correspond to a single row in s-Server or to a row set. (Rows never straddle payload boundaries.) For ProtoBuf, row parsing is determined by the SCHEMA_JAR and SCHEMA_CLASS that you provide to s-Server.

In a ProtoBuf context, in most cases a payload will consist of a single ProtoBuf message. The message may represent a single row or a large set of rows. You do not need to specify any special options for this use-case.

A payload may also consist of multiple back-to-back messages, each prefixed by its encoded length. The ProtoBuf producer creates a payload like this by issuing a series of calls to the following method:

com.google.protobuf.CodedOutputStream.writeByteArrayNoTag(messageSerializedIntoAByteArray);

This method write a bytes field to the stream.

In order to parse payloads of this form, you need to specify an additional option in the CREATE FOREIGN TABLE or CREATE FOREIGN STREAM statement that tells s-Server to look for this bytes field. That way, s-Server will be able to identify distinct messages within payloads that contain multiple messages.

MESSAGE_STREAM 'true'

This option tells the ProtoBuf parser to expect multiple messages within a single payload separated by a bytes field of its encoded length.

For example:

CREATE FOREIGN STREAM message_stream_protobuf
(
"tag" VARCHAR(20),
"bigintCol" BIGINT,
"varcharCol" VARCHAR(100)
)
SERVER FileReaderServer
OPTIONS
(
MESSAGE_STREAM 'true',
DIRECTORY '/tmp/protoData',
FILENAME_PATTERN 'length_separated.protobuf',
PARSER 'PROTOBUF',
SCHEMA_JAR '/home/acme/data/protobuf-data.jar',
SCHEMA_CLASS 'com.acme.proto.MySchemas$SimpleRow'
);