If you have SAS Viya with SingleStore deployment, you notice that the CAS load from a SingleStore table (with auto-increment field) is blazing fast. The CAS and SingleStore(S2) database services are tightly integrated. When CAS loads from a SingleStore table, it does not bring all the data to make a copy into CAS, instead on-the-fly streams the data required by CAS actions. The integration (CAS and S2) features enabled users to instantly view new datasets at CAS when ingested into a S2 table.
SingleStore(S2) Pipeline plays a vital role in data ingestion into S2 tables. S2 Pipeline is a feature that continuously loads data into the S2 table from an external data source as it arrives at the source environment. It enabled users to instantly ingest the data into the S2 table with new datasets and view/experience the additional datasets at the S2 table, CAS, and VA reports.
This post highlights the SingleStore Pipeline from ADLS2 cloud storage to CAS. The post is an extension to “SAS Viya with SingleStore: Near Real-Time Dashboarding Using SAS Visual Analytics”, posted by @NicolasRobert
SingleStore(S2) Pipeline is a built-in component of the S2 cloud database and can be configured against various data sources like Azure Blob Storage, S3 Storage, GCS Storage, HDFS storage, Kafka, and an external linked database. The S2 Pipeline can perform ETL tasks (Extract Transform and Load) without the need for any additional tools.
The S2 Pipeline supports JSON, Avro, Parquet, and CSV file format of data to ingest into the S2 table.
Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.
With the S2 Database instance up and running and data files located at ADLS2 storage with known schema, you can use the following steps to create the S2 Pipeline. The S2 Pipeline requires the ADLS2 information including Storage Key to access the data files.
The following screen describes the data file located at the ADLS2 location. The listed data files in the folder have the same file structure/schema.
With known datafile schema need to create an S2 database table.
/* Set the working database */
use geldm ;
/* Drop objects if exist */
DROP TABLE IF EXISTS prdsale_from_pl ;
/* Create the prdsale table */
CREATE TABLE IF NOT EXISTS prdsale_from_pl (
actual double,
predict double,
country varchar(20),
region varchar(20),
division varchar(20),
prodtype varchar(20),
product varchar(20),
quarter double,
year double,
month date,
/* auto_increment column that will be useful for CAS later */
rowId bigint(20) NOT NULL AUTO_INCREMENT ,
KEY rowId (rowId) USING CLUSTERED COLUMNSTORE,
SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL AUTOSTATS_HISTOGRAM_MODE=CREATE AUTOSTATS_SAMPLING=ON SQL_MODE='STRICT_ALL_TABLES' ;
/* Check the created table */
DESCRIBE prdsale_from_pl ;
use geldm ;
/* Drop objects if exist */
DROP PIPELINE IF EXISTS prdsale_pl ;
/* Define the pipeline */
/* AGGREGATOR pipeline to support AUTO_INCREMENT */
/* Specific notation to parse the date from the CSV file */
CREATE AGGREGATOR PIPELINE IF NOT EXISTS prdsale_pl
AS LOAD DATA AZURE 'blobdata/data/prdsale/'
CREDENTIALS '{"account_name": "", "account_key": ""}'
INTO TABLE prdsale_from_pl
FIELDS TERMINATED BY ','
(actual,predict,country,region,division,prodtype,product,quarter,year,@month)
SET month = STR_TO_DATE(@month, '%d%b%Y') ;
use geldm ;
/* Show status of pipelines */
SHOW PIPELINES ;
/* Show status of detected files by the pipeline */
SELECT * FROM information_schema.PIPELINES_FILES ;
use geldm ;
/* Start the pipeline */
START PIPELINE IF NOT RUNNING prdsale_pl ;
/* Show status of pipelines */
SHOW PIPELINES ;
/* Show status of detected files by the pipeline */
SELECT * FROM information_schema.PIPELINES_FILES ;
use geldm;
/* Print a sample */
select * from prdsale_from_pl ;
/* Simple aggregation */
select country, sum(actual) as actual, sum(predict) as predict from prdsale_from_pl group by country ;
Before you can run the following statement, you need to load the S2 table to CAS using “singlestore” type CASLIB as global CAS table.
/* simple aggregation at CAS table */
proc fedSQL sessref=mysession ;
select country, sum(actual) as actual, sum(predict) as predict
from s2.prdsale_from_s2
group by country ;
quit ;
The following screenshot describes the additional data files added to the ADLS2 location. As the S2 pipeline is active and running, it will load the data to the S2 table and eventually be available in the CAS table.
use geldm ;
/* Show status of detected files by the pipeline */
SELECT * FROM information_schema.PIPELINES_FILES ;
/* Simple aggregation */
select country, sum(actual) as actual, sum(predict) as predict from prdsale_from_pl group by country ;
Since two new data files are part of the S2 pipeline and loaded to the S2 table, the aggregate SQL returns two more groups.
The global CAS table is loaded from the S2 table when new data is ingested into the S2 table, immediately available to the CAS table as well. The aggregation query on the CAS table returns two more groups.
/* simple aggregation at CAS table */
proc fedSQL sessref=mysession ;
select country, sum(actual) as actual, sum(predict) as predict
from s2.prdsale_from_s2
group by country ;
quit ;
To apply changes to S2 Pipeline use ALTER PIPELINE statement. For example, if the Azure Storage Account Key changes, you can use the following statement to update the Pipeline with the latest Key. After updating the Pipeline, you may have to restart.
ALTER PIPELINE prdsale_pl SET
CREDENTIALS '{"account_name": "XXXXXXXX", "account_key": "F8tLDXMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}';
START PIPELINE IF NOT RUNNING prdsale_pl ;
While creating S2 Pipelines you may use the SKIP, IGNORE, or REPLACE clause. You can use one or all these options in the same create a PIPELINE statement. More information is available at Additional CREATE S2 PIPELINE Examples. Example:
CREATE PIPELINE AS LOAD DATA SKIP CONSTRAINT ERRORS INTO TABLE ;
CREATE PIPELINE AS LOAD DATA IGNORE PARSER ERRORS REPLACE INTO TABLE ;
Many Thanks to @NicolasRobert for his help and contribution to this post .
Important Links:
SAS Viya with SingleStore: Near Real-Time Dashboarding Using SAS Visual Analytics
References:
Additional CREATE S2 PIPELINE Examples
Find more articles from SAS Global Enablement and Learning here.
Registration is open! SAS is returning to Vegas for an AI and analytics experience like no other! Whether you're an executive, manager, end user or SAS partner, SAS Innovate is designed for everyone on your team. Register for just $495 by 12/31/2023.
If you are interested in speaking, there is still time to submit a session idea. More details are posted on the website.
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.