Hello,
I am trying to load a Databricks table into the memory of a caslib using the proc cas and/or proc casutil and I cannot manage to make it work. I can however load tables using the method of assigning a libname on the compute server pointing to databricks, another pointing to a caslib and using the datastep to load the table.
I tried both, using code and the Data Explorer interface and none seems to be working. Please note that I can connect successfully and list all the files in the schema. It is only when I try to load a table into memory that I ran into issues.
This is the log I get when successfully testing the connection using the Data Explorer (No errors and the schema is not the same as below in the code )
/****************************************************************************
* Connection Label : Test databricks
* Connection Name : ttdbwh
* Connection srcType: spark
* Connection Type : 05f6913e-c968-4e0f-94de-4e774a7501ca
* Connection Target : 519ce398-28fc-4720-b371-e8d2239a88cb
****************************************************************************/
/****************************************************************************
* _sdc_ prefix indicates functions and variables used in connections. *
****************************************************************************/
/* Query CAS to see if cas library exists. Return TRUE if it does, FALSE otherwise. */
function _sdc_doesCaslibExist(_sdc_casLibrary);
table.querycaslib result=r_libExists / caslib=_sdc_casLibrary;
_sdc_exists = dictionary(r_libExists, _sdc_casLibrary);
return _sdc_exists;
end func;
/* Drop the CAS library, if it exists. */
function _sdc_dropCaslib(_sdc_casLibrary);
_sdc_exists = _sdc_doesCaslibExist(_sdc_casLibrary);
if (_sdc_exists = FALSE) then do;
print (warn) 'Cannot drop non-existent caslib.';
end;
else do;
print (note) 'Dropping library: "'|| _sdc_casLibrary || '".';
table.dropCaslib status=sc / caslib=_sdc_casLibrary;
if (sc.statusCode != 0) then do;
print (warn) 'Unable to delete caslib "' || _sdc_casLibrary || '".';
print (warn) 'Reason: "' || sc.reason || '".' ;
end;
end;
end func;
/* Verify that a caslib is valid by trying to fetch the table listing */
function _sdc_verifyCaslib(_sdc_casLibrary);
table.fileInfo result=res status=sc / caslib=_sdc_casLibrary;
if (0 = sc.severity) then do;
print (note) 'Successfully verified caslib "' || _sdc_casLibrary || '".';
end;
else do;
print (warn) 'Unable to verify caslib "' || _sdc_casLibrary || '".';
print (warn) 'Reason: "' || sc.reason || '".';
end;
end func;
/* Create caslib */
function _sdc_createCaslib(_sdc_casLibrary);
table.addCaslib / name=_sdc_casLibrary description='' datasource={srctype='spark',
authenticationType='OAUTH2',
httpPath='/sql/1.0/warehouses/#######',
platform='databricks',
port=443,
schema='dbname.dwh',
server='###############.#.azuredatabricks.net'
} session=TRUE;
end func;
/****************************************************************************
* Test *
****************************************************************************/
/* Check if the passed in caslib name exists already. If so, generate a new name */
_sdc_casLibrary='Test databricks';
_sdc_exists = _sdc_doesCaslibExist(_sdc_casLibrary);
if _sdc_exists != 0 then do;
print (note) 'NOTE: Using random alias.';
_sdc_casLibrary='pU0QqDz5';
/* Warn if exists when creating new. */
print (warn) 'Found existing library with name "' || 'Test databricks' || '".';
print (warn) 'Library may give an error on connect if the scope is the same as existing.';
end;
/* Try to create our test caslib. Note that we define this as SESSION ONLY in the template data. */
_sdc_createCaslib(_sdc_casLibrary);
/* Verify */
_sdc_verifyCaslib(_sdc_casLibrary);
/* Remove test caslib */
_sdc_dropCaslib(_sdc_casLibrary);
NOTE: Active Session now server.
NOTE: 'Test databricks' is now the active caslib.
NOTE: Cloud Analytic Services added the caslib 'Test databricks'.
Name Type Description Path
Test databricks spark
Test databricks
Test databricks
Test databricks
Test databricks
Test databricks
Definition Subdirs Local Active
schema = 'dbname.dwh' 0 1 1
authenticationType = 'OAUTH2' . . .
httpPath = '/sql/1.0/warehouses/#######' . . .
server = '###############.#.azuredatabricks.net' . . .
port = 443 . . .
platform = 'databricks' . . .
Personal Hidden Transient TableRedistUpPolicy
0 0 0 Not Specified
. . .
. . .
. . .
. . .
. . .
NOTE: Successfully verified caslib "Test databricks".
NOTE: Dropping library: "Test databricks".
NOTE: 'CASUSER(myemail@mydomain.com)' is now the active caslib.
NOTE: Cloud Analytic Services removed the caslib 'Test databricks'.
This is the SAS code I used to test the different load methods:
%let ctl=%nrstr(samples);
%let schm=nyctaxi;
/*Connect to Databricks using compute*/
libname dbk_tt spark
platform=databricks
auth=oauth2
server="&srv."
port=443
schema="&schm"
httpPath="&hpath."
properties="Catalog=&ctl.;"
;
libname dbk_tt list;
/*Connect to CAS*/
cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);;
/*Assign a libname pointing to caslib*/
libname cuser cas caslib=casuser sessref=casauto;
/*Save data in caslib*/
data cuser.trips;
set dbk_tt.trips;
where datepart(tpep_dropoff_datetime) = '25feb2016'd;
run;
/*Create a table in caslib using fedsql*/
proc fedsql sessref=casauto _method ;
create table casuser.trp10 {options replace=true} as
select *
from casuser.trips a
where a.trip_distance > 10;
quit;
/*Drop table in caslib*/
proc casutil sessref=casauto incaslib=casuser outcaslib=casuser;
droptable casdata='Trips' quiet;
run;
quit;
cas casauto terminate;
libname dbk_tt clear;
/*Start cas server*/
cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);;
/*Connect caslib*/
caslib spkcaslib
dataSource=(
srctype='spark',
platform='databricks',
authenticationType='OAUTH2',
port=443,
httpPath="&hpath.",
schema="&ctl..&schm.",
server="&srv.",
BULKLOAD=NO
);
/*assign libname in compute (not really needed) */
libname ttt cas caslib=spkcaslib;
/*List files in caslib*/
proc casutil sessref=casauto incaslib="spkcaslib" ;
list files ;
run;
quit;
/*Load table into casuser*/
proc casutil ;
load casdata='trips' incaslib=spkcaslib casout='Trips' replace ;
run;
quit;
cas casauto terminate;
and this is the log:
86 %let ctl=%nrstr(samples);
87 %let schm=nyctaxi;
88
89
90 /*Connect to Databricks using compute*/
91 libname dbk_tt spark
92 platform=databricks
93 auth=oauth2
94 server="&srv."
95 port=443
96 schema="&schm"
97 httpPath="&hpath."
98 properties="Catalog=&ctl.;"
99 ;
NOTE: Libref DBK_TT was successfully assigned as follows:
Engine: SPARK
Physical Name:
jdbc:cdata:databricks:Server=###############.#.azuredatabricks.net;Database=nyctaxi;HTTPPath=/sql/1.0/warehouses/######
######;QueryPassthrough=true;UseCloudFetch=true;InitiateOAuth=OFF;AuthScheme=AzureAD;OAuthAccessToken=******;Catalog=samples
;
100 libname dbk_tt list;
NOTE: Libref= DBK_TT
Scope= Compute Server
Engine= SPARK
Physical Name=
jdbc:cdata:databricks:Server=###############.#.azuredatabricks.net;Database=nyctaxi;HTTPPath=/sql/1.0/warehouses/######
######;QueryPassthrough=true;UseCloudFetch=true;InitiateOAuth=OFF;AuthScheme=AzureAD;OAuthAccessToken=******;Catalog=samples
;
Schema= nyctaxi
Comment= Sample database nyctaxi
Location=
DBMS Major Version= 3
DBMS Minor Version= 1
101
102 /*Connect to CAS*/
103 cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);
NOTE: Executing action 'sessionProp.setSessOpt'.
NOTE: Action 'sessionProp.setSessOpt' used (Total process time):
NOTE: real time 0.000490 seconds
NOTE: cpu time 0.000426 seconds (86.94%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 303.44K (0.00%)
NOTE: The CAS statement request to update one or more session options for session CASAUTO completed.
103! ;
104
105 /*Assign a libname pointing to caslib*/
106 libname cuser cas caslib=casuser sessref=casauto;
NOTE: Libref CUSER was successfully assigned as follows:
Engine: CAS
Physical Name: 61b2075d-b2bb-6143-af5e-c80b28528d92
107
108 /*Save data in caslib*/
109 data cuser.trips;
110 set dbk_tt.trips;
111 where datepart(tpep_dropoff_datetime) = '25feb2016'd;
112 run;
NOTE: Executing action 'table.tableInfo'.
NOTE: Action 'table.tableInfo' used (Total process time):
NOTE: real time 0.000754 seconds
NOTE: cpu time 0.000643 seconds (85.28%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 314.47K (0.00%)
NOTE: There were 373 observations read from the data set DBK_TT.TRIPS.
WHERE DATEPART(tpep_dropoff_datetime)='25FEB2016'D;
NOTE: The data set CUSER.TRIPS has 373 observations and 6 variables.
NOTE: Executing action 'table.addTable'.
NOTE: DATA statement used (Total process time):
real time 2.33 seconds
cpu time 0.11 seconds
113
114
115 /*Create a table in caslib using fedsql*/
116 proc fedsql sessref=casauto _method ;
117 create table casuser.trp10 {options replace=true} as
118 select *
119 from casuser.trips a
120 where a.trip_distance > 10;
NOTE: Executing action 'fedSql.execDirect'.
Methods for full query plan
----------------------------
SeqScan with qual from CASUSER.TRIPS
Methods for stage 1
--------------------
SeqScan from {Push Down}.Child 1
NOTE: CASDAL driver. Creation of a TIMESTAMP column has been requested, but is not supported by the CASDAL driver. A DOUBLE
PRECISION column will be created instead. A DATETIME format will be associated with the column.
NOTE: CASDAL driver. Creation of a TIMESTAMP column has been requested, but is not supported by the CASDAL driver. A DOUBLE
PRECISION column will be created instead. A DATETIME format will be associated with the column.
NOTE: Table TRP10 was created in caslib CASUSER(myname@mydomain.com) with 17 rows returned.
NOTE: Action 'fedSql.execDirect' used (Total process time):
NOTE: real time 0.018885 seconds
NOTE: cpu time 0.030097 seconds (159.37%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 12.60M (0.01%)
121 quit;
NOTE: PROCEDURE FEDSQL used (Total process time):
real time 0.03 seconds
cpu time 0.00 seconds
122
123 /*Drop table in caslib*/
124 proc casutil sessref=casauto incaslib=casuser outcaslib=casuser;
NOTE: The UUID '61b2075d-b2bb-6143-af5e-c80b28528d92' is connected using session CASAUTO.
125
125! droptable casdata='Trips' quiet;
NOTE: Executing action 'table.dropTable'.
NOTE: Action 'table.dropTable' used (Total process time):
NOTE: real time 0.000459 seconds
NOTE: cpu time 0.000425 seconds (92.59%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 311.00K (0.00%)
NOTE: The Cloud Analytic Services server processed the request in 0.000459 seconds.
126 run;
127 quit;
NOTE: PROCEDURE CASUTIL used (Total process time):
real time 0.00 seconds
cpu time 0.01 seconds
128
129 cas casauto terminate;
NOTE: Libref CUSER has been deassigned.
NOTE: Libref TTT has been deassigned.
NOTE: Deletion of the session CASAUTO was successful.
NOTE: The default CAS session CASAUTO identified by SAS option SESSREF= was terminated. Use the OPTIONS statement to set the
SESSREF= option to an active session.
NOTE: Request to TERMINATE completed for session CASAUTO.
130
131 libname dbk_tt clear;
NOTE: Libref DBK_TT has been deassigned.
132
133
134
135
136 /*Start cas server*/
137 cas casauto sessopts=(messagelevel="ALL" metrics=TRUE);
NOTE: The session CASAUTO connected successfully to Cloud Analytic Services sas-cas-server-default-client using port 5570. The UUID
is 531f436a-c5e8-cc4e-9146-917af316f010. The user is myname@mydomain.com and the active caslib is
CASUSER(myname@mydomain.com).
NOTE: The SAS option SESSREF was updated with the value CASAUTO.
NOTE: The SAS macro _SESSREF_ was updated with the value CASAUTO.
NOTE: The session is using 0 workers.
NOTE: Action 'sessionProp.setSessOpt' used (Total process time):
NOTE: real time 0.000184 seconds
NOTE: cpu time 0.000155 seconds (84.24%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 281.06K (0.00%)
NOTE: The CAS statement request to update one or more session options for session CASAUTO completed.
137! ;
138
139 /*Connect caslib*/
140 caslib spkcaslib
141 dataSource=(
142 srctype='spark',
143 platform='databricks',
144 authenticationType='OAUTH2',
145 port=443,
146 httpPath="&hpath.",
147 schema="&ctl..&schm.",
148 server="&srv.",
149 BULKLOAD=NO
150 );
NOTE: Executing action 'table.addCaslib'.
NOTE: 'SPKCASLIB' is now the active caslib.
NOTE: Cloud Analytic Services added the caslib 'SPKCASLIB'.
NOTE: Action 'table.addCaslib' used (Total process time):
NOTE: real time 0.002478 seconds
NOTE: cpu time 0.002449 seconds (98.83%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 952.47K (0.00%)
NOTE: Action to ADD caslib SPKCASLIB completed for session CASAUTO.
151
152 /*assign libname in compute (not really needed) */
153 libname ttt cas caslib=spkcaslib;
NOTE: Libref TTT was successfully assigned as follows:
Engine: CAS
Physical Name: 531f436a-c5e8-cc4e-9146-917af316f010
154
155
156 /*List files in caslib*/
157 proc casutil sessref=casauto incaslib="spkcaslib" ;
NOTE: The UUID '531f436a-c5e8-cc4e-9146-917af316f010' is connected using session CASAUTO.
158
158! list files ;
NOTE: Executing action 'table.caslibInfo'.
Caslib Information
Library SPKCASLIB
Source Type spark
Schema samples.nyctaxi
Session local Yes
Active Yes
Personal No
Hidden No
Transient No
TableRedistUpPolicy Not Specified
BulkLoad false
AuthenticationType OAUTH2
HttpPath /sql/1.0/warehouses/############
Server ###############.#.azuredatabricks.net
Port 443
Platform databricks
NOTE: Action 'table.caslibInfo' used (Total process time):
NOTE: real time 0.000537 seconds
NOTE: cpu time 0.000494 seconds (91.99%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 715.59K (0.00%)
NOTE: Executing action 'table.fileInfo'.
CAS File Information
Name Catalog Schema Type Description
trips SPKCASLIB samples.nyctaxi TABLE
NOTE: Action 'table.fileInfo' used (Total process time):
NOTE: real time 5.861422 seconds
NOTE: cpu time 0.040489 seconds (0.69%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 1.33M (0.00%)
NOTE: Cloud Analytic Services processed the combined requests in 5.861959 seconds.
159 run;
160 quit;
NOTE: PROCEDURE CASUTIL used (Total process time):
real time 5.89 seconds
cpu time 0.06 seconds
161
162
163 /*Load table into casuser*/
164 proc casutil ;
NOTE: The UUID '531f436a-c5e8-cc4e-9146-917af316f010' is connected using session CASAUTO.
165
165! load casdata='trips' incaslib=spkcaslib casout='Trips' replace ;
NOTE: Executing action 'table.loadTable'.
NOTE: Performing serial LoadTable action using SAS Data Connector to Spark.
ERROR: Base table or view not found trips
ERROR: Function failed.
ERROR: The action stopped due to errors.
NOTE: Action 'table.loadTable' used (Total process time):
NOTE: real time 2.243779 seconds
NOTE: cpu time 0.020469 seconds (0.91%)
NOTE: total nodes 1 (16 cores)
NOTE: total memory 125.79G
NOTE: memory 1.05M (0.00%)
NOTE: The Cloud Analytic Services server processed the request in 2.243779 seconds.
166 run;
167 quit;
NOTE: The SAS System stopped processing this step because of errors.
NOTE: PROCEDURE CASUTIL used (Total process time):
real time 2.25 seconds
cpu time 0.02 seconds
168
169 cas casauto terminate;
NOTE: Libref TTT has been deassigned.
NOTE: Deletion of the session CASAUTO was successful.
NOTE: The default CAS session CASAUTO identified by SAS option SESSREF= was terminated. Use the OPTIONS statement to set the
SESSREF= option to an active session.
NOTE: Request to TERMINATE completed for session CASAUTO.
Thanks in advance for your help
... View more