Sample Code for Source Plugin

The following code creates a source plugin for Kafka. It is useful as an example of how to write a source plugin The topic Reading from Kafka describes the functionality of this plugin.

This sample code works for “message-based” sources, which send out data in messages or buffers with breaks. For sources without such breaks, be sure to make appropriate changes to the segments indicated in the topic - Writing an Extensible Common Data Framework Plugin.

/*
// Aspen dataflow server
// Copyright (C) 2014-<var styleclass="SQL code"><%YEAR%>
SQLstream, Inc.
*/

package com.sqlstream.aspen.namespace.kafka;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.sqlstream.aspen.namespace.common.AbstractDataInputSource;
import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonDataConstants;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.RowInserter;
import com.sqlstream.aspen.namespace.common.SourceData;
import com.sqlstream.aspen.namespace.common.SourceDataFactory;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class KafkaInputSource extends AbstractDataInputSource
{
   protected final static String [] requiredParameters = { "TOPIC" };
   protected final static String [] optionalParameters = {
       "STARTING_TIME",
       "STARTING_OFFSET",
       "PARTITION_OFFSET_QUERY",
       "SEED_BROKERS",
       "PORT",
       "PARTITION",
       "BUFFER_SIZE",
       "FETCH_SIZE",
       "MAX_WAIT",
       "MIN_BYTES",
       "BACKOFF_WAIT",
       "CLIENT_ID",
       "METRICS_PER_PARTITION",
       "REQUIRE_N_READERS_BEFORE_START",
       CommonDataConstants.PARSER_QUEUE_SIZE,
       CommonDataConstants.OPTIONS_QUERY
   };

   protected final Properties initProps = new Properties();

   protected final static Logger tracer =
       Logger.getLogger("com.sqlstream.aspen.namespace.kafka");

   protected ExecutorService service;

   protected volatile boolean canceled;
   protected int runningReaderCount;

   protected int port;
   protected String topic;
   protected String partitionString;
   protected int firstPartition;
   protected int lastPartition;
   protected long startingTime;
   protected long startingOffset;
   protected int consumerBufferSize;
   protected int fetchSize;
   protected int maxWait;
   protected int minBytes;
   protected int backoffWait;
   protected String seedBrokersString;
   protected String offsetsQuery;
   protected String clientId;
   protected boolean metricsPerPartition;
   protected boolean isOpen;
   protected int requireNReadersBeforeStart;
   protected static int runningKafkaInputSources;
   protected final static Object runningReadersLock = new Object();

   protected KafkaSourceDataFactory sourceDataFactory;

   public KafkaInputSource()
   {
       //parsedRows = new ArrayBlockingQueue<Object[]>(4096);
       canceled = false;

       service = Executors.newSingleThreadExecutor();
   }

    static class KafkaSourceData extends SourceData {
        final int offsetColumn;
        final int partitionColumn;

        long offset;
        int partition;

       public KafkaSourceData(KafkaSourceDataFactory factory, int[] columnNumbers)
        {
            super(factory, columnNumbers);
            offsetColumn = columnNumbers[0];
            partitionColumn = columnNumbers[1];
        }

         @Override
        public String locationDescription()
        {
            return "Offset " + offset;
        }

        @Override
        public void updateLocation(RowInserter inserter)
            throws SQLException
        {
            if (partitionColumn != -1) {
                inserter.setInt(partitionColumn, partition);
            }
            if (offsetColumn != -1) {
                inserter.setLong(offsetColumn, offset);
            }
        }
    }
   
    class KafkaSourceDataFactory extends SourceDataFactory<KafkaSourceData> {

        protected KafkaSourceDataFactory(int numBuffers, int numPartitions)
                throws SQLException
        {
            super(new String[] {"OFFSET", "PARTITION"}, context, numBuffers, 262144, true);
        }

        @Override
        public KafkaSourceData newBufferInfo()
        {
            return new KafkaSourceData(this, columnNumbers);
        }
    }

   long parseStartingTime(String startingTimeString) {
       if (startingTimeString.equalsIgnoreCase("LATEST")) {
           return kafka.api.OffsetRequest.LatestTime();
       } else if (startingTimeString.equalsIgnoreCase("EARLIEST")) {
           return kafka.api.OffsetRequest.EarliestTime();
       }
       return Long.parseLong(startingTimeString);
   }

   public void init(Properties props) throws Exception
   {
       OptionsUtils.filterInitProperties(
           props, requiredParameters, optionalParameters, initProps);
       Properties dynamic;
       try {
           Connection conn = context.getQueryConnection();
           dynamic = OptionsUtils.addDynamicOptions(
               initProps, conn);
       } catch (Exception e) {
           // TODO Auto-generated catch block
           dynamic = initProps;
       }
       seedBrokersString = dynamic.getProperty("SEED_BROKERS", "localhost");
       port = Integer.parseInt(dynamic.getProperty("PORT", "9092"));
       topic = dynamic.getProperty("TOPIC");
       partitionString = dynamic.getProperty("PARTITION");
       startingTime = parseStartingTime(dynamic.getProperty(
           "STARTING_TIME", "LATEST"));
       startingOffset = Long.parseLong(dynamic.getProperty(
           "STARTING_OFFSET", "-1")) ;
       offsetsQuery = dynamic.getProperty(
           "PARTITION_OFFSET_QUERY") ;
       consumerBufferSize = Integer.parseInt(dynamic.getProperty(
           "BUFFER_SIZE", "1048576")) ;
       fetchSize = Integer.parseInt(dynamic.getProperty(
           "FETCH_SIZE", "1000000")) ;
       maxWait = Integer.parseInt(dynamic.getProperty(
           "MAX_WAIT", "500")) ;
       minBytes = Integer.parseInt(dynamic.getProperty(
           "MIN_BYTES", "64")) ;
       backoffWait = Integer.parseInt(dynamic.getProperty(
           "BACKOFF_WAIT", "500")) ;
       clientId = dynamic.getProperty("CLIENT_ID");
       metricsPerPartition = "true".equalsIgnoreCase(dynamic.getProperty("METRICS_PER_PARTITION"));
       if (clientId == null | | clientId.trim().isEmpty()) {
           clientId = "client_" + topic;
           if (!metricsPerPartition && partitionString != null && !partitionString.isEmpty()) {
               clientId += '_' + partitionString;
           }
       }
       if (partitionString == null | | partitionString.trim().length() == 0) {
           firstPartition = 0;
           lastPartition = -1;
       } else {
           String [] partitionEnds = partitionString.split("-");
           firstPartition = Integer.parseInt(partitionEnds[0]);
           if (partitionEnds.length == 1) {
               lastPartition = firstPartition;
           } else {
               lastPartition = Integer.parseInt(partitionEnds[1]);
           }
           if (partitionEnds.length > 2 | | firstPartition > lastPartition) {
               throw new SQLException("Invalid partition range " + partitionString);
           }
       }
       int numPartitions = lastPartition - firstPartition + 1;
       int numBuffers = Integer.parseInt(props.getProperty(
           CommonDataConstants.PARSER_QUEUE_SIZE,
           numPartitions <= 1
               ? CommonDataConstants.DEFAULT_PARSER_QUEUE_SIZE
               : Integer.toString(numPartitions + 1)));
       if (numPartitions == 0) {
           numPartitions = 256;
       }
       sourceDataFactory = new KafkaSourceDataFactory(numBuffers, numPartitions);
       requireNReadersBeforeStart = Integer.parseInt(dynamic.getProperty("REQUIRE_N_READERS_BEFORE_START", "0"));
   }

   public Properties getInitProperties()
   {
       return initProps;
   }

   public static long getLastOffset(
       SimpleConsumer consumer, String topic, int partition,
       long whichTime, String clientName)
       throws SQLException
   {
       TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
       Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
           new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
       requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
       kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
           requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
       OffsetResponse response = consumer.getOffsetsBefore(request);

       if (response.hasError()) {
           tracer.warning(
               "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
           return 0;
       }
       long[] offsets = response.offsets(topic, partition);
       return offsets[0];
   }

   public static int findMaxPartition(
       int port, String topic,
       List<String> replicaBrokers)
       throws SQLException
   {
       int maxPartition = 0;
       for (String seed : replicaBrokers) {
           SimpleConsumer consumer = null;
           try {
               consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
               List<String> topics = Collections.singletonList(topic);
               TopicMetadataRequest req = new TopicMetadataRequest(topics);
               kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
   
               List<TopicMetadata> metaData = resp.topicsMetadata();
               for (TopicMetadata item : metaData) {
                   for (PartitionMetadata part : item.partitionsMetadata()) {
                       if (part.partitionId() > maxPartition) {
                           maxPartition = part.partitionId();
                       }
                   }
               }
           } catch (Exception e) {
               throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                       + ", " + "] Reason: " + e, e);
           } finally {
               if (consumer != null) consumer.close();
           }
       }
       return maxPartition;
   }
   
   public static PartitionMetadata findLeader(
       int port, String topic, int partition,
       List<String> replicaBrokers)
   throws SQLException
   {
       PartitionMetadata returnMetaData = null;
       loop:
```sql
       for (String seed : replicaBrokers) {
           SimpleConsumer consumer = null;
           try {
               consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
               List<String> topics = Collections.singletonList(topic);
               TopicMetadataRequest req = new TopicMetadataRequest(topics);
               kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

               List<TopicMetadata> metaData = resp.topicsMetadata();
               for (TopicMetadata item : metaData) {
                   for (PartitionMetadata part : item.partitionsMetadata()) {
                       if (part.partitionId() == partition) {
                           returnMetaData = part;
                           break loop;
                       }
                   }
               }
           } catch (Exception e) {
               throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                       + ", " + partition + "] Reason: " + e, e);
           } finally {
               if (consumer != null) consumer.close();
           }
       }
       if (returnMetaData != null) {
           replicaBrokers.clear();
           for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
               replicaBrokers.add(replica.host());
           }
       }
       return returnMetaData;
   }

   public static String findNewLeader(
       String a_oldLeader,
       int port, String topic, int partition, List<String> replicaBrokers)
       throws SQLException {
       for (int i = 0; i < 3; i++) {
           boolean goToSleep = false;
           PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);
           if (metadata == null) {
               goToSleep = true;
           } else if (metadata.leader() == null) {
               goToSleep = true;
           } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
               // first time through if the leader hasn't changed give ZooKeeper a second to recover
               // second time, assume the broker did recover before failover, or it was a non-Broker issue
               //
               goToSleep = true;
           } else {
               return metadata.leader().host();
           }
           if (goToSleep) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException ie) {
               }
           }
       }
       throw new SQLException("Unable to find new leader after Broker failure. Exiting");
   }

   public static String findLeadBroker(
       int port, String topic, int partition, List<String> replicaBrokers)
       throws SQLException
   {
       // find the meta data about the topic and partition we are interested in
       //
       PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);
       if (metadata == null) {
           tracer.warning("Can't find metadata for Topic and Partition. Exiting");
           return null;
       }
       if (metadata.leader() == null) {
          tracer.warning("Can't find Leader for Topic and Partition. Exiting");
           return null;
       }
       return metadata.leader().host();
   }


   static void kafkaInputSourceStarted() {
       synchronized (runningReadersLock) {
           runningKafkaInputSources++;
           runningReadersLock.notifyAll();
       }
   }
   static void kafkaInputSourceStopped() {
       synchronized (runningReadersLock) {
           runningKafkaInputSources--;
       }
   }

   class Impl implements Runnable {
       //int rowtimeColumn, int offsetColumn, int payloadColumn,
       final String topic;
       final int partition;
       String leadBroker;
       
       final int port;
       final String clientName;
       final BuffersInputStream inputStream;
       

       private List<String> replicaBrokers = new ArrayList<>();
       
       private Impl(
           String topic, int partition, long startingOffset, int port,
           List<String> seedBrokers)
           throws SQLException
       {
           this.topic = topic;
           this.partition = partition;
           this.port = port;
           if (metricsPerPartition) {
               this.clientName = clientId + '_' + partition;
           } else {
               this.clientName = clientId;
           }
           replicaBrokers.clear();
           replicaBrokers.addAll(seedBrokers);
           this.leadBroker = findLeadBroker(port, topic, partition, replicaBrokers);
           this.inputStream = context.getInputStream();
       }


   public void run()
   {
       tracer.fine("starting kafka");
       Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
       kafkaInputSourceStarted();
       if (requireNReadersBeforeStart > 0) {
           synchronized (runningReadersLock) {
               while (requireNReadersBeforeStart > runningKafkaInputSources && !canceled) {
                   tracer.config("waiting for " + (requireNReadersBeforeStart - runningKafkaInputSources) + "more readers to start");
                   try {
                       runningReadersLock.wait(1000);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   }
               }
               tracer.config("requred number of readers running");
           }
       }
       try (BuffersInputStream.Connection<KafkaSourceData> connection = inputStream.getConnection()) {
           long readOffset = startingOffset;
           int retries = 0;
           while (!canceled) {
               SimpleConsumer consumer = null;
               try {
                   consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);
                   if (readOffset < 0) {
                       readOffset = getLastOffset(consumer,topic, partition, startingTime, clientName);
                   }
                   tracer.fine("starting with offset " + readOffset);
                   int numErrors = 0;
                   while (!canceled) {
                       if (consumer == null) {
                           consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);
                       }
                       FetchRequest req = new FetchRequestBuilder()
                               .clientId(clientName)
                               .addFetch(topic, partition, readOffset, fetchSize)
                               .maxWait(maxWait)
                               .minBytes(minBytes)
                               .build();
                       //tracer.finest("about to fetch");
                       FetchResponse fetchResponse = consumer.fetch(req);
                       //tracer.finest("back from fetch");
       
                       if (fetchResponse.hasError()) {
                           numErrors++;
                           // Something went wrong!
                           short code = fetchResponse.errorCode(topic, partition);
                           tracer.warning("Error fetching data from the Broker:" + leadBroker + " Reason: " + code + " topic: " + topic + " partition: " + partition);
                           if (numErrors > 10) {
                               canceled = true;
                               tracer.severe("exiting kafka reader topic: " + topic + " partition: " + partitionString);
                               return;
                           }
                           if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                               // We asked for an invalid offset. For simple case ask for the last element to reset
                               readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                               continue;
                           }
                           consumer.close();
                           consumer = null;
                           if (numErrors > 1) {
                               Thread.sleep((2^numErrors));
                           }
                           leadBroker = findNewLeader(
                               leadBroker, port, topic, partition, replicaBrokers);
                           continue;
                       }
                       numErrors = 0;
                       int numRead = 0;
                       for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
                           numRead++;
                           long currentOffset = messageAndOffset.offset();
                           if (currentOffset < readOffset) {
                               tracer.warning("Found an old offset: " + currentOffset + " Expecting: " + readOffset + " topic: " + topic + " partition: " + partition);
                               continue;
                           }
                           long nextOffset = messageAndOffset.nextOffset();
                           if (nextOffset > readOffset) {
                               readOffset = nextOffset;
                           }
                           ByteBuffer payload = messageAndOffset.message().payload();
       
                           KafkaSourceData bufferInfo = connection.getCurrentWithSize(payload.remaining());
                           if (bufferInfo == null) {
                               canceled = true;
                               break;
                           }
                           ByteBuffer buf = bufferInfo.getBuffer();
                           payload.get(buf.array(), buf.position(), payload.remaining());
                           if (tracer.isLoggable(Level.FINE)) {
                               tracer.fine("offset=" + currentOffset + " nextOffset=" + readOffset + " got msg ");
                           }
                           if (canceled) {
                               break;
                           }
                           bufferInfo.offset = currentOffset;
                           bufferInfo.partition = partition;
                           connection.submit();
                           retries = 0;
                       }
                       if (numRead == 0) {
                           connection.flush();
                           Thread.sleep(backoffWait);
                       }
                   }
               } catch (Throwable e) {
                   tracer.log(Level.SEVERE, "Error during kafka processing topic: " + topic + " partition: " + partition, e);
                   if (retries > 1) {
                       Thread.sleep((2^retries));
                   }
                   leadBroker = findNewLeader(
                       leadBroker, port, topic, partition, replicaBrokers);
               } finally {
                   if (consumer != null) {
                       try {
                           consumer.close();
                       } catch (Throwable e) {
                           tracer.log(Level.SEVERE, "Error during while closing consumer", e);
                       }
                       consumer = null;
                   }
                   if (retries > 10) {
                       tracer.log(Level.SEVERE, "Error during kafka processing - exiting topic: " + topic + " partition: " + partitionString);
                       canceled = true;
                       break;
                   }
               }
           }
       } catch (Throwable e) {
           tracer.log(Level.SEVERE, "Exception in kafka topic: " + topic + " partition: " + partition, e);
       } finally {
           tracer.config("exiting kafka reader topic: " + topic + " partition: " + partition);
           canceled = true;
           kafkaInputSourceStopped();
           synchronized (KafkaInputSource.this) {
               runningReaderCount --;
               if (runningReaderCount == 0) {
                   inputStream.sendEOS();
               }
           }
       }
   }
   }

   public void closeAllocation()
   {
       tracer.fine("closing");
       canceled = true;
   }

   @Override
   public SourceDataFactory<?> getSourceDataFactory()
       throws SQLException
   {
       return sourceDataFactory;
   }

   @Override
   public void start()
       throws SQLException
   {
       Map<Integer, Long> partitionOffsets = new HashMap<>();
       if (offsetsQuery != null && offsetsQuery.trim().length() != 0) {
           try (PreparedStatement pstmt = context.getQueryConnection().prepareStatement(offsetsQuery)) {
               try (ResultSet rs = pstmt.executeQuery()) {
                   while (rs.next()) {
                       int partition = rs.getInt("PARTITION");
                       long offset = rs.getLong("OFFSET");
                       partitionOffsets.put(partition, offset);
                   }
               }
           }
       }
       List<String> seedBrokers =
           Arrays.asList(seedBrokersString.split(","));
       if (lastPartition == -1) {
           lastPartition = findMaxPartition(port, topic, seedBrokers);
       }
       runningReaderCount = lastPartition - firstPartition + 1;
       for (int partition = firstPartition; partition <= lastPartition; partition++) {
           long partitionOffset = partitionOffsets.containsKey(partition)
               ? partitionOffsets.get(partition) : startingOffset;
           Impl r = new Impl(topic, partition, partitionOffset, port, seedBrokers);
           Thread t = new Thread(r);
           t.setName("kafkaReader." + topic + '.' + partition);
           t.start();
       }
   }
}

// End KafkaInputSource.java