Sample Code for Parser

The following code creates a parser for Key Value Pairs. It is useful as an example of how to write a parser plugin The topic Extensible Common Data Framework: Parsing Key Value Pairs describes the functionality of this plugin.

/*
// Copyright (C) 2016-<var styleclass="SQL code"><%YEAR%>
SQLstream, Inc.
*/
package com.sqlstream.aspen.namespace.keyvalue;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;

import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonRowParser;
import com.sqlstream.aspen.namespace.common.EcdaPluginFactory;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.TypeParser;

/*
 * KeyValuePArser is a parser for KeyValue data. It parses a record at a time
 * from a stream of bytes, reading up to the next line delimiter. The line is
 * split into fields by the column delimiter, unless protected by quotes. (The
 * quote and both delimiters are attruibutes. By default " and ,). Then the
 * field values are parsed from byte-strings to scalar values and inserted into
 * the next row of the target table or stream.
 *
 * For the sake of speed the code avoids making java objects. In particular, the
 * current line is not a java String or a java byte array, but simply a range of
 * contiguous bytes in a larger byte array. The same holds for each field to be
 * parsed.
 *
 */
public class KeyValueParser extends CommonRowParser {

    private final static String KEY_VALUE_SEPARATOR_CHARACTER = "KEY_VALUE_SEPARATOR_CHARACTER";

    protected final static String [] REQUIRED_PROPERTIES = {
    };
    protected final static String [] OPTIONAL_PROPERTIES = {
        ROW_SEPARATOR_CHAR_KEY, SEPARATOR_CHAR_KEY, PARSER_SKIP_HEADER_KEY,
        COLUMN_QUOTE_CHARACTER, QUOTED_COLUMNS, KEY_VALUE_SEPARATOR_CHARACTER
    };
    private BytePointer[] columns;
    private ArrayList<BytePointer> unmatchedKeys;

    private byte[] rowDelimiter;
    private byte[] colDelimiter;
    private byte[] quotes;
    private byte[] keyValueSeparator;
    private int quotesLength = 0;
    private int keyValueSeparatorLength;
    private boolean rowDelimiterSpecified;

    protected TypeParser[] rowParsers;
    protected BuffersInputStream.Line currentLine; // without line delimiter

    private boolean rowGenerated;
    private int rowCount = 0;

    private static BytePointer key;

    class Impl implements Runnable {

        public void run() {
            long rowStartPosition = 0;
            try {
                tracer.log(Level.FINE, "Started parsing the stream");
                while (inputStream.getLine(currentLine, rowDelimiter)) {
                    parseRow(rowStartPosition);
                    rowStartPosition = inputStream.getPosition();
                }
                tracer.log(Level.FINE, "Finished parsing the stream");
            } catch (Throwable e) {
                tracer.log(Level.SEVERE, "Exiting parser", e);
            } finally {
                context.close();
            }
        }

    }

    public KeyValueParser() {
        super();
    }

    /*
     * Initializes this formatter and throws an exception if something is
     * underspecified
     */
    public void init(Properties props) throws Exception {
        super.init(props);
        OptionsUtils.filterInitProperties(
            props, REQUIRED_PROPERTIES, OPTIONAL_PROPERTIES, initProps);
        key = new BytePointer();
        // Record separator. By default is a \n character
        String rowSeparator = props.getProperty(ROW_SEPARATOR_CHAR_KEY);
        rowDelimiterSpecified = (rowSeparator != null);
        if (rowSeparator == null) {
            rowSeparator = "\n";
        }
        rowDelimiter = rowSeparator.getBytes(charset);

        // Field (key=value) separator. By default is a comma (,)
        String colSeparator = props.getProperty(SEPARATOR_CHAR_KEY);
        if (colSeparator == null) {
            colSeparator = ",";
        }
        colDelimiter = colSeparator.getBytes(charset);

        initDefaultTypeParsers(props, initProps);
        rowParsers = initRowParsers();

        // The quote character. By defautl it is a double quote character"
        String quoteCharacter = props.getProperty(COLUMN_QUOTE_CHARACTER);
        String quotedColumns = props.getProperty(QUOTED_COLUMNS);
        if (null == quoteCharacter && quotedColumns != null) {
            quoteCharacter = "\"";
        }
        if (quoteCharacter != null) {
            quotes = quoteCharacter.getBytes(charset);
        } else {
            quotes = "\"".getBytes(charset);
        }
        quotesLength = quotes.length;

        // The character that separates a key from a value. By default is a =
        // It can be configured using the KEY_VALUE_SEPARATOR_CHARACTER property
        String keyValueSeparatorCharacter = props.getProperty(KEY_VALUE_SEPARATOR_CHARACTER);
        if (null == keyValueSeparatorCharacter) {
            keyValueSeparatorCharacter = "=";
        }

        keyValueSeparator = keyValueSeparatorCharacter.getBytes(charset);
        keyValueSeparatorLength = keyValueSeparator.length;

    }

    /*
     * Parses a single record
     *
     * @param rowStartPosition
     * @throws Exception
     */
    public void parseRow(long rowStartPosition) throws Exception {
        rowGenerated = false;
        final int rowLength = currentLine.length();
        key.setBytes(currentLine.buffer);
        int a = currentLine.buffer.length;
        assert (rowLength >= 0);
        try {
            traceLine(Level.FINEST, currentLine);

            // String s = new String (currentLine.buffer, charset);
            // Brief explanation of the index variables
            // assuming a a row with a "key=value" field, where the index of k=0
            // and the index of the last 'e' is 8 then
            // startKey = 0
            // endKEy = 2
            // startValue = 4
            // endValue = 8
            // colStart = 0
            // colEnd = 8

            int rowEnd = rowLength;
            int colStart = 0, colEnd = 0; // offsets in currentLine
            boolean sawComma = false;

            boolean finish = false;

            // parse the fields until it reaches the end of the line
            while (!finish) {
                
                // find next field in the line (exclude delimiters)
                colStart = colEnd;
                if (colStart < rowEnd) {
                    if (sawComma) {
                        colStart += colDelimiter.length;
                    }
                    colEnd = currentLine.findBytes(colStart, colDelimiter);
                    if (colEnd < 0) {
                        // it's the last key=value in the row
                        colEnd = rowEnd;
                        sawComma = false;
                        finish = true;
                    } else {
                        sawComma = true;
                    }
                }
                int startKey = colStart;
                int endKey = currentLine.findBytes(startKey, colEnd, keyValueSeparator);

                int startValue = endKey + keyValueSeparatorLength;
                int endValue = colEnd;

                // key = new BytePointer(startKey, endKey, currentLine.buffer);

                key.setLimits(startKey, endKey, currentLine.start);
                //

                // parse the text from colStart to colEnd
                try {
                                    //traceField(Level.FINEST, null, startKey, endKey, currentLine, colStart, colEnd);
                    TypeParser parser = null;
                    int iCol = 0;
                    for (iCol = 0; iCol < numColumns; iCol++) {
                        if (columns[iCol].equals(key)) {
                            parser = rowParsers[iCol];
                            iCol++;
                            break;
                        }
                    }

                    // this condition check if the current key is present in the
                    // table,
                    // if not is is traced (only once for each new key) and the
                    // loop
                    // skips this iteration
                    if (parser == null) {
                        // unmatched key
                        boolean contains = false;
                        for(BytePointer unmatchedKey : this.unmatchedKeys){
                            if(unmatchedKey.equals(key)){
                                contains = true;
                            }
                        }
                        
                        if (!contains) {
                            BytePointer unmatchedKey = (BytePointer)key.clone();
                            this.unmatchedKeys.add(unmatchedKey);
                            tracer.log(Level.WARNING, "Unmatched key: " + unmatchedKey.toString());
                        }

                        continue;
                    }

                    // Checks if this value starts with a quote character
                    if (atQuote(startValue)) {
                        // strip off quotes; but what about string "" vs
                        // string null??
                        int start = startValue + quotesLength;
                        int end = currentLine.findBytes(start, endValue, quotes);
                        if (end < 0) {
                            end = colEnd; // expect a parse err with a msg
                        }
                        traceField(Level.FINEST, "trimmed ", startKey, endKey, currentLine, start, end);
                        parser.parse(currentLine.buffer, currentLine.start + start, currentLine.start + end, inputStmt,
                                iCol);
                    } else {
                        parser.parse(currentLine.buffer, currentLine.start + startValue, currentLine.start + endValue,
                                inputStmt, iCol);
                    }

                } catch (Exception e) {
                    setParserPosition(rowStartPosition - messageStartPosition + colEnd);
                    String msg = e.getMessage() + " while parsing field " + key.toString() + " at line " + getParserLineNumber()
                            + " of " + inputStream.locationDescription();
                    setParserError(msg);
                    tracer.log(Level.WARNING, msg, e);
                }
            }
            rowGenerated = true;
        } finally {
            if (rowGenerated) {
                submitRow(tracer);
            }
            incParserLineNumber();
        }

    }

    /*
     * Logs the given line to the tracer
     *
     * @param level
     * @param line
     */
    private void traceLine(Level level, BuffersInputStream.Line line) {
        if (tracer.isLoggable(level)) {
            StringBuilder sb = new StringBuilder();
            sb.append("row ").append(rowCount).append(" location ").append(inputStream.locationDescription())
                    .append(" line ").append(getParserLineNumber()).append(": ").append(line.asString());
            tracer.log(level, sb.toString());
        }
    }

    /*
     * Logs a single field (key=value) to the tracer
     *
     * @param level
     * @param prefix
     * @param startKey
     * @param endKey
     * @param line
     * @param startValue
     * @param endValue
     * @throws SQLException
     */
    private void traceField(
        Level level, String prefix, int startKey, int endKey, BuffersInputStream.Line line,
        int startValue, int endValue)
        throws SQLException
    {
        if (tracer.isLoggable(level)) {
            StringBuilder sb = new StringBuilder();
            if (prefix != null)
                sb.append(prefix);
            sb.append(" ").append(line.asString(startKey, (endKey - startKey))).append(" parse: ")
                    .append(line.asString(startValue, (endValue - startValue)));
            tracer.log(level, sb.toString());
        }
    }

    /*
     * Initializes the required row parsers
     *
     * @return
     * @throws Exception
     */
    private TypeParser[] initRowParsers() throws Exception {
        Set<String> alreadyMapped = context.getMappedSpecialColumns();
        this.columns = new BytePointer[numColumns];
        this.unmatchedKeys = new ArrayList<>();
        ArrayList<TypeParser> parsers = new ArrayList<>();

        for (int i = 1; i <= numColumns; i++) {
            byte[] fieldName = metaData.getFieldName(i).getBytes();

            this.columns[i - 1] = new BytePointer(0, fieldName.length, fieldName);
            if (alreadyMapped.contains(fieldName)) {
                parsers.add(null);
                continue;
            }
            TypeParser customParser = customParsers.get(fieldName);
            if (customParser != null) {
                if (logLevel.isTraceFine()) {
                    tracer.fine("custom parser " + customParser);
                }
                parsers.add(customParser);
            } else {
                parsers.add(getTypeParser(i));
            }
        }
        if (logLevel.isTraceFinest()) {
            tracer.finest("row parsers size " + parsers.size());
            tracer.finest("using type parsers " + parsers);
        }
        return parsers.toArray(new TypeParser[0]);
    }

    /*
     * Checks if an input field start with a quote
     *
     * @param valueStart
     *            the index of the beginning of the value
     * @return true if this value starts with a quote character
     * @throws IOException
     */
    private boolean atQuote(int valueStart) throws IOException {
        if (quotes == null)
            return false;
        for (int i = 0; i < quotesLength; ++i) {
            byte b = currentLine.buffer[currentLine.start + valueStart + i];
            if (b != quotes[i]) {
                return false;
            }
        }
        return true;
    }

    @Override
    public void closeAllocation() {
        tracer.fine("final rowCount=" + rowCount);
        super.closeAllocation();
    }

    @Override
    public void start() {
        Impl impl = new Impl();
        inputStream = context.getInputStream();
        if (quotes != null) {
            inputStream.setQuoteCharacters(quotes);
        }
        currentLine = inputStream.newLine();
        Thread thread = new Thread(impl);
        thread.start();
    }

    public static class PluginFactory extends EcdaPluginFactory {
        /*
         * Installs the parser
         */
        public void installPlugins() {
            installParser("KV", KeyValueParser.class);
            tracer.log(Level.FINE, "Installed KeyValue parser");
        }
    }

    /*
     *
     * A BytePointer is a type that contains a pointer to a byte[] and a start
     * and end indexes. The BytePointer is used to compares values of the stream
     * (requires less computation power than Strings)
     *
     */
    class BytePointer {
        private int start;
        private int end;
        private byte[] bytes;

        public BytePointer() {
            super();
            this.start = 0;
            this.end = 0;
            this.bytes = null;
        }

        public BytePointer(int start, int end, byte[] bytes) {
            super();
            this.start = start;
            this.end = end;
            this.bytes = bytes;
        }

        public void setLimits(int start, int end, long rowStartPosition) {
            
            this.start = start + (int)rowStartPosition;
            this.end = end + (int)rowStartPosition;
            
        }

        /*
         * hashCode implementation similar to the String hashCode
         */
        @Override
        public int hashCode() {
            int result = 0;
            int n = end - start;
            int i = 1;
            for (int index = start; index < end; index++) {

                result += (bytes[index] * Math.pow(31, n - i));
                i++;
            }

            return result;
        }

        /*
         * This equals compares two bytePointers. In order to be equal, they
         * muys have the same length (end-start) and the same bytes in this
         * interval
         */
        @Override
        public boolean equals(Object obj) {
            try{
            BytePointer other = (BytePointer) obj;
            
            if ((this.end - this.start) != (other.getEnd() - other.getStart())) {
                return false;
            }
            byte[] otherByteArray = other.getBytes();
            int otherStart = other.getStart();
            int len = end - start;

            for (int i = 0; i < len; i++) {
                if (this.bytes[i + start] != otherByteArray[i + otherStart]) {
                    return false;
                }
            }
            return true;
            }catch(Exception e){
                e.printStackTrace();
            }
            return false;
        }

        @Override
        protected Object clone() throws CloneNotSupportedException {
            
            return new BytePointer(0, end-start, Arrays.copyOfRange(bytes, start, end));
            
        }
        
        
        // getters and setters


        @Override
        public String toString() {
            byte[] byteStr = Arrays.copyOfRange(bytes, start, end);
            return new String(byteStr, charset);
        }

        public int getStart() {
            return start;
        }

        public void setStart(int start) {
            this.start = start;
        }

        public int getEnd() {
            return end;
        }

        public void setEnd(int end) {
            this.end = end;
        }

        public byte[] getBytes() {
            return bytes;
        }

        public void setBytes(byte[] bytes) {
            this.bytes = bytes;
        }

    }

}
// End KeyValueParser.java