“Data-Aware” Scheduling with Airflow or How to Specify Data Dependencies in your DAGs
- Article History
- RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Last time, I explored how Airflow sensors can help starting processes upon the occurrence of an event like the creation of a file or the load of new records in a database table. Today, I am going to take a look at Airflow Datasets. As explained in the Airflow documentation, Airflow Datasets are useful to make your flows “data-aware”.
In other words, Airflow Datasets allow you to trigger a DAG (Directed Acyclic Graph, a process flow in Airflow) based on the “update” of an input “Dataset”, produced by another DAG for instance. I use “quotes” because it’s all logical. A Dataset is nothing but a string that describes a logical data source or target. It is not connected to actual data, and that’s finally the beauty of it.
It’s simply declarative. For example, you can declare that a flow named MAN_THROWS “updates” a Dataset named BALL. And you can declare that a flow named DOG_RUNS is triggered when Dataset BALL is updated. You just created a dependency between MAN_THROWS and DOG_RUNS. If MAN_THROWS is triggered (for example based on a time event) and runs successfully, then Dataset BALL will be marked as updated which will trigger DOG_RUNS. It’s probably a tiny bit more subtle but you get the idea, and we will provide more examples further down.
How to declare an Airflow Dataset?
An Airflow Dataset is defined using a URI and remember that an Airflow Dataset is not connected to any data systems. It’s just a string and Airflow will not check the existence of the data it represents. So, you can be very creative about it. Let say we have a flow (DAG) whose goal is to update a fact table named ORDERS stored as a SAS data set (yes, it’s confusing, right?) in a SAS library named SALES, you could define the Airflow Dataset as:
- sas://sales/orders
And that’s it. You defined an Airflow Dataset URI.
You can define as many Datasets as you want and try to match the reality of your data ecosystem:
- cas://productsCaslib/product_catalog
- gcs://credit/transactions/operations.parquet
- singlestore://energy/water/water_consumption_current_year
In terms of DAG source code (Python), this will look like this:
example_dataset = Dataset("sas://sales/orders")
How to specify that a DAG updates an Airflow Dataset?
First, we need to specify what “updates” a Dataset. I was probably oversimplifying it earlier when I said that a DAG “updates” a Dataset. Actually, a task (a unit of processing belonging to a DAG) is the element that owns the update concept. In other words, a task within a DAG “produces” (or “updates”) a Dataset. A task can update multiple Datasets. A DAG can contain multiple tasks that update a Dataset.
So, the following flow is NOT what really happens:
Instead, the following one represents more accurately the actual workflow:
Bottom line: a consumer DAG that depends on Datasets – here DAG 3 – may start before the producer DAGs finish – here DAG 1 and DAG 2. Indeed, as soon as the tasks that update both Datasets are finished and successful, DAG 3 can start. This can be useful to have that level of granularity.
Having said that, let’s see a code example of how to specify that a task updates a Dataset:
task1 = SASStudioOperator(task_id="1_LoadOrders.flw",
exec_type="flow",
path_type="content",
path="/Public/Load Orders.flw",
compute_context="SAS Studio compute context",
connection_name="sas_default",
exec_log=True,
codegen_init_code=False,
codegen_wrap_code=False,
outlets=[Dataset("sas://sales/orders")],
trigger_rule='all_success',
dag=dag)
This is done through the outlet parameter (standard option of the main operator class). If this task (among other tasks in a DAG) runs successfully, the Dataset sas://sales/orders will be marked as “updated”. You can specify multiple Datasets in this outlet parameter.
How to specify that a DAG is triggered by an Airflow Dataset update?
Now, how do we tell a DAG that it should start when a Dataset has been marked “updated”? Simply by using a special string in the schedule parameter.
The schedule parameter can take a cron expression, a cron preset, a timedelta expression or a Dataset expression:
dag = DAG(dag_id="Prepare_Data_for_Analytics",
schedule=[Dataset("sas://sales/orders")],
start_date=datetime(2024,3,14),
catchup=False)
This DAG will start as soon as the Dataset sas://sales/orders is being updated. And it will start every time the Dataset is being updated.
If you look at the DAGs list in Airflow, you will observe this:
Get the full code of the two sample DAGS here.
The cherry on the cake: Datasets lineage
The Airflow user interface provides a lineage view (“Datasets” menu) of the relationships between DAGs and Datasets. This is super helpful to understand your orchestration sequence.
As a conclusion, I will just mention that I updated my “Airflow – Generate DAG” custom steps on GitHub to include the ability to define dependencies between your DAGs using Airflow datasets:
Thanks for reading.
Find more articles from SAS Global Enablement and Learning here.