Writing an ECD Plugin

This topic provides information on creating new plugins to process data for s-Server using the Extensible Common Data Framework. Plugins are written in Java and require knowledge of Java programming to develop. After you write them, plugins need to be installed.

Currently, you can write plugins to read and parse data from external sources. We do not support writing plugins to write data to external sources.

The directory $SQLSTREAM_HOME/examples/sdk has two scripts to be used in installing the plugin. You can access the SQLstream Software Development Kit API as a Javadoc or as a zip file.

This topic contains the following sections:

Introduction

The Extensible Common Data Framework lets you independently write a plugin that handles a currently unsupported data source or data format. You can then use these plugins to create foreign data sources in SQL, set up foreign streams which can be queried, analyzed, merged with native streams or tables, archived in an RDBMS source, or otherwise manipulated.

For a list of supported sources and formats, see the topics Reading from Other Sources and Writing to Other Destinations in this guide.

  • With parsers, we are concerned with transforming data into row-column pairs. Parsers first validate the data and the parse data out to match columns defined in a foreign stream.
  • With data sources, we are primarily concerned with delivering buffers and buffer metadata (information about where the data originated, such as sequence number or names of files). This metadata is entered into s-Server streams so that you can later poll it.

Currently, only parsers and sources are supported for writing your own ECD plugin. For help with writing a custom writer or sink, please contact SQLstream technical support.

There are two code examples provided:

Setting Up the ECD Plugin

The first part of developing an ECD plugin involves setting up a Maven project for your plugin. For more information on Maven, see https://maven.apache.org/

Using Maven, create your project. As a parent pom, use $SQLSTREAM_HOME/examples/sdk/pom.xml.

Invoke $SQLSTREAM_HOME/examples/sdk/install.sh to install artifacts.

You will most likely want to modify one of the pom files in $SQLSTREAM_HOME/examples/sdk/keyvalue.

A sample pom file might look like the following:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.sqlstream.ecda</groupId>
    <artifactId>ecda</artifactId>
    <version>6.0.0-SNAPSHOT</version>
  </parent>
  <artifactId>keyvalue</artifactId>
  <packaging>jar</packaging>
  <name>ECDA Key-Value Parser Adapter</name>
  <properties>
    <plugin.factory>com.sqlstream.aspen.namespace.keyvalue.KeyValueParser$PluginFactory</plugin.factory>
  </properties>
</project>

Writing the ECDA Plugin

To write a plugin, begin by creating a subclass of com.sqlstream.aspen.namespace.common.EcdaPluginFactory. In this subclass, invoke one or more of the following classes:

  • public static void installSource(String name, Class extends DataInputSource> clazz);
  • public static void installParser(String name, Class extends DataInputParser> clazz);
  • public static void installFactoryPlugin(String name, Class extends DataFactoryPlugin> clazz);

For example, for an ECDA parser, you might use code along the following lines:

public class ProtoBufParser implements DataInputParser

EcdaPluginFactory

For any Extensible Common Data plugin, you need to reference the class EcdaPluginFactory, and make a reference to this subclass in POM. When you install the plugin, s-Server looks through the manifest for the ECDA plugin factory.

The plugin factory subclass also names the plugin. You invoke this name in SQL through either the FORMAT TYPE parameter (parsers) or SERVER TYPE parameter (sources).

For example, the following code names a parser “PROTOBUF”.

public static class PluginFactory extends EcdaPluginFactory {
       public void installPlugins() {
           installParser("PROTOBUF", ProtoBufParser.class);

Managing Properties

For all plugins, you need to manage properties for the plugin. These are accessible in SQL and handle how the plugin works, in terms of, for example, collecting metadata or handling offsets. You do so using the OptionsUtils.filterInitProperties method. You should use this method to validate properties and throw a user-understandable exception on error.

getInitProperties should return the subset of properties that are interesting for this data source. getInitProperties is inherited from the interface com.sqlstream.aspen.core.RuntimeObject.

In init() you can use the following helper method for this purpose

initProps = OptionsUtils.filterInitProperties(
             props, requiredParameters, optionalParameters, initProps);

Writing a Source Plugin

Source plugins deliver buffers and metadata about buffers. Metadata is generally information about where the data originated, and might include, for example, sequence numbers or file names. To write a source plugin you need to do the following.

  • extend the class SourceData
  • extend the class SourceDataFactory.
  • implement the class DataInputSource

For more information on all three classes, see the SQLstream Java doc, available here. Sources generally deliver data in one of two ways:

  • Message-based, which have breaks between messages or buffers. Kafka is an example of such a source.
  • Non message-based, which have no natural breaks between messages. Network sockets or the file system both work this way.

Your source type determines how you implement the plugin.

You should use these implementations and extensions in the following way:

SourceDataFactory

Override BufferInfo() boilerplate for source data subclass. In the constructor, lift names of fields that are interesting to the source plugin.

For Multiple Message Submissions (Like Kafka)

For sources where you need message based sources with multiple buffers/message submissions per work unit, use the following code model for SourceDataFactory. Use {@link BuffersInputStream.Connection# getCurrent()} and {@link BuffersInputStream.Connection# submit()} for each work unit.

/**
     * @param supportedColumnNames Supported metadata Column names
     * @param context context of DataInputSource
     * @param numWorkUnits number of work units to allow queuing in BuffersInputStream
     * @param workSize Maximum number of bytes per work unit
     * @param sharedBuffer  Use shared buffer for SourceDatas within work unit
     * @throws SQLException
     */
    protected SourceDataFactory(
        String [] supportedColumnNames, EcdaReaderContext context,
        int numWorkUnits, int workSize, boolean sharedBuffer)
        throws SQLException
    {
        this(supportedColumnNames, context, numWorkUnits, 0, workSize, sharedBuffer, false);
    }

For Single Message Submissions (Like a Log File)

For continuous sources where you need message based sources with a single buffer/message submission per work unit, use the following code model for SourceDataFactory. Use {@link BuffersInputStream.Connection# getCurrent()} and {@link BuffersInputStream.Connection# submit()} for each work unit.

/**
     * @param supportedColumnNames Supported metadata Column names
     * @param context context of DataInputSource
     * @param numBuffers number of buffers to allow queuing in BuffersInputStream
     * @throws SQLException
     */
    protected SourceDataFactory(
        String [] supportedColumnNames, EcdaReaderContext context,
        int numBuffers)
        throws SQLException

SourceData

Override updateLocation to update any metadata columns associated with messages (such as sequence numbers). Use the columnNumbers array to know what columns to update. The columnNumbers array is set based on the list of metadata column names passed to the super class in the constructor of the SourceDataFactory subclass.

For sources that deliver messages using more than one buffer, override the methods isStartOfMessage and isEndOfMessage. Use LocationDescription to debug (or in other cases where you need to know the current location in the data).

In the constructor, allocate buffer if you are using SourceData to manage the buffer. If SourceData does not manage buffer then it’s a good idea to override resetBeforeFree to set buffer to null.

DataInputSource

start() should spawn a thread if needed and process incoming messages. init( processes properties, finds the ones that are interesting, and validates them. closeAllocation() should free any resources and set canceled to true.

You then should write a loop along one of the two following lines:

For Multiple Message Submissions (Like Kafka)
public void run()
    {
     BuffersInputStream inputStream = context.getInputStream();
-- If important for client libraries
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
     try (BuffersInputStream.Connection<SourceDataSubclass> connection = inputStream.getConnection()) {
       while (!canceled) {
       try {
           ByteBuffer buf = dataSupplier.getBufferIfAvailable();
        if (buf == null) {
            connection.flush();
            dataSuplier.waitForDataToBeAvailable() // or could sleep for a short time.
            continue;
        }

SourceDataSubclass dataSource = connection.getCurrentWithSize(buf.remaining());

          // when using a shared buffer -

          ByteBuffer savedBuf = dataSource.getBuffer();
          buf.get(savedBuf.array(), savedBuf.position(), buf.remaining());

          dataSource.setSequenceNumber(dataSupplier.getSequenceNumber);

           connection.submit();
             } catch(Exception e) {
               if (e.isRecoverable()) {
               tracer.log(...);
               connection.reset(); -- only needed if there is more than one                                                                                    
                                   -- connection.
                } else {
                  throw...
                    }
                }
            }
        } catch (Throwable e) {
            tracer.log(Level.SEVERE, "Exception in reader", e);
        } finally {
            tracer.config("exiting reader");
            inputStream.sendEOS();
        }
    }

For Single Message Submissions (Like a Log File)

public void run()
    {
     BuffersInputStream inputStream = context.getInputStream();
-- If important for client libraries
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
     try (BuffersInputStream.Connection<SourceDataSubclass> connection = inputStream.getConnection()) {
       while (!canceled) {
       try {
           ByteBuffer buf = dataSupplier.getBuffer();
           SourceDataSubclass dataSource = connection.getCurrent();
           dataSource.setBuffer(buf);
dataSource.setSequenceNumber(dataSupplier.getSequenceNumber);
           connection.submit();
             } catch(Exception e) {
               if (e.isRecoverable()) {
               tracer.log(...);
               connection.reset(); -- only needed if there is more than one                                                                                    
                                   -- connection.
                } else {
                  throw...
                    }
                }
            }
        } catch (Throwable e) {
            tracer.log(Level.SEVERE, "Exception in reader", e);
        } finally {
            tracer.config("exiting reader");
            inputStream.sendEOS();
        }
    }

Writing a Parser Plugin

Parser plugins receive data, validate this data, and parse the data into matched columns. The particulars of how you write this code will depend on your data source, but you can get ideas about it by looking at the Sample Code for Parser.

To write a parser plugin, you need to implement DataInputParser.

The list of columns to parse are in context.getMetaData() context.getMappedSpecialColumns() has a set of column names that have already been filled out by the source and should not be parsed

Use code along the following lines to save the passed in context (same as source plugin):

public void setContext(EcdaReaderContext context);
public void setInserter(RowInserter inserter) throws SQLException;

Use code along the following lines to set Executorservice (if necessary):

public void setExecutorService(ExecutorService service);

Use code along the following lines to return the Charset expected by parser (this could be specified by a property). Binary parsers should return null.

public Charset getCharset();

When using *public void start() throws SQLException;*this should not clear all parameters in RowInserter, but should instead only clear parameters set by the parser.

All parser plugins process data either as non-streaming or streaming. It is possible to write a plugin that does either depending on the context. Non-streaming data is processed by line or chunk, and is implemented with a code block along the following lines:

class NonStreamingReaderImpl implements Runnable
 --Nonstreaming vs. streaming applies to all parsers,
      --but the either one or the other case presented here
      --does not.
 {
       public void run()
       {
           final BuffersInputStream inputStream = context.getInputStream();
           final BuffersInputStream.Line currentLine = inputStream.newLine(); -- without line delimiter
           try {
               if (nonRepeatedProtoReader != null) {
                   nonRepeatedProtoReader.reset(inputStmt);
               }
               if (repeatedProtoReader != null) {
                   repeatedProtoReader.reset(inputStmt);
               }
               while (inputStream.getLine(currentLine, null) && !canceled) {
                --keep going as long as neither of two things below happen.
               --Iterate over a record at a time. Get line in this case = get whole thing
          try {
                       if (nonRepeatedProtoReader != null) {
                           CodedInputStream in = CodedInputStream.newInstance(currentLine.buffer, currentLine.start, currentLine.length());
                           nonRepeatedProtoReader.processRecord(in, inputStmt, messageSeparator, messageLengthPrefixed);
             --protobuf input specific
                       }
                       if (repeatedProtoReader != null) {
                           CodedInputStream in = CodedInputStream.newInstance(currentLine.buffer, currentLine.start, currentLine.length());
                           repeatedProtoReader.processRecord(in, inputStmt, messageSeparator, messageLengthPrefixed);
                           if (nonRepeatedProtoReader != null) {
                               nonRepeatedProtoReader.reset(inputStmt);
                           } else{
                               repeatedProtoReader.reset(inputStmt);
                           }
                       }
                   } catch (Exception e) {
                       tracer.log(Level.WARNING, "Error while parsing protobuf stream", e);
                   }
               }
           } catch (Exception e) {
               tracer.log(Level.SEVERE, "Error while reading protobuf stream", e);
           } finally {
   --no matter what goes on, once you leave this
             --section of code, call context.close().
   --Any parser should call this.
             --Otherwise we leave things in a bad state.
               context.close();

           }
       }
   }

Streaming data is processed as it comes in, and is handled by a code block along the following lines:

class StreamingReaderImpl implements Runnable {
       public void run()
       {
           final BuffersInputStream inputStream = context.getInputStream();
           CodedInputStream in = CodedInputStream.newInstance(inputStream);
     --Wraps it in new stream. Turns it into protobuf input stream.

           ProtoMessageReader protoReader = nonRepeatedProtoReader != null
               ? nonRepeatedProtoReader : repeatedProtoReader;
           try {
               if (nonRepeatedProtoReader != null) {
                   nonRepeatedProtoReader.reset(inputStmt);
               }
               if (repeatedProtoReader != null) {
                   repeatedProtoReader.reset(inputStmt);
               }
               while (!canceled) {
       --important pattern: try, but if you get error, skip message. Gives you place to start parsing again.
                   try {
                       protoReader.processRecord(
                           in,
                           inputStmt,
                           messageSeparator,
                           messageLengthPrefixed);
                   } catch (Exception e) {
                       try {
                           inputStream.skipCurrentMessage();
                           if (inputStream.isAtEos()) {
                               return;
                           }
                           in = CodedInputStream.newInstance(inputStream);
                       } catch (Exception e1) {
                           tracer.log(
                               Level.SEVERE,
                               "Error while reading protobuf stream",
                               e1);
                           return;
                       }
                       tracer.log(
                           Level.SEVERE,
                           "Error while reading protobuf stream",
                           e);
                   }
               }
           } catch (Exception e) {
               tracer.log(Level.SEVERE, "Error while reading protobuf stream", e);
           --no matter what goes on, once you leave this
             --section of code, call context.close().
   --Any parser should call this.
             --Otherwise we leave things in a bad state.

     }finally {
               context.close();
           }
       }
   }

Compiling and Building the ECD Plugin

In your pom add the following property override:

<properties>
<plugin.factory>your.PluginFactorySubclassname</plugin.factory>
</properties>

Build the plugin by invoking the following code. The makeTarball.sh script is available in /s-Server/examples/sdk/makeTarball.sh.

makeTarball.sh <plugin dir name>

Installing the ECD plugin

Unpack the tarball into the directory *$SQLSTREAM_HOME/plugins. This should create a directory tree plugins/<plugin-name>

In the created directory, you should find install.sql. To install your plugin, invoke sqllineClient (available in $SQLSTREAM_HOME/bin) with this script.

Alternately, you can build the plugin with mvn package or mvn install, then adding code along the following lines to your SQL script to install the JAR:

CREATE JAR "GPSFuncsJavaLib"
    LIBRARY 'file:/home/aspen/gps.jar'
    OPTIONS(0);
--- This code loads the installed Java JAR file

Installing the ECD plugin in a Docker Container

If you want to add a new plugin to an existing SQLstream container, you have two choices:

Using the original SQLstream image

If you are using one of the published SQLstream images - sqlstream/complete, sqlstream/slim or sqlstream/micro:

  • When using docker run to start a container from the image, mount a volume which includes your plugin directory
  • In your ENTRYPOINT script, create a soft link to that directory from the $SQLSTREAM_HOME/plugins directory
  • After starting s-Server, but before attempting to create your application schemas and objects, run the installEcdaPlugin.sql script that is in the plugin directory.

In a new image derived from a SQLstream image

  • In the Dockerfile, ADD the plugin tarball to the $SQLSTREAM_HOME/plugin directory. ADD will extract the content as needed.
  • If you plan to create the application schema at image creation time, ensure the <plugin>/install.sql script is included before any SQL scripts that need to use the plugin.

Sample Code for Parser

The following code creates a parser for Key Value Pairs. It is useful as an example of how to write a parser plugin The topic Extensible Common Data Framework: Parsing Key Value Pairs describes the functionality of this plugin.

/*
// Copyright (C) 2016-<%YEAR%> SQLstream, Inc.
*/
package com.sqlstream.aspen.namespace.keyvalue;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;

import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonRowParser;
import com.sqlstream.aspen.namespace.common.EcdaPluginFactory;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.TypeParser;

/**
 * KeyValuePArser is a parser for KeyValue data. It parses a record at a time
 * from a stream of bytes, reading up to the next line delimiter. The line is
 * split into fields by the column delimiter, unless protected by quotes. (The
 * quote and both delimiters are attruibutes. By default " and ,). Then the
 * field values are parsed from byte-strings to scalar values and inserted into
 * the next row of the target table or stream.
 *
 * For the sake of speed the code avoids making java objects. In particular, the
 * current line is not a java String or a java byte array, but simply a range of
 * contiguous bytes in a larger byte array. The same holds for each field to be
 * parsed.
 *
 **/
public class KeyValueParser extends CommonRowParser {

    private final static String KEY_VALUE_SEPARATOR_CHARACTER = "KEY_VALUE_SEPARATOR_CHARACTER";

    protected final static String [] REQUIRED_PROPERTIES = {
    };
    protected final static String [] OPTIONAL_PROPERTIES = {
        ROW_SEPARATOR_CHAR_KEY, SEPARATOR_CHAR_KEY, PARSER_SKIP_HEADER_KEY,
        COLUMN_QUOTE_CHARACTER, QUOTED_COLUMNS, KEY_VALUE_SEPARATOR_CHARACTER
    };
    private BytePointer[] columns;
    private ArrayList<BytePointer> unmatchedKeys;

    private byte[] rowDelimiter;
    private byte[] colDelimiter;
    private byte[] quotes;
    private byte[] keyValueSeparator;
    private int quotesLength = 0;
    private int keyValueSeparatorLength;
    private boolean rowDelimiterSpecified;

    protected TypeParser[] rowParsers;
    protected BuffersInputStream.Line currentLine; // without line delimiter

    private boolean rowGenerated;
    private int rowCount = 0;

    private static BytePointer key;

    class Impl implements Runnable {

        public void run() {
            long rowStartPosition = 0;
            try {
                tracer.log(Level.FINE, "Started parsing the stream");
                while (inputStream.getLine(currentLine, rowDelimiter)) {
                    parseRow(rowStartPosition);
                    rowStartPosition = inputStream.getPosition();
                }
                tracer.log(Level.FINE, "Finished parsing the stream");
            } catch (Throwable e) {
                tracer.log(Level.SEVERE, "Exiting parser", e);
            } finally {
                context.close();
            }
        }

    }

    public KeyValueParser() {
        super();
    }

    /**
     * Initializes this formatter and throws an exception if something is
     * underspecified
     */
    public void init(Properties props) throws Exception {
        super.init(props);
        OptionsUtils.filterInitProperties(
            props, REQUIRED_PROPERTIES, OPTIONAL_PROPERTIES, initProps);
        key = new BytePointer();
        // Record separator. By default is a \n character
        String rowSeparator = props.getProperty(ROW_SEPARATOR_CHAR_KEY);
        rowDelimiterSpecified = (rowSeparator != null);
        if (rowSeparator == null) {
            rowSeparator = "\n";
        }
        rowDelimiter = rowSeparator.getBytes(charset);

        // Field (key=value) separator. By default is a comma (,)
        String colSeparator = props.getProperty(SEPARATOR_CHAR_KEY);
        if (colSeparator == null) {
            colSeparator = ",";
        }
        colDelimiter = colSeparator.getBytes(charset);

        initDefaultTypeParsers(props, initProps);
        rowParsers = initRowParsers();

        // The quote character. By defautl it is a double quote character"
        String quoteCharacter = props.getProperty(COLUMN_QUOTE_CHARACTER);
        String quotedColumns = props.getProperty(QUOTED_COLUMNS);
        if (null == quoteCharacter && quotedColumns != null) {
            quoteCharacter = "\"";
        }
        if (quoteCharacter != null) {
            quotes = quoteCharacter.getBytes(charset);
        } else {
            quotes = "\"".getBytes(charset);
        }
        quotesLength = quotes.length;

        // The character that separates a key from a value. By default is a =
        // It can be configured using the KEY_VALUE_SEPARATOR_CHARACTER property
        String keyValueSeparatorCharacter = props.getProperty(KEY_VALUE_SEPARATOR_CHARACTER);
        if (null == keyValueSeparatorCharacter) {
            keyValueSeparatorCharacter = "=";
        }

        keyValueSeparator = keyValueSeparatorCharacter.getBytes(charset);
        keyValueSeparatorLength = keyValueSeparator.length;

    }

    /**
     * Parses a single record
     *
     * @param rowStartPosition
     * @throws Exception
     */
    public void parseRow(long rowStartPosition) throws Exception {
        rowGenerated = false;
        final int rowLength = currentLine.length();
        key.setBytes(currentLine.buffer);
        int a = currentLine.buffer.length;
        assert (rowLength >= 0);
        try {
            traceLine(Level.FINEST, currentLine);

            // String s = new String (currentLine.buffer, charset);
            // Brief explanation of the index variables
            // assuming a a row with a "key=value" field, where the index of k=0
            // and the index of the last 'e' is 8 then
            // startKey = 0
            // endKEy = 2
            // startValue = 4
            // endValue = 8
            // colStart = 0
            // colEnd = 8

            int rowEnd = rowLength;
            int colStart = 0, colEnd = 0; // offsets in currentLine
            boolean sawComma = false;

            boolean finish = false;

            // parse the fields until it reaches the end of the line
            while (!finish) {

                // find next field in the line (exclude delimiters)
                colStart = colEnd;
                if (colStart < rowEnd) {
                    if (sawComma) {
                        colStart += colDelimiter.length;
                    }
                    colEnd = currentLine.findBytes(colStart, colDelimiter);
                    if (colEnd < 0) {
                        // it's the last key=value in the row
                        colEnd = rowEnd;
                        sawComma = false;
                        finish = true;
                    } else {
                        sawComma = true;
                    }
                }
                int startKey = colStart;
                int endKey = currentLine.findBytes(startKey, colEnd, keyValueSeparator);

                int startValue = endKey + keyValueSeparatorLength;
                int endValue = colEnd;

                // key = new BytePointer(startKey, endKey, currentLine.buffer);

                key.setLimits(startKey, endKey, currentLine.start);
                //

                // parse the text from colStart to colEnd
                try {
                                    //traceField(Level.FINEST, null, startKey, endKey, currentLine, colStart, colEnd);
                    TypeParser parser = null;
                    int iCol = 0;
                    for (iCol = 0; iCol < numColumns; iCol++) {
                        if (columns[iCol].equals(key)) {
                            parser = rowParsers[iCol];
                            iCol++;
                            break;
                        }
                    }

                    // this condition check if the current key is present in the
                    // table,
                    // if not is is traced (only once for each new key) and the
                    // loop
                    // skips this iteration
                    if (parser == null) {
                        // unmatched key
                        boolean contains = false;
                        for(BytePointer unmatchedKey : this.unmatchedKeys){
                            if(unmatchedKey.equals(key)){
                                contains = true;
                            }
                        }

                        if (!contains) {
                            BytePointer unmatchedKey = (BytePointer)key.clone();
                            this.unmatchedKeys.add(unmatchedKey);
                            tracer.log(Level.WARNING, "Unmatched key: " + unmatchedKey.toString());
                        }

                        continue;
                    }

                    // Checks if this value starts with a quote character
                    if (atQuote(startValue)) {
                        // strip off quotes; but what about string "" vs
                        // string null??
                        int start = startValue + quotesLength;
                        int end = currentLine.findBytes(start, endValue, quotes);
                        if (end < 0) {
                            end = colEnd; // expect a parse err with a msg
                        }
                        traceField(Level.FINEST, "trimmed ", startKey, endKey, currentLine, start, end);
                        parser.parse(currentLine.buffer, currentLine.start + start, currentLine.start + end, inputStmt,
                                iCol);
                    } else {
                        parser.parse(currentLine.buffer, currentLine.start + startValue, currentLine.start + endValue,
                                inputStmt, iCol);
                    }

                } catch (Exception e) {
                    setParserPosition(rowStartPosition - messageStartPosition + colEnd);
                    String msg = e.getMessage() + " while parsing field " + key.toString() + " at line " + getParserLineNumber()
                            + " of " + inputStream.locationDescription();
                    setParserError(msg);
                    tracer.log(Level.WARNING, msg, e);
                }
            }
            rowGenerated = true;
        } finally {
            if (rowGenerated) {
                submitRow(tracer);
            }
            incParserLineNumber();
        }

    }

    /**
     * Logs the given line to the tracer
     *
     * @param level
     * @param line
     */
    private void traceLine(Level level, BuffersInputStream.Line line) {
        if (tracer.isLoggable(level)) {
            StringBuilder sb = new StringBuilder();
            sb.append("row ").append(rowCount).append(" location ").append(inputStream.locationDescription())
                    .append(" line ").append(getParserLineNumber()).append(": ").append(line.asString());
            tracer.log(level, sb.toString());
        }
    }

    /**
     * Logs a single field (key=value) to the tracer
     *
     * @param level
     * @param prefix
     * @param startKey
     * @param endKey
     * @param line
     * @param startValue
     * @param endValue
     * @throws SQLException
     */
    private void traceField(
        Level level, String prefix, int startKey, int endKey, BuffersInputStream.Line line,
        int startValue, int endValue)
        throws SQLException
    {
        if (tracer.isLoggable(level)) {
            StringBuilder sb = new StringBuilder();
            if (prefix != null)
                sb.append(prefix);
            sb.append(" ").append(line.asString(startKey, (endKey - startKey))).append(" parse: ")
                    .append(line.asString(startValue, (endValue - startValue)));
            tracer.log(level, sb.toString());
        }
    }

    /**
     * Initializes the required row parsers
     *
     * @return
     * @throws Exception
     */
    private TypeParser[] initRowParsers() throws Exception {
        Set<String> alreadyMapped = context.getMappedSpecialColumns();
        this.columns = new BytePointer[numColumns];
        this.unmatchedKeys = new ArrayList<>();
        ArrayList<TypeParser> parsers = new ArrayList<>();

        for (int i = 1; i <= numColumns; i++) {
            byte[] fieldName = metaData.getFieldName(i).getBytes();

            this.columns[i - 1] = new BytePointer(0, fieldName.length, fieldName);
            if (alreadyMapped.contains(fieldName)) {
                parsers.add(null);
                continue;
            }
            TypeParser customParser = customParsers.get(fieldName);
            if (customParser != null) {
                if (logLevel.isTraceFine()) {
                    tracer.fine("custom parser " + customParser);
                }
                parsers.add(customParser);
            } else {
                parsers.add(getTypeParser(i));
            }
        }
        if (logLevel.isTraceFinest()) {
            tracer.finest("row parsers size " + parsers.size());
            tracer.finest("using type parsers " + parsers);
        }
        return parsers.toArray(new TypeParser[0]);
    }

    /**
     * Checks if an input field start with a quote
     *
     * @param valueStart
     *            the index of the beginning of the value
     * @return true if this value starts with a quote character
     * @throws IOException
     */
    private boolean atQuote(int valueStart) throws IOException {
        if (quotes == null)
            return false;
        for (int i = 0; i < quotesLength; ++i) {
            byte b = currentLine.buffer[currentLine.start + valueStart + i];
            if (b != quotes[i]) {
                return false;
            }
        }
        return true;
    }

    @Override
    public void closeAllocation() {
        tracer.fine("final rowCount=" + rowCount);
        super.closeAllocation();
    }

    @Override
    public void start() {
        Impl impl = new Impl();
        inputStream = context.getInputStream();
        if (quotes != null) {
            inputStream.setQuoteCharacters(quotes);
        }
        currentLine = inputStream.newLine();
        Thread thread = new Thread(impl);
        thread.start();
    }

    public static class PluginFactory extends EcdaPluginFactory {
        /**
         * Installs the parser
         */
        public void installPlugins() {
            installParser("KV", KeyValueParser.class);
            tracer.log(Level.FINE, "Installed KeyValue parser");
        }
    }

    /**
     *
     * A BytePointer is a type that contains a pointer to a byte[] and a start
     * and end indexes. The BytePointer is used to compares values of the stream
     * (requires less computation power than Strings)
     *
     */
    class BytePointer {
        private int start;
        private int end;
        private byte[] bytes;

        public BytePointer() {
            super();
            this.start = 0;
            this.end = 0;
            this.bytes = null;
        }

        public BytePointer(int start, int end, byte[] bytes) {
            super();
            this.start = start;
            this.end = end;
            this.bytes = bytes;
        }

        public void setLimits(int start, int end, long rowStartPosition) {

            this.start = start + (int)rowStartPosition;
            this.end = end + (int)rowStartPosition;

        }

        /**
         * hashCode implementation similar to the String hashCode
         */
        @Override
        public int hashCode() {
            int result = 0;
            int n = end - start;
            int i = 1;
            for (int index = start; index < end; index++) {

                result += (bytes[index] * Math.pow(31, n - i));
                i++;
            }

            return result;
        }

        /**
         * This equals compares two bytePointers. In order to be equal, they
         * muys have the same length (end-start) and the same bytes in this
         * interval
         */
        @Override
        public boolean equals(Object obj) {
            try{
            BytePointer other = (BytePointer) obj;

            if ((this.end - this.start) != (other.getEnd() - other.getStart())) {
                return false;
            }
            byte[] otherByteArray = other.getBytes();
            int otherStart = other.getStart();
            int len = end - start;

            for (int i = 0; i < len; i++) {
                if (this.bytes[i + start] != otherByteArray[i + otherStart]) {
                    return false;
                }
            }
            return true;
            }catch(Exception e){
                e.printStackTrace();
            }
            return false;
        }

        @Override
        protected Object clone() throws CloneNotSupportedException {

            return new BytePointer(0, end-start, Arrays.copyOfRange(bytes, start, end));

        }


        // getters and setters


        @Override
        public String toString() {
            byte[] byteStr = Arrays.copyOfRange(bytes, start, end);
            return new String(byteStr, charset);
        }

        public int getStart() {
            return start;
        }

        public void setStart(int start) {
            this.start = start;
        }

        public int getEnd() {
            return end;
        }

        public void setEnd(int end) {
            this.end = end;
        }

        public byte[] getBytes() {
            return bytes;
        }

        public void setBytes(byte[] bytes) {
            this.bytes = bytes;
        }

    }

}
// End KeyValueParser.java


Sample Code for Source Plugin

The following code creates a source plugin for Kafka. It is useful as an example of how to write a source plugin The topic Reading from Kafka describes the functionality of this plugin.

This sample code works for “message-based” sources, which send out data in messages or buffers with breaks. For sources without such breaks, be sure to make appropriate changes to the segments indicated for single messages.

/*
// Aspen dataflow server
// Copyright (C) 2014-2018
SQLstream, Inc.
*/

package com.sqlstream.aspen.namespace.kafka;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.sqlstream.aspen.namespace.common.AbstractDataInputSource;
import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonDataConstants;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.RowInserter;
import com.sqlstream.aspen.namespace.common.SourceData;
import com.sqlstream.aspen.namespace.common.SourceDataFactory;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 *
 * @author Hunter Payne
 * @version $Id: //depot/aspen/doc/booksource/IntGuideSource/Topics/int_Writing_ECDA_Plugin.xml#9 $
 **/
public class KafkaInputSource extends AbstractDataInputSource
{
    protected final static String [] requiredParameters = { "TOPIC" };
    protected final static String [] optionalParameters = {
        "STARTING_TIME",
        "STARTING_OFFSET",
        "PARTITION_OFFSET_QUERY",
        "SEED_BROKERS",
        "PORT",
        "PARTITION",
        "BUFFER_SIZE",
        "FETCH_SIZE",
        "MAX_WAIT",
        "MIN_BYTES",
        "BACKOFF_WAIT",
        "CLIENT_ID",
        "METRICS_PER_PARTITION",
        "REQUIRE_N_READERS_BEFORE_START",
        CommonDataConstants.PARSER_QUEUE_SIZE,
        CommonDataConstants.OPTIONS_QUERY
    };

    protected final Properties initProps = new Properties();

    protected final static Logger tracer =
        Logger.getLogger("com.sqlstream.aspen.namespace.kafka");

    protected ExecutorService service;

    protected volatile boolean canceled;
    protected int runningReaderCount;

    protected int port;
    protected String topic;
    protected String partitionString;
    protected int firstPartition;
    protected int lastPartition;
    protected long startingTime;
    protected long startingOffset;
    protected int consumerBufferSize;
    protected int fetchSize;
    protected int maxWait;
    protected int minBytes;
    protected int backoffWait;
    protected String seedBrokersString;
    protected String offsetsQuery;
    protected String clientId;
    protected boolean metricsPerPartition;
    protected boolean isOpen;
    protected int requireNReadersBeforeStart;
    protected static int runningKafkaInputSources;
    protected final static Object runningReadersLock = new Object();

    protected KafkaSourceDataFactory sourceDataFactory;

    public KafkaInputSource()
    {
        //parsedRows = new ArrayBlockingQueue<Object[]>(4096);
        canceled = false;

        service = Executors.newSingleThreadExecutor();
    }

     static class KafkaSourceData extends SourceData {
         final int offsetColumn;
         final int partitionColumn;

         long offset;
         int partition;

        public KafkaSourceData(KafkaSourceDataFactory factory, int[] columnNumbers)
         {
             super(factory, columnNumbers);
             offsetColumn = columnNumbers[0];
             partitionColumn = columnNumbers[1];
         }

          @Override
         public String locationDescription()
         {
             return "Offset " + offset;
         }

         @Override
         public void updateLocation(RowInserter inserter)
             throws SQLException
         {
             if (partitionColumn != -1) {
                 inserter.setInt(partitionColumn, partition);
             }
             if (offsetColumn != -1) {
                 inserter.setLong(offsetColumn, offset);
             }
         }
     }

     class KafkaSourceDataFactory extends SourceDataFactory<KafkaSourceData> {

         protected KafkaSourceDataFactory(int numBuffers, int numPartitions)
                 throws SQLException
         {
             super(new String[] {"OFFSET", "PARTITION"}, context, numBuffers, 262144, true);
         }

         @Override
         public KafkaSourceData newBufferInfo()
         {
             return new KafkaSourceData(this, columnNumbers);
         }
     }

    long parseStartingTime(String startingTimeString) {
        if (startingTimeString.equalsIgnoreCase("LATEST")) {
            return kafka.api.OffsetRequest.LatestTime();
        } else if (startingTimeString.equalsIgnoreCase("EARLIEST")) {
            return kafka.api.OffsetRequest.EarliestTime();
        }
        return Long.parseLong(startingTimeString);
    }

    public void init(Properties props) throws Exception
    {
        OptionsUtils.filterInitProperties(
            props, requiredParameters, optionalParameters, initProps);
        Properties dynamic;
        try {
            Connection conn = context.getQueryConnection();
            dynamic = OptionsUtils.addDynamicOptions(
                initProps, conn);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            dynamic = initProps;
        }
        seedBrokersString = dynamic.getProperty("SEED_BROKERS", "localhost");
        port = Integer.parseInt(dynamic.getProperty("PORT", "9092"));
        topic = dynamic.getProperty("TOPIC");
        partitionString = dynamic.getProperty("PARTITION");
        startingTime = parseStartingTime(dynamic.getProperty(
            "STARTING_TIME", "LATEST"));
        startingOffset = Long.parseLong(dynamic.getProperty(
            "STARTING_OFFSET", "-1")) ;
        offsetsQuery = dynamic.getProperty(
            "PARTITION_OFFSET_QUERY") ;
        consumerBufferSize = Integer.parseInt(dynamic.getProperty(
            "BUFFER_SIZE", "1048576")) ;
        fetchSize = Integer.parseInt(dynamic.getProperty(
            "FETCH_SIZE", "1000000")) ;
        maxWait = Integer.parseInt(dynamic.getProperty(
            "MAX_WAIT", "500")) ;
        minBytes = Integer.parseInt(dynamic.getProperty(
            "MIN_BYTES", "64")) ;
        backoffWait = Integer.parseInt(dynamic.getProperty(
            "BACKOFF_WAIT", "500")) ;
        clientId = dynamic.getProperty("CLIENT_ID");
        metricsPerPartition = "true".equalsIgnoreCase(dynamic.getProperty("METRICS_PER_PARTITION"));
        if (clientId == null || clientId.trim().isEmpty()) {
            clientId = "client_" + topic;
            if (!metricsPerPartition && partitionString != null && !partitionString.isEmpty()) {
                clientId += '_' + partitionString;
            }
        }
        if (partitionString == null || partitionString.trim().length() == 0) {
            firstPartition = 0;
            lastPartition = -1;
        } else {
            String [] partitionEnds = partitionString.split("-");
            firstPartition = Integer.parseInt(partitionEnds[0]);
            if (partitionEnds.length == 1) {
                lastPartition = firstPartition;
            } else {
                lastPartition = Integer.parseInt(partitionEnds[1]);
            }
            if (partitionEnds.length > 2 || firstPartition > lastPartition) {
                throw new SQLException("Invalid partition range " + partitionString);
            }
        }
        int numPartitions = lastPartition - firstPartition + 1;
        int numBuffers = Integer.parseInt(props.getProperty(
            CommonDataConstants.PARSER_QUEUE_SIZE,
            numPartitions <= 1
                ? CommonDataConstants.DEFAULT_PARSER_QUEUE_SIZE
                : Integer.toString(numPartitions + 1)));
        if (numPartitions == 0) {
            numPartitions = 256;
        }
        sourceDataFactory = new KafkaSourceDataFactory(numBuffers, numPartitions);
        requireNReadersBeforeStart = Integer.parseInt(dynamic.getProperty("REQUIRE_N_READERS_BEFORE_START", "0"));
    }

    public Properties getInitProperties()
    {
        return initProps;
    }

    public static long getLastOffset(
        SimpleConsumer consumer, String topic, int partition,
        long whichTime, String clientName)
        throws SQLException
    {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
            new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            tracer.warning(
                "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    public static int findMaxPartition(
        int port, String topic,
        List<String> replicaBrokers)
        throws SQLException
    {
        int maxPartition = 0;
        for (String seed : replicaBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() > maxPartition) {
                            maxPartition = part.partitionId();
                        }
                    }
                }
            } catch (Exception e) {
                throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                        + ", " + "] Reason: " + e, e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        return maxPartition;
    }

    public static PartitionMetadata findLeader(
        int port, String topic, int partition,
        List<String> replicaBrokers)
    throws SQLException
    {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : replicaBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                        + ", " + partition + "] Reason: " + e, e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }

    public static String findNewLeader(
        String a_oldLeader,
        int port, String topic, int partition, List<String> replicaBrokers)
        throws SQLException {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        throw new SQLException("Unable to find new leader after Broker failure. Exiting");
    }

    public static String findLeadBroker(
        int port, String topic, int partition, List<String> replicaBrokers)
        throws SQLException
    {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);
        if (metadata == null) {
            tracer.warning("Can't find metadata for Topic and Partition. Exiting");
            return null;
        }
        if (metadata.leader() == null) {
           tracer.warning("Can't find Leader for Topic and Partition. Exiting");
            return null;
        }
        return metadata.leader().host();
    }


    static void kafkaInputSourceStarted() {
        synchronized (runningReadersLock) {
            runningKafkaInputSources++;
            runningReadersLock.notifyAll();
        }
    }
    static void kafkaInputSourceStopped() {
        synchronized (runningReadersLock) {
            runningKafkaInputSources--;
        }
    }

    class Impl implements Runnable {
        //int rowtimeColumn, int offsetColumn, int payloadColumn,
        final String topic;
        final int partition;
        String leadBroker;

        final int port;
        final String clientName;
        final BuffersInputStream inputStream;


        private List<String> replicaBrokers = new ArrayList<>();

        private Impl(
            String topic, int partition, long startingOffset, int port,
            List<String> seedBrokers)
            throws SQLException
        {
            this.topic = topic;
            this.partition = partition;
            this.port = port;
            if (metricsPerPartition) {
                this.clientName = clientId + '_' + partition;
            } else {
                this.clientName = clientId;
            }
            replicaBrokers.clear();
            replicaBrokers.addAll(seedBrokers);
            this.leadBroker = findLeadBroker(port, topic, partition, replicaBrokers);
            this.inputStream = context.getInputStream();
        }


    public void run()
    {
        tracer.fine("starting kafka");
        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
        kafkaInputSourceStarted();
        if (requireNReadersBeforeStart > 0) {
            synchronized (runningReadersLock) {
                while (requireNReadersBeforeStart > runningKafkaInputSources && !canceled) {
                    tracer.config("waiting for " + (requireNReadersBeforeStart - runningKafkaInputSources) + "more readers to start");
                    try {
                        runningReadersLock.wait(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                tracer.config("requred number of readers running");
            }
        }
        try (BuffersInputStream.Connection<KafkaSourceData> connection = inputStream.getConnection()) {
            long readOffset = startingOffset;
            int retries = 0;
            while (!canceled) {
                SimpleConsumer consumer = null;
                try {
                    consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);
                    if (readOffset < 0) {
                        readOffset = getLastOffset(consumer,topic, partition, startingTime, clientName);
                    }
                    tracer.fine("starting with offset " + readOffset);
                    int numErrors = 0;
                    while (!canceled) {
                        if (consumer == null) {
                            consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);
                        }
                        FetchRequest req = new FetchRequestBuilder()
                                .clientId(clientName)
                                .addFetch(topic, partition, readOffset, fetchSize)
                                .maxWait(maxWait)
                                .minBytes(minBytes)
                                .build();
                        //tracer.finest("about to fetch");
                        FetchResponse fetchResponse = consumer.fetch(req);
                        //tracer.finest("back from fetch");

                        if (fetchResponse.hasError()) {
                            numErrors++;
                            // Something went wrong!
                            short code = fetchResponse.errorCode(topic, partition);
                            tracer.warning("Error fetching data from the Broker:" + leadBroker + " Reason: " + code + " topic: " + topic + " partition: " + partition);
                            if (numErrors > 10) {
                                canceled = true;
                                tracer.severe("exiting kafka reader topic: " + topic + " partition: " + partitionString);
                                return;
                            }
                            if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                                // We asked for an invalid offset. For simple case ask for the last element to reset
                                readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                                continue;
                            }
                            consumer.close();
                            consumer = null;
                            if (numErrors > 1) {
                                Thread.sleep((2^numErrors));
                            }
                            leadBroker = findNewLeader(
                                leadBroker, port, topic, partition, replicaBrokers);
                            continue;
                        }
                        numErrors = 0;
                        int numRead = 0;
                        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
                            numRead++;
                            long currentOffset = messageAndOffset.offset();
                            if (currentOffset < readOffset) {
                                tracer.warning("Found an old offset: " + currentOffset + " Expecting: " + readOffset + " topic: " + topic + " partition: " + partition);
                                continue;
                            }
                            long nextOffset = messageAndOffset.nextOffset();
                            if (nextOffset > readOffset) {
                                readOffset = nextOffset;
                            }
                            ByteBuffer payload = messageAndOffset.message().payload();

                            KafkaSourceData bufferInfo = connection.getCurrentWithSize(payload.remaining());
                            if (bufferInfo == null) {
                                canceled = true;
                                break;
                            }
                            ByteBuffer buf = bufferInfo.getBuffer();
                            payload.get(buf.array(), buf.position(), payload.remaining());
                            if (tracer.isLoggable(Level.FINE)) {
                                tracer.fine("offset=" + currentOffset + " nextOffset=" + readOffset + " got msg ");
                            }
                            if (canceled) {
                                break;
                            }
                            bufferInfo.offset = currentOffset;
                            bufferInfo.partition = partition;
                            connection.submit();
                            retries = 0;
                        }
                        if (numRead == 0) {
                            connection.flush();
                            Thread.sleep(backoffWait);
                        }
                    }
                } catch (Throwable e) {
                    tracer.log(Level.SEVERE, "Error during kafka processing topic: " + topic + " partition: " + partition, e);
                    if (retries > 1) {
                        Thread.sleep((2^retries));
                    }
                    leadBroker = findNewLeader(
                        leadBroker, port, topic, partition, replicaBrokers);
                } finally {
                    if (consumer != null) {
                        try {
                            consumer.close();
                        } catch (Throwable e) {
                            tracer.log(Level.SEVERE, "Error during while closing consumer", e);
                        }
                        consumer = null;
                    }
                    if (retries > 10) {
                        tracer.log(Level.SEVERE, "Error during kafka processing - exiting topic: " + topic + " partition: " + partitionString);
                        canceled = true;
                        break;
                    }
                }
            }
        } catch (Throwable e) {
            tracer.log(Level.SEVERE, "Exception in kafka topic: " + topic + " partition: " + partition, e);
        } finally {
            tracer.config("exiting kafka reader topic: " + topic + " partition: " + partition);
            canceled = true;
            kafkaInputSourceStopped();
            synchronized (KafkaInputSource.this) {
                runningReaderCount --;
                if (runningReaderCount == 0) {
                    inputStream.sendEOS();
                }
            }
        }
    }
    }

    public void closeAllocation()
    {
        tracer.fine("closing");
        canceled = true;
    }

    @Override
    public SourceDataFactory<?> getSourceDataFactory()
        throws SQLException
    {
        return sourceDataFactory;
    }

    @Override
    public void start()
        throws SQLException
    {
        Map<Integer, Long> partitionOffsets = new HashMap<>();
        if (offsetsQuery != null && offsetsQuery.trim().length() != 0) {
            try (PreparedStatement pstmt = context.getQueryConnection().prepareStatement(offsetsQuery)) {
                try (ResultSet rs = pstmt.executeQuery()) {
                    while (rs.next()) {
                        int partition = rs.getInt("PARTITION");
                        long offset = rs.getLong("OFFSET");
                        partitionOffsets.put(partition, offset);
                    }
                }
            }
        }
        List<String> seedBrokers =
            Arrays.asList(seedBrokersString.split(","));
        if (lastPartition == -1) {
            lastPartition = findMaxPartition(port, topic, seedBrokers);
        }
        runningReaderCount = lastPartition - firstPartition + 1;
        for (int partition = firstPartition; partition <= lastPartition; partition++) {
            long partitionOffset = partitionOffsets.containsKey(partition)
                ? partitionOffsets.get(partition) : startingOffset;
            Impl r = new Impl(topic, partition, partitionOffset, port, seedBrokers);
            Thread t = new Thread(r);
            t.setName("kafkaReader." + topic + '.' + partition);
            t.start();
        }
    }
}

// End KafkaInputSource.java