The following code creates a source plugin for Kafka. It is useful as an example of how to write a source plugin The topic Reading from Kafka describes the functionality of this plugin.
This sample code works for “message-based” sources, which send out data in messages or buffers with breaks. For sources without such breaks, be sure to make appropriate changes to the segments indicated in the topic - Writing an Extensible Common Data Framework Plugin.
/*
// Aspen dataflow server
// Copyright (C) 2014-<var styleclass="SQL code"><%YEAR%>
SQLstream, Inc.
*/
package com.sqlstream.aspen.namespace.kafka;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sqlstream.aspen.namespace.common.AbstractDataInputSource;
import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonDataConstants;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.RowInserter;
import com.sqlstream.aspen.namespace.common.SourceData;
import com.sqlstream.aspen.namespace.common.SourceDataFactory;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
public class KafkaInputSource extends AbstractDataInputSource
{
  protected final static String [] requiredParameters = { "TOPIC" };
  protected final static String [] optionalParameters = {
    "STARTING_TIME",
    "STARTING_OFFSET",
    "PARTITION_OFFSET_QUERY",
    "SEED_BROKERS",
    "PORT",
    "PARTITION",
    "BUFFER_SIZE",
    "FETCH_SIZE",
    "MAX_WAIT",
    "MIN_BYTES",
    "BACKOFF_WAIT",
    "CLIENT_ID",
    "METRICS_PER_PARTITION",
    "REQUIRE_N_READERS_BEFORE_START",
    CommonDataConstants.PARSER_QUEUE_SIZE,
    CommonDataConstants.OPTIONS_QUERY
  };
  protected final Properties initProps = new Properties();
  protected final static Logger tracer =
    Logger.getLogger("com.sqlstream.aspen.namespace.kafka");
  protected ExecutorService service;
  protected volatile boolean canceled;
  protected int runningReaderCount;
  protected int port;
  protected String topic;
  protected String partitionString;
  protected int firstPartition;
  protected int lastPartition;
  protected long startingTime;
  protected long startingOffset;
  protected int consumerBufferSize;
  protected int fetchSize;
  protected int maxWait;
  protected int minBytes;
  protected int backoffWait;
  protected String seedBrokersString;
  protected String offsetsQuery;
  protected String clientId;
  protected boolean metricsPerPartition;
  protected boolean isOpen;
  protected int requireNReadersBeforeStart;
  protected static int runningKafkaInputSources;
  protected final static Object runningReadersLock = new Object();
  protected KafkaSourceDataFactory sourceDataFactory;
  public KafkaInputSource()
  {
    //parsedRows = new ArrayBlockingQueue<Object[]>(4096);
    canceled = false;
    service = Executors.newSingleThreadExecutor();
  }
  static class KafkaSourceData extends SourceData {
    final int offsetColumn;
    final int partitionColumn;
    long offset;
    int partition;
    public KafkaSourceData(KafkaSourceDataFactory factory, int[] columnNumbers)
    {
      super(factory, columnNumbers);
      offsetColumn = columnNumbers[0];
      partitionColumn = columnNumbers[1];
    }
     @Override
    public String locationDescription()
    {
      return "Offset " + offset;
    }
    @Override
    public void updateLocation(RowInserter inserter)
      throws SQLException
    {
      if (partitionColumn != -1) {
        inserter.setInt(partitionColumn, partition);
      }
      if (offsetColumn != -1) {
        inserter.setLong(offsetColumn, offset);
      }
    }
  }
 Â
  class KafkaSourceDataFactory extends SourceDataFactory<KafkaSourceData> {
    protected KafkaSourceDataFactory(int numBuffers, int numPartitions)
        throws SQLException
    {
      super(new String[] {"OFFSET", "PARTITION"}, context, numBuffers, 262144, true);
    }
    @Override
    public KafkaSourceData newBufferInfo()
    {
      return new KafkaSourceData(this, columnNumbers);
    }
  }
  long parseStartingTime(String startingTimeString) {
    if (startingTimeString.equalsIgnoreCase("LATEST")) {
      return kafka.api.OffsetRequest.LatestTime();
    } else if (startingTimeString.equalsIgnoreCase("EARLIEST")) {
      return kafka.api.OffsetRequest.EarliestTime();
    }
    return Long.parseLong(startingTimeString);
  }
  public void init(Properties props) throws Exception
  {
    OptionsUtils.filterInitProperties(
      props, requiredParameters, optionalParameters, initProps);
    Properties dynamic;
    try {
      Connection conn = context.getQueryConnection();
      dynamic = OptionsUtils.addDynamicOptions(
        initProps, conn);
    } catch (Exception e) {
      // TODO Auto-generated catch block
      dynamic = initProps;
    }
    seedBrokersString = dynamic.getProperty("SEED_BROKERS", "localhost");
    port = Integer.parseInt(dynamic.getProperty("PORT", "9092"));
    topic = dynamic.getProperty("TOPIC");
    partitionString = dynamic.getProperty("PARTITION");
    startingTime = parseStartingTime(dynamic.getProperty(
      "STARTING_TIME", "LATEST"));
    startingOffset = Long.parseLong(dynamic.getProperty(
      "STARTING_OFFSET", "-1")) ;
    offsetsQuery = dynamic.getProperty(
      "PARTITION_OFFSET_QUERY") ;
    consumerBufferSize = Integer.parseInt(dynamic.getProperty(
      "BUFFER_SIZE", "1048576")) ;
    fetchSize = Integer.parseInt(dynamic.getProperty(
      "FETCH_SIZE", "1000000")) ;
    maxWait = Integer.parseInt(dynamic.getProperty(
      "MAX_WAIT", "500")) ;
    minBytes = Integer.parseInt(dynamic.getProperty(
      "MIN_BYTES", "64")) ;
    backoffWait = Integer.parseInt(dynamic.getProperty(
      "BACKOFF_WAIT", "500")) ;
    clientId = dynamic.getProperty("CLIENT_ID");
    metricsPerPartition = "true".equalsIgnoreCase(dynamic.getProperty("METRICS_PER_PARTITION"));
    if (clientId == null | | clientId.trim().isEmpty()) {
      clientId = "client_" + topic;
      if (!metricsPerPartition && partitionString != null && !partitionString.isEmpty()) {
        clientId += '_' + partitionString;
      }
    }
    if (partitionString == null | | partitionString.trim().length() == 0) {
      firstPartition = 0;
      lastPartition = -1;
    } else {
      String [] partitionEnds = partitionString.split("-");
      firstPartition = Integer.parseInt(partitionEnds[0]);
      if (partitionEnds.length == 1) {
        lastPartition = firstPartition;
      } else {
        lastPartition = Integer.parseInt(partitionEnds[1]);
      }
      if (partitionEnds.length > 2 | | firstPartition > lastPartition) {
        throw new SQLException("Invalid partition range " + partitionString);
      }
    }
    int numPartitions = lastPartition - firstPartition + 1;
    int numBuffers = Integer.parseInt(props.getProperty(
      CommonDataConstants.PARSER_QUEUE_SIZE,
      numPartitions <= 1
        ? CommonDataConstants.DEFAULT_PARSER_QUEUE_SIZE
        : Integer.toString(numPartitions + 1)));
    if (numPartitions == 0) {
      numPartitions = 256;
    }
    sourceDataFactory = new KafkaSourceDataFactory(numBuffers, numPartitions);
    requireNReadersBeforeStart = Integer.parseInt(dynamic.getProperty("REQUIRE_N_READERS_BEFORE_START", "0"));
  }
  public Properties getInitProperties()
  {
    return initProps;
  }
  public static long getLastOffset(
    SimpleConsumer consumer, String topic, int partition,
    long whichTime, String clientName)
    throws SQLException
  {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
      new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
      requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    if (response.hasError()) {
      tracer.warning(
        "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
      return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
  }
  public static int findMaxPartition(
    int port, String topic,
    List<String> replicaBrokers)
    throws SQLException
  {
    int maxPartition = 0;
    for (String seed : replicaBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 Â
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            if (part.partitionId() > maxPartition) {
              maxPartition = part.partitionId();
            }
          }
        }
      } catch (Exception e) {
        throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
            + ", " + "] Reason: " + e, e);
      } finally {
        if (consumer != null) consumer.close();
      }
    }
    return maxPartition;
  }
 Â
  public static PartitionMetadata findLeader(
    int port, String topic, int partition,
    List<String> replicaBrokers)
  throws SQLException
  {
    PartitionMetadata returnMetaData = null;
    loop:
    for (String seed : replicaBrokers) {       SimpleConsumer consumer = null;       try {         consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, “leaderLookup”);         List topics = Collections.singletonList(topic);         TopicMetadataRequest req = new TopicMetadataRequest(topics);         kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
        List metaData = resp.topicsMetadata();         for (TopicMetadata item : metaData) {           for (PartitionMetadata part : item.partitionsMetadata()) {             if (part.partitionId() == partition) {               returnMetaData = part;               break loop;             }           }         }       } catch (Exception e) {         throw new SQLException(“Error communicating with Broker [” + seed + “] to find Leader for [” + topic             + “, " + partition + “] Reason: " + e, e);       } finally {         if (consumer != null) consumer.close();       }     }     if (returnMetaData != null) {       replicaBrokers.clear();       for (kafka.cluster.Broker replica : returnMetaData.replicas()) {         replicaBrokers.add(replica.host());       }     }     return returnMetaData;   }
  public static String findNewLeader(     String a_oldLeader,     int port, String topic, int partition, List replicaBrokers)     throws SQLException {     for (int i = 0; i < 3; i++) {       boolean goToSleep = false;       PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);       if (metadata == null) {         goToSleep = true;       } else if (metadata.leader() == null) {         goToSleep = true;       } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {         // first time through if the leader hasn’t changed give ZooKeeper a second to recover         // second time, assume the broker did recover before failover, or it was a non-Broker issue         //         goToSleep = true;       } else {         return metadata.leader().host();       }       if (goToSleep) {         try {           Thread.sleep(1000);         } catch (InterruptedException ie) {         }       }     }     throw new SQLException(“Unable to find new leader after Broker failure. Exiting”);   }
  public static String findLeadBroker(     int port, String topic, int partition, List replicaBrokers)     throws SQLException   {     // find the meta data about the topic and partition we are interested in     //     PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);     if (metadata == null) {       tracer.warning(“Can’t find metadata for Topic and Partition. Exiting”);       return null;     }     if (metadata.leader() == null) {      tracer.warning(“Can’t find Leader for Topic and Partition. Exiting”);       return null;     }     return metadata.leader().host();   }
  static void kafkaInputSourceStarted() {     synchronized (runningReadersLock) {       runningKafkaInputSources++;       runningReadersLock.notifyAll();     }   }   static void kafkaInputSourceStopped() {     synchronized (runningReadersLock) {       runningKafkaInputSources–;     }   }
  class Impl implements Runnable {     //int rowtimeColumn, int offsetColumn, int payloadColumn,     final String topic;     final int partition;     String leadBroker;         final int port;     final String clientName;     final BuffersInputStream inputStream;    Â
    private List replicaBrokers = new ArrayList<>();         private Impl(       String topic, int partition, long startingOffset, int port,       List seedBrokers)       throws SQLException     {       this.topic = topic;       this.partition = partition;       this.port = port;       if (metricsPerPartition) {         this.clientName = clientId + ‘_’ + partition;       } else {         this.clientName = clientId;       }       replicaBrokers.clear();       replicaBrokers.addAll(seedBrokers);       this.leadBroker = findLeadBroker(port, topic, partition, replicaBrokers);       this.inputStream = context.getInputStream();     }
  public void run()   {     tracer.fine(“starting kafka”);     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());     kafkaInputSourceStarted();     if (requireNReadersBeforeStart > 0) {       synchronized (runningReadersLock) {         while (requireNReadersBeforeStart > runningKafkaInputSources && !canceled) {           tracer.config(“waiting for " + (requireNReadersBeforeStart - runningKafkaInputSources) + “more readers to start”);           try {             runningReadersLock.wait(1000);           } catch (InterruptedException e) {             Thread.currentThread().interrupt();           }         }         tracer.config(“requred number of readers running”);       }     }     try (BuffersInputStream.Connection connection = inputStream.getConnection()) {       long readOffset = startingOffset;       int retries = 0;       while (!canceled) {         SimpleConsumer consumer = null;         try {           consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);           if (readOffset < 0) {             readOffset = getLastOffset(consumer,topic, partition, startingTime, clientName);           }           tracer.fine(“starting with offset " + readOffset);           int numErrors = 0;           while (!canceled) {             if (consumer == null) {               consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);             }             FetchRequest req = new FetchRequestBuilder()                 .clientId(clientName)                 .addFetch(topic, partition, readOffset, fetchSize)                 .maxWait(maxWait)                 .minBytes(minBytes)                 .build();             //tracer.finest(“about to fetch”);             FetchResponse fetchResponse = consumer.fetch(req);             //tracer.finest(“back from fetch”);                 if (fetchResponse.hasError()) {               numErrors++;               // Something went wrong!               short code = fetchResponse.errorCode(topic, partition);               tracer.warning(“Error fetching data from the Broker:” + leadBroker + " Reason: " + code + " topic: " + topic + " partition: " + partition);               if (numErrors > 10) {                 canceled = true;                 tracer.severe(“exiting kafka reader topic: " + topic + " partition: " + partitionString);                 return;               }               if (code == ErrorMapping.OffsetOutOfRangeCode())  {                 // We asked for an invalid offset. For simple case ask for the last element to reset                 readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);                 continue;               }               consumer.close();               consumer = null;               if (numErrors > 1) {                 Thread.sleep((2^numErrors));               }               leadBroker = findNewLeader(                 leadBroker, port, topic, partition, replicaBrokers);               continue;             }             numErrors = 0;             int numRead = 0;             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {               numRead++;               long currentOffset = messageAndOffset.offset();               if (currentOffset < readOffset) {                 tracer.warning(“Found an old offset: " + currentOffset + " Expecting: " + readOffset + " topic: " + topic + " partition: " + partition);                 continue;               }               long nextOffset = messageAndOffset.nextOffset();               if (nextOffset > readOffset) {                 readOffset = nextOffset;               }               ByteBuffer payload = messageAndOffset.message().payload();                   KafkaSourceData bufferInfo = connection.getCurrentWithSize(payload.remaining());               if (bufferInfo == null) {                 canceled = true;                 break;               }               ByteBuffer buf = bufferInfo.getBuffer();               payload.get(buf.array(), buf.position(), payload.remaining());               if (tracer.isLoggable(Level.FINE)) {                 tracer.fine(“offset=” + currentOffset + " nextOffset=” + readOffset + " got msg “);               }               if (canceled) {                 break;               }               bufferInfo.offset = currentOffset;               bufferInfo.partition = partition;               connection.submit();               retries = 0;             }             if (numRead == 0) {               connection.flush();               Thread.sleep(backoffWait);             }           }         } catch (Throwable e) {           tracer.log(Level.SEVERE, “Error during kafka processing topic: " + topic + " partition: " + partition, e);           if (retries > 1) {             Thread.sleep((2^retries));           }           leadBroker = findNewLeader(             leadBroker, port, topic, partition, replicaBrokers);         } finally {           if (consumer != null) {             try {               consumer.close();             } catch (Throwable e) {               tracer.log(Level.SEVERE, “Error during while closing consumer”, e);             }             consumer = null;           }           if (retries > 10) {             tracer.log(Level.SEVERE, “Error during kafka processing - exiting topic: " + topic + " partition: " + partitionString);             canceled = true;             break;           }         }       }     } catch (Throwable e) {       tracer.log(Level.SEVERE, “Exception in kafka topic: " + topic + " partition: " + partition, e);     } finally {       tracer.config(“exiting kafka reader topic: " + topic + " partition: " + partition);       canceled = true;       kafkaInputSourceStopped();       synchronized (KafkaInputSource.this) {         runningReaderCount –;         if (runningReaderCount == 0) {           inputStream.sendEOS();         }       }     }   }   }
  public void closeAllocation()   {     tracer.fine(“closing”);     canceled = true;   }
  @Override   public SourceDataFactory<?> getSourceDataFactory()     throws SQLException   {     return sourceDataFactory;   }
  @Override   public void start()     throws SQLException   {     Map<Integer, Long> partitionOffsets = new HashMap<>();     if (offsetsQuery != null && offsetsQuery.trim().length() != 0) {       try (PreparedStatement pstmt = context.getQueryConnection().prepareStatement(offsetsQuery)) {         try (ResultSet rs = pstmt.executeQuery()) {           while (rs.next()) {             int partition = rs.getInt(“PARTITION”);             long offset = rs.getLong(“OFFSET”);             partitionOffsets.put(partition, offset);           }         }       }     }     List seedBrokers =       Arrays.asList(seedBrokersString.split(”,"));     if (lastPartition == -1) {       lastPartition = findMaxPartition(port, topic, seedBrokers);     }     runningReaderCount = lastPartition - firstPartition + 1;     for (int partition = firstPartition; partition <= lastPartition; partition++) {       long partitionOffset = partitionOffsets.containsKey(partition)         ? partitionOffsets.get(partition) : startingOffset;       Impl r = new Impl(topic, partition, partitionOffset, port, seedBrokers);       Thread t = new Thread(r);       t.setName(“kafkaReader.” + topic + ‘.’ + partition);       t.start();     }   } }
// End KafkaInputSource.java