Customers who implement SAS Event Stream Processing for high profile streaming projects will likely require that the software be highly available. One method used to improve availability of SAS ESP is to deploy a supported message bus to assist with the failover process. Failover in ESP is accomplished by seamlessly transitioning event processing and subscription from an active instance of an ESP engine to a standby instance when the active instance fails.
There are four message buses supported for ESP failover: Tervela, Solace, RabbitMQ and Kafka. Kafka is the most recent message bus added to the supported list. In this article, we walk through the key steps of setting up failover using Kafka as the message bus for SAS ESP.
This article assumes some familiarity of ESP concepts and terms. For a look at failover involving RabbitMQ, see this article. Like the RabbitMQ article, this is fairly lengthy. So grab your favorite caffeinated beverage. :-)
Kafka is an open-source, distributed streaming platform developed by Apache that uses publish-subscribe messaging. Kafka employs partitions and replication of "transaction" logs to provide scalability and availability. It was originally developed by LinkedIn and became open source in 2011. Support for using Kafka as a message bus during the failover process was added in SAS ESP 4.1.
Before we dive into the setting up the failover scenario, let's review a few terms related to Kafka.
Also, here are some key things to know about Kafka.
ZooKeeper is an open-source, high-performance coordination service for distributed applications. This centralized service provides configuration management, locking and synchronization for coordinating between distributed systems, registry services and group membership. ZooKeeper is required to setup ESP failover via Kafka. In order to perform its duties properly, ZooKeeper must be deployed across a minimum of three hosts. This set of hosts is called an ensemble. Like the distributed processes it coordinates, Zookeeper is intended to be replicated over a set of hosts. Each host must be aware of the other hosts. The state of each image is kept in memory, as well as transaction logs and snapshots to a persistent store. For more information, see the ZooKeeper doc.
The architecture of a Kafka failover configuration is similar to that of a RabbitMQ configuration. The key components are as follows:
One or more publishers send messages to the model. These messages are routed to a topic using the Kafka transport of the adapter to the Kafka cluster. Without the transport the adapter would publish directly to the source window of an individual model.
The messages are appended to the appropriate inbound topic where the publish connector within a source window of each engine retrieves it. The model subsequently processes the messages.
The subscriber connectors from the active engine then stores the resulting message in an outbound topic within Kafka. Subscriber connectors working with ZooKeeper determine which ESP instance is the active and which are standby. Only the active instance send events. No events are sent from the standby instances to the Kafka topic, although the instance are essentially in sync in terms of processing.
Finally the subscriber client, in many cases an adapter, will retrieve the messages from the outbound topic using the Kafka transport.
There are many details not noted here. Reference the SAS ESP documentation for further info, but basically the Kafka Bus acts as a “buffer” for the messages sent by ESP publisher and received by ESP subscribers.
Now let's dig into what is needed to get this running. First it is clear from the previous sections that Kafka and ZooKeeper must be deployed and functional. For purposes of this article, we will assume that these pieces have already been installed and verified that they are working. Remember that in a production environment there must be at least three instances of ZooKeeper and three instances of the Kafka broker if we want highly available ZooKeeper/Kafka and ESP.
An internal collection of machines was used as the test environment. Note that it did not quite meet the requirement of three nodes for ZooKeeper and three for the Kafka broker. It contained four nodes of ZooKeeper and two nodes for Kafka broker. Therefore it should be noted that the testing only performed failover for ESP using two ESP engines. Failover of Kafka was not tested. The following diagram shows the key components used for testing and the machine on which they reside. I would be remiss if I didn't mention that my colleague Raphaël Poumarède provided significant contributions to this environment. Thanks Raphaël!
With the components in place and basic functionality tested, what is required to proceed with an ESP failover test using Kafka as the message bus?
So let's step through these items.
This configuration file tells the adapter to which host and port to connect to Kafka, as well as several other connection parameters. This is the format for C++ based adapter.
And here is the format for the Java based adapter.
These are needed by connectors to talk to Kafka and ZooKeeper, therefore they need to be installed on the machines where the ESP model runs. The source for these libraries can be downloaded from the following URLs, but they will need to be compiled.
This article doesn't cover the compile steps, as they are available as part of the download, but the resulting libraries should look like the following. Note that the supported Kafka client version for ESP 4.3 is 0.9.3. For ZooKeeper client the version used was 3.4.8.
To enable the server and clients to find the correct libraries, modules and executables, define the following environment variables. In our system we have placed these in their own /etc/profile.d file.
Please note that the DFESP_KAFKA_JAR variable needs to be set only for Java-based adapters that use the Kafka transport. For example in our testing this was required to use the CAS adapter. And only 0.9.0.1 of the Kafka client jar is supported.
Also, it is not required to add $DFESP_HOME/bin to the $PATH variable, but eliminates the need to type $DFESP_HOME/ each time you enter a command.
Of course it will be necessary to build the model with the correct connectors to interact with Kafka topics.
In order to retrieve messages from Kafka the publish connector must be configured properly. Here is the XML definition for the publish connector from the model. Notice that both Kafka brokers are specified. Also notice that Kafka topic used for publishing. The "I" at the end of the topic indicates "input".
A subscribe connector is also defined with "hotfailover" to tell the model that more than one instance will be executing. Not only is "hotfailover" defined, but also notice that the zookeeperhostport is also specified. Remember that ZooKeeper is the component that keeps track of active and standby servers.
Finally we need a client to retrieve events from the Kafka topic, which was specified in the connector above. This can be a simple file and socket adapter so that it is easy to track events by tailing the output file.
Or if you want the events to be directed to a CAS table, the following CAS adapter can be used. Keep in mind that the DFESP_KAFKA_JAR environment variable must be set. The values set for this variable will be added to the classpath of the adapter. Note, this is not documented.
Now that all the components have been put into place it is time to step through the failure process.
Recalling the diagram from our environment earlier, the model is loaded on an XML server on sashdp03 and sashdp04. When failover is enabled properly, the following message will be displayed by the active and standby servers. Unrelated messages were removed for clarity.
Notice that ZooKeeper creates a "node" for each ESP instance to keep track of which is active and which is standby. And that the second instance identified the existing node.
In this test the File and Socket adapter with the Kafka transport will be used to direct events to a Kafka topic. This quite simply is as easy as starting the adapter with a parameter telling it to use the Kafka transport.
However, at this point no data has been delivered via the publish adapter to the model. The test model processes events that are continuously added, one per second, to a CSV file via the "dstat" command. Therefore the adapter uses the growing input file as its source. So when first started, it processes events already in the file and once it reads through all events, it will publish events as they arrive.
There are two growing CSV files, one on each machine (intcas01 and sashdp04), so two file and socket adapters are started and events published to the source window of the model.
Now that events are flowing into the model, an adapter can be started to retrieve events from the output topic using the Kafka transport. Events are generated once a second for each machine, so with a CSV file and timestamps in the data it will be fairly easy to ensure that all events are accounted for after a failure.
If you are using the CAS adapter to subscribe, then the log will look similar to the following:
Finally, the failover test. In the above scenario the active instance was on sashdp04. Therefore we want to terminate that instance to verify that the standby machine takes over and no events are missed. When we terminate the active instance, we see the following messages in the log of the standby instance. The standby determines the last message and the current message, then retrieve messages from its buffer to ensure that events delivered to Kafka are in sync.
To verify we review the data in the failure time range to ensure no events were missed. Remember that one event is generated per second on each machine. Upon reviewing the data, it appears that the failover was successful and no events were skipped.
That's all folks! If you made it this far I am impressed. Thanks for hanging in there.
I'll keep it short. There are numerous steps required to set up ESP failover using Kafka, some of them not so trivial such as deploying Kafka and ZooKeeper, but once the key pieces are in place it works quite nicely.