Using the ECD Agent

To read data from or write data to remote locations (that is, locations that are not on the same machine as s-Server), you need to configure and use the Extensible Common Data agent. This agent is a standalone Java application that mediates data between SQLstream s-Server and external data sources.

The agent is supplied both as part of the SQLstream s-Server product and as part of the ClientTools download from the SQLstream website (via SQLstream-<VERSION>-clienttools-linux.run or SQLstream-client-tools-<VERSION>-windows.exe).

The ECDA Agent uses the SQLstream JDBC driver either to select from a stream in s-Server (in order to write data to an external target) or to write to a stream (when reading from an external source). You need to create the stream before executing the agent.

The agent itself is a wrapper (or "hoster") for the ECDA (Extensible Common Data Adapter). The agent is configured much like the adapter, except that you use command-line options and properties files instead of foreign stream OPTIONs to configure the agent.

Using ECD Agent on the same host as s-Server

Sometimes it can be helpful to run the agent on the same host as s-Server. It is shipped and by default installed alongside s-Server.

An example use-case is where data from multiple sources is collected into multiple local directories but then needs to be loaded into the same source stream. Each directory can be separately monitored by its own agent, and each agent can be controlled independently (for example, working to different cron schedules).

Requirements

The server hosting the ECDA Agent must have a suitable Java Runtime Environment (or a JDK) installed.

Prebuilt Server Objects Available in s-Server

This table lists the ECD plugin types, and the corresponding prebuilt servers.

The 'Type' is used in the TYPE clause when creating a SERVER, and with the --io switch when running an ECD agent.

Type Plugin Description Name of Prebuilt Server for Foreign Streams Notes
amqp_legacy AMQP Legacy: Reads to and writes to AMQP message bus for AMQP 0.9. See the topics Reading from AMQP and Writing to AMQP for more details. AMQP_LEGACY_SERVER
amqp10 AMQP 1.0: Reads to and writes to AMQP message bus for AMQP 1.0. See the topics Reading from AMQP and Writing to AMQP for more details. AMQP10_SERVER
hdfs Allows s-Server to write to the Hadoop/HDFS file system (also supports ADLS Gen 2). See Writing to Hadoop for more details. HDFS_SERVER sink only
hive Allows s-Server to write to Hadoop/Hive tables. See Writing to Hive Tables for more details. HIVE_SERVER sink only; ORC format only
http Allows s-Server to read and write data over HTTP / HTTPS. See the topics Reading over HTTP and Writing over HTTP for more details. HTTP_SERVER
file File System: Reading and writing over the file system. See the topics Reading from the File System and Writing to the File System for more details. FILE_SERVER
filevfs Allows s-Server to read files in the compressed format from ADLS Gen2, HDFS, local file system, S3 and SFTP. See the topics Integrating Files using VFS for more details. FILE_VFS_SERVER source only
ibmmq Allows s-Server to read from or write to queues and topics in IBM MQ. See Reading from IBM MQ and Writing to IBM MQ for more details. IBMMQ_SERVER
kafka Allows s-Server to exchange data with Kafka clusters. See the topics Reading from Kafka and Writing to Kafka for more details. KAFKA_SERVER non-transactional
kafka10 Allows s-Server to exchange data with Kafka clusters versions 0.1.0 and above. See the topics Reading from Kafka and Writing to Kafka for more details. Also supports Azure Event Hubs. KAFKA10_SERVER versions since 0.10.2
kinesis Allows s-Server to exchange data with Kinesis streams. See the topics Reading from Kinesis and Writing to Kinesis for more details. KINESIS_SERVER
pulsar Using s-Server, Using s-Server, you can read from and write to Pulsar streams. See the topic Integrating Pulsar for more details. PULSAR_SERVER
mail Allows s-Server to connect to an SMTP server in order to send emails. See the topic Writing to Mail Servers for details. MAIL_SERVER sink only
mongodb Allows s-Server to write to MongoDB. See the topic Writing to MongoDB for more details. MONGO_SERVER sink only
mqtt Allows s-Server to exchange data with MQTT brokers. See the topics Reading from MQTT and Writing to MQTT for more details. MQTT_SERVER
snowflake Allows s-Server to write to Snowflake warehouses. See the topic Writing to Snowflake for more details. SNOWFLAKE_SERVER sink only; uploads files
net Network Sockets (TCP/UDP): Configured for a socket. Reads or writes data streamed from a client program using TCP or UDP. See the topics Reading from Network Sockets and Writing to Network Sockets for more details. NET_SERVER
websocket Allows s-Server to read and write data over web sockets. See the topics Reading from Websockets and Writing to Websockets for more details. WEBSOCKET_SERVER

NOTES:

  • You can select a list of all servers (including these pre-defined servers):
    sqllineClient --run=$SQLSTREAM_HOME/support/sql/showForeignServers

Reading Data Remotely

To read data from a source and write it into s-Server using the ECDA Agent:

  1. Define or identify a source stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide. This stream will be at the start of a SQL pipeline for a source agent.

  2. Create a properties file with information about the data source and input stream. Property file options are described below. The following example is for an source agent that reads from a Kafka topic formatted as Avro, and writes to a SQLstream stream called "orders" in a schema called "stocks". See the topic Reading Data into s-Server for more information on the ECD adapter options for readers and parsers. Note that in this example some of the Kafka options are themselves defined in another properties file:

    # Parser options
    PARSER=AVRO
    AVRO_SCHEMA_LOCATION=/home/sqlstream/stock_orders.avsc
    # Kafka options
    TOPIC=production_orders
    kafka.consumer.config=/home/sqlstream/stock_topic.conf
    # Schema, name, and parameter signature of destination stream
    SCHEMA_NAME=stocks
    TABLE_NAME=orders
    #columns
    ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker, VARCHAR(100) buyer)
        
  3. Run the commondataagent.sh script. See the section below for a full list of command line options. For the same example we state that the agent is run as input, with a parser type of Avro, using the properties file at /var/sqlstream/etc/kafka_demo.properties. Note that as well as specifying the input type and the parser, we must explicitly reference the reader and parser plugin jar files (in this example they are relative to $SQLSTREAM_HOME):

    ./commondataagent.sh --input --io kafka --props /var/sqlstream/etc/kafka_demo.properties --parser avro --plugin $SQLSTREAM_HOME/plugin/kafka10/kafka10.jar --plugin plugin/avroparser/avroparser.jar --plugin 

    At a minimum, you need to specify the agent (--io - see above, specify it is a reader (--input) and supply a configuration file (--props). Remaining parameters can be read from the properties file.

Writing Data Remotely

To read data from s-Server and write it to a target from s-Server using the ECDA Agent:

  1. Define or identify a stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide. This stream will be at the end of a pipeline for a target agent.

  2. Create a properties file with information about the output stream and data target. Property file options are described below. See the topic Writing Data out of s-Server for more information on the ECD adapter options for formatters and weriters. The following example is for an target agent that reads from a SQLstream stream called "output_orders" in a schema called "stocks" and writes to an CSV file located in the directory /home/guavus/output:

    # Location, date format, prefix, suffix
    FORMATTER=CSV
    CHARACTER_ENCODING=UTF-8
    ROW_SEPARATOR=\000A
    SEPARATOR=,
    WRITE_HEADER=false
    DIRECTORY=/home/guavus/output
    ORIGINAL_FILENAME=stocks-output.csv
    FILENAME_PREFIX=output-
    FILENAME_SUFFIX=.csv
    FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
    FILE_ROTATION_SIZE=20K
    FORMATTER_INCLUDE_ROWTIME=true
    # Schema, name, and parameter signature of destination stream
    SCHEMA_NAME=stocks
    TABLE_NAME=output_orders
    #columns
    ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)
        
  3. Run the commondataagent.sh script. See the section below for a full list of command line options. The following example specifies that the agent is run as input, with a parser type of CSV, using a properties file at /var/sqlstream/etc/kafka_demo.properties. Note that as well as specifying the input type and the parser, we must explicitly reference the reader and parser plugin jar files:

    ./commondataagent.sh --output --io file --props /var/sqlstream/etc/kafka_demo.properties --parser CSV --plugin plugin/file.jar --plugin plugin/csv/csv.jar

    At a minimum, you need to specify the agent (--io - see above, specify it is a writer (--output) and supply a configuration file (--props). Remaining parameters can be read from the properties file.

Configuring the Properties file

The properties file is a typical Java properties file consisting of Key=Value lines. Lines starting with a hash '#' symbol are treated as comments.

The properties file contains two types of property:

  1. Most properties represent one of the OPTIONS that would have been required if defining the same input or output as a foreign stream. For example, when defining a FILE source you would need to define DIRECTORY_NAME, FILENAME_PATTERN etc.

    • See the topic Options for foreign streams for the options used for input (reading and parsing) and output (formatting and writing)
    • Some options may also be specified on command line parameters - which overrides settings from the properties file):
      • PARSER or FORMATTER (overridden by --parser or --formatter respectively)
      • SCHEMA_NAME (overridden by --schemaName)
      • TABLE_NAME (overridden --streamName or --tableName)
  2. The ROWTYPE property (which can be overridden by the --rowType command line parameter) defines the signature of the source or target data - standing in for the column descriptions that would be present for an equivalent foreign stream definition.

Command Line Arguments for ECDA Agent

Command Line Argument Definition
--input
--output
--input specifies reader (inserting)
--output specifies writer (selecting). One of these is required.
--io Input source/output target. Defaults to file. Options include:
file reads/writes over the file system
net reads/writes over UDP or TCP network sockets
amqp10 reads/writes over amqp
kafka and kafka10 read from/write to Apache Kafka
http reads from/writes to HTTP servers
hdfs writes to Hadoop file system.
See the full lists of source agents and target agents.
--parser (with --input only) - specifies the parser to be used, for example CSV, JSON, XML, Avro etc - overriding any value for PARSER in the properties file. See the list of parsers.
--formatter (with --output only) - specifies the formatter to be used, for example CSV, JSON, XML, Avro etc - overriding any value for FORMATTER in the properties file . See the list of formatters.
--props Indicates properties file to be passed. Some properties, such as --rowType, --tableName, --schemaName can be also be configured through the command line.
‑‑schemaName Indicates the schema in which to find the stream. Overrides any setting of SCHEMA_NAME in the properties file.
‑‑streamName or ‑‑tableName These indicate the s-Server stream to be read from or written to. They override any setting of TABLE_NAME in the properties file.
--rowType This is optional in many use cases, because the parser will automatically generate column names. This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name which must be exactly as it appears in the stream declaration. Column names are case sensitive with implicit quotation marks; that is, id in a rowType property is equivalent to "id" in a stream declaration:
ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at)
The rowType parameter overrides any ROWTYPE setting in the properties file.
--uri, --user, ‑‑pass Connection information for the s-Server JDBC driver. You only need to specify these if you are connecting to an s-Server other then the one defined in in client-tools/default.conn.properties.
--plugin Two plugin jars need to be specified - one for the reader or writer, and one for the parser or formatter. The location of each jar file which contains the appropriate ECDA plugin must be given in relative or absolute terms. For example, for the latest Kafka Agent, this argument would be set to a value like this:plugin/kafka10/kafka10.jar The resulting command for the Kafka Agent would then be:
clienttools/commondata/commondataagent.sh --output --io kafka --props propertyFile --plugin plugin/kafka/kafka.jar --plugin plugin/avroformatter/avroformatter.jar
where propertyFile is the name of the file containing the agent property files (as described below).

Using Command Line Options Versus Property File Options

We recommend using the properties file option as far as possible, to minimize the number of command line parameters required.

However, you may wish to use the same agent configuration to connect with another instance of s-Server (for example, you may have test and development servers). In this case, you can configure server connection and stream information from the command line, using the arguments defined above.