BookmarkSubscribeRSS Feed

Waiting for Something to Occur before Triggering SAS Jobs: Airflow Sensors

Started ‎01-25-2024 by
Modified ‎01-25-2024 by
Views 552

Since the release of the SAS Airflow Provider, some people at SAS have been exploring how Airflow can help orchestrate and schedule your SAS data pipelines, and trying to figure out how to map common scheduling capabilities to Airflow concepts.

 

Among them is the ability to start a process (a DAG in the context of Airflow) upon the occurrence of an event (other than just a specific time event). Typical events of interest are the arrival of a data file, the presence of specific records in a database table, the success of an external task, etc.

 

This is where Airflow Sensors come into play. “Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur.

 

There is already a lot of very good literature on this topic, and I won’t even try to explain them more. This article is very nice. My intent here, is just to give some examples and considerations.

 

I hear it coming… “be careful with sensors”, “sensors taking up a full worker slot for the entire time they are running”, “the Sensor deadlock issue”, “you should use deferrable operators”, etc.

 

All right, this makes sense.

 

But before optimizing fairly complicated systems, let’s try to solve basic scheduling challenges. And Airflow Sensors are very simple to understand and to use even for people not really proficient in Python like me. The opposite of deferrable operators.

 

Anyway, let’s just give an example. You want to run a SAS flow every day that ingests a data file from disk, and you want to start it as soon as the file is created. You will use a File Sensor.

 

A sensor is nothing less than a specific operator. Thus, it shows up as a task when you define it in a DAG:

 

nir_post_93_01_airflow_graph.png

Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.

 

In the graph representation above, you can see a FileSensor task on the left, which when it will be satisfied, will trigger the SASStudioOperator task on the right.

 

What does the FileSensor code look like? Pretty simple:

 

task1 = FileSensor(task_id="1_wait_for_file",
   filepath="/gelcontent/data/contact_list2.csv",
   fs_conn_id="fs_default",
   poke_interval=60,
   timeout=3600,
   mode="reschedule",
   dag=dag)

 

  • filepath defines the file whose existence is to check.
  • fs_conn_id defines the file system (where to check the file) connection id defined globally in Airflow.
  • poke_interval defines the number of seconds to wait before checking again.
  • timeout defines the number of seconds to wait before the task times out in case the file does not arrive (the FileSensor task will then have a FAILED status by default).
  • mode defines if the task runs continuously while doing its file check (“poke”), using a worker slot all the time, or if the task stops after each check and restarts before each check (“reschedule”), freeing up a worker slot between 2 checks.

Now, where is the file you are checking?

 

Generally, the file will be needed by the subsequent task, in our case, SAS. So, it has to be accessible from SAS.

 

However, it will be checked by Airflow, which, depending on how it is deployed (on bare OS, as containers or in Kubernetes), has its own file access setup.

 

By default, SAS Viya and Airflow won’t have access to the same resources out of the box.

 

You will need to make Airflow and SAS sharing access to the same file system.

 

In my case, where both SAS Viya and Airflow are deployed in the same Kubernetes cluster, it is quite easy to set up an identical access to the same NFS. Both applications will share the same “view”.

 

If, instead, you want to wait for a file on a Cloud Object Storage, then it is easier because you don’t have to deal with local file access. Accessing cloud object resources is quite universal. Sensors do exist for the main cloud object storage providers (AWS S3, GCS, Azure Blob Storage (not sure about ADLS)) and they do seem to provide a “deferrable” version of them (cf. the Sensor deadlock issue!).

 

Checking a database table for some records before running a SAS job is also something possible with an Airflow Sensor. Let’s see an example of a SqlSensor syntax:

 

task1 = SqlSensor(
   task_id='waiting_for_data',
   conn_id='postgres',
   sql="select * from load_status where data_scope='DWH' and load_date='{{ ds }}'",
   poke_interval=30,
   timeout=60 * 5,
   mode='reschedule',
   dag=dag
)

 

Here, we specify the database connection we want to use (defined globally) and the SQL code we want to run.

 

The default behavior (which can be customized of course) is the following:

  • if the query does not return any row, the condition is not satisfied, and the sensor will then try it again later.
  • if the query returns rows, the condition is satisfied, the sensor gets a SUCCESS status, and all linked downstream tasks are run.

 

So, you need to write your SQL query in a way that fits your target condition. The query can use macro-variables provided by Airflow. Here {{ ds }} represents the DAG run date.

 

As a conclusion, I will just mention that I updated my “Airflow – Generate DAG” custom steps on GitHub to include the ability to define a FileSensor (thanks to Lorenzo Toja for this idea):

 

nir_post_93_02_filesensor_custom_step.png

 

Next time, I will talk about another Airflow concept that I also included in my tools: Airflow Datasets to make your flows “data-aware”.

 

Thanks for reading.

Version history
Last update:
‎01-25-2024 09:27 AM
Updated by:
Contributors

Ready to join fellow brilliant minds for the SAS Hackathon?

Build your skills. Make connections. Enjoy creative freedom. Maybe change the world. Registration is now open through August 30th. Visit the SAS Hackathon homepage.

Register today!

Free course: Data Literacy Essentials

Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning  and boost your career prospects.

Get Started