BookmarkSubscribeRSS Feed

“Data-Aware” Scheduling with Airflow or How to Specify Data Dependencies in your DAGs

Started ‎03-19-2024 by
Modified ‎03-19-2024 by
Views 1,587

Last time, I explored how Airflow sensors can help starting processes upon the occurrence of an event like the creation of a file or the load of new records in a database table. Today, I am going to take a look at Airflow Datasets. As explained in the Airflow documentation, Airflow Datasets are useful to make your flows “data-aware”.

 

In other words, Airflow Datasets allow you to trigger a DAG (Directed Acyclic Graph, a process flow in Airflow) based on the “update” of an input “Dataset”, produced by another DAG for instance. I use “quotes” because it’s all logical. A Dataset is nothing but a string that describes a logical data source or target. It is not connected to actual data, and that’s finally the beauty of it.

 

It’s simply declarative. For example, you can declare that a flow named MAN_THROWS “updates” a Dataset named BALL. And you can declare that a flow named DOG_RUNS is triggered when Dataset BALL is updated. You just created a dependency between MAN_THROWS and DOG_RUNS. If MAN_THROWS is triggered (for example based on a time event) and runs successfully, then Dataset BALL will be marked as updated which will trigger DOG_RUNS. It’s probably a tiny bit more subtle but you get the idea, and we will provide more examples further down.

 

How to declare an Airflow Dataset?

 

An Airflow Dataset is defined using a URI and remember that an Airflow Dataset is not connected to any data systems. It’s just a string and Airflow will not check the existence of the data it represents. So, you can be very creative about it. Let say we have a flow (DAG) whose goal is to update a fact table named ORDERS stored as a SAS data set (yes, it’s confusing, right?) in a SAS library named SALES, you could define the Airflow Dataset as:

 

  • sas://sales/orders

 

And that’s it. You defined an Airflow Dataset URI.

 

You can define as many Datasets as you want and try to match the reality of your data ecosystem:

 

  • cas://productsCaslib/product_catalog
  • gcs://credit/transactions/operations.parquet
  • singlestore://energy/water/water_consumption_current_year

 

In terms of DAG source code (Python), this will look like this:

 

example_dataset = Dataset("sas://sales/orders")

 

How to specify that a DAG updates an Airflow Dataset?

 

First, we need to specify what “updates” a Dataset. I was probably oversimplifying it earlier when I said that a DAG “updates” a Dataset. Actually, a task (a unit of processing belonging to a DAG) is the element that owns the update concept. In other words, a task within a DAG “produces” (or “updates”) a Dataset. A task can update multiple Datasets. A DAG can contain multiple tasks that update a Dataset.

 

So, the following flow is NOT what really happens:

 

01_nir_post_94_01_incorrect_flow-2048x1074.png

 

Instead, the following one represents more accurately the actual workflow:

 

02_nir_post_94_02_correct_flow.png

 

Bottom line: a consumer DAG that depends on Datasets – here DAG 3 – may start before the producer DAGs finish – here DAG 1 and DAG 2. Indeed, as soon as the tasks that update both Datasets are finished and successful, DAG 3 can start. This can be useful to have that level of granularity.

 

Having said that, let’s see a code example of how to specify that a task updates a Dataset:

 

task1 = SASStudioOperator(task_id="1_LoadOrders.flw",
   exec_type="flow",
   path_type="content",
   path="/Public/Load Orders.flw",
   compute_context="SAS Studio compute context",
   connection_name="sas_default",
   exec_log=True,
   codegen_init_code=False,
   codegen_wrap_code=False,
   outlets=[Dataset("sas://sales/orders")],
   trigger_rule='all_success',
   dag=dag)

 

This is done through the outlet parameter (standard option of the main operator class). If this task (among other tasks in a DAG) runs successfully, the Dataset sas://sales/orders will be marked as “updated”. You can specify multiple Datasets in this outlet parameter.  

 

How to specify that a DAG is triggered by an Airflow Dataset update?

 

Now, how do we tell a DAG that it should start when a Dataset has been marked “updated”? Simply by using a special string in the schedule parameter.

 

The schedule parameter can take a cron expression, a cron preset, a timedelta expression or a Dataset expression:

 

dag = DAG(dag_id="Prepare_Data_for_Analytics",
   schedule=[Dataset("sas://sales/orders")],
   start_date=datetime(2024,3,14),
   catchup=False)

 

This DAG will start as soon as the Dataset sas://sales/orders is being updated. And it will start every time the Dataset is being updated.

 

03_nir_post_94_03_dag_relationship.png

 

If you look at the DAGs list in Airflow, you will observe this:

 

04_nir_post_94_04_dag_schedule-2048x109.png

 

Get the full code of the two sample DAGS here.  

 

The cherry on the cake: Datasets lineage

 

The Airflow user interface provides a lineage view (“Datasets” menu) of the relationships between DAGs and Datasets. This is super helpful to understand your orchestration sequence.

 

05_nir_post_94_06_lineage.png

 

As a conclusion, I will just mention that I updated my “Airflow – Generate DAG” custom steps on GitHub to include the ability to define dependencies between your DAGs using Airflow datasets:

 

06_nir_post_94_05_custom_steps.png

 

Thanks for reading.

 

 

Find more articles from SAS Global Enablement and Learning here.

Version history
Last update:
‎03-19-2024 10:11 AM
Updated by:
Contributors

sas-innovate-white.png

Special offer for SAS Communities members

Save $250 on SAS Innovate and get a free advance copy of the new SAS For Dummies book! Use the code "SASforDummies" to register. Don't miss out, May 6-9, in Orlando, Florida.

 

View the full agenda.

Register now!

SAS AI and Machine Learning Courses

The rapid growth of AI technologies is driving an AI skills gap and demand for AI talent. Ready to grow your AI literacy? SAS offers free ways to get started for beginners, business leaders, and analytics professionals of all skill levels. Your future self will thank you.

Get started