Since the release of the SAS Airflow Provider, some people at SAS have been exploring how Airflow can help orchestrate and schedule your SAS data pipelines, and trying to figure out how to map common scheduling capabilities to Airflow concepts.
Among them is the ability to start a process (a DAG in the context of Airflow) upon the occurrence of an event (other than just a specific time event). Typical events of interest are the arrival of a data file, the presence of specific records in a database table, the success of an external task, etc.
This is where Airflow Sensors come into play. “Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur.”
There is already a lot of very good literature on this topic, and I won’t even try to explain them more. This article is very nice. My intent here, is just to give some examples and considerations.
I hear it coming… “be careful with sensors”, “sensors taking up a full worker slot for the entire time they are running”, “the Sensor deadlock issue”, “you should use deferrable operators”, etc.
All right, this makes sense.
But before optimizing fairly complicated systems, let’s try to solve basic scheduling challenges. And Airflow Sensors are very simple to understand and to use even for people not really proficient in Python like me. The opposite of deferrable operators.
Anyway, let’s just give an example. You want to run a SAS flow every day that ingests a data file from disk, and you want to start it as soon as the file is created. You will use a File Sensor.
A sensor is nothing less than a specific operator. Thus, it shows up as a task when you define it in a DAG:
Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.
In the graph representation above, you can see a FileSensor task on the left, which when it will be satisfied, will trigger the SASStudioOperator task on the right.
What does the FileSensor code look like? Pretty simple:
task1 = FileSensor(task_id="1_wait_for_file",
filepath="/gelcontent/data/contact_list2.csv",
fs_conn_id="fs_default",
poke_interval=60,
timeout=3600,
mode="reschedule",
dag=dag)
Now, where is the file you are checking?
Generally, the file will be needed by the subsequent task, in our case, SAS. So, it has to be accessible from SAS.
However, it will be checked by Airflow, which, depending on how it is deployed (on bare OS, as containers or in Kubernetes), has its own file access setup.
By default, SAS Viya and Airflow won’t have access to the same resources out of the box.
You will need to make Airflow and SAS sharing access to the same file system.
In my case, where both SAS Viya and Airflow are deployed in the same Kubernetes cluster, it is quite easy to set up an identical access to the same NFS. Both applications will share the same “view”.
If, instead, you want to wait for a file on a Cloud Object Storage, then it is easier because you don’t have to deal with local file access. Accessing cloud object resources is quite universal. Sensors do exist for the main cloud object storage providers (AWS S3, GCS, Azure Blob Storage (not sure about ADLS)) and they do seem to provide a “deferrable” version of them (cf. the Sensor deadlock issue!).
Checking a database table for some records before running a SAS job is also something possible with an Airflow Sensor. Let’s see an example of a SqlSensor syntax:
task1 = SqlSensor(
task_id='waiting_for_data',
conn_id='postgres',
sql="select * from load_status where data_scope='DWH' and load_date='{{ ds }}'",
poke_interval=30,
timeout=60 * 5,
mode='reschedule',
dag=dag
)
Here, we specify the database connection we want to use (defined globally) and the SQL code we want to run.
The default behavior (which can be customized of course) is the following:
So, you need to write your SQL query in a way that fits your target condition. The query can use macro-variables provided by Airflow. Here {{ ds }} represents the DAG run date.
As a conclusion, I will just mention that I updated my “Airflow – Generate DAG” custom steps on GitHub to include the ability to define a FileSensor (thanks to Lorenzo Toja for this idea):
Next time, I will talk about another Airflow concept that I also included in my tools: Airflow Datasets to make your flows “data-aware”.
Thanks for reading.
Join us for SAS Innovate 2025, our biggest and most exciting global event of the year, in Orlando, FL, from May 6-9. Sign up by March 14 for just $795.
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.