Using the Extensible Common Data Framework

The Extensible Common Data Framework lets you read and write from all non-RDBMS sources. To read and write from RDBMS sources, you use the SQL/MED plugin.

The Extensible Common Data Framework is implemented as both an adapter and an agent. You manage adapters through SQL, by setting up server and foreign stream objects. You enter connection information for each source in these objects. Connection information might include a file’s location and search pattern (for reading and writing to the file system) or a Kafka topic’s seed broker URL and topic name (for reading and writing to Kafka). Connection information also lets you manage how data is written or read, such as how frequently to rotate a file or where to begin writing to Kafka.

For agents, you set up a properties file that the agent uses when it runs. This properties file references an existing stream in s-Server, either as a source or destination.

Using the ECD Adapter

To use the ECD Adapter, you need to do the following:

  1. Designate a transport medium as a server option.
  2. Designate foreign stream or table options, including: a. The location of the file. b. Options for how the file will be parsed, such as file pattern, separator character, and format-specific options.
  3. Run a SELECT against the stream.

General Code Example

The example below demonstrates how to parse a CSV file over the file system. To parse other files over other transport systems, you change server and stream options appropriately.

Creating a Server Object

First, you need to create a server object which references the data wrapper ECDA as well as the type of medium.

CREATE OR REPLACE SERVER "FileReaderServer" TYPE 'FILE'
FOREIGN DATA WRAPPER ECDA;
Creating a Foreign Stream

Next, you create a foreign stream object. This object contains connection information for the file, such as directory, filename pattern, filename date format, filename prefix and suffix, character encoding, whether or not to write a header, character separator.

CREATE OR REPLACE SCHEMA "FileSource";
SET SCHEMA '"FileSource"';

CREATE OR REPLACE FOREIGN STREAM "FileReaderStream"
("recNo" INTEGER,
"ts" TIMESTAMP NOT NULL,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER,)
--Columns for the new stream
SERVER "FileReaderServer"
OPTIONS
(parser 'CSV',
character_encoding 'UTF-8',
separator ',',
skip_header 'false',
directory '/tmp',
filename_pattern 'buses\.log');

Using the Options Query Property

You can use the Options Query property to create a configuration table. You can then use this table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset.

The Options Query returns one row. Table column names should be set for property names and row value should be set for property value.

If you set the OPTIONS_QUERY property to

select * from conf

and that query returns 1 row with 1 column called DIRECTORY containing the value /user/myuser, then the adapter is configured with the DIRECTORY property set to /user/myuser. Each time the adapter runs its configuration gets dynamically computed from the conf table. You can also use views for the OPTIONS_QUERY.

CREATE OR REPLACE FOREIGN STREAM testOut (
OFFSET BIGINT NOT NULL,
line VARCHAR(4096))
SERVER KAFKASERVER
OPTIONS (TOPIC 'testThroughput', OPTIONS_QUERY 'select lastOffset as STARTING_OFFSET from TEST.committedOffset');

Using the ECD Agent

The ECD Agent reads files in remote locations. You can use it to pre-parse files at their location, allowing you to send rows over JDBC instead of entire files.

To configure the ECD agent:

  1. Create a stream (and schema, if necessary) in s-Server into which the agent will input (this is known as the destination stream).
  2. Create a properties file options for the agent, such as example.properties. You use this file to configure the agent. This file needs to include information such as the input file’s directory, parser, filename pattern, and other identifying information, as well as options for the destination stream, such as schema_name, table_name, and rowtype. See the topic Extensible Common Data Agent Overview for more details. The code below represents a sample property file that reads from a CSV formatted file called example.log in a directory called /var/tmp, and inputs into a destination stream called BIDS in the schema SALES on the default s-Server defined for this agent.

    # Location, name, and type of file
    DIRECTORY=/var/tmp
    FILENAME_PATTERN=example.log
    # Parser for file. Options are CSV, JSON, XML, and others.
    PARSER=CSV
    # Schema, name, and parameter signature of destination stream
    SCHEMA_NAME=SALES
    STREAM_NAME=BIDS
    ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)
    
  3. Determine the input/output format you will use. The ECD agent defaults to using the file system, but you can also configure it to read over a network socket, an amqp feed, to and from a kafka server, to and from an mqseries server, or to and from a hadoop file system. A list of options for the –io property is available at the topic Extensible Common Data Agent Overview.

  4. Launch the agent with the –input property enabled and a –props property that references the properties file you just created:

    ./commondataagent.sh --input --io file --props example.properties
    

If you have configured everything correctly, the agent should begin parsing data from the file location and feeding this data into your destination stream.

Using a Custom Filter with ECDA Agent

At times, you may want to write your own filter for a particular type of file, such as a log file. Such a filter might exclude rows in the log file that fail to meet specific criteria. You can do so by calling an interface in the Extensible Common Data Agent JAR file called RowFilter, and implement this interface through the –filterClassname property of the ECDA Agent. See the topic Extensible Common Data Agent in this guide for more details.

To implement a custom filter, you need to create your own Java class as either a JAR file or .class file which implements the interface. Then, you need to update your classpath to add this new class. The class implements specific filtering logic, which is applied to each row in the file.

For reach row, you return a Boolean value which determines whether the row is added to a stream or table in s-Server. The row is implemented as

  • public void init(Properties tableProps). By default, this contains the options passed on the command line to the agent. They include all the values used to configure the agent including the rowType, the filterClassname, the streamName and everything else the agent uses to configure itself.  You can add your own custom properties that configure your custom filter. For example, you might write something like 'RECORD ("time" BIGINT, "ticker" VARCHAR(3), "shares" INTEGER, "price" REAL)'
  • public boolean filterRow(Object[] row). This provides filtering logic, returning a Boolean value which determines whether or not the row is inserted. For example, you might write code indicating “if the first value in the array is less than 100, return TRUE, otherwise return FALSE.” This filtering logic can reference the columns defined in tableProps. Row maps onto the set of columns parsed from the log file

This class would contain code along the following lines:

package <package.name>
import com.sqlstream.aspen.namespace.common.RowFilter;
Class <CustomersClassName> implements RowFilter
{
public void init(Properties tableProps){
--add properties containing rowtype value as a string representation of a rowtype.  
public boolean filterRow(Object[] row) {
--add filtering code here that returns a Boolean value
   }

Implementing the Class

To implement the class for the RowFilter interface, you pass –filterClassname on the command line to the ECDA agent, where is the name of the class you created above. See the topic Extensible Common Data Agent in this guide for more details. Doing so creates an instance of the filter class using the void arg constructor (no arguments).  The adapter then calls init() to get the properties of the stream/table it is filtering. By default, these properties are those defined for the agent, but you can also customize these through the tableProps property of the init method. It then calls filterRow for each row in the file. For each row, the agent either inserts it into the stream or table (TRUE) or discards it (FALSE), depending on the Boolean value returned by filterRow.

Example Implementation

For example, take a simple log file like the following:

07/Mar/2018:16:05:49, SQLS, 50, $50.00
07/Mar/2018:16:06:51, ORCL, 100, $84.00
07/Mar/2018:16:10:02, MSFT, 200, $78.00
07/Mar/2018:16:11:58, MSFT, 100, $78.00

With the Rowtype property defined as ‘RECORD (“time” BIGINT, “ticker” VARCHAR(3), “shares” INTEGER, “price” REAL)’, the parsed stream would appear like the following:

| time                 | ticker | shares | price  |
| -------------------- | ------ | ------ | ------ |
| 07/Mar/2018:16:05:49 | SQLS   | 50     | $50.00 |
| 07/Mar/2018:16:06:51 | ORCL   | 100    | $84.00 |
| 07/Mar/2018:16:10:02 | MSFT   | 200    | $78.00 |
| 07/Mar/2018:16:11:58 | MSFT   | 100    | $78.00 |

You could then write code in the RowFilter object to insert only records with the ticker MSFT: ​

Class <CustomClassName> implements RowFilter {

public static int tickerOrdinal;

public void init(Properties tableProps) {
   -- code that sets the tickerOrdinal using tableProps
   if (....) {
       tickerOrdinal = 2;
   }
}
   public boolean filterRow(Object[] row) {
       if (row[tickerOrdinal].equals("MSFT")) {
           return true;      -- insert rows for MSFT to the stream.
       }
       return false;   -- discard all other rows
   }
}

Interface Code

This is the interface to implement:

/*
$Id: //depot/aspen/doc/booksource/IntGuideSource/Topics/int_Using_a_Custom_Filter_with_ECD.xml# 5 $
*/
public interface RowFilter
{
   public void init(Properties tableProps) throws Exception;
-- true means send the row, false means filter it out
   public boolean filterRow(Object[] row);
}