Implementing Federation for s-Server

Using the SQLstream JDBC driver, you can federate (link together) multiple instances of s-Server with fine-grained topology. You do so using DDL similar to that used for a SQL/MED connection to other databases. See the topic Reading Data from RDBMS Sources in the Integrating Guavus SQLstream with Other Systems guide for more details.

Once you federate multiple instances of s-Server, you can query from and insert into streams and tables in the federated instances of s-Server. This means that you can perform pipeline operations such as parsing, stream enrichment through stream-table joins, filtering and aggregation over these federated instances of s-Server. These operations are described in the section Automatic Distribution of Streaming Workloads.

Federation allows for complex topologies declared entirely in SQL. See the section Scale-up & Scale-Out of Streaming Applications.

Setting Up Federation

Setting up federation requires three steps:

  1. Creating a foreign data wrapper for the other instance of s-Server that references the JDBC driver.
  2. Creating a server object that references this foreign data wrapper and includes connection information for the server.
  3. Creating a foreign stream that references the server object.

Creating a Foreign Data Wrapper

To install the JDBC driver, you create a foreign data wrapper along the following lines:

CREATE OR REPLACE FOREIGN DATA WRAPPER CLIENT_JDBC
 LIBRARY 'class com.sqlstream.aspen.namespace.jdbc.AspenMedJdbcForeignDataWrapper'
 LANGUAGE JAVA
 OPTIONS (DRIVER_CLASS 'com.sqlstream.jdbc.Driver');

For more detail, see the topic CREATE FOREIGN DATA WRAPPER in the Streaming SQL Reference Guide.

Creating a Server Object

To create a federation server, you create a data server using the same options as a MED/JDBC foreign data server. See the subtopic Foreign Server Definition in this guide for details on the options defined below.

CREATE OR REPLACE SERVER "my-federation-server" FOREIGN DATA WRAPPER CLIENT_JDBC
OPTIONS(
 URL 'jdbc:vjdbc:sdp://remote-host:5570',
 USER_NAME 'sa',
 PASSWORD '',
 SCHEMA_NAME 'remote-schema-name'
);

For more detail, see the topic CREATE SERVER in the Streaming SQL Reference Guide.

Creating a Foreign Stream for the Federated Server

In order to access data from the federated server, you need to create a special kind of stream called a foreign stream. Options in the stream specify the file format, character separators, whether or not a header should be written, as well as any options specific to the format type.

The example below creates and sets a SCHEMA called “FederationData” and then creates a FOREIGN STREAM called “my-federation-stream”.


CREATE OR REPLACE SCHEMA "FederationData";
SET SCHEMA '"FederationData"';

CREATE OR REPLACE FOREIGN STREAM "my-federation-stream"
(
"recNo" INTEGER,
"ts" TIMESTAMP NOT NULL,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER
)
SERVER "my-federation-server"
--set options for foreign s-Server
OPTIONS ("SCHEMA_NAME" 'remote-schema-name',
   "STREAM_NAME" 'remote-stream-or-view',
   "SHARD_ID" '3')
;

In addition to a column list, this foreign stream sets three OPTIONS:

  1. SCHEMA_NAME refers to the SCHEMA in the foreign s-Server.
  2. TABLE_NAME or STREAM_NAME refers to the TABLE, FOREIGN STREAM, or VIEW in the foreign s-Server.
  3. SHARD_ID refers to a particular shard. Options specified here or in the SERVER definition are expanded in VIEWs within the FOREIGN SERVER as session variables.

Like all streams, foreign streams must be created within a SCHEMA. Both SCHEMA_NAME and STREAM_NAME are required.

Querying Streams

Once you have set up the foreign data wrapper and server above, you query streams and tables in the foreign s-Server as if they were on the local server, using the SCHEMA_NAME as a qualifier, as in the following code:

SELECT STREAM * FROM "FederationData"."my-federation-stream";

For the code above, this data will be pulled from “remote-schema-name”.“remote-stream” on the remote s-Server by the foreign data wrapper.

Note: You cannot insert into a stream on the remote s-Server. You also cannot create or drop streams and tables in the remote schema.

Automatic Distribution of Streaming Workloads Across Federated s-Server Instances

This section describes how streaming pipelines such as parsing and stream enrichment through stream-table joins, filtering, and, in particular, aggregations, can be distributed across federated s-Server instances in a cluster of compute nodes.

You can use message queue middlewares such as Apache Kafka or AWS Kinesis to distribute raw payloads across multiple s-Server instances. This allows you to perform those analytics in a distributed manner. For more information on using Kafka and Kinesis with s-Server, see the topics Reading from Kafka, Writing to Kafka, Reading from Kinesis, and Writing to Kinesis

Streaming transformations such as parsing and enrichment tend to be stateless. Such transformations can be distributed across nodes easily.

Distributing stateful transformations such as aggregations is slightly more difficult, but still straightforward. The following section describe techniques that you can use to distribute aggregations across federated s-Server instances.

An application may aggregate data by either

  1. Using the GROUP BY clause of the SELECT statement, known as tumbling windows.
  2. Using the OVER clause of the SELECT statement, known as sliding windows.

You can easily distribute these aggregation workloads across multiple nodes in a cluster using the simple SQL scripts described in the following sections.

Parallel Aggregations

Tumbling Windows/GROUP BY

For tumbling windows, you can use rollup aggregation, partitioned parallel aggregation, or both. You can do a parallelizing GROUP BY operation in two ways:

  • Rollup Aggregation
  • Partitioned Parallel Aggregation

Sliding Windows/OVER

For sliding windows, you need to use Partitioned Parallel Aggregation.

Using Rollup Aggregation to Aggregate Tumbling Windows

The following represents aggregating four instances of s-Server into one RollupAggView on another instance of s-Server.

ad_rollupaggregation

Using Partitioned Parallel Aggregation to Aggregate Tumbling or Sliding Windows

In partitioned parallel aggregation, you aggregate data from the same partitions on four different nodes. The diagram below illustrates views on each node’s partition being aggregated into an aggregated view of partitions on all four nodes. For each node, the views named Partition1View, for example, are aggregated into a view called PartitionUnion1View. In the last step, the four aggregated views are combined using a UNION ALL statement into AllPartitionsView.

ad_partitioned_parallel_agg

Scale-up and Scale-Out of Streaming Applications

The diagram below shows how a streaming application can be distributed across multiple nodes in a cluster. The distribution of queries across nodes achieves the following objectives:

  • Each node in a cluster subscribes to subset of partitions of a partitioned Kafka topic.
  • Each node creates a dedicated pipeline of parsers/data cleansing operations for each Kafka partition.
  • All operators in a given pipeline execute concurrently, taking advantage of multiple CPU cores on the node. This is referred to as “Pipeline Parallelism” or “vertical scaling” or “scale up”.
  • The result of these pipelines are routed to “aggregator operators” instantiated on each node in a cluster. In short, the aggregation operator is distributed “horizontally” across all nodes in a cluster. This is also known as “horizontal scaling” or “scale out”
  • The entire topology is described in declarative SQL, taking advantage of s-Server’s federation capabilities.
  • The application is “pinned” to node 1 from the consumer’s perspective.
ad_scaleup_scaleout