BookmarkSubscribeRSS Feed

Scheduling SAS Jobs and SAS Studio Flows with Apache Airflow

Started ‎06-15-2023 by
Modified ‎06-15-2023 by
Views 3,000

Apache Airflow is an excellent tool for scheduling and monitoring custom workflows. These workflows, or pipelines, are written entirely in Python and can be monitored in an incredibly user-friendly local website. Airflow has existed as an open-source tool for a few years now. Recently, a new package named SAS Airflow Provider was released, which provides methods to integrate SAS Studio flows and SAS Jobs in Airflow pipelines. By combining SAS's powerful analytic engine with Airflow's simple yet robust scheduling tools, SAS Airflow Provider has opened up many opportunities for SAS users to automate and schedule important tasks.

 

Introducing: Airflow

 

Installation

 

The Airflow developers have provided many different ways to install their program depending on the type of system that suits your needs the best. Airflow can be installed via pip (PyPl), Helm for Docker, a Docker image, and more. To install Airflow, follow the extensive instructions they have available on their website here.

 

DAGs and Tasks

 

Airflow structures all its workflows as DAGs (directed acyclic graphs), which are composed of nodes called tasks. Each task performs a small action, such as run a python script, run a bash command, or send an email. Dependencies are formed between the tasks in a DAG, meaning that an Airflow user can ensure that tasks occur in a particular order. By scheduling a DAG, Airflow ensures that the workflow will happen on a set interval.

 

Before exploring how to work with DAGs, it's important to know exactly what they are and how they work. A directed graph means that every connection between one task and another has directionality (a parent task must occur before its child task). An acyclic graph means there cannot be a cycle, meaning that a child node cannot point to an ancestor such that it is possible to return back to itself during execution. When you combine these two concepts, you have a directed acyclic graph, which can be utilized to ensure a consistent ordering of tasks each time it is run. When viewing the Airflow main page, all pipelines are listed on the DAGs page.

 

me_1_DAGPage.png

Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.

 

Clicking on a DAG on the DAGs page will show you details about its previous and current runs, its schedule, and the tasks it is composed of. From the Grid view, the details for the DAG's previous runs are viewable, such as the total number of runs, total successes and failures, and the length of runs. Additionally, the left side of the screen shows a number of very recent runs, with squares representing each of the tasks inside the DAG. The color of each square represents whether the task was successful or failed. In the top right, the schedule for the DAG is shown. In the example below, the schedule is set to @daily, meaning that the DAG will run every night at midnight UTC (8:00pm EST).

 

me_2_DAGGrid.png

 

Clicking on the Graph view for a DAG, you can see the dependencies for each task expressed as a drawn-out graph. Hovering over a task shows important details about the task, such as its id and Operator type.

 

From any view for a DAG, clicking the play button in the top right will trigger the DAG to run, regardless of when it is scheduled. When running in the Graph view, the outline for each task will update as it changes states from scheduled to queued to running to either success or failed.

 

me_3_runDAG.gif

 

Just because a DAG has a schedule does not mean that it will run at its scheduled time. By default, some DAGs will be paused. Clicking the switch in the top left next to the name of the DAG will pause and unpause the DAG. To ensure that a scheduled DAG runs at its expected time, be sure to unpause it.

 

me_4_DAGPauseUnpause.png

 

 

Creating DAGs

 

DAG object

 

As discussed earlier, the basis for all Airflow workflows are DAGs. In Python, all Airflow pipelines must contain a DAG object. To create a DAG object, use the following code:

 

from airflow import DAG
dag = DAG('sas_example_dag',
description='Short DAG description',
start_date=datetime(2023, 5, 24),
catchup=False,
schedule_interval='@daily'
)

 

The DAG object has many parameters. The first parameter is the dag_id, which is the id and name for the DAG that is used throughout the Airflow UI. The description appears next to the dag_id on the DAG's page. The start_date is a Python datetime object that describes the first day the DAG should run at its scheduled time. catchup is a boolean value that determines whether or not the DAG should run to account for days missed if the start_date value is prior to the DAG’s creation. For example, if the DAG is scheduled to run every day, the start date was 3 days before the DAG was created, and catchup was set to True, the DAG would run 3 times to catch up.

 

The schedule_interval describes the scheduled time the DAG should run. This can be expressed in many different ways, including cron strings, some shorthand strings, Python timedelta objects, Airflow Timetable objects, and more. In this blog, I will focus on the cron strings and the shorthand strings. A cron string or cron expression is a standardized method used to describe a schedule. 5 characters are used to describe the following fields: minute, hour, day, month, and day of the week. Each field is expressed as a value inside the expected range for the corresponding period (0-59 for minute, 0-23 for hour, 1-31 for day, 1-12 for month, and 0-6 for day of the week). For the day of the week, 0 represents Sunday and 6 represents Saturday. Additionally, a star (*) can be used to mean "any value."

 

An example of a cron string to represent "every Friday at 6:00AM UDT" would be "0 6 * * 5". Cron strings can be very simple, like "0 0 * * *" for "every night at midnight UDT" or very complicated, implementing concepts like steps and ranges ("every other hour on the hour starting at midnight in January and March" is "0 0/2 * 1,3 *"). To further explore cron strings, as well as some explanations on the more unique syntax, explore crontab.guru.

 

To avoid having to write out some common cron strings, there are some shorthand strings that can be used instead. Earlier, I used the shorthand "@daily" instead of writing "0 0 * * *" for "every day at midnight". Additionally, there is @yearly (once a year on Jan 1 at midnight), @monthly (once a month on the first day at midnight), @weekly (once a week at midnight on Sunday morning), and @hourly (once an hour on the hour).

 

Operator object

 

In order to create a task for a DAG, you have to use an Operator object. There are many native Operators available for Airflow, such as BashOperator to run a bash command, a Python operator to run a python function, and even an EmailOperator to send an email. Each Operator type requires a different set of parameters, but all share two common required parameters: task_id, which is a unique id for the task, and dag, the DAG that the task belongs to. This is what appears as the task name in the Graph view of the DAG screen.

 

To create a BashOperator object, use the following code:

 

from airflow.operators.bash import BashOperator
bashTask = BashOperator(task_id='bash',
bash_command='echo "Testing bash"',
dag=dag
)

 

The BashOperator has the bash_command parameter, which contains the bash code that is submitted and run with the task.

 

To create a Python operator, slightly different syntax is used:

 

from airflow.decorators import task
@task(task_id="python", dag=dag)
def python_task(**kwargs):
    print("Today is: " + str(datetime.today()))

pythonTask = python_task()

 

The @task decorator above the function definition is used to convert the function into an Airflow operator. As expected of all Operators, the task_id and dag are described inside the decorator. Then, the task is called into a variable, which readies it to be referenced like a normal task.

 

Creating Relationships

 

Now that we have a DAG and two Operators, we want to define how the two tasks will run relative to one another. We would like the BashOperator task to run first, so we use the following syntax underneath the DAG and task definitions:

 

bashTask.set_downstream(pythonTask)

 

This instructs Airflow to place pythonTask downstream of bashTask, meaning that pythonTask will only start after bashTask has successfully run. Our full code is shown below:

 

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.decorators import task

# Create a DAG
dag = DAG('sas_example_dag',
description='Short DAG description',
schedule_interval='@daily',
start_date=datetime(2023, 5, 24),
catchup=False)

# Create a Bash task
bashTask = BashOperator(task_id='bash',
bash_command='echo "Testing bash"',
dag=dag)

# Create a Python task
@task(task_id="python", dag=dag)
def python_task(**kwargs):
    print("Today is: " + str(datetime.today()))

pythonTask = python_task()

# Set up the execution order for our DAG with our tasks
bashTask.set_downstream(pythonTask)

 

By saving this full code as a python file inside the Airflow DAGs folder, it will become visible and runnable in the Airflow UI.

 

SAS and Airflow

 

While Airflow comes built-in with several native Operators, its real power comes from the various Operators that can be installed, imported, and used to create tasks. SAS offers the sas-airflow-provider package to bring two new Operators to Airflow users: SASStudioFlowOperator and SASJobExecutionOperator. These enable users to schedule SAS Studio flows and SAS Jobs using Airflow.

 

SAS Airflow Provider

 

Before you are able to use sas-airflow-provider, you need to install the package via pip. If you installed Airflow via pip (PyPI), return to the same python environment where you installed Airflow and run the following command:

 

pip install sas-airflow-provider

 

If you installed Airflow with Docker, update your dockerfile to include the pip install command above in your base image.

 

Once you have installed sas-airflow-provider, you must then create a SAS connection via the Airflow UI. To do so, navigate to your Airflow homepage, and click Admin -> Connections.

 

me_5_connectionsPage.png

 

From the Connections page, click the blue plus button to create a new connection. On the add connection page, enter sas_default for the connection_id and select SAS from the Connection Type dropdown. For host, enter the base url for your Viya environment (everything before "SASStudio" in the url for SAS Studio), and enter your username and password in their respective fields.

 

me_6_createConnection.png

 

The remaining fields can be left blank. Click Save in the bottom left to establish a SAS connection for Airflow.

 

SASStudioFlowOperator

 

The SASStudioFlowOperator is designed to run an existing SAS Studio flow using Airflow. The code to create one is seen here:

 

from sas_airflow_provider.operators.sas_studioflow import SASStudioFlowOperator
flow_task = SASStudioFlowOperator(task_id='studio_flow',
flow_path='/Public/Airflow/demo_studio_flow_1.flw',
flow_path_type='content',
flow_exec_log=True,
compute_context="SAS Studio compute context",
connection_name='sas_default',
env_vars={'var1':'val1'},
dag=dag)

 

There are many parameters for this Operator, but many are self-explanatory. As expected, there is a task_id and dag parameter. The flow_path is the path of the SAS Studio flow to execute. The value of flow_path_type can either be content or compute, which indicates where the flow exists in your file structure. When viewing your flow in the Explorer pane in SAS studio, if the flow exists in the SAS Content folder, the flow_path_type is content. If your flow is in the file system mapped for Compute server access (often named SAS Server or Files), the flow_path_type is compute. The flow_exec_log parameter determines whether the flow log will be output in the Airflow log. The compute_context parameter is the name of the compute context the flow should use. The connection_name is the name that was chosen when setting up your SAS connection via Airflow (following the directions earlier, this should be sas_default). The env_vars variable is a Python dictionary of variable/value pairs that are set in the environment the flow is run.

 

SASJobExecutionOperator

 

The SASJobExecutionOperator is designed to run a SAS Viya platform job using Airflow. The code to create one is seen here:

 

from sas_airflow_provider.operators.sas_jobexecution import SASJobExecutionOperator
hello_task = SASJobExecutionOperator(task_id='hello_task',
job_name='/Public/Airflow/Hello-World',
parameters={'param1':'val1'},
dag=dag)

 

The Operator has very few parameters. Besides the expected task_id and dag, the Operator requires the job_name, the path of the job to execute, and the parameters, a Python dictionary of the macro variables/values that should be passed to the SAS Job.

 

Example: ValidateViya and Airflow

 

ValidateViya

 

To demonstrate a possible use case for the SASStudioFlowOperator, I decided to create a workflow that utilizes ValidateViya. This command line tool is one of the many pyviyatools and is designed to quickly test that a Viya environment is operating as expected. To explore ValidateViya, check out my blog Validating Viya Environments Has Never Been this Easy. This workflow uses ValidateViya in two main ways: creating an HTML report with a full test suite and recording the results of these tests in a SAS table.

 

XComs for Communication Between Tasks

 

XComs provide an important mechanism for workflows, allowing for data to move between tasks. Many tasks automatically push some amount of data to an XCom after they complete a run. Using another mechanism called Jinja templating, the data held in an XCom from an upstream task can be pulled into a downstream task’s parameters. In my example, XComs and Jinja templating are both used to move the ValidateViya data generated in one task to the task responsible for appending that data to a SAS table via a SAS Studio flow.

 

Code: Airflow Pipeline

 

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from sas_airflow_provider.operators.sas_studioflow import SASStudioFlowOperator

# DAG
# The dag is scheduled to run each day at 6am UTC (2am EST).
dag = DAG('validate_viya', description='Validates Viya environment on a daily basis',
schedule_interval='0 6 * * *',
start_date=datetime(2023, 5, 15), catchup=False)

# Some important variables for all of the BashOperators
endpoint = "https://exampleenv.example.race.sas.com/"
user = "example_user"
password = "hunter2"

# Each of the BashOperators must first set some important environment variables before they can run.
namespace = "exampleenv"
envExport = "export GELLOW_NAMESPACE=" + namespace + ";
export SAS_CLI_PROFILE=${GELLOW_NAMESPACE};
export SSL_CERT_FILE=~/.certs/${GELLOW_NAMESPACE}_trustedcerts.pem;
export REQUESTS_CA_BUNDLE=${SSL_CERT_FILE};" # Setup pyviyatools test_preferences = '{   "tests": [     {       "id": "0",      
"name": "Logged in User",       "active":
"False",       "req": [         "/identities/users/@currentUser"       ],      
"cols": [         "name",         "id"       ],       "type":
"Data Collection"     },     {       "id": "1",      
"name": "List Users",       "active": "False",      
"req": [         "/identities/users?limit=10000"       ],      
"cols": [         "name",         "id"       ],       "type":
"Data Collection"     },     {       "id": "2",      
"name": "List Base Folders",       "active": "False",      
"req": [         "/folders/rootFolders?limit=10000"       ],      
"cols": [         "name",         "description"       ],      
"type": "Data Collection"     },     {       "id": "3",      
"name": "List CAS Servers",       "active": "False",      
"req": [         "/casManagement/servers?limit=10000"       ],      
"cols": [         "name",         "host",         "port",        
"description"       ],       "type":
"Data Collection"     },     {       "id": "4",      
"name": "List CAS Server Metrics",       "active": "True",      
"req": [         "/casManagement/servers/",        
"/metrics"       ],       "reqVariable": "servers",      
"servers": [         [          
"cas-shared-default"         ]       ],      
"cols": [         "serverName",         "systemNodes",         "systemCores",         "cpuSystemTime",        
"memory"       ],       "type": "Data Collection"     },     {       "id": "5",      
"name": "List CAS Server Caslibs",       "active": "False",      
"req": [         "/casManagement/servers/",         "/caslibs?limit=10000"       ],      
"reqVariable": "servers",       "servers": [         [          
"cas-shared-default"         ]       ],      
"cols": [         "name",         "scope",         "description"       ],      
"type": "Data Collection"     },     {       "id": "6",       "name": "List CASLib Tables",      
"active": "False",       "req": [         "/casManagement/servers/",        
"/caslibs/",         "/tables?limit=10000"       ],       "reqVariable": "caslibs",      
"caslibs": [         [           "cas-shared-default",           "systemData"         ]       ],      
"cols": [         "serverName",         "caslibName",         "name"       ],      
"type": "Data Collection"     },     {       "id": "7",       "name": "Run Test SAS Code",      
"active": "True",       "cols": [         "runSuccessful",         "jobState"       ],      
"type": "Computation"     }   ],   "count": 8 }'



setup_code = envExport + " cd ~/pyviyatools; python3 setup.py; /opt/sas/viya/home/bin/sas-viya profile init --colors-enabled=true --output=json --sas-endpoint=\"" + endpoint + "\"; echo '" + test_preferences + "' > csvTests.json" # This is a BashOperator, which executes commands in a Bash shell. # The following commands are used to set up the environment in order to use validateviya. When the BashOperator is done running, the environment that the code is running is reset. # This means that each time we run the BashOperator, we must start by exporting the important environment variables neccessary to run validateviya and it's related programs setup = BashOperator( task_id="setup_pyviyatools", bash_command=setup_code, dag=dag) # Validate viya full # This BashOperator runs validateviya to create a html report of the results. This is saved as a .html file to the ~/pyviyatools folder. vv_full_code = envExport + " cd ~/pyviyatools; /opt/sas/viya/home/bin/sas-viya auth login -u=" + user + " -p=" + password + "; python3 validateviya.py -o report-full;" validate_viya_full = BashOperator( task_id="generate_report", bash_command=vv_full_code, dag=dag) #Validate viya csv # This BashOperator runs validateviya to create a csv string of the results. Each row of data is typically delimited by \n, but I use tr to replace all \n characters with spaces, # allowing for the data to all be contained in one line of output. This is important for moving results from one task to another, as the last line of bash output by a BashOperator # is pushed to an XCom and is therefore accessible to other tasks. vv_csv_code = envExport + " cd ~/pyviyatools; /opt/sas/viya/home/bin/sas-viya auth login -u=" + user + " -p=" + password + "; python3 validateviya.py -o csv -s -c csvTests.json | tr -d ' ' | tr '\n' ' '" validate_viya_csv = BashOperator( task_id="generate_csv", bash_command=vv_csv_code, dag=dag) # Push CSV data to SAS table # This SASJobExecutionOperator gets the results from validate_viya_csv and submits them to a SAS job that's responsible for cleaning up some of the input and then publishing it # to a table. The parameters field includes the use of templating with {{task_instance.xcom_pull(task_ids='generate_csv')}}. Directly before execution of the task, this is evaluated # and instructs the code to fill in everything between {{}} with the XCom value pushed by the task with the id "generate_csv." Being that all BashOperators push the last line of # output to an XCom, this will evaluate to the last line of output for the task above. validate_viya_SAS = SASStudioFlowOperator( task_id='csv_to_SAS', flow_path_type='content', flow_path='/Public/create_data.flw', flow_exec_log=True, compute_context="SAS Studio compute context", connection_name='sas_default', env_vars={"_input1":'{{ ti.xcom_pull(task_ids="generate_csv") }}'}, dag=dag) # Set up relationships between each of the tasks: # Ensure that setup happens before the report generating task setup.set_downstream(validate_viya_full) # Ensure that setup happens before the csv data generating task # Also ensure that the csv data generating task happens before the data uploading task setup.set_downstream(validate_viya_csv) validate_viya_csv.set_downstream(validate_viya_SAS)

 

Code: SAS Studio Flow

 

The SAS Studio flow used by the SASStudioFlowOperator is shown below. It contains three nodes, a Python Program node responsible for retrieving and cleaning data, an Insert Rows node, and a Table node with the table that contains the valdiateviya data.

 

me_7_SASStudioFlow.png

 

Python Program Code:

 

import os
import io
import pandas as pd
from datetime import date

# Pull the data from the environment variable _input1
data = os.getenv("_input1")

# Separate the data (temporarily) into the server metrics data and the run code data
serverMetricsData = data[:data.find("runSuccessful")].replace(' ', '\n')
runCodeData = data[data.find("runSuccessful"):].replace(' ', '\n')

# Get the date today as a string
todayString = str(date.today())

# Convert the two datasets into Pandas Dataframe objects
serverMetricsAsDf = pd.read_csv(io.StringIO(serverMetricsData))
runCodeAsDf = pd.read_csv(io.StringIO(runCodeData))

# Add the date to one of the datasets
serverMetricsAsDf.insert(0, "Date", todayString)
serverMetricsAsDf.set_index('Date')

# Combine the two dataframes
dataAsDf = pd.concat([serverMetricsAsDf, runCodeAsDf], axis=1)

# Convert the Pandas dataframe into a SAS table on the output port
SAS.df2sd(dataAsDf, _output1)

 

Results

 

Once this DAG has ran for a few days, you can see the overall health of your Viya environment in your SAS table.

 

me_8_resultTable.png

 

Conclusion

 

Airflow is an incredible scheduling tool thanks to its seamless integration with so many existing services. Adding SAS Jobs and SAS Studio flows among the many other possible operators enables SAS users to bring a powerful analytic engine to their existing Airflow pipelines. Additionally, examples like the one I showed earlier in this blog show how SAS tables can be integrated into your workflows to ensure your data exists in SAS from the moment it is collected. Airflow's user-friendly UI, Python-driven development, and countless integrations make it yet another excellent open-source tool for SAS users to benefit from.

 

More Resources

 

Apache Airflow Documentation

SAS Airflow Provider

Crontab.guru

Validating Viya Environments Has Never Been this Easy

 

Find more articles from SAS Global Enablement and Learning here.

Comments

@michaelerickson , this is great! VERY informative. Thanks for posting.

Version history
Last update:
‎06-15-2023 10:51 AM
Updated by:
Contributors

sas-innovate-2024.png

Available on demand!

Missed SAS Innovate Las Vegas? Watch all the action for free! View the keynotes, general sessions and 22 breakouts on demand.

 

Register now!

Free course: Data Literacy Essentials

Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning  and boost your career prospects.

Get Started