We’re smarter together. Learn from this collection of community knowledge and add your expertise.

How to read SAS Scalable Performance Data Engine Tables on the Hadoop Distributed File System

by SAS Employee SteveSober ‎02-03-2016 10:39 AM - edited ‎02-05-2016 05:44 PM (1,390 Views)

In a previous article, we learned how to create a SAS Scalable Performance Data (SPD) Engine table that is stored on HDFS. Here, as part of the SAS Data Management for Hadoop article series, we’ll explore how to read an SPD Engine table stored on HDFS.

 

I’ll take you through an example whereby the size of my SPD Engine table is 100GB and the WHERE statement returns 14% of the data.

 

Let’s start by reviewing the options in the following SPD Engine LIBNAME statement.

 

LIBNAME MYSPDE SPDE '/user/sasss1' 
        HDFSHOST=DEFAULT 
        PARALLELWRITE=YES 
        PARALLELREAD=YES
        ACCELWHERE=YES;
  1. MYSPDE is the LIBREF we will reference in our SAS code to process the SPD Engine data stored on HDFS
  2. SPDE is the engine SPD Engine uses to process SPD Engine tables
  3. '/user/sasss1' is the path on HDFS that our SPD Engine data will be read from
  4. HDFSHOST=DEFAULT will default to the Hadoop system that the following SAS/ACCESS Interface to Hadoop OPTIONS point to
    1. options set=SAS_HADOOP_CONFIG_PATH=
    2. options set=SAS_HADOOP_JAR_PATH=
  5. PARALLELWRITE=YES tells SPD Engine to use parallel processing to write data to HDFS
  6. PARALLELREAD=YES tells SPD Engine to use parallel processing to read data stored in HDFS
  7. ACCELWHERE=YES tells SPD Engine to push all WHERE clauses down to Hadoop


UN-COMPRESSED data

When SPD Engine data is not compressed, we can push our WHERE statements down to MapReduce by ensuring the SPD Engine LIBNAME option ACCCELWHERE= is set to YES. To validate where the WHERE statements are processed, set the SPD Engine macro SPDSWDEB= to YES. With SPDSWDEB=YES, the SAS log provides information on the processing location of WHERE statements.

 

 

12        +%let SPDSWDEB=YES; **SAS Log info on where WHERE was processed; 
 	 
80        +data MYSPDE.&source.3;
81        +   set MYSPDE.&source;
82        +   where joinvalue < 3;
 
whinit: WHERE (joinvalue<3)
whinit returns: ALL EVAL2
83        +run;

WHERE processing is optimized on the Hadoop cluster
Hadoop Job ID: job_1450299322397_0061
INFO: Parallel write processing is being performed using 4 threads.
NOTE: There were 9375485 observations read from the data set MYSPDE.TESTDATA.
      WHERE joinvalue<3;
NOTE: The data set MYSPDE.TESTDATA3 has 9375485 observations and 204 variables.
NOTE: DATA statement used (Total process time):
      real time           8:55.79
      cpu time            26.11 seconds

86        +proc sql;
87        +   create table MYSPDE.&source._sql as
88        +   select x1, x3, x5, joinvalue from MYSPDE.&source
89        +   where joinvalue < 3;

whinit: WHERE (joinvalue<3)
whinit returns: ALL EVAL2
WHERE processing is optimized on the Hadoop cluster
Hadoop Job ID: job_1450299322397_0062
INFO: Parallel write processing is being performed using 4 threads.
NOTE: Table MYSPDE.TESTDATA_SQL created, with 9375485 rows and 4 columns.

90        +quit;
NOTE: PROCEDURE SQL used (Total process time):
      real time           7:45.81
      cpu time            11.79 seconds



 

When SPD Engine data is not compressed, we can also process that data using the SAS In-Database Code Accelerator for Hadoop. SAS In-Database Code Accelerator for Hadoop is part of the bundle of software you get when you license SAS Data Loader for Hadoop. The SAS In-Database Code Accelerator for Hadoop uses the SAS DS2 language to run DS2 code in a MapReduce framework. To accomplish this, create a DS2 THREAD program that contains your source table(s) and the business rules you need to apply to that data. To execute that THREAD program, in parallel on every data node in your Hadoop cluster, we declared it to the DS2 DATA program and then executed it using the SET statement of that DATA program.

 

 

104       +proc ds2;
105       +THREAD work.thread / overwrite=yes;
106       +dcl double count;
107       +method run ();
108       +   set myspde.&source;
109       +   by joinvalue;
110       +   if first.joinvalue then count = 0;
111       +   count + 1;
112       +   if last.joinvalue;
113       +end;
114       +ENDTHREAD;
115       +DATA MYSPDE.&source.2 (overwrite=yes);
116       +DECLARE THREAD work.thread thrd;
117       +method run ();
118       +   SET FROM thrd;
119       +end;
120       +ENDDATA;
121       +run;
NOTE: Created thread thread in data set work.thread.
NOTE: Running THREAD program in-database
NOTE: Running DATA program in-database
NOTE: Execution succeeded. No rows affected.
122       +quit;

NOTE: PROCEDURE DS2 used (Total process time):
      real time           20:39.18
      cpu time            2.59 seconds

 

When SPD Engine data is not compressed, all SAS-High Performance procedures will lift that data, in parallel, in to the memory of the SAS-High-Performance Analytics Server.  

 

 

128       +PROC HPLOGISTIC DATA = MYSPDE.&source;
129       +performance nodes=all DETAILS;
130       +Class j / PARAM= GLM;
131       +	WEIGHT c98 ;
132       +MODEL y (EVENT = "0" ) = /  link= LOGIT;
133       +run;

NOTE: No explanatory variables have been specified.
INFO: Read the content of utility file /opt/sasinside/XMLS/core-site.xml.
INFO: Read the content of utility file /opt/sasinside/XMLS/hdfs-site.xml.
INFO: Read the content of utility file /opt/sasinside/XMLS/mapred-site.xml.
INFO: Read the content of utility file /opt/sasinside/XMLS/yarn-site.xml.
NOTE: The HPLOGISTIC procedure is executing in the distributed computing environment with 
4 worker nodes.
NOTE: You are modeling the probability that y='0'.
NOTE: Convergence criterion (GCONV=1E-8) satisfied.
NOTE: The PROCEDURE HPLOGISTIC printed pages 7-8.
NOTE: PROCEDURE HPLOGISTIC used (Total process time):
      real time           1:59.46
      cpu time            3.29 seconds 

 

COMPRESSED data

When the SPD Engine data is compressed, we cannot push our WHERE statements down to MapReduce. But do not fret – in my testing, the real time is faster than pushing that WHERE statement down to MapReduce. For the DATA-Step test, the compressed real time was 3:38.48 quicker. For the PROC SQL test, the compressed real time was 3:18.39 quicker.

  

87        +data MYSPDE.&source.3;
88        +   set MYSPDE.&source;
89        +   where joinvalue < 3;
 
whinit: WHERE (joinvalue<3)
The data set is compressed
WHERE processing cannot be optimized on the Hadoop cluster
whinit returns: ALL EVAL2
90        +run;

NOTE: There were 9375485 observations read from the data set MYSPDE.TESTDATA.
      WHERE joinvalue<3;
NOTE: The data set MYSPDE.TESTDATA3 has 9375485 observations and 204 variables.
NOTE: DATA statement used (Total process time):
      real time           5:17.31
      cpu time            5:06.40

93        +proc sql;
94        +   create table MYSPDE.&source._sql as
95        +   select x1, x3, x5, joinvalue from MYSPDE.&source
96        +   where joinvalue < 3;
 
whinit: WHERE (joinvalue<3)
The data set is compressed
WHERE processing cannot be optimized on the Hadoop cluster
whinit returns: ALL EVAL2
NOTE: Table MYSPDE.TESTDATA_SQL created, with 9375485 rows and 4 columns.

97        +quit;
NOTE: PROCEDURE SQL used (Total process time):
      real time           4:27.42
      cpu time            4:45.59

 

When the SPD Engine data is compressed, we cannot process that data using the MapReduce framework using DS2 code and SAS In-Database Code Accelerator for Hadoop. 

 

111       +proc ds2;
112       +thread work.thread / overwrite=yes;
113       +dcl double count;
114       +method run ();
115       +   set MYSPDE.&source;
116       +   by joinvalue;
117       +   if first.joinvalue then count = 0;
118       +   count + 1;
119       +   if last.joinvalue;
120       +end;
121       +endthread;
122       +data MYSPDE.&source.2 (overwrite=yes);
123       +dcl thread work.thread thrd;
124       +method run ();
125       +   set from thrd;
126       +end;
127       +enddata;
128       +run;
NOTE: Created thread thread in data set work.thread.
NOTE: Running THREAD program in-database
NOTE: Running DATA program in-database
ERROR: Failed to run DS2INDB.
ERROR: The path was not found: /user/sasss1/spde//sasds2_spd_k2s8ho_es44t8bpnr.
ERROR: Error returned from tkedsPubINDBDS2.
NOTE: Execution succeeded. No rows affected.
129       +quit;

NOTE: PROCEDURE DS2 used (Total process time):
      real time           7.56 seconds
      cpu time            1.48 seconds 

 

When the SPD Engine data is compressed, SAS-High Performance procedures will “front load” the data in to the memory of the SAS-High-Performance Analytics Server. By “front load” we mean the data must leave the Hadoop cluster and flow through the SAS Server in to the memory of the SAS-High-Performance Analytics Server. In this case, the compressed real time was 8:56.43 slower. 

 

135       +PROC HPLOGISTIC DATA = MYSPDE.&source;
136       +performance nodes = all DETAILS;
137       +Class j / PARAM= GLM;
138       +	WEIGHT c98 ;
139       +MODEL y (EVENT = "0" ) = /  link= LOGIT;
140       +run;

NOTE: No explanatory variables have been specified.
NOTE: Data set MYSPDE.TESTDATA cannot be processed in the Embedded Process Environment because the data set is either encrypted or compressed.
NOTE: The data MYSPDE.TESTDATA are being routed through the client.
NOTE: The HPLOGISTIC procedure is executing in the distributed computing environment with 
4 worker nodes.
NOTE: You are modeling the probability that y='0'.
NOTE: Convergence criterion (GCONV=1E-8) satisfied.
NOTE: There were 62500000 observations read from the data set MYSPDE.TESTDATA.
NOTE: The PROCEDURE HPLOGISTIC printed pages 3-4.
NOTE: PROCEDURE HPLOGISTIC used (Total process time):
      real time           10:15.89
      cpu time            4:23.25 

  

To compress or not to compress? That is the question. The answer depends on how you plan to process that data. If you’ve licensed SAS-High Performance procedures, SAS Visual Analytics, SAS Visual Statistics, or SAS IMSTAT, you will want to store your data in an un-compressed format. You’ll have quicker real time for your SAS-High Performance procedures. For SAS Visual Analytics, SAS Visual Statistics, and SAS IMSTAT, it will lift data in to memory quicker. Lastly, it will allow you to leverage the SAS In-Database Code Accelerator for Hadoop when staging your data for visualization, reporting, and predictive analytics.

 

Stay tuned, in my next article we will explore how non-SAS clients can process SPD Engine data stored on HDFS.

 

And be sure to follow the Data Management section of the SAS Communities Library (Click Subscribe in the pink-shaded bar of the section) for more articles on how SAS Data Management works with Hadoop. Here are links to other posts in the series for reference:

 

Comments
by Super User
on ‎02-04-2016 03:28 AM

Nice post! Getting interesting to see what's happening using real examples.

Could you tel little about the iron available to you for these test?

Your data set isn't incredible large, it would be interesting to see a comparison with a SMP execution. Especially with the 14% where clause filter. Feels like that one would outperform the Hadoop scenario, especially if you would use indexes.

by SAS Employee SteveSober
on ‎02-05-2016 02:48 PM

Thank you for your comments LinusH.

 

The Hadoop environment I have access to is small and not a candidate to run official benchmarks on. I am testing with Cloudera - CDH 5.4.5 on 5 Linux Servers (RHEL 6) with 4 of the servers acting as data nodes. Each server has 64GB RAM with Intel(R) Xeon(R) CPU E7-4880 v2 @ 2.50GHz (4 cores) processors. Each server has 251 gigabytes of local storage and for HDFS we have 3.0 terabytes (NFS mount).

 

Suffice to say in the real world Hadoop clusters are much bigger. The customers I have worked with tend to have 100  to 1,000 data nodes. In these environment due to the size of data, meaning source tables are in the hundreds of gigabytes and it is common when joining 2 source tables that the target table is over a terabyte in size, it is not possible to bring the SPD Engine data back to the SAS server to run in SMP mode.  In these environments everything we do must run via MapReduce. To accomplish this we utilize the SAS In-Database Code Accelerator for Hadoop.

Contributors
Your turn
Sign In!

Want to write an article? Sign in with your profile.


Looking for the Ask the Expert series? Find it in its new home: communities.sas.com/askexpert.