We’re smarter together. Learn from this collection of community knowledge and add your expertise.

ESP 4.3 Failover Using Kafka

by SAS Employee MarkThomas on ‎10-19-2017 03:07 PM - edited on ‎02-16-2018 05:05 PM by Community Manager (1,536 Views)

Customers who implement SAS Event Stream Processing for high profile streaming projects will likely require that the software be highly available.  One method used to improve availability of SAS ESP is to deploy a supported message bus to assist with the failover process.  Failover in ESP is accomplished by seamlessly transitioning event processing and subscription from an active instance of an ESP engine to a standby instance when the active instance fails. 


There are four message buses supported for ESP failover: Tervela, Solace, RabbitMQ and Kafka.  Kafka is the most recent message bus added to the supported list.  In this article, we walk through the key steps of setting up failover using Kafka as the message bus for SAS ESP.


First we'll briefly cover some key aspects of Kafka and Zookeeper.  For more details reference the official documentation here and here.


This article assumes some familiarity of ESP concepts and terms.  For a look at failover involving RabbitMQ, see this article.  Like the RabbitMQ article, this is fairly lengthy.  So grab your favorite caffeinated beverage. :-)  




Kafka is an open-source, distributed streaming platform developed by Apache that uses publish-subscribe messaging.  Kafka employs partitions and replication of "transaction" logs to provide scalability and availability.  It was originally developed by LinkedIn and became open source in 2011.  Support for using Kafka as a message bus during the failover process was added in SAS ESP 4.1.


Key Kafka Terms


Before we dive into the setting up the failover scenario, let's review a few terms related to Kafka.


  • Topic - a category or feed name to which messages are published and stored; topics may have zero, one or more consumers that subscribe to it
  • Partition - messages/records are stored in one or more partitions of a topic; messages/records are appended to a structured commit log and assigned a sequence id, called the offset
  • Producer - the application that publishes messages to topics within Kafka
  • Consumer - the application that subscribes to one or more topics and processes the stream of messages produced to them
  • Broker - the Kafka nodes that receive messages from producers, stores messages in topics and retrieves messages for consumers; a Kafka cluster runs on one or more servers and is stateless

Also, here are some key things to know about Kafka.


  • It is recommended that at least three Kafka brokers/servers be configured for high availability. This is based on the replication factor specified for the topic
  • Kafka persists messages in logs within a file system. Therefore it is important to have adequate space on the file system where logs are stored. The default location of /kafka-logs is specified using the log.dirs parameter in server.properties of the config directory.
  • Messages are ordered by timestamp and are immutable
  • Only read operations are permitted. Updates are not allowed and purging is accomplished via retention.
  • Partitions are assigned to brokers and thus data is split at the broker level. The greater the number of partitions, the more concurrent consumers a topic can support.
  • ZooKeeper is a key component that pulls all of the pieces together (coordinator)




ZooKeeper is an open-source, high-performance coordination service for distributed applications.  This centralized service provides configuration management, locking and synchronization for coordinating between distributed systems, registry services and group membership.  ZooKeeper is required to setup ESP failover via Kafka. In order to perform its duties properly, ZooKeeper must be deployed across a minimum of three hosts.  This set of hosts is called an ensemble.  Like the distributed processes it coordinates, Zookeeper is intended to be replicated over a set of hosts.  Each host must be aware of the other hosts.  The state of each image is kept in memory, as well as transaction logs and snapshots to a persistent store. For more information, see the ZooKeeper doc.  


General Architecture and Flow


The architecture of a Kafka failover configuration is similar to that of a RabbitMQ configuration.  The key components are as follows:


  • One or more publishers (e.g. adapter) using the Kafka transport
  • Kafka cluster - three or more brokers used for publishing and subscribing messages
  • ZooKeeper cluster - used to manage the Kafka configuration and detect active/standby status
  • ESP engines - two or more instances running the same model; the model would be built to include Kafka publish and subscribe connectors
  • One or more subscribers using the Kafka transport. The subscriber connectors enabled for failover communicate with ZooKeeper to manage active and standby status



One or more publishers send messages to the model.  These messages are routed to a topic using the Kafka transport of the adapter to the Kafka cluster.  Without the transport the adapter would publish directly to the source window of an individual model.  


The messages are appended to the appropriate inbound topic where the publish connector within a source window of each engine retrieves it.  The model subsequently processes the messages. 


The subscriber connectors from the active engine then stores the resulting message in an outbound topic within Kafka.  Subscriber connectors working with ZooKeeper determine which ESP instance is the active and which are standby. Only the active instance send events.  No events are sent from the standby instances to the Kafka topic, although the instance are essentially in sync in terms of processing. 


Finally the subscriber client, in many cases an adapter, will retrieve the messages from the outbound topic using the Kafka transport.


There are many details not noted here.  Reference the SAS ESP documentation for further info, but basically the Kafka Bus acts as a “buffer” for the messages sent by ESP publisher and received by ESP subscribers.  


Steps to Failover


Now let's dig into what is needed to get this running.  First it is clear from the previous sections that Kafka and ZooKeeper must be deployed and functional.  For purposes of this article,  we will assume that these pieces have already been installed and verified that they are working.  Remember that in a production environment there must be at least three instances of ZooKeeper and three instances of the Kafka broker if we want highly available ZooKeeper/Kafka and ESP.


An internal collection of machines was used as the test environment.  Note that it did not quite meet the requirement of three nodes for ZooKeeper and three for the Kafka broker.  It contained four nodes of ZooKeeper and two nodes for Kafka broker.  Therefore it should be noted that the testing only performed failover for ESP using two ESP engines.  Failover of Kafka was not tested.  The following diagram shows the key components used for testing and the machine on which they reside.  I would be remiss if I didn't mention that my colleague Raphaël Poumarède provided significant contributions to this environment.  Thanks Raphaël!





  With the components in place and basic functionality tested, what is required to proceed with an ESP failover test using Kafka as the message bus?


  • Publish client that uses the Kafka transport (in our case the File and Socket adapter)
  • A kafka.cfg file that specifies configuration information for the client
  • Kafka and ZooKeeper client runtime libraries for the connectors
  • Environment variables set to access appropriate modules
  • Publish and subscribe Kafka connectors within the model to inject events into the model and write events to a Kafka topic
  • A subscribe connector configured with HOTFAILOVER
  • Subscribe client to retrieve events from the model using the Kafka transport

So let's step through these items.  




This configuration file tells the adapter to which host and port to connect to Kafka, as well as several other connection parameters.  This is the format for C++ based adapter.


hostport = "sashdp03:6667,sashdp04:6667"
partition = "0"
initialoffset = "smallest"
groupid = "mygroup"


And here is the format for the Java based adapter.


kafka =
hostport = "sashdp03:6667,sashdp04:6667"
partition = "0"
initialoffset = "smallest"
groupid = "mygroup"
sas =
protobuf = false
protofile = "./GpbHistSimFactory.proto"
protomsg = "GpbTrade"
json = false
dateformat = "%Y-%m-%d %H:%M:%S"



Kafka and ZooKeeper client runtime libraries


These are needed by connectors to talk to Kafka and ZooKeeper, therefore they need to be installed on the machines where the ESP model runs.  The source for these libraries can be downloaded from the following URLs, but they will need to be compiled.


 This article doesn't cover the compile steps, as they are available as part of the download, but the resulting libraries should look like the following.  Note that the supported Kafka client version for ESP 4.3 is 0.9.3.  For ZooKeeper client the version used was 3.4.8.



Environment Variables


To enable the server and clients to find the correct libraries, modules and executables, define the following environment variables.  In our system we have placed these in their own /etc/profile.d file.


export DFESP_HOME=/opt/sas/viya/home/SASEventStreamProcessingEngine/4.3.0
export DFESP_KAFKA_JAR=/opt/sas/kafka/kafka-clients-


Please note that the DFESP_KAFKA_JAR variable needs to be set only for Java-based adapters that use the Kafka transport. For example in our testing this was required to use the CAS adapter.  And only of the Kafka client jar is supported.


Also, it is not required to add $DFESP_HOME/bin to the $PATH variable, but eliminates the need to type $DFESP_HOME/ each time you enter a command.  


Publish and Subscribe Kafka Connectors


Of course it will be necessary to build the model with the correct connectors to interact with Kafka topics.


In order to retrieve messages from Kafka the publish connector must be configured properly.  Here is the XML definition for the publish connector from the model.  Notice that both Kafka brokers are specified. Also notice that Kafka topic used for publishing.  The "I" at the end of the topic indicates "input".


<connector name="dstatkafka" class="kafka">
<property name="type"><![CDATA[pub]]></property>
<property name="kafkahostport"><![CDATA[sashdp03:6667,sashdp04:6667]]></property>
<property name="kafkapartition"><![CDATA[0]]></property>
<property name="kafkatopic"><![CDATA[sashdp04_15555.dstatkafka.cq1.dstatSource.I]]>
<property name="kafkatype"><![CDATA[binary]]></property>
<property name="urlhostport"><![CDATA[sashdp04_15555]]></property>


A subscribe connector is also defined with "hotfailover" to tell the model that more than one instance will be executing.  Not only is "hotfailover" defined, but also notice that the zookeeperhostport is also specified.  Remember that ZooKeeper is the component that keeps track of active and standby servers.


<connector name="subkafka" class="kafka">
<property name="type"><![CDATA[sub]]></property>
<property name="snapshot"><![CDATA[false]]></property>
<property name="kafkahostport"><![CDATA[sashdp03:6667,sashdp04:6667]]></property>
<property name="kafkapartition"><![CDATA[0]]></property>
<property name="kafkatopic"><![CDATA[sashdp04_15555.dstatkafka.cq1.Compute1.O]]></property>
<property name="kafkatype"><![CDATA[binary]]></property>
<property name="urlhostport"><![CDATA[sashdp04_15555]]></property>
<property name="numbufferedmsgs"><![CDATA[5000]]></property>
<property name="hotfailover"><![CDATA[true]]></property>
<property name="zookeeperhostport"><!


Subscribe adapter


Finally we need a client to retrieve events from the Kafka topic, which was specified in the connector above.  This can be a simple file and socket adapter so that it is easy to track events by tailing the output file.


dfesp_fs_adapter -k sub -h "dfESP://sashdp04:15555/dstatkafka/cq1/Compute1?snapshot=false"
-f /tmp/dstat-out.csv -t csv -l kafka


Or if you want the events to be directed to a CAS table, the following CAS adapter can be used.  Keep in mind that the DFESP_KAFKA_JAR environment variable must be set.  The values set for this variable will be added to the classpath of the adapter.  Note, this is not documented.


dfesp_cas_adapter -k sub -h "dfESP://sashdp04:15555/dstatkafka/cq1/Compute1?snapshot=false"
-H "sashdp03:5570" -t dstatdetail -l "info" -L kafka


Time for testing


Now that all the components have been put into place it is time to step through the failure process.


Recalling the diagram from our environment earlier, the model is loaded on an XML server on sashdp03 and sashdp04.  When failover is enabled properly, the following message will be displayed by the active and standby servers.  Unrelated messages were removed for clarity.  


sashdp04 (Active)


[sasdemo01@sashdp04 ~]$ dfesp_xml_server  -pubsub 15555 -http-pubsub 15557 -http-admin 15556
-model file:///home/sasdemo01/dstatkafka.xml -badevents /tmp/dstat_badevents.txt
2017-09-07T10:52:45,026; INFO ; 00000037; DF.ESP; (dfESPkafkaConnector.cpp:1801); dfESPkafkaConnector::start(): Hot failover state switched to standby
. . . .
2017-09-07T10:52:47,211; INFO ; 00000050; DF.ESP; (dfESPkafkaConnector.cpp:275); dfESPkafkaConnector::z_watcher(): Zookeeper watcher event: watcher = SESSION_EVENT, state = CONNECTED_STATE, path =
2017-09-07T10:52:47,216; INFO ; 00000051; DF.ESP; (dfESPkafkaConnector.cpp:305); dfESPkafkaConnector::z_watcher(): Created zookeeper node /ESP/server-n_0000000029
2017-09-07T10:52:47,216; WARN ; 00000038; DF.ESP; (dfESPkafkaConnector.cpp:729); dfESPkafkaConnector::goActive(): Hot failover state switched to active


sashdp03 (Standby)


[sasdemo01@intcas01 ~]$ dfesp_xml_server  -pubsub 15555 -http-pubsub 15557 -http-admin 15556
-model file:///home/sasdemo01/dstatkafka.xml -badevents /tmp/dstat_badevents.txt
2017-09-07T10:52:54,397; INFO ; 00000035; DF.ESP; (dfESPkafkaConnector.cpp:1801);dfESPkafkaConnector::start(): Hot failover state switched to standby
. . . .
2017-09-07T10:52:54,411; INFO ; 00000048; DF.ESP; (dfESPkafkaConnector.cpp:275);
dfESPkafkaConnector::z_watcher(): Zookeeper watcher event: watcher = SESSION_EVENT, state                                                             = CONNECTED_STATE, path =
2017-09-07T10:52:54,413; INFO ; 00000049; DF.ESP; (dfESPkafkaConnector.cpp:305); dfESPkafkaConnector::z_watcher(): Created zookeeper node /ESP/server-n_0000000030
2017-09-07T10:52:54,414; INFO ; 00000050; DF.ESP; (dfESPkafkaConnector.cpp:256); dfESPkafkaConnector::watchActiveZnode(): Watching zookeeper node /ESP/server-n_0000000029


Notice that ZooKeeper creates a "node" for each ESP instance to keep track of which is active and which is standby.  And that the second instance identified the existing node.


Publish adapter


In this test the File and Socket adapter with the Kafka transport will be used to direct events to a Kafka topic.  This quite simply is as easy as starting the adapter with a parameter telling it to use the Kafka transport.


However, at this point no data has been delivered via the publish adapter to the model.  The test model processes events that are continuously added, one per second, to a CSV file via the "dstat" command.  Therefore the adapter uses the growing input file as its source.  So when first started, it processes events already in the file and once it reads through all events, it will publish events as they arrive.


There are two growing CSV files, one on each machine (intcas01 and sashdp04), so two file and socket adapters are started and events published to the source window of the model.


[sasdemo01@intcas01 ~]$ $DFESP_HOME/bin/dfesp_fs_adapter -k pub -h "dfESP://sashdp04:15555/dstatkafka/cq1/dstatSource" -f /tmp/dstat-sashdp03.csv -t csv -l
kafka -i -O -x 7 -F "normal" -d "%d-%m %H:%M:%S"


[sasdemo01@sashdp04 ~]$ $DFESP_HOME/bin/dfesp_fs_adapter -k pub -h "dfESP://sashdp04:15555/dstatkafka/cq1/dstatSource" -f /tmp/dstat-sashdp04.csv -t csv -l
kafka -i -O -x 7 -F "normal" -d "%d-%m %H:%M:%S"


Now that events are flowing into the model, an adapter can be started to retrieve events from the output topic using the Kafka transport.  Events are generated once a second for each machine, so with a CSV file and timestamps in the data it will be fairly easy to ensure that all events are accounted for after a failure.


[sasdemo01@intcas01 ~]$ dfesp_fs_adapter -k sub -h
"dfESP://sashdp04:15555/dstatkafka/cq1/Compute1?snapshot=false" -f /tmp/dstat-out.csv -t csv
-l kafka


If you are using the CAS adapter to subscribe, then the log will look similar to the following:


[sasdemo01@sashdp04 kafka]$ dfesp_cas_adapter -k sub -h
"dfESP://sashdp04:15555/dstatkafka/cq1/Compute1?snapshot=false" -H "sashdp03:5570" -t
dstatdetail -l "info" -L kafka
log4j:WARN No appenders could be found for logger (org.apache.commons.beanutils.converters.BooleanConverter).
log4j:WARN Please initialize the log4j system properly.
Sep 06, 2017 9:58:25 AM com.sas.esp.api.server.ReferenceIMPL.dfESPlibrary logMessage
INFO: handleMessageTag(): Begin appending events to CAS table.
Sep 06, 2017 9:58:25 AM com.sas.esp.api.server.ReferenceIMPL.dfESPlibrary logMessage
INFO: handleMessageTag(): Appended block of 8 records to CAS table.


Finally, the failover test.  In the above scenario the active instance was on sashdp04.  Therefore we want to terminate that instance to verify that the standby machine takes over and no events are missed.  When we terminate the active instance, we see the following messages in the log of the standby instance.  The standby determines the last message and the current message, then retrieve messages from its buffer to ensure that events delivered to Kafka are in sync.


2017-09-07T17:24:30,286; INFO ; 00000038; DF.ESP; (dfESPkafkaConnector.cpp:2175);
setLastMsgId(): Standby taking over, last message id = 2814749767291636
2017-09-07T17:24:30,315; INFO ; 00000040; DF.ESP; (dfESPkafkaConnector.cpp:530); dfESPkafkaConnector::sendSerializedBlock(): Standby sending buffered messages, last message
id = 2814749767291636, current message id = 2814749767291639
2017-09-07T17:24:30,315; INFO ; 00000040; DF.ESP; (dfESPkafkaConnector.cpp:570); dfESPkafkaConnector::sendSerializedBlock(): Standby synced, current message id =
2017-09-07T17:24:30,317; WARN ; 00000040; DF.ESP; (dfESPkafkaConnector.cpp:729); dfESPkafkaConnector::goActive(): Hot failover state switched to active


To verify we review the data in the failure time range to ensure no events were missed.  Remember that one event is generated per second on each machine.  Upon reviewing the data, it appears that the failover was successful and no events were skipped.


I,N, sashdp04,07-09 17:24:30,48.259000,3.980000,47.264000,0.000000,0.000000,0.498000,0.000000,0.000000,5.71629e+09,466944.000000,1.77772e+09,9.16498e+09,0.000000,0.000000,21307.000000,22060.000000,3.000000,0.000000,0.000000,0.000000,0.000000,7.56003e+08,7.69971e+09,179.000000,52.239000,0.000000,0.000000,5.323708,8.535549,0.020320,0.021038,0.000435,1.655628,0.704082,7.170914
I,N, intcas01,07-09
I,N, sashdp04,07-09
I,N, intcas01,07-09


That's all folks! If you made it this far I am impressed.  Thanks for hanging in there.  


Final Thoughts

I'll keep it short.  There are numerous steps required to set up ESP failover using Kafka, some of them not so trivial such as deploying Kafka and ZooKeeper, but once the key pieces are in place it works quite nicely.  



Implementing 1+N-Way Failover

Using the Java Publish/Subscribe API

by Trusted Advisor
on ‎10-20-2017 03:13 AM

Absolutely great post @MarkThomas , thank you very much!

by Community Manager
on ‎10-20-2017 04:38 PM
Many thanks for the nice comment, @JuanS_OCS! I'm glad you found it useful.

Your turn
Sign In!

Want to write an article? Sign in with your profile.

Looking for the Ask the Expert series? Find it in its new home: communities.sas.com/askexpert.