BookmarkSubscribeRSS Feed
☑ This topic is solved. Need further help from the community? Please sign in and ask a new question.
acordes
Rhodochrosite | Level 12
I have a large dataset with a functioning program as an IML action.
But it takes a long time to accomplish it.
Beside amendments I could introduce to run it more efficiently, I wonder how I can use several nodes and threads for this computation.

https://documentation.sas.com/doc/en/pgmsascdc/v_006/casactiml/casactiml_iml_examples16.htm

Reading the documentation and its examples, I think that ParTasks together with the MapReduce function should help me out.
Am I on the right track ?
For the logic to unfold I need to have control over how the data will be distributed.
Imagine having 8 millions of rows with 1 million different customers. I need all entries from one customer to be sent to the same thread. If not, the logic doesn't play out. If this criteria is met, then the MapReduce should be easy to implement.

So I must send customers whose surnames start with A,B,C to thread number 1, D--F to thread 2, ...

Is there any example out there that combines ParTasks and MapReduce?

Thanks
1 ACCEPTED SOLUTION

Accepted Solutions
Rick_SAS
SAS Super FREQ

A few thoughts:

1. I don't know the details of your program, but you can't "combine" ParTasks and MapReduce. Use one or the other. You should use ParTasks when the tasks on each thread are different. Use MapReduce when the tasks on each thread are similar.

2. While it is possible to partition data so that some observations are on one node and some are on others, I've never done it. 

3. Threads on the same node share the same resources for the node. So in SMP mode (controller only), all your threads can process the same data. Perhaps the threads can process different customers? The first thread could keep and process only A-F, the second thread G-M, and so on.  If so, you don't need to partition the data across nodes.

 

My suggestion: Try to solve this problem on one node by using multiple threads. To get started, create a CAS table with about 100 fake customers and see if you can write a  MapReduce task that uses 4 threads in which each thread processes only a subset of the data, such as A-F, G-M, N-S, and T-Z.  As a first program, see if you can return the number of observations that each thread processes.

View solution in original post

5 REPLIES 5
sbxkoenk
SAS Super FREQ

Hello,

 

I'm also very curious about the answer to your question.

Especially this part : << need to have control over how the data will be distributed. >>

And probably my colleague @DaanBijkerk is interested as well.

 

I am sure @Rick_SAS can help you out !

 

I can only say this :

  • The THREADS= option specifies the maximum number of threads that might be used. Not every program will use all threads.
  • MAPREDUCE function is indeed distributing computations in parallel across threads (and nodes, if you are running on a grid of machines).
  • If your code does not use any functions that are multithreaded or that are distributed, then the program will run in a single thread (also in PROC CAS using the IML action set).
  • Regarding what runs in parallel, see SAS Help Center: The iml Action
    which contains the quote, “When you run a traditional PROC IML program in the iml action, the program runs in a single thread on the controller node. For a list of SAS/IML functions that are not supported by the iml action, see Differences between the IML Procedure and the iml Action.”

Best,

Koen

sbxkoenk
SAS Super FREQ

Hello,

( this is in addition to my answer right above , although I think @Rick_SAS just "cycled in between" 😉)


I can't resist pointing out to you (and all other interested readers) other possibilities of parallelization.

 

The first two articles in the list below explain you about MP Connect.

I use MP Connect all the time (also in SAS VIYA 4).

 

The last two show that you can even parallelize (further) within PROC CAS.

 

Running SAS programs in parallel using SAS/CONNECT®
By Leonid Batkhan on SAS Users January 13, 2021
https://blogs.sas.com/content/sgf/2021/01/13/running-sas-programs-in-parallel-using-sas-connect/

 

Base SAS + SAS/CONNECT - A simple method to generate load on any number of licensed cores
Posted 04-08-2021 05:46 AM | by SimonWilliams (4316 views)
https://communities.sas.com/t5/SAS-Communities-Library/Base-SAS-SAS-CONNECT-A-simple-method-to-gener...

 

Using SYSTASK and SAS macro loops for massively parallel processing
By Leonid Batkhan on SAS Users June 14, 2021
https://blogs.sas.com/content/sgf/2021/06/14/using-systask-and-sas-macro-loops-for-massively-paralle...

 

Tips for parallel processing in CASL
by RICKY THARRINGTON on JULY 6, 2021
https://blogs.sas.com/content/subconsciousmusings/2021/07/06/tips-for-parallel-processing-in-casl/

 

Parallel Processing in SAS Viya
by RICKY THARRINGTON on MAY 25, 2021
https://blogs.sas.com/content/subconsciousmusings/2021/05/25/parallel-processing-in-sas-viya/

 

Greetings,
Koen

Rick_SAS
SAS Super FREQ

A few thoughts:

1. I don't know the details of your program, but you can't "combine" ParTasks and MapReduce. Use one or the other. You should use ParTasks when the tasks on each thread are different. Use MapReduce when the tasks on each thread are similar.

2. While it is possible to partition data so that some observations are on one node and some are on others, I've never done it. 

3. Threads on the same node share the same resources for the node. So in SMP mode (controller only), all your threads can process the same data. Perhaps the threads can process different customers? The first thread could keep and process only A-F, the second thread G-M, and so on.  If so, you don't need to partition the data across nodes.

 

My suggestion: Try to solve this problem on one node by using multiple threads. To get started, create a CAS table with about 100 fake customers and see if you can write a  MapReduce task that uses 4 threads in which each thread processes only a subset of the data, such as A-F, G-M, N-S, and T-Z.  As a first program, see if you can return the number of observations that each thread processes.

acordes
Rhodochrosite | Level 12

@Rick_SAS I've resolved it. 

What had taken a long time before (if it finished execution at all), now takes only 2 minutes. I use 16 threads and it works like a charm. 

 

Without comments I paste my code. 

proc cas;
loadactionset "iml";
source DefineMods;
/* SAS/IML function that computes N random numbers and appends
   node and thread information */

start acFunnel(LL);
x=LL$'vv'; 
help=LL$'comb';

qnummese=help[,2:ncol(help)];
y=help[,1];

XT=(SHAPE(DIF(X), NROW(X)) [, 2:NCOL(X)])#2;
idx=loc((x[, 2:NCOL(X)]+xt)=1);
xt[idx]=1;
t2=unique(qnummese);

t22=(repeat(t2, nrow(x))=qnummese)`*(xt = 2);

t22_sum=t22 [+];
ctr=(xt = 2) [+]; 

idx1=loc(t22 <> 0);
s = ndx2sub(dimension(t22), idx1);

res0=j(nrow(s), ncol(x)-1, .);
res0_perc=res0;

do i=1 to ncol(idx1);
idx2=loc( ( (qnummese=t2[s[i,1]])# (xt[, s[i,2]]=2) )=1);
/* PRINT IDX2; */
res0[i,s[i,2]] = t22[idx1[i]];
/* PRINT RES0; */
res0[i,(s[i,2]+1):ncol(xt)]=(x [loc(element(y[,1], y[idx2,1])),(s[i,2]+1):ncol(x)-1] <>0) [+,];
/* PRINT RES0; */
res0_perc[i,(s[i,2]):ncol(xt)]=res0[i,(s[i,2]):ncol(xt)]/res0[i,s[i,2]];
end;

   y =  res0||s;
   return y;
finish;

start acSimParTasks(labl, opt, L);
   cuts = L$'ord';  vals = L$'xx'; idler=L$'idcomb'; varnames=L$'varn';
namer="L1":"L16";

L1= [#'vv'=vals[1:cuts[1]-1,],            #'comb'=idler[1:cuts[1]-1,] ];
L2= [#'vv'=vals[cuts[1]:cuts[2]-1,],      #'comb'=idler[cuts[1]:cuts[2]-1,]];
L3= [#'vv'=vals[cuts[2]:cuts[3]-1,],      #'comb'=idler[cuts[2]:cuts[3]-1,]];
L4= [#'vv'=vals[cuts[3]:cuts[4]-1,],      #'comb'=idler[cuts[3]:cuts[4]-1,]];
L5= [#'vv'=vals[cuts[4]:cuts[5]-1,] ,     #'comb'=idler[cuts[4]:cuts[5]-1,]];
L6= [#'vv'=vals[cuts[5]:cuts[6]-1,] ,     #'comb'=idler[cuts[5]:cuts[6]-1,]];
L7= [#'vv'=vals[cuts[6]:cuts[7]-1,] ,     #'comb'=idler[cuts[6]:cuts[7]-1,]];
L8= [#'vv'=vals[cuts[7]:cuts[8]-1,] ,     #'comb'=idler[cuts[7]:cuts[8]-1,]];
L9= [#'vv'=vals[cuts[8]:cuts[9]-1,] ,     #'comb'=idler[cuts[8]:cuts[9]-1,]] ;
L10=[#'vv'=vals[cuts[9]:cuts[10]-1,] ,    #'comb'=idler[cuts[9]:cuts[10]-1,]];
L11=[#'vv'=vals[cuts[10]:cuts[11]-1,] ,   #'comb'=idler[cuts[10]:cuts[11]-1,]];
L12=[#'vv'=vals[cuts[11]:cuts[12]-1,] ,   #'comb'=idler[cuts[11]:cuts[12]-1,]];
L13=[#'vv'=vals[cuts[12]:cuts[13]-1,] ,   #'comb'=idler[cuts[12]:cuts[13]-1,]];
L14=[#'vv'=vals[cuts[13]:cuts[14]-1,] ,   #'comb'=idler[cuts[13]:cuts[14]-1,]];
L15=[#'vv'=vals[cuts[14]:cuts[15]-1,] ,   #'comb'=idler[cuts[14]:cuts[15]-1,]];
L16=[#'vv'=vals[cuts[15]:cuts[16],] ,     #'comb'=idler[cuts[15]:cuts[16],]];


   Tasks = repeat('acFunnel', 1, 16);  
   Args = [L1, L2, L3, L4, L5, L6, L7, L8, L9, L10, L11, L12, L13, L14, L15, L16];      
   Results = ParTasks(Tasks, Args, opt);
   free M;
   do i = 1 to ListLen(Results);
      M = M // Results$i;
   end;

s=M[, ncol(M)-1:ncol(M)];
MM=M[,1:ncol(M)-2];
qnummese=idler[,2:ncol(idler)];
t2=unique(qnummese);

rown=catx("|", t2 [s[,1]], varnames[s[,2]]);

call MatrixWriteToCAS(MM, '', '_crm_fun', varnames);
/* call MatrixWriteToCAS(res0_perc, '', '_crm_fun_perc', varnames); */
call MatrixWriteToCAS(rown, '', '_crm_fun_id');

/*    varNames = {'mean' 'std' 'min' 'max'}; */
/*    print M[L=labl  F=Best6.]; */
finish;

store module=(acFunnel acSimParTasks);
endsource;
iml / code=DefineMods;
run;

cas mysession sessopts=(caslib="casuser");
/* try your best */
options casdatalimit=all;
proc cas;
loadactionset "iml";
source RandMR;
/* Run MapReduce on all workers and threads */
KeepStmt1 = 'KEEP=codidoc codopera  ';
KeepStmt2 = 'KEEP=_numeric_  ';
KeepStmt3 = 'KEEP=QNUMMESE  ';
x = matrixCreateFromCAS('PUBLIC', 'exp2', KeepStmt2 );
y=matrixCreateFromCAS('PUBLIC', 'EXP2', KeepStmt1);
QNUMMESE = matrixCreateFromCAS('PUBLIC', 'EXP2', KeepStmt3);

start TasksPerThread(nT, nW, i);
   n = floor(nT / nW) + (i <= mod(nT, nW));
   return(n);
finish;

nTasks = nrow(y);
nThreads = 16;
i = T(1:nThreads);
n = TasksPerThread(nTasks, nThreads, i);
Thread = char(T(1:nThreads));
/* print n[c={'Num Tasks'} r=Thread L='Tasks per Thread']; */

varnames={&var2s.};

X=X||J(NROW(X),1,0);
XT=(SHAPE(DIF(X), NROW(X)) [, 2:NCOL(X)])#2;
idx=loc((x[, 2:NCOL(X)]+xt)=1);
xt[idx]=1;

y=y[,2]||y[,1];

t2=unique(qnummese);

acum_n=cusum(n);
/* print acum_n; */
call sortndx(idx, y, 1 );
y=y[idx,];
x=x[idx,];
QNUMMESE=QNUMMESE[idx,];

op=y[,1];

acum_ok=acum_n[1:nrow(n)-1];
do i=1 to nrow(n)-1;
flag_ok=(op[acum_n[i]-1]^=op[acum_n[i]]);
do j=1 to 200 until(flag_ok);
flag_ok=(op[acum_n[i]-1+j]^=op[acum_n[i]+j]);

end;
acum_ok[i]=acum_n[i]+j;
end;

help=(acum_ok-1)||(acum_ok) || (acum_ok+1);
help2= rowvec(help)`;
help2=({1}//help2)`;

acum_ok=acum_ok//acum_n[nrow(n)];
op=y[,1]||QNUMMESE;



    load module=(acFunnel acSimParTasks);
       L = [#'xx'=x, #'ord'=acum_ok, #'idcomb'=op, #'varn'=varnames]; 
    run acSimParTasks('ParTasks: Threads First', {2 0}, L );

endsource;
iml / code=RandMR nthreads=16;


run;

run;

 

Rick_SAS
SAS Super FREQ

> What had taken a long time before (if it finished execution at all), now takes only 2 minutes

Congratulations. Glad to hear you were successful!

SAS Innovate 2025: Save the Date

 SAS Innovate 2025 is scheduled for May 6-9 in Orlando, FL. Sign up to be first to learn about the agenda and registration!

Save the date!

Multiple Linear Regression in SAS

Learn how to run multiple linear regression models with and without interactions, presented by SAS user Alex Chaplin.

Find more tutorials on the SAS Users YouTube channel.

From The DO Loop
Want more? Visit our blog for more articles like these.
Discussion stats
  • 5 replies
  • 1073 views
  • 6 likes
  • 3 in conversation