Apache Airflow is a popular platform for designing, scheduling, and monitoring process flows. SAS integrates nicely with Airflow through the SAS Airflow Provider, a package that allows SAS users to run SAS assets from Airflow. We have already presented how SAS works with Airflow here and here. Since the first release of this provider, earlier this year, very useful functionalities have been added (latest version addressed by this blog is 0.0.7). Let’s review them in this post.
In order to provide the following feature below (run a SAS program from Airflow), it was decided to create a new operator that supersedes SASStudioFlowOperator. Indeed, the new SASStudioOperator operator (note the name change to suggest it’s no longer dedicated to running flows) is the operator to use from now on for flows and programs. The old one will still work to execute flows, but any new feature will be added only to SASStudioOperator.
This one is great! SASStudioOperator can execute SAS programs stored in SAS content or the SAS Compute file system. You no longer need to copy your code into a flow or create a job definition from a program. You can now directly reference your SAS program in an Airflow DAG through the new SASStudioOperator class.
In the following example, the Airflow task defines the execution of the /Public/Programs/code1.sas program available in SAS content. Note the new exec_type="program" option value.
task1 = SASStudioOperator(task_id="1_code1.sas",
path_type="content",
exec_type="program",
path="/Public/Programs/code1.sas",
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
macro_vars=global_params,
env_vars=global_params,
dag=dag)
On the same topic, one can embed the SAS code to run in the DAG definition. This could be useful for quick testing purposes.
program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''
task1 = SASStudioOperator(task_id="1_code1.sas",
path_type="raw",
exec_type="program",
path=program1,
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
macro_vars=global_params,
env_vars=global_params,
dag=dag)
Note the new path_type="raw" option value and the path= option pointing to the Python object containing the SAS code to run.
It was already possible to send parameters to a flow as environment variables which can be easily resolved in SAS (%sysget(myvar)). Even easier is now the ability to send parameters as SAS macro-variables.
With the following task definition (note the new macro_vars= option):
global_params = {
"VAR1": "val1",
"VAR2": "val2"
}
program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''
task1 = SASStudioOperator(task_id="1_code1.sas",
path_type="raw",
exec_type="program",
path=program1,
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
macro_vars=global_params,
dag=dag)
You will get this in the SAS log (visible from Airflow):
[2023-08-04, 18:39:21 UTC] {sas_studio.py:197} INFO - Adding 2 macro variables to code
...
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 10 /** Begin macro variables **/
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 11 %LET VAR1 = val1;
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 12 %LET VAR2 = val2;
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 13 /** End macro variables **/
It is also possible to extract SAS macro-variables from a task so that they can be sent to downstream tasks. To achieve that, SAS macro-variables whose name starts with output_macro_var_prefix option value will be pushed to an XCom, an Airflow mechanism that allows you to hold values.
Then you will be able to reuse those variables by pulling them from the XCom and send them to a downstream task.
Here is an example of pushing macro-variables to an XCom. Note the output_macro_var_prefix="TEST_" option. Only two macro-variables will be pushed in this example.
program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''
task1 = SASStudioOperator(task_id="1_code1.sas",
path_type="raw",
exec_type="program",
path=program1,
compute_session_id="{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}",
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
macro_vars=global_params,
output_macro_var_prefix="TEST_",
dag=dag)
In a downstream task, you can pull these variables individually and inject them in the macros_vars parameter:
program6 = '''
%put &=TEST_var1 ;
%put &=TEST_var2 ;
%put &=DEV_var3 ;
%put _all_ ;
%put &=systime ;
'''
task6 = SASStudioOperator(task_id="6_code4.sas",
path_type="raw",
exec_type="program",
path=program6,
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
macro_vars={"TEST_var1": "{{ti.xcom_pull(key='TEST_VAR1', task_ids=['1_code1.sas'])|first}}",
"TEST_var2": "{{ti.xcom_pull(key='TEST_VAR2', task_ids=['1_code1.sas'])|first}}"},
dag=dag)
In this particular case, only two macro-variables will resolve. DEV_var3 will not.
Note the syntax to pull values from a specific task’s XCom:
{{ti.xcom_pull(key='TEST_VAR1', task_ids=['1_code1.sas'])|first}}
Airflow provides some interesting contextual environment variables. For example:
AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='aaa_test_5_raw'
AIRFLOW_CTX_TASK_ID='1_code1.sas'
AIRFLOW_CTX_EXECUTION_DATE='2023-08-04T19:00:56.255394+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-04T19:00:56.255394+00:00'
They are now passed by default to a SAS Studio flow or SAS program as environment variables (SASStudioOperator). You can get them using %sysget(AIRFLOW_CTX_DAG_ID) for example.
They are now passed optionally to a SAS job definition as SAS macro-variables (SASJobExecutionOperator). The add_airflow_vars option controls this behavior.
task4 = SASJobExecutionOperator(task_id="4_job12",
job_name="/Public/Jobs/job12",
connection_name="sas_default",
job_exec_log=True,
add_airflow_vars=True,
parameters=global_params,
dag=dag)
Previously, each task was creating its own SAS Compute session. Now, you can have a finer-grained control by creating yourself the SAS Compute sessions needed, using a new operator named SASComputeCreateSession.
task0 = SASComputeCreateSession(task_id="create_sess", dag=dag)
One of the benefits of the previous feature is the ability to reuse existing Compute sessions. This can be helpful:
Reusing Compute sessions makes a lot of sense when you have a sequence of tasks that you want to run in serial. Each task runs after the previous one and thus will use the same compute session. This does not fit well for process flows that run tasks in parallel. In this case, you want to keep parallel execution by maintaining different Compute sessions (and not reusing them).
How to reuse a Compute session?
Once you have started a new Compute session using the new SASComputeCreateSession operator (see syntax example in previous feature), the session id is stored in the task’s XCom.
You can make subsequent tasks reuse that session by specifying this existing session id in the compute_session_id option:
task1 = SASStudioOperator(task_id="1_code1.sas",
path_type="raw",
exec_type="program",
path=program1,
compute_session_id="{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}",
exec_log=True,
dag=dag)
The following Python syntax allows you to extract a value from the create_sess task’s XCom:
{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}
Getting the SAS log in Airflow was already possible for SAS Studio flows but was missing for SAS job definitions (SASJobExecutionOperator). To view the SAS log, we had to go to SAS Environment Manager > Job and Flows > Monitoring.
Now, the job_exec_log option has been implemented in the SAS Job Execution operator. This is very useful to be able to view all logs from different tasks (flows, programs and jobs) in the same Airflow UI.
task3 = SASJobExecutionOperator(task_id="3_job11",
job_name="/Public/Jobs/job11",
connection_name="sas_default",
job_exec_log=True,
add_airflow_vars=False,
parameters=global_params,
dag=dag)
All of these new capabilities make SAS integration with Airflow more efficient and seamless. This is a great complement to the SAS Viya platform.
Thanks to Andrew Shakinovsky for his help on this topic.
Thanks for reading.
Find more articles from SAS Global Enablement and Learning here.
Just noticed that the latest version on the Github (ver 0.13) includes the ability to not just create but end a compute session too. This is very handy for those who want to control the number of idle compute sessions.
Join us for SAS Innovate 2025, our biggest and most exciting global event of the year, in Orlando, FL, from May 6-9. Sign up by March 14 for just $795.
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.