BookmarkSubscribeRSS Feed

What’s New in SAS Airflow Provider

Started ‎08-10-2023 by
Modified ‎08-10-2023 by
Views 2,286

nir_post_89_01_airflow_spinning_logo.gifApache Airflow is a popular platform for designing, scheduling, and monitoring process flows. SAS integrates nicely with Airflow through the SAS Airflow Provider, a package that allows SAS users to run SAS assets from Airflow. We have already presented how SAS works with Airflow here and here. Since the first release of this provider, earlier this year, very useful functionalities have been added (latest version addressed by this blog is 0.0.7). Let’s review them in this post.

 

A new SAS Studio operator

 

In order to provide the following feature below (run a SAS program from Airflow), it was decided to create a new operator that supersedes SASStudioFlowOperator. Indeed, the new SASStudioOperator operator (note the name change to suggest it’s no longer dedicated to running flows) is the operator to use from now on for flows and programs. The old one will still work to execute flows, but any new feature will be added only to SASStudioOperator.

 

Ability to run a SAS program (in addition to SAS Studio flows and SAS job definitions)

 

This one is great! SASStudioOperator can execute SAS programs stored in SAS content or the SAS Compute file system. You no longer need to copy your code into a flow or create a job definition from a program. You can now directly reference your SAS program in an Airflow DAG through the new SASStudioOperator class.

 

In the following example, the Airflow task defines the execution of the /Public/Programs/code1.sas program available in SAS content. Note the new exec_type="program" option value.  

 

task1 = SASStudioOperator(task_id="1_code1.sas",
                          path_type="content",
                          exec_type="program",
                          path="/Public/Programs/code1.sas",
                          compute_context="SAS Studio compute context",
                          connection_name="sas_default",
                          exec_log=True,
                          codegen_init_code=False,
                          codegen_wrap_code=False,
                          macro_vars=global_params,
                          env_vars=global_params,
                          dag=dag)

 

Ability to embed SAS code as part of the Airflow DAG

 

On the same topic, one can embed the SAS code to run in the DAG definition. This could be useful for quick testing purposes. 

 

program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''

task1 = SASStudioOperator(task_id="1_code1.sas",
                          path_type="raw",
                          exec_type="program",
                          path=program1,
                          compute_context="SAS Studio compute context",
                          connection_name="sas_default",
                          exec_log=True,
                          codegen_init_code=False,
                          codegen_wrap_code=False,
                          macro_vars=global_params,
                          env_vars=global_params,
                          dag=dag)

 

Note the new path_type="raw" option value and the path= option pointing to the Python object containing the SAS code to run.

 

Ability to pass macro-variables to SAS Studio flows and SAS programs

 

It was already possible to send parameters to a flow as environment variables which can be easily resolved in SAS (%sysget(myvar)). Even easier is now the ability to send parameters as SAS macro-variables.

 

With the following task definition (note the new macro_vars= option):

 

global_params = {
   "VAR1": "val1",
   "VAR2": "val2"
}

program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''

task1 = SASStudioOperator(task_id="1_code1.sas",
                          path_type="raw",
                          exec_type="program",
                          path=program1,
                          compute_context="SAS Studio compute context",
                          connection_name="sas_default",
                          exec_log=True,
                          codegen_init_code=False,
                          codegen_wrap_code=False,
                          macro_vars=global_params,
                          dag=dag)

 

You will get this in the SAS log (visible from Airflow):

 

[2023-08-04, 18:39:21 UTC] {sas_studio.py:197} INFO - Adding 2 macro variables to code
...
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 10   /** Begin macro variables **/
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 11   %LET VAR1 = val1;
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 12   %LET VAR2 = val2;
[2023-08-04, 18:39:34 UTC] {logging_mixin.py:150} INFO - 13   /** End macro variables **/

 

Ability to extract macro-variables from SAS Studio flows and SAS programs

 

It is also possible to extract SAS macro-variables from a task so that they can be sent to downstream tasks. To achieve that, SAS macro-variables whose name starts with output_macro_var_prefix option value will be pushed to an XCom, an Airflow mechanism that allows you to hold values.

 

Then you will be able to reuse those variables by pulling them from the XCom and send them to a downstream task.

 

Here is an example of pushing macro-variables to an XCom. Note the output_macro_var_prefix="TEST_" option. Only two macro-variables will be pushed in this example.

 

program1 = '''
%let TEST_var1=john ;
%let TEST_var2=jack ;
%let DEV_var3=jill ;
'''

task1 = SASStudioOperator(task_id="1_code1.sas",
                          path_type="raw",
                          exec_type="program",
                          path=program1,
                          compute_session_id="{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}",
                          compute_context="SAS Studio compute context",
                          connection_name="sas_default",
                          exec_log=True,
                          codegen_init_code=False,
                          codegen_wrap_code=False,
                          macro_vars=global_params,
                          output_macro_var_prefix="TEST_",
                          dag=dag)

 

In a downstream task, you can pull these variables individually and inject them in the macros_vars parameter:

 

program6 = '''
%put &=TEST_var1 ;
%put &=TEST_var2 ;
%put &=DEV_var3 ;

%put _all_ ;
%put &=systime ;
'''

task6 = SASStudioOperator(task_id="6_code4.sas",
                          path_type="raw",
                          exec_type="program",
                          path=program6,
                          compute_context="SAS Studio compute context",
                          connection_name="sas_default",
                          exec_log=True,
                          codegen_init_code=False,
                          codegen_wrap_code=False,
                          macro_vars={"TEST_var1": "{{ti.xcom_pull(key='TEST_VAR1', task_ids=['1_code1.sas'])|first}}",
                                      "TEST_var2": "{{ti.xcom_pull(key='TEST_VAR2', task_ids=['1_code1.sas'])|first}}"},
                          dag=dag)

 

In this particular case, only two macro-variables will resolve. DEV_var3 will not.

 

Note the syntax to pull values from a specific task’s XCom:

 

{{ti.xcom_pull(key='TEST_VAR1', task_ids=['1_code1.sas'])|first}}

 

Ability to pass Airflow context variables to SAS Studio flows / SAS programs / SAS job definitions

 

Airflow provides some interesting contextual environment variables. For example:

 

AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='aaa_test_5_raw'
AIRFLOW_CTX_TASK_ID='1_code1.sas'
AIRFLOW_CTX_EXECUTION_DATE='2023-08-04T19:00:56.255394+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-04T19:00:56.255394+00:00'

 

They are now passed by default to a SAS Studio flow or SAS program as environment variables (SASStudioOperator). You can get them using %sysget(AIRFLOW_CTX_DAG_ID) for example.

 

They are now passed optionally to a SAS job definition as SAS macro-variables (SASJobExecutionOperator). The add_airflow_vars option controls this behavior.

 

task4 = SASJobExecutionOperator(task_id="4_job12",
                                job_name="/Public/Jobs/job12",
                                connection_name="sas_default",
                                job_exec_log=True,
                                add_airflow_vars=True,
                                parameters=global_params,
                                dag=dag)

 

A new operator to create Compute sessions

 

Previously, each task was creating its own SAS Compute session. Now, you can have a finer-grained control by creating yourself the SAS Compute sessions needed, using a new operator named SASComputeCreateSession.

 

task0 = SASComputeCreateSession(task_id="create_sess", dag=dag)

 

Ability to reuse Compute sessions

 

One of the benefits of the previous feature is the ability to reuse existing Compute sessions. This can be helpful:

 

  • From a performance perspective: you can minimize the latency of obtaining a new Compute session
  • From a context perspective: you can leverage the state of an existing session (options, macro-variables, tables in the WORK, etc.)

Reusing Compute sessions makes a lot of sense when you have a sequence of tasks that you want to run in serial. Each task runs after the previous one and thus will use the same compute session. This does not fit well for process flows that run tasks in parallel. In this case, you want to keep parallel execution by maintaining different Compute sessions (and not reusing them).

 

How to reuse a Compute session?

 

Once you have started a new Compute session using the new SASComputeCreateSession operator (see syntax example in previous feature), the session id is stored in the task’s XCom.

 

01_NR_nir_post_89_02_session_xcom.png

 

You can make subsequent tasks reuse that session by specifying this existing session id in the compute_session_id option:

 

task1 = SASStudioOperator(task_id="1_code1.sas",
                          path_type="raw",
                          exec_type="program",
                          path=program1,
                          compute_session_id="{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}",
                          exec_log=True,
                          dag=dag)

 

The following Python syntax allows you to extract a value from the create_sess task’s XCom:

 

{{ ti.xcom_pull(key='compute_session_id', task_ids=['create_sess'])|first }}

 

Ability to get the SAS log from a job definition execution

 

Getting the SAS log in Airflow was already possible for SAS Studio flows but was missing for SAS job definitions (SASJobExecutionOperator). To view the SAS log, we had to go to SAS Environment Manager > Job and Flows > Monitoring.

 

Now, the job_exec_log option has been implemented in the SAS Job Execution operator. This is very useful to be able to view all logs from different tasks (flows, programs and jobs) in the same Airflow UI.

 

task3 = SASJobExecutionOperator(task_id="3_job11",
                                job_name="/Public/Jobs/job11",
                                connection_name="sas_default",
                                job_exec_log=True,
                                add_airflow_vars=False,
                                parameters=global_params,
                                dag=dag)

 

 

All of these new capabilities make SAS integration with Airflow more efficient and seamless. This is a great complement to the SAS Viya platform.

 

Thanks to Andrew Shakinovsky for his help on this topic.

 

Thanks for reading.

 

 

Find more articles from SAS Global Enablement and Learning here.

Comments

Just noticed that the latest version on the Github (ver 0.13) includes the ability to not just create but end a compute session too. This is very handy for those who want to control the number of idle compute sessions.

Version history
Last update:
‎08-10-2023 09:56 AM
Updated by:
Contributors

sas-innovate-2024.png

Available on demand!

Missed SAS Innovate Las Vegas? Watch all the action for free! View the keynotes, general sessions and 22 breakouts on demand.

 

Register now!

Free course: Data Literacy Essentials

Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning  and boost your career prospects.

Get Started