You must have heard about the Databricks Delta table while working with Azure Databricks. The question often asked, how to load CAS from Azure Databricks Delta table using delta snapshot? This post is about loading CAS from an Azure Databricks Delta table using delta snapshot.
Databricks Delta table is a table with data change history. It keeps the snapshot/history of the data when the data change operation is executed on the table. The table history enables users to query an older snapshot of the data using history/version (time travel) information.
On the Azure Databricks Workspace Notebook editor, you can use python code to read data into the Spark cluster from ADLS2 storage. Before the Spark data table can be shared with other applications, you need to save it as a permanent table into the Databricks environment. While saving the permanent table you have options to specify the table format e.g. parquet, orc, delta, etc. . when a table is saved with “Delta” format, it becomes a Delta table and starts maintaining the history of the data changes in the table.
A DDL statement can also be used to create a Delta table, details can be found in Databricks documentation.
The following example describes the creation of the delta table. It reads a set of parquet data files from ADLS2 storage into the Spark cluster. It saves the Spark Data Frame as a Delta table to share with other applications. It adds more data into the Delta table to create different versions of data snapshots and query the data from the delta table using the snapshot version. The following code describes the configuration of the Spark cluster for accessing ADLS2 storage to read and write data files.
Code:
storage_account_name = "utkumaviya4adls2"
storage_account_access_key = "/+ologr5F9tXXXXXXXXXXXXXXX"
storage_account_fs="fsdata"
spark.conf.set("fs.azure.account.key."+storage_account_name+".dfs.core.windows.net", storage_account_access_key)
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://"+storage_account_fs+"@"+storage_account_name+".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
The following code describes the reading a set of parquet data files from ADLS2 to Spark data frame and displays the data.
Code:
df3 = spark.read.parquet("abfss://"+storage_account_fs+"@"+storage_account_name+".dfs.core.windows.net/baseball")
display(df3)
Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.
The following code describes the data saving from the Spark data frame into a permanent table with Delta format.
Code:
write_format = 'delta'
permanent_table_name = 'baseball_delta'
df3.write.format(write_format).saveAsTable(permanent_table_name)
The following screen describes the table details with history. A history tab containing the data change history. Initially, there is one entry, but as data changes, it will have more versions.
Now, the next step is to apply a change to the table's data so that a new version/snapshot gets created. The version number will be used to query data from a particular snapshot. Before applying the changes to data, let's query the table for the total number of rows.
Code:
%sql select count(*) from baseball_delta ;
Now add more rows into the existing delta table.
Code:
%sql insert into baseball_delta select * from baseball_prqt;
Again, execute the row count on the delta table, has additional rows.
Now, let’s look at the table's detailed history. Notice the history with more versions.
Now you have a Databricks Delta table with 2 versions of snapshot data. You can query the table using either version 1 or 0. Version 0 will show fewer rows than version 1 as additional data is added in version 1.
Code:
%sql SELECT count(*) FROM baseball_delta VERSION AS OF 0
Code:
%sql SELECT count(*) FROM baseball_delta VERSION AS OF 1
CAS can access Azure Databricks Spark Cluster using Spark Data connector. With Azure Databricks Workspace Token, SPARK-3 Cluster, and Databricks JDBC driver in place, you can use the serial method to load CAS from the Azure Databricks Delta table. The Azure Databricks Workspace token (key) is used as the password to authenticate to the Databricks environment.
To load CAS from the delta snapshot data, you need to use PROC FEDSQL with explicit SQL statement with data snapshot version. The following example code describes the CAS load from the delta table.
Note: You can use standard PROC CASUTIL or PROC CAS statement to load delta table, but it does not support including the snapshot version number. It always loads from the latest version of the data.
Code:
/* Note : variable value in quotes generate errors, So keep it without quotes. */
%let MYDBRICKS=adb-7060859955656306.6.azuredatabricks.net;
%let MYPWD=dapiaa66843abaXXXXXXXXXXXXXXXXXXXX;
%let MYHTTPPATH=sql/protocolv1/o/7060859955656306/0210-155120-shop163;
%let MYUID=token;
CAS mySession SESSOPTS=( CASLIB=casuser TIMEOUT=99 LOCALE="en_US" metrics=true);
caslib spkcaslib dataSource=(srctype='spark',
url="jdbc:spark://&MYDBRICKS:443/default;transportMode=http;ssl=1;httpPath=&MYHTTPPATH;AuthMech=3;UID=&MYUID;PWD=&MYPWD"
driverclass="com.simba.spark.jdbc.Driver",
classpath="/mnt/myazurevol/config/access-clients/JDBC",
BULKLOAD=NO,
schema="default" );
proc casutil outcaslib="spkcaslib" incaslib="spkcaslib" ;
list files ;
quit;
/* Load CAS from DataBricks Delta table with no version */
proc casutil outcaslib="spkcaslib" incaslib="spkcaslib" ;
load casdata="baseball_prqt" casout="baseball_prqt" replace;
load casdata="baseball_delta" casout="baseball_delta" replace;
quit;
/* Load CAS from DataBricks Delta table with snapshot version 0 and 1 */
proc fedsql sessref=mySession _method ;
create table spkcaslib.baseball_delta_0{options replace=true} as
select * from connection to spkcaslib (select * from default.baseball_delta VERSION AS OF 0 );
create table spkcaslib.baseball_delta_1{options replace=true} as
select * from connection to spkcaslib (select * from default.baseball_delta VERSION AS OF 1 );
quit ;
proc casutil outcaslib="spkcaslib" incaslib="spkcaslib" ;
list tables;
quit;
CAS mySession TERMINATE;
Log extract :
....
..............
104 proc casutil outcaslib="spkcaslib" incaslib="spkcaslib" ;
NOTE: The UUID '78ed5903-c6a3-5947-854e-9743b251667a' is connected using session MYSESSION.
105
105! load casdata="baseball_prqt" casout="baseball_prqt" replace;
NOTE: Executing action 'table.loadTable'.
NOTE: Performing serial LoadTable action using SAS Data Connector to Spark.
NOTE: Cloud Analytic Services made the external data from baseball_prqt available as table BASEBALL_PRQT in caslib spkcaslib.
NOTE: Action 'table.loadTable' used (Total process time):
NOTE: The Cloud Analytic Services server processed the request in 1.367409 seconds.
106
106! load casdata="baseball_delta" casout="baseball_delta" replace;
NOTE: Executing action 'table.loadTable'.
NOTE: Performing serial LoadTable action using SAS Data Connector to Spark.
NOTE: Cloud Analytic Services made the external data from baseball_delta available as table BASEBALL_DELTA in caslib spkcaslib.
NOTE: Action 'table.loadTable' used (Total process time):
NOTE: The Cloud Analytic Services server processed the request in 1.408629 seconds.
107 quit;
NOTE: PROCEDURE CASUTIL used (Total process time):
real time 2.79 seconds
cpu time 0.03 seconds
108
109
110 /* Load CAS from DataBricks Delta table with snapshot version 0 and 1 */
111 proc fedsql sessref=mySession _method ;
112 create table spkcaslib.baseball_delta_0{options replace=true} as
113 select * from connection to spkcaslib (select * from default.baseball_delta VERSION AS OF 0 );
NOTE: Executing action 'fedSql.execDirect'.
Methods for full query plan
----------------------------
SeqScan from SPKCASLIB.__fedsql_cep_1__
Methods for stage 1
--------------------
SeqScan from {Push Down}.Child 1
NOTE: Table BASEBALL_DELTA_0 was created in caslib SPKCASLIB with 1610 rows returned.
NOTE: Action 'fedSql.execDirect' used (Total process time):
114 create table spkcaslib.baseball_delta_1{options replace=true} as
115 select * from connection to spkcaslib (select * from default.baseball_delta VERSION AS OF 1 );
NOTE: Executing action 'fedSql.execDirect'.
Methods for full query plan
----------------------------
SeqScan from SPKCASLIB.__fedsql_cep_1__
Methods for stage 1
--------------------
SeqScan from {Push Down}.Child 1
NOTE: Table BASEBALL_DELTA_1 was created in caslib SPKCASLIB with 3220 rows returned.
NOTE: Action 'fedSql.execDirect' used (Total process time):
NOTE: real time 1.535750 seconds
....
..............
Result Output:
CAS table loaded from Azure Databricks Delta table.
Important Links:
CAS Accessing Azure Databricks Spark Cluster
Find more articles from SAS Global Enablement and Learning here.
SAS Innovate 2025 is scheduled for May 6-9 in Orlando, FL. Sign up to be first to learn about the agenda and registration!
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.