Now, what’s the link with the Sensor timeout? The Deadlock The concurrency argument limits the number of maximum running tasks for a given DAG to 16 by default. Your tasks share the same pool and when a task is running a slot is locked until that task is done. In other words, you can run 16 tasks at the same time for a same DAG. In addition, the concurrency or the limit to the maximum number of running tasks for a given DAG is set to 16 by default (dag_concurrency / concurrency). By default, you can execute at most 128 tasks at the same time as the default pool has 128 slots. If there is no more slots available, the tasks will be queued and so the number of queued slots will increase. It’s really just a slot, 1 task = 1 slot. A slot is given regardless of the resources a task needs. Once the task is finished, the slot is free again and ready to be given to another task. Each time a task is running, a slot is given to that task throughout its execution. If you go to Admin -> Pools here is what you get:Īirflow pools are used to limit the execution parallelism on arbitrary sets of tasks. In Airflow, all operators share a common pool soberly called “default_pool”. However, we need to step back a little bit and understand what’s going on when a task is executed in Airflow. Speaking of the time interval, what if the file never arrives? Pools and ConcurrencyĮasy to forget but it is super important! You should always define a timeout to your Airflow Sensors. Behind the scene, the poke process is nothing more than a loop verifying your condition with a sleep in it. My advise to you, define a “reasonable” value, the default one is usually a good fit. For instance, checking if a record exists in a SQL Table means creating a new connection at each interval and a network latency. Ultimately it depends on your condition, your criteria. If it is too short, there is no guarantee that Airflow will be able to evaluate each interval in time. The shorter the poke interval, the faster the check and so the task completion. Now you might wonder, what is the best poke interval? What is the optimal value for it? Well, there is no right answer for that. The code above implements a FileSensor that checks for every 30 seconds if a file exists (but no file is specified yet). To do this, you have to modify the poke_interval attribute which expects a float as shown below. When you add a Sensor, the first step is to define the time interval at which the criteria/condition will be evaluated. If you truly want to understand Sensors, you have to understand their base class, the BaseSensorOperator. Regardless of the Sensor used, they are all based on the BaseSensorOperator class which means, they share common attributes that you are going to discover right now! The King of Airflow Sensors Take a look at the list of Sensors available here. The TimeDeltaSensor: Waits for a timedelta after the task’s execution_date + schedule interval (Looks similar to the previous one no?).The DateTimeSensor: Waits until the specified datetime (Useful to add some delay to your DAGs).The ExternalTaskSensor: Waits for a different DAG or a task in a different DAG to complete for a specific execution date.The HivePartitionSensor: Waits for a partition to show up in Hive.The SqlSensor: Runs a sql statement repeatedly until a criteria is met.The S3KeySensor: Waits for a key to be present in a S3 bucket.The FileSensor: Waits for a file or folder to land in a filesystem.You need to wait for something? Use an Airflow Sensor.Īirflow brings different sensors, here are a non exhaustive list of the most commonly used: If yes it succeeds otherwise it will time out. With a Sensor, every 30 seconds it checks if the file exists at that location. Concretely, you goal is to verify if a file exists at a specific location. If yes, it succeeds, if not, it retries until it times out. What is a Sensor operator? A Sensor is an operator evaluating at a time interval if a criteria/condition is met or not. You will not leverage the benefits of Airflow and it will be a nightmare to maintain. As they need to wait for a file, they create a python function, do their stuff in it to wait for that file and call the python function with the PythonOperator. Well, when people are not aware about Sensors, they tend to use the PythonOperator. Ok, that being said, what are the tasks Partner A, B and C exactly? So, your goal is to wait for all files to be available before moving to the task Process. Hoping without delay, but we will come back to this later. For example, Partner A sends you data at 9:00 AM, B at 9:30 AM and C and 10:00 AM. A really common use case is when you have multiple partners (A, B and C in this example) and wait for the data coming from them each day at a more or less specific time.
0 Comments
Leave a Reply. |