A few thoughts:
1. I don't know the details of your program, but you can't "combine" ParTasks and MapReduce. Use one or the other. You should use ParTasks when the tasks on each thread are different. Use MapReduce when the tasks on each thread are similar.
2. While it is possible to partition data so that some observations are on one node and some are on others, I've never done it.
3. Threads on the same node share the same resources for the node. So in SMP mode (controller only), all your threads can process the same data. Perhaps the threads can process different customers? The first thread could keep and process only A-F, the second thread G-M, and so on. If so, you don't need to partition the data across nodes.
My suggestion: Try to solve this problem on one node by using multiple threads. To get started, create a CAS table with about 100 fake customers and see if you can write a MapReduce task that uses 4 threads in which each thread processes only a subset of the data, such as A-F, G-M, N-S, and T-Z. As a first program, see if you can return the number of observations that each thread processes.
Hello,
I'm also very curious about the answer to your question.
Especially this part : << need to have control over how the data will be distributed. >>
And probably my colleague @DaanBijkerk is interested as well.
I am sure @Rick_SAS can help you out !
I can only say this :
Best,
Koen
Hello,
( this is in addition to my answer right above , although I think @Rick_SAS just "cycled in between" 😉)
I can't resist pointing out to you (and all other interested readers) other possibilities of parallelization.
The first two articles in the list below explain you about MP Connect.
I use MP Connect all the time (also in SAS VIYA 4).
The last two show that you can even parallelize (further) within PROC CAS.
Running SAS programs in parallel using SAS/CONNECT®
By Leonid Batkhan on SAS Users January 13, 2021
https://blogs.sas.com/content/sgf/2021/01/13/running-sas-programs-in-parallel-using-sas-connect/
Base SAS + SAS/CONNECT - A simple method to generate load on any number of licensed cores
Posted 04-08-2021 05:46 AM | by SimonWilliams (4316 views)
https://communities.sas.com/t5/SAS-Communities-Library/Base-SAS-SAS-CONNECT-A-simple-method-to-gener...
Using SYSTASK and SAS macro loops for massively parallel processing
By Leonid Batkhan on SAS Users June 14, 2021
https://blogs.sas.com/content/sgf/2021/06/14/using-systask-and-sas-macro-loops-for-massively-paralle...
Tips for parallel processing in CASL
by RICKY THARRINGTON on JULY 6, 2021
https://blogs.sas.com/content/subconsciousmusings/2021/07/06/tips-for-parallel-processing-in-casl/
Parallel Processing in SAS Viya
by RICKY THARRINGTON on MAY 25, 2021
https://blogs.sas.com/content/subconsciousmusings/2021/05/25/parallel-processing-in-sas-viya/
Greetings,
Koen
A few thoughts:
1. I don't know the details of your program, but you can't "combine" ParTasks and MapReduce. Use one or the other. You should use ParTasks when the tasks on each thread are different. Use MapReduce when the tasks on each thread are similar.
2. While it is possible to partition data so that some observations are on one node and some are on others, I've never done it.
3. Threads on the same node share the same resources for the node. So in SMP mode (controller only), all your threads can process the same data. Perhaps the threads can process different customers? The first thread could keep and process only A-F, the second thread G-M, and so on. If so, you don't need to partition the data across nodes.
My suggestion: Try to solve this problem on one node by using multiple threads. To get started, create a CAS table with about 100 fake customers and see if you can write a MapReduce task that uses 4 threads in which each thread processes only a subset of the data, such as A-F, G-M, N-S, and T-Z. As a first program, see if you can return the number of observations that each thread processes.
@Rick_SAS I've resolved it.
What had taken a long time before (if it finished execution at all), now takes only 2 minutes. I use 16 threads and it works like a charm.
Without comments I paste my code.
proc cas;
loadactionset "iml";
source DefineMods;
/* SAS/IML function that computes N random numbers and appends
node and thread information */
start acFunnel(LL);
x=LL$'vv';
help=LL$'comb';
qnummese=help[,2:ncol(help)];
y=help[,1];
XT=(SHAPE(DIF(X), NROW(X)) [, 2:NCOL(X)])#2;
idx=loc((x[, 2:NCOL(X)]+xt)=1);
xt[idx]=1;
t2=unique(qnummese);
t22=(repeat(t2, nrow(x))=qnummese)`*(xt = 2);
t22_sum=t22 [+];
ctr=(xt = 2) [+];
idx1=loc(t22 <> 0);
s = ndx2sub(dimension(t22), idx1);
res0=j(nrow(s), ncol(x)-1, .);
res0_perc=res0;
do i=1 to ncol(idx1);
idx2=loc( ( (qnummese=t2[s[i,1]])# (xt[, s[i,2]]=2) )=1);
/* PRINT IDX2; */
res0[i,s[i,2]] = t22[idx1[i]];
/* PRINT RES0; */
res0[i,(s[i,2]+1):ncol(xt)]=(x [loc(element(y[,1], y[idx2,1])),(s[i,2]+1):ncol(x)-1] <>0) [+,];
/* PRINT RES0; */
res0_perc[i,(s[i,2]):ncol(xt)]=res0[i,(s[i,2]):ncol(xt)]/res0[i,s[i,2]];
end;
y = res0||s;
return y;
finish;
start acSimParTasks(labl, opt, L);
cuts = L$'ord'; vals = L$'xx'; idler=L$'idcomb'; varnames=L$'varn';
namer="L1":"L16";
L1= [#'vv'=vals[1:cuts[1]-1,], #'comb'=idler[1:cuts[1]-1,] ];
L2= [#'vv'=vals[cuts[1]:cuts[2]-1,], #'comb'=idler[cuts[1]:cuts[2]-1,]];
L3= [#'vv'=vals[cuts[2]:cuts[3]-1,], #'comb'=idler[cuts[2]:cuts[3]-1,]];
L4= [#'vv'=vals[cuts[3]:cuts[4]-1,], #'comb'=idler[cuts[3]:cuts[4]-1,]];
L5= [#'vv'=vals[cuts[4]:cuts[5]-1,] , #'comb'=idler[cuts[4]:cuts[5]-1,]];
L6= [#'vv'=vals[cuts[5]:cuts[6]-1,] , #'comb'=idler[cuts[5]:cuts[6]-1,]];
L7= [#'vv'=vals[cuts[6]:cuts[7]-1,] , #'comb'=idler[cuts[6]:cuts[7]-1,]];
L8= [#'vv'=vals[cuts[7]:cuts[8]-1,] , #'comb'=idler[cuts[7]:cuts[8]-1,]];
L9= [#'vv'=vals[cuts[8]:cuts[9]-1,] , #'comb'=idler[cuts[8]:cuts[9]-1,]] ;
L10=[#'vv'=vals[cuts[9]:cuts[10]-1,] , #'comb'=idler[cuts[9]:cuts[10]-1,]];
L11=[#'vv'=vals[cuts[10]:cuts[11]-1,] , #'comb'=idler[cuts[10]:cuts[11]-1,]];
L12=[#'vv'=vals[cuts[11]:cuts[12]-1,] , #'comb'=idler[cuts[11]:cuts[12]-1,]];
L13=[#'vv'=vals[cuts[12]:cuts[13]-1,] , #'comb'=idler[cuts[12]:cuts[13]-1,]];
L14=[#'vv'=vals[cuts[13]:cuts[14]-1,] , #'comb'=idler[cuts[13]:cuts[14]-1,]];
L15=[#'vv'=vals[cuts[14]:cuts[15]-1,] , #'comb'=idler[cuts[14]:cuts[15]-1,]];
L16=[#'vv'=vals[cuts[15]:cuts[16],] , #'comb'=idler[cuts[15]:cuts[16],]];
Tasks = repeat('acFunnel', 1, 16);
Args = [L1, L2, L3, L4, L5, L6, L7, L8, L9, L10, L11, L12, L13, L14, L15, L16];
Results = ParTasks(Tasks, Args, opt);
free M;
do i = 1 to ListLen(Results);
M = M // Results$i;
end;
s=M[, ncol(M)-1:ncol(M)];
MM=M[,1:ncol(M)-2];
qnummese=idler[,2:ncol(idler)];
t2=unique(qnummese);
rown=catx("|", t2 [s[,1]], varnames[s[,2]]);
call MatrixWriteToCAS(MM, '', '_crm_fun', varnames);
/* call MatrixWriteToCAS(res0_perc, '', '_crm_fun_perc', varnames); */
call MatrixWriteToCAS(rown, '', '_crm_fun_id');
/* varNames = {'mean' 'std' 'min' 'max'}; */
/* print M[L=labl F=Best6.]; */
finish;
store module=(acFunnel acSimParTasks);
endsource;
iml / code=DefineMods;
run;
cas mysession sessopts=(caslib="casuser");
/* try your best */
options casdatalimit=all;
proc cas;
loadactionset "iml";
source RandMR;
/* Run MapReduce on all workers and threads */
KeepStmt1 = 'KEEP=codidoc codopera ';
KeepStmt2 = 'KEEP=_numeric_ ';
KeepStmt3 = 'KEEP=QNUMMESE ';
x = matrixCreateFromCAS('PUBLIC', 'exp2', KeepStmt2 );
y=matrixCreateFromCAS('PUBLIC', 'EXP2', KeepStmt1);
QNUMMESE = matrixCreateFromCAS('PUBLIC', 'EXP2', KeepStmt3);
start TasksPerThread(nT, nW, i);
n = floor(nT / nW) + (i <= mod(nT, nW));
return(n);
finish;
nTasks = nrow(y);
nThreads = 16;
i = T(1:nThreads);
n = TasksPerThread(nTasks, nThreads, i);
Thread = char(T(1:nThreads));
/* print n[c={'Num Tasks'} r=Thread L='Tasks per Thread']; */
varnames={&var2s.};
X=X||J(NROW(X),1,0);
XT=(SHAPE(DIF(X), NROW(X)) [, 2:NCOL(X)])#2;
idx=loc((x[, 2:NCOL(X)]+xt)=1);
xt[idx]=1;
y=y[,2]||y[,1];
t2=unique(qnummese);
acum_n=cusum(n);
/* print acum_n; */
call sortndx(idx, y, 1 );
y=y[idx,];
x=x[idx,];
QNUMMESE=QNUMMESE[idx,];
op=y[,1];
acum_ok=acum_n[1:nrow(n)-1];
do i=1 to nrow(n)-1;
flag_ok=(op[acum_n[i]-1]^=op[acum_n[i]]);
do j=1 to 200 until(flag_ok);
flag_ok=(op[acum_n[i]-1+j]^=op[acum_n[i]+j]);
end;
acum_ok[i]=acum_n[i]+j;
end;
help=(acum_ok-1)||(acum_ok) || (acum_ok+1);
help2= rowvec(help)`;
help2=({1}//help2)`;
acum_ok=acum_ok//acum_n[nrow(n)];
op=y[,1]||QNUMMESE;
load module=(acFunnel acSimParTasks);
L = [#'xx'=x, #'ord'=acum_ok, #'idcomb'=op, #'varn'=varnames];
run acSimParTasks('ParTasks: Threads First', {2 0}, L );
endsource;
iml / code=RandMR nthreads=16;
run;
run;
> What had taken a long time before (if it finished execution at all), now takes only 2 minutes
Congratulations. Glad to hear you were successful!
Registration is now open for SAS Innovate 2025 , our biggest and most exciting global event of the year! Join us in Orlando, FL, May 6-9.
Sign up by Dec. 31 to get the 2024 rate of just $495.
Register now!
Learn how to run multiple linear regression models with and without interactions, presented by SAS user Alex Chaplin.
Find more tutorials on the SAS Users YouTube channel.