Hello,
I am trying to load a Databricks table into the memory of a caslib using the proc cas and/or proc casutil and I cannot manage to make it work. I can however load tables using the method of assigning a libname on the compute server pointing to databricks, another pointing to a caslib and using the datastep to load the table.
I tried both, using code and the Data Explorer interface and none seems to be working. Please note that I can connect successfully and list all the files in the schema. It is only when I try to load a table into memory that I ran into issues.
This is the log I get when successfully testing the connection using the Data Explorer (No errors and the schema is not the same as below in the code )
/****************************************************************************
* Connection Label : Test databricks
* Connection Name : ttdbwh
* Connection srcType: spark
* Connection Type : 05f6913e-c968-4e0f-94de-4e774a7501ca
* Connection Target : 519ce398-28fc-4720-b371-e8d2239a88cb
****************************************************************************/
/****************************************************************************
* _sdc_ prefix indicates functions and variables used in connections. *
****************************************************************************/
/* Query CAS to see if cas library exists. Return TRUE if it does, FALSE otherwise. */
function _sdc_doesCaslibExist(_sdc_casLibrary);
table.querycaslib result=r_libExists / caslib=_sdc_casLibrary;
_sdc_exists = dictionary(r_libExists, _sdc_casLibrary);
return _sdc_exists;
end func;
/* Drop the CAS library, if it exists. */
function _sdc_dropCaslib(_sdc_casLibrary);
_sdc_exists = _sdc_doesCaslibExist(_sdc_casLibrary);
if (_sdc_exists = FALSE) then do;
print (warn) 'Cannot drop non-existent caslib.';
end;
else do;
print (note) 'Dropping library: "'|| _sdc_casLibrary || '".';
table.dropCaslib status=sc / caslib=_sdc_casLibrary;
if (sc.statusCode != 0) then do;
print (warn) 'Unable to delete caslib "' || _sdc_casLibrary || '".';
print (warn) 'Reason: "' || sc.reason || '".' ;
end;
end;
end func;
/* Verify that a caslib is valid by trying to fetch the table listing */
function _sdc_verifyCaslib(_sdc_casLibrary);
table.fileInfo result=res status=sc / caslib=_sdc_casLibrary;
if (0 = sc.severity) then do;
print (note) 'Successfully verified caslib "' || _sdc_casLibrary || '".';
end;
else do;
print (warn) 'Unable to verify caslib "' || _sdc_casLibrary || '".';
print (warn) 'Reason: "' || sc.reason || '".';
end;
end func;
/* Create caslib */
function _sdc_createCaslib(_sdc_casLibrary);
table.addCaslib / name=_sdc_casLibrary description='' datasource={srctype='spark',
authenticationType='OAUTH2',
httpPath='/sql/1.0/warehouses/#######',
platform='databricks',
port=443,
schema='dbname.dwh',
server='###############.#.azuredatabricks.net'
} session=TRUE;
end func;
/****************************************************************************
* Test *
****************************************************************************/
/* Check if the passed in caslib name exists already. If so, generate a new name */
_sdc_casLibrary='Test databricks';
_sdc_exists = _sdc_doesCaslibExist(_sdc_casLibrary);
if _sdc_exists != 0 then do;
print (note) 'NOTE: Using random alias.';
_sdc_casLibrary='pU0QqDz5';
/* Warn if exists when creating new. */
print (warn) 'Found existing library with name "' || 'Test databricks' || '".';
print (warn) 'Library may give an error on connect if the scope is the same as existing.';
end;
/* Try to create our test caslib. Note that we define this as SESSION ONLY in the template data. */
_sdc_createCaslib(_sdc_casLibrary);
/* Verify */
_sdc_verifyCaslib(_sdc_casLibrary);
/* Remove test caslib */
_sdc_dropCaslib(_sdc_casLibrary);
NOTE: Active Session now server.
NOTE: 'Test databricks' is now the active caslib.
NOTE: Cloud Analytic Services added the caslib 'Test databricks'.
Name Type Description Path
Test databricks spark
Test databricks
Test databricks
Test databricks
Test databricks
Test databricks
Definition Subdirs Local Active
schema = 'dbname.dwh' 0 1 1
authenticationType = 'OAUTH2' . . .
httpPath = '/sql/1.0/warehouses/#######' . . .
server = '###############.#.azuredatabricks.net' . . .
port = 443 . . .
platform = 'databricks' . . .
Personal Hidden Transient TableRedistUpPolicy
0 0 0 Not Specified
. . .
. . .
. . .
. . .
. . .
NOTE: Successfully verified caslib "Test databricks".
NOTE: Dropping library: "Test databricks".
NOTE: 'CASUSER(myemail@mydomain.com)' is now the active caslib.
NOTE: Cloud Analytic Services removed the caslib 'Test databricks'.
This is the SAS code I used to test the different load methods:
%let ctl=%nrstr(samples);
%let schm=nyctaxi;
/*Connect to Databricks using compute*/
libname dbk_tt spark
platform=databricks
auth=oauth2
server="&srv."
port=443
schema="&schm"
httpPath="&hpath."
properties="Catalog=&ctl.;"
;
libname dbk_tt list;
/*Connect to CAS*/
cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);;
/*Assign a libname pointing to caslib*/
libname cuser cas caslib=casuser sessref=casauto;
/*Save data in caslib*/
data cuser.trips;
set dbk_tt.trips;
where datepart(tpep_dropoff_datetime) = '25feb2016'd;
run;
/*Create a table in caslib using fedsql*/
proc fedsql sessref=casauto _method ;
create table casuser.trp10 {options replace=true} as
select *
from casuser.trips a
where a.trip_distance > 10;
quit;
/*Drop table in caslib*/
proc casutil sessref=casauto incaslib=casuser outcaslib=casuser;
droptable casdata='Trips' quiet;
run;
quit;
cas casauto terminate;
libname dbk_tt clear;
/*Start cas server*/
cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);;
/*Connect caslib*/
caslib spkcaslib
dataSource=(
srctype='spark',
platform='databricks',
authenticationType='OAUTH2',
port=443,
httpPath="&hpath.",
schema="&ctl..&schm.",
server="&srv.",
BULKLOAD=NO
);
/*assign libname in compute (not really needed) */
libname ttt cas caslib=spkcaslib;
/*List files in caslib*/
proc casutil sessref=casauto incaslib="spkcaslib" ;
list files ;
run;
quit;
/*Load table into casuser*/
proc casutil ;
load casdata='trips' incaslib=spkcaslib casout='Trips' replace ;
run;
quit;
cas casauto terminate;
and this is the log:
86 %let ctl=%nrstr(samples); 87 %let schm=nyctaxi; 88 89 90 /*Connect to Databricks using compute*/ 91 libname dbk_tt spark 92 platform=databricks 93 auth=oauth2 94 server="&srv." 95 port=443 96 schema="&schm" 97 httpPath="&hpath." 98 properties="Catalog=&ctl.;" 99 ; NOTE: Libref DBK_TT was successfully assigned as follows: Engine: SPARK Physical Name: jdbc:cdata:databricks:Server=###############.#.azuredatabricks.net;Database=nyctaxi;HTTPPath=/sql/1.0/warehouses/###### ######;QueryPassthrough=true;UseCloudFetch=true;InitiateOAuth=OFF;AuthScheme=AzureAD;OAuthAccessToken=******;Catalog=samples ; 100 libname dbk_tt list; NOTE: Libref= DBK_TT Scope= Compute Server Engine= SPARK Physical Name= jdbc:cdata:databricks:Server=###############.#.azuredatabricks.net;Database=nyctaxi;HTTPPath=/sql/1.0/warehouses/###### ######;QueryPassthrough=true;UseCloudFetch=true;InitiateOAuth=OFF;AuthScheme=AzureAD;OAuthAccessToken=******;Catalog=samples ; Schema= nyctaxi Comment= Sample database nyctaxi Location= DBMS Major Version= 3 DBMS Minor Version= 1 101 102 /*Connect to CAS*/ 103 cas casauto sessopts=(messagelevel="ALL" metrics=TRUE); NOTE: Executing action 'sessionProp.setSessOpt'. NOTE: Action 'sessionProp.setSessOpt' used (Total process time): NOTE: real time 0.000490 seconds NOTE: cpu time 0.000426 seconds (86.94%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 303.44K (0.00%) NOTE: The CAS statement request to update one or more session options for session CASAUTO completed. 103! ; 104 105 /*Assign a libname pointing to caslib*/ 106 libname cuser cas caslib=casuser sessref=casauto; NOTE: Libref CUSER was successfully assigned as follows: Engine: CAS Physical Name: 61b2075d-b2bb-6143-af5e-c80b28528d92 107 108 /*Save data in caslib*/ 109 data cuser.trips; 110 set dbk_tt.trips; 111 where datepart(tpep_dropoff_datetime) = '25feb2016'd; 112 run; NOTE: Executing action 'table.tableInfo'. NOTE: Action 'table.tableInfo' used (Total process time): NOTE: real time 0.000754 seconds NOTE: cpu time 0.000643 seconds (85.28%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 314.47K (0.00%) NOTE: There were 373 observations read from the data set DBK_TT.TRIPS. WHERE DATEPART(tpep_dropoff_datetime)='25FEB2016'D; NOTE: The data set CUSER.TRIPS has 373 observations and 6 variables. NOTE: Executing action 'table.addTable'. NOTE: DATA statement used (Total process time): real time 2.33 seconds cpu time 0.11 seconds 113 114 115 /*Create a table in caslib using fedsql*/ 116 proc fedsql sessref=casauto _method ; 117 create table casuser.trp10 {options replace=true} as 118 select * 119 from casuser.trips a 120 where a.trip_distance > 10; NOTE: Executing action 'fedSql.execDirect'. Methods for full query plan ---------------------------- SeqScan with qual from CASUSER.TRIPS Methods for stage 1 -------------------- SeqScan from {Push Down}.Child 1 NOTE: CASDAL driver. Creation of a TIMESTAMP column has been requested, but is not supported by the CASDAL driver. A DOUBLE PRECISION column will be created instead. A DATETIME format will be associated with the column. NOTE: CASDAL driver. Creation of a TIMESTAMP column has been requested, but is not supported by the CASDAL driver. A DOUBLE PRECISION column will be created instead. A DATETIME format will be associated with the column. NOTE: Table TRP10 was created in caslib CASUSER(myname@mydomain.com) with 17 rows returned. NOTE: Action 'fedSql.execDirect' used (Total process time): NOTE: real time 0.018885 seconds NOTE: cpu time 0.030097 seconds (159.37%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 12.60M (0.01%) 121 quit; NOTE: PROCEDURE FEDSQL used (Total process time): real time 0.03 seconds cpu time 0.00 seconds 122 123 /*Drop table in caslib*/ 124 proc casutil sessref=casauto incaslib=casuser outcaslib=casuser; NOTE: The UUID '61b2075d-b2bb-6143-af5e-c80b28528d92' is connected using session CASAUTO. 125 125! droptable casdata='Trips' quiet; NOTE: Executing action 'table.dropTable'. NOTE: Action 'table.dropTable' used (Total process time): NOTE: real time 0.000459 seconds NOTE: cpu time 0.000425 seconds (92.59%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 311.00K (0.00%) NOTE: The Cloud Analytic Services server processed the request in 0.000459 seconds. 126 run; 127 quit; NOTE: PROCEDURE CASUTIL used (Total process time): real time 0.00 seconds cpu time 0.01 seconds 128 129 cas casauto terminate; NOTE: Libref CUSER has been deassigned. NOTE: Libref TTT has been deassigned. NOTE: Deletion of the session CASAUTO was successful. NOTE: The default CAS session CASAUTO identified by SAS option SESSREF= was terminated. Use the OPTIONS statement to set the SESSREF= option to an active session. NOTE: Request to TERMINATE completed for session CASAUTO. 130 131 libname dbk_tt clear; NOTE: Libref DBK_TT has been deassigned. 132 133 134 135 136 /*Start cas server*/ 137 cas casauto sessopts=(messagelevel="ALL" metrics=TRUE); NOTE: The session CASAUTO connected successfully to Cloud Analytic Services sas-cas-server-default-client using port 5570. The UUID is 531f436a-c5e8-cc4e-9146-917af316f010. The user is myname@mydomain.com and the active caslib is CASUSER(myname@mydomain.com). NOTE: The SAS option SESSREF was updated with the value CASAUTO. NOTE: The SAS macro _SESSREF_ was updated with the value CASAUTO. NOTE: The session is using 0 workers. NOTE: Action 'sessionProp.setSessOpt' used (Total process time): NOTE: real time 0.000184 seconds NOTE: cpu time 0.000155 seconds (84.24%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 281.06K (0.00%) NOTE: The CAS statement request to update one or more session options for session CASAUTO completed. 137! ; 138 139 /*Connect caslib*/ 140 caslib spkcaslib 141 dataSource=( 142 srctype='spark', 143 platform='databricks', 144 authenticationType='OAUTH2', 145 port=443, 146 httpPath="&hpath.", 147 schema="&ctl..&schm.", 148 server="&srv.", 149 BULKLOAD=NO 150 ); NOTE: Executing action 'table.addCaslib'. NOTE: 'SPKCASLIB' is now the active caslib. NOTE: Cloud Analytic Services added the caslib 'SPKCASLIB'. NOTE: Action 'table.addCaslib' used (Total process time): NOTE: real time 0.002478 seconds NOTE: cpu time 0.002449 seconds (98.83%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 952.47K (0.00%) NOTE: Action to ADD caslib SPKCASLIB completed for session CASAUTO. 151 152 /*assign libname in compute (not really needed) */ 153 libname ttt cas caslib=spkcaslib; NOTE: Libref TTT was successfully assigned as follows: Engine: CAS Physical Name: 531f436a-c5e8-cc4e-9146-917af316f010 154 155 156 /*List files in caslib*/ 157 proc casutil sessref=casauto incaslib="spkcaslib" ; NOTE: The UUID '531f436a-c5e8-cc4e-9146-917af316f010' is connected using session CASAUTO. 158 158! list files ; NOTE: Executing action 'table.caslibInfo'. Caslib Information Library SPKCASLIB Source Type spark Schema samples.nyctaxi Session local Yes Active Yes Personal No Hidden No Transient No TableRedistUpPolicy Not Specified BulkLoad false AuthenticationType OAUTH2 HttpPath /sql/1.0/warehouses/############ Server ###############.#.azuredatabricks.net Port 443 Platform databricks NOTE: Action 'table.caslibInfo' used (Total process time): NOTE: real time 0.000537 seconds NOTE: cpu time 0.000494 seconds (91.99%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 715.59K (0.00%) NOTE: Executing action 'table.fileInfo'. CAS File Information Name Catalog Schema Type Description trips SPKCASLIB samples.nyctaxi TABLE NOTE: Action 'table.fileInfo' used (Total process time): NOTE: real time 5.861422 seconds NOTE: cpu time 0.040489 seconds (0.69%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 1.33M (0.00%) NOTE: Cloud Analytic Services processed the combined requests in 5.861959 seconds. 159 run; 160 quit; NOTE: PROCEDURE CASUTIL used (Total process time): real time 5.89 seconds cpu time 0.06 seconds 161 162 163 /*Load table into casuser*/ 164 proc casutil ; NOTE: The UUID '531f436a-c5e8-cc4e-9146-917af316f010' is connected using session CASAUTO. 165 165! load casdata='trips' incaslib=spkcaslib casout='Trips' replace ; NOTE: Executing action 'table.loadTable'. NOTE: Performing serial LoadTable action using SAS Data Connector to Spark. ERROR: Base table or view not found trips ERROR: Function failed. ERROR: The action stopped due to errors. NOTE: Action 'table.loadTable' used (Total process time): NOTE: real time 2.243779 seconds NOTE: cpu time 0.020469 seconds (0.91%) NOTE: total nodes 1 (16 cores) NOTE: total memory 125.79G NOTE: memory 1.05M (0.00%) NOTE: The Cloud Analytic Services server processed the request in 2.243779 seconds. 166 run; 167 quit; NOTE: The SAS System stopped processing this step because of errors. NOTE: PROCEDURE CASUTIL used (Total process time): real time 2.25 seconds cpu time 0.02 seconds 168 169 cas casauto terminate; NOTE: Libref TTT has been deassigned. NOTE: Deletion of the session CASAUTO was successful. NOTE: The default CAS session CASAUTO identified by SAS option SESSREF= was terminated. Use the OPTIONS statement to set the SESSREF= option to an active session. NOTE: Request to TERMINATE completed for session CASAUTO.
Thanks in advance for your help
SAS Technical Support have received the case, and replicated the issue.
The CASLIB needs to be changed from having
schema="&ctl..&schm.",
to have
properties="&ctl." schema="&schm."
SAS Technical Support have received the case, and replicated the issue.
The CASLIB needs to be changed from having
schema="&ctl..&schm.",
to have
properties="&ctl." schema="&schm."
Hi @MarkDawson
Your suggestion works but what you copied in the post above is incomplete. You missed the "Catalog=" piece in the Properties statement, i.e.,
caslib spkcaslib
dataSource=(
srctype='spark',
platform='databricks',
authenticationType='OAUTH2',
port=443,
httpPath="&hpath.",
schema="&schm.", /*Before was: schema="&ctl..&schm." */
server="&srv.",
PROPERTIES="Catalog=&ctl." /* Added this statement*/
);
Could you please edit your solution so I can mark it as accepted?
Thank you
SAS Innovate 2025 is scheduled for May 6-9 in Orlando, FL. Sign up to be first to learn about the agenda and registration!