We’re smarter together. Learn from this collection of community knowledge and add your expertise.

Using Cloud Analytic Services (CAS) to access data streams from SAS® ESP windows

by SAS Employee UttamKumar ‎05-26-2017 10:31 AM - edited ‎05-31-2017 02:37 PM (921 Views)

With SAS® Viya™ 3.1 and SAS® Event Stream Processing 4.2, a Cloud Analytic Services (CAS) session can access data streams from a SAS Event Stream Processing (ESP) window. You can use the ESP window as a data source to feed data into a CAS table. CAS has a list of action sets that enable you to load data from a continuous data stream or from a snapshot of data into a CAS table.

 

Let’s discuss the steps with examples for accessing data streams from SAS Event Stream Processing windows in a CAS session in order to load the data into a CAS table. First, you need a SAS Event Stream Processing server running with a defined or pre-defined model (XML file) and data. The source data for the SAS Event Stream Processing model could be a text file or a continuous stream of data, depending on how the model has been developed. The following example describes starting a SAS Event Stream Processing server with the vmap.xml model file, which contains the “TradesDemo” project name. The SAS Event Stream Processing model feeds data from the trades1M.csv file and aggregates the data in three time periods: five-minute, one-hour, and 24-hour.

 

   $DFESP_HOME/bin/dfesp_xml_server -model file://vwap.xml -pubsub 5555 -http-admin 5556

      -http-pubsub 5557  

 

To access the SAS Event Stream Processing window from a CAS session, first you must define a CAS library (caslib) with srcType=”esp” along with a SAS Event Stream Processing server name and port number. The following code snippet describes the creation of a user session, the addition of a loadStreams action set, and the creation of a caslib named “espStatic.” The caslib is being created to read data from the SAS Event Stream Processing server that is listening on port 5555 and hosted at server gatekrbhdp01.gatehadoop.com.  

 

 

Code:

 

CAS mySession host="gatekrbhdp01.gatehadoop.com" SESSOPTS=( CASLIB=casuser TIMEOUT=999 LOCALE="en_US");

proc cas;

  builtins.loadActionSet actionSet="loadStreams";

  table.addCaslib

  dataSource={port=5555,server="gatekrbhdp01.gatehadoop.com",srcType="esp"}

  name="espStatic" ;

run;  

 

 

 

Results: cas_eps_1.pngOnce the SAS Event Stream Processing server is running with a pre-defined model and the SAS Event Stream Processing caslib has been established, you can read the SAS Event Stream Processing project metadata from the CAS session. The following code describes the metadata read of “espStatic” caslib, which is connected to the SAS Event Stream Processing server.  

 

 

Code:

 

proc cas;

   loadStreams.mMetaData casLib="espStatic";

run;  

 

 

Results:

 

 

cas_eps_2.png  

 

Based on the available list in the SAS Event Stream Processing data window, you can provide the name of the window in the “espURI=“option to read the data into the CAS table. The following example shows the code for loading snapshot data from the "tradesDemo/trades/aggW5minRet" SAS Event Stream Processing window into the global CAS table “aggW5minRet” in the “casuser” caslib. The table fetch action displays the data from the CAS table. You can also specify the index and list of columns that you want to fetch from the CAS table. In this example, we have selected a few columns and excluded the index. The output data stream in the CAS environment has to be a global table. Session scoped tables do not support SAS Event Stream Processing output that comes from a continuous data stream.  

 

 

Code:

 

proc cas;

   loadStreams.loadSnapshot /

   casLib="espStatic"

   espUri="tradesDemo/trades/aggW5minRet"

   casOut={caslib="casuser",name="aggW5minRet", promote=true } ;

   table.fetch /

   table={caslib="casuser",name="aggW5minRet"

   vars={'symbol*','awap','minprice','maxprice','std'}

   }

   index=false;

run;  

 

 

 

 

Results:

 

cas_eps_3.png  

 

 

To append data from a SAS Event Stream Processing window to an existing CAS table, use the “appendSnapshot” action statement. The following code example appends the data from the tradesDemo/trades/aggW5minRet ESP window to the existing CAS table “aggW5minRet”.  

 

 

Code:

 

proc cas;

   loadStreams.appendSnapshot /

   casLib="espStatic"

   espUri="tradesDemo/trades/aggW5minRet"

   casOut={caslib="casuser",name="aggW5minRet"} ;

run;  

 

 

 

To continuously stream data from a SAS Event Stream Processing window to a CAS table, use the “loadStream” action statement. The following code example describes the continuous data stream from the tradesDemo/trades/aggW5minRet ESP window to the CAS table “aggW5minRet”. It is suggested to execute these types of operations using an operating system command from a native Python program as the data is streaming continuously.  

 

 

Code:

 

proc cas;

   loadStreams.loadStream /

   casLib="espStatic"

   espUri="tradesDemo/trades/aggW5minRet"

   casOut={caslib="casuser",name="aggW5minRet" promote=true} ;

run;  

 

 

Here is a list of CAS actions for querying event streaming projects and getting snapshots of streaming data.

  • appendSnapshot – Append the current snapshot of a window to a CAS table
  • loadSnapshot – Load the current snapshot of a window to a CAS table
  • loadStream - Stream data from a window into a CAS table
  • nMetaData - Get all the model metadata for projects/queries/windows

 

 

Note: If your SAS Viya 3.1 environment is not able to load data from SAS Event Stream Processing 4.2, you might need to update the ~/viya/home/SASFoundation/sasexe/espact.so library file. After updating this library file, it requires restarting the CAS environment     

 

 

For related information about using a CAS session to access data streams from SAS Event Stream Processing windows, see : Streaming Data Action Set: Syntax

Accessing ESP streams with CAS.  

Contributors
Your turn
Sign In!

Want to write an article? Sign in with your profile.