Sample Code for Source Plugin

The following code creates a source plugin for Kafka. It is useful as an example of how to write a source plugin The topic Reading from Kafka describes the functionality of this plugin.

This sample code works for “message-based” sources, which send out data in messages or buffers with breaks. For sources without such breaks, be sure to make appropriate changes to the segments indicated in the topic - Writing an Extensible Common Data Framework Plugin.

/*
// Aspen dataflow server
// Copyright (C) 2014-<var styleclass="SQL code"><%YEAR%>
SQLstream, Inc.
*/

package com.sqlstream.aspen.namespace.kafka;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.sqlstream.aspen.namespace.common.AbstractDataInputSource;
import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonDataConstants;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.RowInserter;
import com.sqlstream.aspen.namespace.common.SourceData;
import com.sqlstream.aspen.namespace.common.SourceDataFactory;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class KafkaInputSource extends AbstractDataInputSource
{
   protected final static String [] requiredParameters = { "TOPIC" };
   protected final static String [] optionalParameters = {
       "STARTING_TIME",
       "STARTING_OFFSET",
       "PARTITION_OFFSET_QUERY",
       "SEED_BROKERS",
       "PORT",
       "PARTITION",
       "BUFFER_SIZE",
       "FETCH_SIZE",
       "MAX_WAIT",
       "MIN_BYTES",
       "BACKOFF_WAIT",
       "CLIENT_ID",
       "METRICS_PER_PARTITION",
       "REQUIRE_N_READERS_BEFORE_START",
       CommonDataConstants.PARSER_QUEUE_SIZE,
       CommonDataConstants.OPTIONS_QUERY
   };

   protected final Properties initProps = new Properties();

   protected final static Logger tracer =
       Logger.getLogger("com.sqlstream.aspen.namespace.kafka");

   protected ExecutorService service;

   protected volatile boolean canceled;
   protected int runningReaderCount;

   protected int port;
   protected String topic;
   protected String partitionString;
   protected int firstPartition;
   protected int lastPartition;
   protected long startingTime;
   protected long startingOffset;
   protected int consumerBufferSize;
   protected int fetchSize;
   protected int maxWait;
   protected int minBytes;
   protected int backoffWait;
   protected String seedBrokersString;
   protected String offsetsQuery;
   protected String clientId;
   protected boolean metricsPerPartition;
   protected boolean isOpen;
   protected int requireNReadersBeforeStart;
   protected static int runningKafkaInputSources;
   protected final static Object runningReadersLock = new Object();

   protected KafkaSourceDataFactory sourceDataFactory;

   public KafkaInputSource()
   {
       //parsedRows = new ArrayBlockingQueue<Object[]>(4096);
       canceled = false;

       service = Executors.newSingleThreadExecutor();
   }

    static class KafkaSourceData extends SourceData {
        final int offsetColumn;
        final int partitionColumn;

        long offset;
        int partition;

       public KafkaSourceData(KafkaSourceDataFactory factory, int[] columnNumbers)
        {
            super(factory, columnNumbers);
            offsetColumn = columnNumbers[0];
            partitionColumn = columnNumbers[1];
        }

         @Override
        public String locationDescription()
        {
            return "Offset " + offset;
        }

        @Override
        public void updateLocation(RowInserter inserter)
            throws SQLException
        {
            if (partitionColumn != -1) {
                inserter.setInt(partitionColumn, partition);
            }
            if (offsetColumn != -1) {
                inserter.setLong(offsetColumn, offset);
            }
        }
    }
   
    class KafkaSourceDataFactory extends SourceDataFactory<KafkaSourceData> {

        protected KafkaSourceDataFactory(int numBuffers, int numPartitions)
                throws SQLException
        {
            super(new String[] {"OFFSET", "PARTITION"}, context, numBuffers, 262144, true);
        }

        @Override
        public KafkaSourceData newBufferInfo()
        {
            return new KafkaSourceData(this, columnNumbers);
        }
    }

   long parseStartingTime(String startingTimeString) {
       if (startingTimeString.equalsIgnoreCase("LATEST")) {
           return kafka.api.OffsetRequest.LatestTime();
       } else if (startingTimeString.equalsIgnoreCase("EARLIEST")) {
           return kafka.api.OffsetRequest.EarliestTime();
       }
       return Long.parseLong(startingTimeString);
   }

   public void init(Properties props) throws Exception
   {
       OptionsUtils.filterInitProperties(
           props, requiredParameters, optionalParameters, initProps);
       Properties dynamic;
       try {
           Connection conn = context.getQueryConnection();
           dynamic = OptionsUtils.addDynamicOptions(
               initProps, conn);
       } catch (Exception e) {
           // TODO Auto-generated catch block
           dynamic = initProps;
       }
       seedBrokersString = dynamic.getProperty("SEED_BROKERS", "localhost");
       port = Integer.parseInt(dynamic.getProperty("PORT", "9092"));
       topic = dynamic.getProperty("TOPIC");
       partitionString = dynamic.getProperty("PARTITION");
       startingTime = parseStartingTime(dynamic.getProperty(
           "STARTING_TIME", "LATEST"));
       startingOffset = Long.parseLong(dynamic.getProperty(
           "STARTING_OFFSET", "-1")) ;
       offsetsQuery = dynamic.getProperty(
           "PARTITION_OFFSET_QUERY") ;
       consumerBufferSize = Integer.parseInt(dynamic.getProperty(
           "BUFFER_SIZE", "1048576")) ;
       fetchSize = Integer.parseInt(dynamic.getProperty(
           "FETCH_SIZE", "1000000")) ;
       maxWait = Integer.parseInt(dynamic.getProperty(
           "MAX_WAIT", "500")) ;
       minBytes = Integer.parseInt(dynamic.getProperty(
           "MIN_BYTES", "64")) ;
       backoffWait = Integer.parseInt(dynamic.getProperty(
           "BACKOFF_WAIT", "500")) ;
       clientId = dynamic.getProperty("CLIENT_ID");
       metricsPerPartition = "true".equalsIgnoreCase(dynamic.getProperty("METRICS_PER_PARTITION"));
       if (clientId == null | | clientId.trim().isEmpty()) {
           clientId = "client_" + topic;
           if (!metricsPerPartition && partitionString != null && !partitionString.isEmpty()) {
               clientId += '_' + partitionString;
           }
       }
       if (partitionString == null | | partitionString.trim().length() == 0) {
           firstPartition = 0;
           lastPartition = -1;
       } else {
           String [] partitionEnds = partitionString.split("-");
           firstPartition = Integer.parseInt(partitionEnds[0]);
           if (partitionEnds.length == 1) {
               lastPartition = firstPartition;
           } else {
               lastPartition = Integer.parseInt(partitionEnds[1]);
           }
           if (partitionEnds.length > 2 | | firstPartition > lastPartition) {
               throw new SQLException("Invalid partition range " + partitionString);
           }
       }
       int numPartitions = lastPartition - firstPartition + 1;
       int numBuffers = Integer.parseInt(props.getProperty(
           CommonDataConstants.PARSER_QUEUE_SIZE,
           numPartitions <= 1
               ? CommonDataConstants.DEFAULT_PARSER_QUEUE_SIZE
               : Integer.toString(numPartitions + 1)));
       if (numPartitions == 0) {
           numPartitions = 256;
       }
       sourceDataFactory = new KafkaSourceDataFactory(numBuffers, numPartitions);
       requireNReadersBeforeStart = Integer.parseInt(dynamic.getProperty("REQUIRE_N_READERS_BEFORE_START", "0"));
   }

   public Properties getInitProperties()
   {
       return initProps;
   }

   public static long getLastOffset(
       SimpleConsumer consumer, String topic, int partition,
       long whichTime, String clientName)
       throws SQLException
   {
       TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
       Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
           new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
       requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
       kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
           requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
       OffsetResponse response = consumer.getOffsetsBefore(request);

       if (response.hasError()) {
           tracer.warning(
               "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
           return 0;
       }
       long[] offsets = response.offsets(topic, partition);
       return offsets[0];
   }

   public static int findMaxPartition(
       int port, String topic,
       List<String> replicaBrokers)
       throws SQLException
   {
       int maxPartition = 0;
       for (String seed : replicaBrokers) {
           SimpleConsumer consumer = null;
           try {
               consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
               List<String> topics = Collections.singletonList(topic);
               TopicMetadataRequest req = new TopicMetadataRequest(topics);
               kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
   
               List<TopicMetadata> metaData = resp.topicsMetadata();
               for (TopicMetadata item : metaData) {
                   for (PartitionMetadata part : item.partitionsMetadata()) {
                       if (part.partitionId() > maxPartition) {
                           maxPartition = part.partitionId();
                       }
                   }
               }
           } catch (Exception e) {
               throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                       + ", " + "] Reason: " + e, e);
           } finally {
               if (consumer != null) consumer.close();
           }
       }
       return maxPartition;
   }
   
   public static PartitionMetadata findLeader(
       int port, String topic, int partition,
       List<String> replicaBrokers)
   throws SQLException
   {
       PartitionMetadata returnMetaData = null;
       loop:

       for (String seed : replicaBrokers) {            SimpleConsumer consumer = null;            try {                consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, “leaderLookup”);                List topics = Collections.singletonList(topic);                TopicMetadataRequest req = new TopicMetadataRequest(topics);                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

               List metaData = resp.topicsMetadata();                for (TopicMetadata item : metaData) {                    for (PartitionMetadata part : item.partitionsMetadata()) {                        if (part.partitionId() == partition) {                            returnMetaData = part;                            break loop;                        }                    }                }            } catch (Exception e) {                throw new SQLException(“Error communicating with Broker [” + seed + “] to find Leader for [” + topic                        + “, " + partition + “] Reason: " + e, e);            } finally {                if (consumer != null) consumer.close();            }        }        if (returnMetaData != null) {            replicaBrokers.clear();            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {                replicaBrokers.add(replica.host());            }        }        return returnMetaData;    }

   public static String findNewLeader(        String a_oldLeader,        int port, String topic, int partition, List replicaBrokers)        throws SQLException {        for (int i = 0; i < 3; i++) {            boolean goToSleep = false;            PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);            if (metadata == null) {                goToSleep = true;            } else if (metadata.leader() == null) {                goToSleep = true;            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {                // first time through if the leader hasn’t changed give ZooKeeper a second to recover                // second time, assume the broker did recover before failover, or it was a non-Broker issue                //                goToSleep = true;            } else {                return metadata.leader().host();            }            if (goToSleep) {                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                }            }        }        throw new SQLException(“Unable to find new leader after Broker failure. Exiting”);    }

   public static String findLeadBroker(        int port, String topic, int partition, List replicaBrokers)        throws SQLException    {        // find the meta data about the topic and partition we are interested in        //        PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);        if (metadata == null) {            tracer.warning(“Can’t find metadata for Topic and Partition. Exiting”);            return null;        }        if (metadata.leader() == null) {           tracer.warning(“Can’t find Leader for Topic and Partition. Exiting”);            return null;        }        return metadata.leader().host();    }

   static void kafkaInputSourceStarted() {        synchronized (runningReadersLock) {            runningKafkaInputSources++;            runningReadersLock.notifyAll();        }    }    static void kafkaInputSourceStopped() {        synchronized (runningReadersLock) {            runningKafkaInputSources–;        }    }

   class Impl implements Runnable {        //int rowtimeColumn, int offsetColumn, int payloadColumn,        final String topic;        final int partition;        String leadBroker;                final int port;        final String clientName;        final BuffersInputStream inputStream;        

       private List replicaBrokers = new ArrayList<>();                private Impl(            String topic, int partition, long startingOffset, int port,            List seedBrokers)            throws SQLException        {            this.topic = topic;            this.partition = partition;            this.port = port;            if (metricsPerPartition) {                this.clientName = clientId + ‘_’ + partition;            } else {                this.clientName = clientId;            }            replicaBrokers.clear();            replicaBrokers.addAll(seedBrokers);            this.leadBroker = findLeadBroker(port, topic, partition, replicaBrokers);            this.inputStream = context.getInputStream();        }

   public void run()    {        tracer.fine(“starting kafka”);        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());        kafkaInputSourceStarted();        if (requireNReadersBeforeStart > 0) {            synchronized (runningReadersLock) {                while (requireNReadersBeforeStart > runningKafkaInputSources && !canceled) {                    tracer.config(“waiting for " + (requireNReadersBeforeStart - runningKafkaInputSources) + “more readers to start”);                    try {                        runningReadersLock.wait(1000);                    } catch (InterruptedException e) {                        Thread.currentThread().interrupt();                    }                }                tracer.config(“requred number of readers running”);            }        }        try (BuffersInputStream.Connection connection = inputStream.getConnection()) {            long readOffset = startingOffset;            int retries = 0;            while (!canceled) {                SimpleConsumer consumer = null;                try {                    consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);                    if (readOffset < 0) {                        readOffset = getLastOffset(consumer,topic, partition, startingTime, clientName);                    }                    tracer.fine(“starting with offset " + readOffset);                    int numErrors = 0;                    while (!canceled) {                        if (consumer == null) {                            consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);                        }                        FetchRequest req = new FetchRequestBuilder()                                .clientId(clientName)                                .addFetch(topic, partition, readOffset, fetchSize)                                .maxWait(maxWait)                                .minBytes(minBytes)                                .build();                        //tracer.finest(“about to fetch”);                        FetchResponse fetchResponse = consumer.fetch(req);                        //tracer.finest(“back from fetch”);                                if (fetchResponse.hasError()) {                            numErrors++;                            // Something went wrong!                            short code = fetchResponse.errorCode(topic, partition);                            tracer.warning(“Error fetching data from the Broker:” + leadBroker + " Reason: " + code + " topic: " + topic + " partition: " + partition);                            if (numErrors > 10) {                                canceled = true;                                tracer.severe(“exiting kafka reader topic: " + topic + " partition: " + partitionString);                                return;                            }                            if (code == ErrorMapping.OffsetOutOfRangeCode())  {                                // We asked for an invalid offset. For simple case ask for the last element to reset                                readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);                                continue;                            }                            consumer.close();                            consumer = null;                            if (numErrors > 1) {                                Thread.sleep((2^numErrors));                            }                            leadBroker = findNewLeader(                                leadBroker, port, topic, partition, replicaBrokers);                            continue;                        }                        numErrors = 0;                        int numRead = 0;                        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {                            numRead++;                            long currentOffset = messageAndOffset.offset();                            if (currentOffset < readOffset) {                                tracer.warning(“Found an old offset: " + currentOffset + " Expecting: " + readOffset + " topic: " + topic + " partition: " + partition);                                continue;                            }                            long nextOffset = messageAndOffset.nextOffset();                            if (nextOffset > readOffset) {                                readOffset = nextOffset;                            }                            ByteBuffer payload = messageAndOffset.message().payload();                                    KafkaSourceData bufferInfo = connection.getCurrentWithSize(payload.remaining());                            if (bufferInfo == null) {                                canceled = true;                                break;                            }                            ByteBuffer buf = bufferInfo.getBuffer();                            payload.get(buf.array(), buf.position(), payload.remaining());                            if (tracer.isLoggable(Level.FINE)) {                                tracer.fine(“offset=” + currentOffset + " nextOffset=” + readOffset + " got msg “);                            }                            if (canceled) {                                break;                            }                            bufferInfo.offset = currentOffset;                            bufferInfo.partition = partition;                            connection.submit();                            retries = 0;                        }                        if (numRead == 0) {                            connection.flush();                            Thread.sleep(backoffWait);                        }                    }                } catch (Throwable e) {                    tracer.log(Level.SEVERE, “Error during kafka processing topic: " + topic + " partition: " + partition, e);                    if (retries > 1) {                        Thread.sleep((2^retries));                    }                    leadBroker = findNewLeader(                        leadBroker, port, topic, partition, replicaBrokers);                } finally {                    if (consumer != null) {                        try {                            consumer.close();                        } catch (Throwable e) {                            tracer.log(Level.SEVERE, “Error during while closing consumer”, e);                        }                        consumer = null;                    }                    if (retries > 10) {                        tracer.log(Level.SEVERE, “Error during kafka processing - exiting topic: " + topic + " partition: " + partitionString);                        canceled = true;                        break;                    }                }            }        } catch (Throwable e) {            tracer.log(Level.SEVERE, “Exception in kafka topic: " + topic + " partition: " + partition, e);        } finally {            tracer.config(“exiting kafka reader topic: " + topic + " partition: " + partition);            canceled = true;            kafkaInputSourceStopped();            synchronized (KafkaInputSource.this) {                runningReaderCount –;                if (runningReaderCount == 0) {                    inputStream.sendEOS();                }            }        }    }    }

   public void closeAllocation()    {        tracer.fine(“closing”);        canceled = true;    }

   @Override    public SourceDataFactory<?> getSourceDataFactory()        throws SQLException    {        return sourceDataFactory;    }

   @Override    public void start()        throws SQLException    {        Map<Integer, Long> partitionOffsets = new HashMap<>();        if (offsetsQuery != null && offsetsQuery.trim().length() != 0) {            try (PreparedStatement pstmt = context.getQueryConnection().prepareStatement(offsetsQuery)) {                try (ResultSet rs = pstmt.executeQuery()) {                    while (rs.next()) {                        int partition = rs.getInt(“PARTITION”);                        long offset = rs.getLong(“OFFSET”);                        partitionOffsets.put(partition, offset);                    }                }            }        }        List seedBrokers =            Arrays.asList(seedBrokersString.split(”,"));        if (lastPartition == -1) {            lastPartition = findMaxPartition(port, topic, seedBrokers);        }        runningReaderCount = lastPartition - firstPartition + 1;        for (int partition = firstPartition; partition <= lastPartition; partition++) {            long partitionOffset = partitionOffsets.containsKey(partition)                ? partitionOffsets.get(partition) : startingOffset;            Impl r = new Impl(topic, partition, partitionOffset, port, seedBrokers);            Thread t = new Thread(r);            t.setName(“kafkaReader.” + topic + ‘.’ + partition);            t.start();        }    } }

// End KafkaInputSource.java