To read data from or write data to remote locations (that is, locations that are not on the same machine as s-Server), you need to configure and use the Extensible Common Data agent. This agent is a standalone Java application that mediates data between SQLstream s-Server and external data sources.
The agent is supplied both as part of the SQLstream s-Server product and as part of the ClientTools download from the SQLstream website (via SQLstream-<VERSION>-clienttools-linux.run or SQLstream-client-tools-<VERSION>-windows.exe).
The ECDA Agent uses the SQLstream JDBC driver either to select from a stream in s-Server (in order to write data to an external target) or to write to a stream (when reading from an external source). You need to create the stream before executing the agent.
The agent itself is a wrapper (or "hoster") for the ECDA (Extensible Common Data Adapter). The agent is configured much like the adapter, except that you use command-line options and properties files instead of foreign stream OPTIONs to configure the agent.
Sometimes it can be helpful to run the agent on the same host as s-Server. It is shipped and by default installed alongside s-Server.
An example use-case is where data from multiple sources is collected into multiple local directories but then needs to be loaded into the same source stream. Each directory can be separately monitored by its own agent, and each agent can be controlled independently (for example, working to different cron schedules).
The server hosting the ECDA Agent must have a suitable Java Runtime Environment (or a JDK) installed.
This table lists the ECD plugin types, and the corresponding prebuilt servers.
The 'Type' is used in the TYPE clause when creating a SERVER, and with the --io switch when running an ECD agent.
|Type||Plugin Description||Name of Prebuilt Server for Foreign Streams||Notes|
|amqp_legacy||AMQP Legacy: Reads to and writes to AMQP message bus for AMQP 0.9. See the topics Reading from AMQP and Writing to AMQP for more details.||AMQP_LEGACY_SERVER|
|amqp10||AMQP 1.0: Reads to and writes to AMQP message bus for AMQP 1.0. See the topics Reading from AMQP and Writing to AMQP for more details.||AMQP10_SERVER|
|hdfs||Allows s-Server to write to the Hadoop/HDFS file system (also supports ADLS Gen 1). See Writing to Hadoop for more details.||HDFS_SERVER||sink only|
|hive||Allows s-Server to write to Hadoop/Hive tables. See Writing to Hive Tables for more details.||HIVE_SERVER||sink only; ORC format only|
|http||Allows s-Server to read and write data over HTTP / HTTPS. See the topics Reading over HTTP and Writing over HTTP for more details.||HTTP_SERVER|
|file||File System: Reading and writing over the file system. See the topics Reading from the File System and Writing to the File System for more details.||FILE_SERVER|
|filevfs||Allows s-server to read files in the compressed format from local file system and SFTP. See the topics Integrating Files using VFS for more details.||FILE_VFS_SERVER||source only|
|kafka||Allows s-Server to exchange data with Kafka clusters. See the topics Reading from Kafka and Writing to Kafka for more details.||KAFKA_SERVER||non-transactional|
|kafka10||Allows s-Server to exchange data with Kafka clusters versions 0.1.0 and above. See the topics Reading from Kafka and Writing to Kafka for more details.||KAFKA10_SERVER||versions since 0.10.2|
|kinesis||Allows s-Server to exchange data with Kinesis streams. See the topics Reading from Kinesis and Writing to Kinesis for more details.||KINESIS_SERVER|
|pulsar||Using s-Server, you can read from and write to Pulsar streams. See the topics Integrating Pulsar for more details.||PULSAR_SERVER|
|Allows s-Server to connect to an SMTP server in order to send emails. See the topic Writing to Mail Servers for details.||MAIL_SERVER||sink only|
|mongodb||Allows s-Server to write to MongoDB. See the topic Writing to MongoDB for more details.||MONGODB_SERVER||sink only|
|mqtt||Allows s-Server to exchange data with MQTT brokers. See the topics Reading from MQTT and Writing to MQTT for more details.||MQTT_SERVER|
|s3||Allows s-server to read files in the compressed format from local file system and Amazon S3. See the topics Integrating S3 Files for more details.||S3_SERVER||source only|
|snowflake||Allows s-Server to write to Snowflake warehouses. See the topic Writing to Snowflake for more details.||SNOWFLAKE_SERVER||sink only; uploads files|
|net||Network Sockets (TCP/UDP): Configured for a socket. Reads or writes data streamed from a client program using TCP or UDP. See the topics Reading from Network Sockets and Writing to Network Sockets for more details.||NET_SERVER|
|websocket||Allows s-Server to read and write data over web sockets. See the topics Reading from Websockets and Writing to Websockets for more details.||WEBSOCKET_SERVER|
To read data from a source and write it into s-Server using the ECDA Agent:
Define or identify a source stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide. This stream will be at the start of a SQL pipeline for a source agent.
Create a properties file with information about the data source and input stream. Property file options are described below. The following example is for an source agent that reads from a Kafka topic formatted as Avro, and writes to a SQLstream stream called "orders" in a schema called "stocks". See the topic Reading Data into s-Server for more information on the ECD adapter options for readers and parsers. Note that in this example some of the Kafka options are themselves defined in another properties file:
# Parser options PARSER=AVRO AVRO_SCHEMA_LOCATION=/home/sqlstream/stock_orders.avsc # Kafka options TOPIC=production_orders kafka.consumer.config=/home/sqlstream/stock_topic.conf # Schema, name, and parameter signature of destination stream SCHEMA_NAME=stocks TABLE_NAME=orders #columns ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker, VARCHAR(100) buyer)
Run the commondataagent.sh script. See the section below for a full list of command line options. For the same example we state that the agent is run as input, with a parser type of Avro, using the properties file at /var/sqlstream/etc/kafka_demo.properties. Note that as well as specifying the input type and the parser, we must explicitly reference the reader and parser plugin jar files (in this example they are relative to $SQLSTREAM_HOME):
./commondataagent.sh --input --io kafka --props /var/sqlstream/etc/kafka_demo.properties --parser avro --plugin $SQLSTREAM_HOME/plugin/kafka10/kafka10.jar --plugin plugin/avroparser/avroparser.jar --plugin
At a minimum, you need to specify the agent (--io
To read data from s-Server and write it to a target from s-Server using the ECDA Agent:
Define or identify a stream in s-Server. For more information on creating a stream, see the topic CREATE STREAM in the Streaming SQL Reference Guide. This stream will be at the end of a pipeline for a target agent.
Create a properties file with information about the output stream and data target. Property file options are described below. See the topic Writing Data out of s-Server for more information on the ECD adapter options for formatters and weriters. The following example is for an target agent that reads from a SQLstream stream called "output_orders" in a schema called "stocks" and writes to an CSV file located in the directory /home/guavus/output:
# Location, date format, prefix, suffix FORMATTER=CSV CHARACTER_ENCODING=UTF-8 ROW_SEPARATOR=\000A SEPARATOR=, WRITE_HEADER=false DIRECTORY=/home/guavus/output ORIGINAL_FILENAME=stocks-output.csv FILENAME_PREFIX=output- FILENAME_SUFFIX=.csv FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss FILE_ROTATION_SIZE=20K FORMATTER_INCLUDE_ROWTIME=true # Schema, name, and parameter signature of destination stream SCHEMA_NAME=stocks TABLE_NAME=output_orders #columns ROWTYPE=RECORDTYPE(TIMESTAMP order_time, INTEGER amount, VARCHAR(100) ticker)
Run the commondataagent.sh script. See the section below for a full list of command line options. The following example specifies that the agent is run as input, with a parser type of CSV, using a properties file at /var/sqlstream/etc/kafka_demo.properties. Note that as well as specifying the input type and the parser, we must explicitly reference the reader and parser plugin jar files:
./commondataagent.sh --output --io file --props /var/sqlstream/etc/kafka_demo.properties --parser CSV --plugin plugin/file.jar --plugin plugin/csv/csv.jar
At a minimum, you need to specify the agent (--io
The properties file is a typical Java properties file consisting of Key=Value lines. Lines starting with a hash '#' symbol are treated as comments.
The properties file contains two types of property:
Most properties represent one of the OPTIONS that would have been required if defining the same input or output as a foreign stream. For example, when defining a FILE source you would need to define DIRECTORY_NAME, FILENAME_PATTERN etc.
The ROWTYPE property (which can be overridden by the --rowType command line parameter) defines the signature of the source or target data - standing in for the column descriptions that would be present for an equivalent foreign stream definition.
|Command Line Argument||Definition|
|--input specifies reader (inserting)
--output specifies writer (selecting). One of these is required.
|--io||Input source/output target. Defaults to file. Options include:
file reads/writes over the file system
net reads/writes over UDP or TCP network sockets
amqp10 reads/writes over amqp
kafka and kafka10 read from/write to Apache Kafka
http reads from/writes to HTTP servers
hdfs writes to Hadoop file system.
See the full lists of source agents and target agents.
|--parser||(with --input only) - specifies the parser to be used, for example CSV, JSON, XML, Avro etc - overriding any value for PARSER in the properties file. See the list of parsers.|
|--formatter||(with --output only) - specifies the formatter to be used, for example CSV, JSON, XML, Avro etc - overriding any value for FORMATTER in the properties file . See the list of formatters.|
|--props||Indicates properties file to be passed. Some properties, such as --rowType, --tableName, --schemaName can be also be configured through the command line.|
|‑‑schemaName||Indicates the schema in which to find the stream. Overrides any setting of SCHEMA_NAME in the properties file.|
|‑‑streamName or ‑‑tableName||These indicate the s-Server stream to be read from or written to. They override any setting of TABLE_NAME in the properties file.|
|--rowType||This is optional in many use cases, because the parser will automatically generate column names. This is always RecordType and takes as options column names for the stream. In writing column names, you enter the column type first, such as VARCHAR(2040) or INTEGER, then the column name which must be exactly as it appears in the stream declaration. Column names are case sensitive with implicit quotation marks; that is, id in a rowType property is equivalent to "id" in a stream declaration:
The rowType parameter overrides any ROWTYPE setting in the properties file.
|--uri, --user, ‑‑pass||Connection information for the s-Server JDBC driver. You only need to specify these if you are connecting to an s-Server other then the one defined in in client-tools/default.conn.properties.|
|--plugin||Two plugin jars need to be specified - one for the reader or writer, and one for the parser or formatter. The location of each jar file which contains the appropriate ECDA plugin must be given in relative or absolute terms. For example, for the latest Kafka Agent, this argument would be set to a value like this:plugin/kafka10/kafka10.jar The resulting command for the Kafka Agent would then be:
where propertyFile is the name of the file containing the agent property files (as described below).
We recommend using the properties file option as far as possible, to minimize the number of command line parameters required.
However, you may wish to use the same agent configuration to connect with another instance of s-Server (for example, you may have test and development servers). In this case, you can configure server connection and stream information from the command line, using the arguments defined above.