Apache Airflow & DAGs by Examples – FLINTERS Developer's Blog (2022)

Introduction

The world is undergoing a huge transformation called Digital Transformation (DX), in which previously manual workflows are being turned into automated versions. To accommodate that shift, companies have been applying automated workflow management tools, among which is Apache Airflow. This blog will give a closer look into Airflow, its core, Directed Acyclic Graphs (DAG) and the examples of the implementation of DAG to define workflows. Alright, let us begin!

What is Airflow?

Airflow is an automated workflow manager. It helps to programmatically create, run and monitor workflows regardless of how large, how complex they are, by means of representing the workflows as directed acyclic graphs (DAG/đồ thị có hướng) of tasks. Airflow was originally created as an open-source utility for supporting Airbnb’s workflows back in 2014. Airflow is designed to be dynamic, extensible, elegant and scalable.

The idea behind Airflow was “configuration-as-code”, which is to manage configuration files in repositories (in the same way as our management of source code), offering testability, maintainability and collaboration. To realize this idea, Airflow was created to employ Python language to create its task graphs in place of markup languages because of Python’s ease of importing existing supporting libraries and classes. Task graphs in Airflow can be written in just one file. Therefore, Airflow is known to bepure Python.

Ever since Airflow became open-source, there have been an increasing number of cloud service providers employing and providing their managed Airflow services. The most notable service available out there is the Google Cloud Composer, which combines the robust nature of Airflow in workflow orchestration with the already-famous distributed nature of the Google Cloud Platform and the large Google Cloud ecosystem, resulting in a user-friendly experience of managing workflows that involve several different services, for instance: Getting data from other data sources, then transferring to BigQuery, finally updating the dashboard in Google Data Studio. By a seamless integration with BigQuery combined with Airflow’s, it became apparent that the use of Airflow is beneficial in performing automated data-related workflows (for instance: data warehousing, machine learning, etc.). Also note that Airflow has no concern with data flows – it simply makes sure that the right tasks happen at the right time.

What does Airflow do?

When talking about workflows, we talk most specific about the order in which things are done. For instance, your typical “get-out-for-work” workflow is like: have breakfast → change your clothes → get out of the house → lock the door → go to work by bus. In the field of IT, workflows would mostly refer to “automated workflow” which contains a number of running tasks with a specific goal. A commonly seen example is a cron job, which is essentially an automatic task that we schedule for our personal computers to operate at a specific moment in time, like “to defragment the hard disk drive then write log to a text file at 6:00AM every Monday”. Apache Airflow bears a huge degree of similarity to those aforementioned kinds of “workflows”, the difference being that Airflow is designed to tackle much larger, much more complex automation problems than a small cron job. Companies world wide have been employing Airflow in solving such problems as:

  • Automatically retrieve data from a source, transform it and generate insights in visualized form.
  • Building complex machine learning models combining several other models.
  • Automatically feed new data to a machine learning model then retrieve the result in a different database.

and so on. Of course, depending on our creativity, the list of what Airflow can do may be a lot more than what we have listed.

Some of Airflow concepts

  • DAG or directed-acyclic graph:
    • Defined as Python scripts, the DAGs represent the tasks to be carried out along with their respective dependencies.
    • An example can be a DAG representing the following workflow: task A (get data from data source, prepare it for analysis) followed by task B (analyze the prepared data, yielding a visualized report), finally to task C (send the visualized report by email to the administrator); then we have a linear DAG like this: A → B → C.
    • DAGs are placed in Airflow’s DAG_FOLDER. Python by default only search for DAGs in files that contains “airflow” and “dag” (case-insensitive) in the name. What also needs to be noted is that Airflow only loads DAGs that are declared globally within a file. Consider the following example by the Airflow team where the outer DAG is loaded while the inner is not:
      dag_1 = DAG('this_dag_will_be_discovered')def my_function(): dag_2 = DAG('but_this_dag_will_not')my_function()
    • DAGs in Airflow only represent the order in which tasks are done; the details of how a task is performed is defined by defining an Operator (which will be discussed later).
    • default_args: a dictionary of arguments that, when passed into the constructor of DAG, will be applied to all of its operators:
      default_args = { 'start_date': datetime(2016, 1, 1), 'owner': 'airflow'}dag = DAG('my_dag', default_args=default_args)op = DummyOperator(task_id='dummy', dag=dag)print(op.owner) # Airflow
    • DAG run: can be thought as a specific time that the workflow with its task instances are executed. Multiple DAG runs of the same DAG can run at the same time given that their execution_date differ. For instance: a particular DAG has 2 DAG runs, one executed on 2021-03-01, another executed on 2021-03-02.
  • Operator:
    • An Operator defines a single task to be performed within the workflow. Without dependencies, operators can run independently as they are largely atomic, standing on their own and share nothing with others. If two operators need to share information, consider combining them into one operator.
    • Examples of popular operators include: PythonOperator, BashOperator, MySqlOperator, BigQueryGetDataOperator, DockerOperator, S3FileTransformOperator, PrestoToMySqlOperator, SlackAPIOperator.
    • Only operators that are assigned to DAGs are loaded. To assign an operator to a DAG, set the value of dag field on the operator or pass in the dag as argument:
      # sets the DAG explicitlyexplicit_op = DummyOperator(task_id='op1', dag=dag)# deferred DAG assignmentdeferred_op = DummyOperator(task_id='op2')deferred_op.dag = dag
  • Task:
    • Implementation of an Operator to perform some actual work by defining specific values for the operator.
      run_this = PythonOperator( task_id='print_the_context', python_callable=print_context, dag=dag,)
    • Task relationships: Basic relationships of Tasks in Airflow include:
      • set_upstream or <<: Task 2 << task 1 means task 2 depends on task 1.
      • set_downstream or >>: Task 2 >> task 1 means task 1 depends on task 2.
    • Task lifecycle: A task goes through various stages from its start to its completion. In the Airflow Web UI, the status of the tasks are as follows: Apache Airflow & DAGs by Examples – FLINTERS Developer's Blog (1)
      The complete task lifecycle looks like the following, with the expected route being: (1) No status, (2) Scheduled, (3) Queued, (4) Running and (5) Success. Apache Airflow & DAGs by Examples – FLINTERS Developer's Blog (2)
  • Sensors: Special kind of Operator that waits for external triggers before triggering their downstream operators.
  • Scheduler & Executor: Scheduler is responsible of monitoring DAG folder and triggering the tasks whose dependencies have been met. When tasks are triggered, they are put in a waiting queue before getting executed using the Executor. The default SequentialExecutor of Airflow is limited as it executes everything sequentially; to ensure performance and availability, the developers of Apache recommended the CeleryExecutor (https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html#celery-executor)

How to run Airflow (local first)

Before we can do anything with Airflow (declaring tasks & workflows), we need to setup a small test environment. The page Running Airflow locally has given a very thorough guide. Please kindly follow the guide by Apache until you have started the webserver and scheduler successfully, at which moment you are presented with the following page:

Then the workflows can be executed either by the command-line interface or by interacting with the web GUI. To start a DAG immediately, either flick the switch at the head of the DAG’s line or use the trigger DAG button in the Actions button group.

Writing an example DAG

Alright, that’s a lot of conceptual part to get started with Airflow. Now it’s time we tried to build some simple DAG. Starting version 2.0, Airflow has offered decorator-based task and DAG declaration, however since most of its documentations still contains non-decorator versions, it would be easier to start with non-decorator DAGs and tasks first. We shall start with the following workflow:

To make things easy, we will define all the trio as PythonOperators that perform nothing but execute simple print statements. In reality, users can even define custom operators that handle more complex operations, which will be briefly discussed in later sections. For even more simplicity, both workflow and operation logic are defined within the same file (in reality, they should be separated).

First of all we need Airflow’s libraries imported:

# The DAG object; we'll need this to instantiate a DAGfrom airflow import DAG# Operators; we need this to operate!from airflow.operators.python import PythonOperatorfrom airflow.utils.dates import days_ago

Now we need to define the DAG first, so that inner operators can be assigned to it.

# default_args to be passed into the DAG constructor - replace with your owndefault_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success'}# define the DAG# only the first parameter (name) is requireddag = DAG( "pyop_example", default_args = default_args, description = "A simple example with PythonOperators" schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['example'])

We will define the operators. Firstly, print_date is an operator used to print the current date/time. We first define the print_date_time function:

from datetime import datetimedef print_date_time(): print("Now it is {}!".format(datetime.now()))

Then we define print_date task as a PythonOperator:

print_date = PythonOperator( task_id="print_date", python_callable=print_date_time, dag=dag)

Similarly, we will define the other two operators:

# sleep functiondef print_sleep(): print("sleeping...")sleep = PythonOperator( task_id="sleep", python_callable=print_sleep, dag=dag) # templateddef print_templated(): print("this is a template!")templated = PythonOperator( task_id="templated", python_callable=templated, dag=dag)

Now we are ready to define the dependencies between the tasks. From the diagram, it can be seen that print_date triggers both sleep and templated. Therefore, using bitshift composition, we define the dependency:

print_date >> [sleep, templated]# which is the same as# print_date.set_downstream(sleep)# print_date.set_downstream(templated)

The DAG file is completed. Supposed that we have created the DAG, named it “py_op_example.py” in the default folder of Airflow (~/airflow/dag). Now we can run it by executing:

python ~/airflow/dags/py_op_example.py

Now our DAG is scheduled to run by Airflow. You can check the progress using the Airflow webserver interface.

Writing a DAG performing BigQuery operator

Then we will come to more complex (yet not really hard to understand) sample workflows that were written for interaction with Google’s BigQuery. The full file can be found here. We only discuss one of the featured examples:

This DAG is named example_bigquery_operations_location, in which three tasks are defined: create_dataset_with_location, create_table_with_location and delete_dataset_with_location, which are implementations of BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator and BigQueryDeleteDatasetOperator respectively. To break down the code:

  • First a location dataset is created (create_dataset_with_location)
  • Then a table is created inside the just-created dataset (create_table_with_location)
  • Finally, the contents of the dataset is deleted (delete_dataset_with_location)

While the above example may seem to be trivial, it has successfully demonstrate the way we can work with operators from arbitrary providers.

Writing custom operators

Sometimes, relying on just pure standard operators may not be enough for your work. Therefore, Airflow also provides a mechanism to define your own operators, simply by extending from BaseOperator class and overriding the execute method. For instance, we can move the aforementioned print_date operation logic to a separate class, like following:

from airflow.models.baseoperator import BaseOperatorfrom airflow.utils.decorators import apply_defaultsfrom datetime import datetimedef print_date_time(): print("Now it is {}!".format(datetime.now()))class PrintDateOperator(BaseOperator): @apply_defaults def __init__( self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): date_time = datetime.now() print("Now it is {}!".format(date_time)) return date_time

Then PrintDateOperator can be imported and used like any other type of operator.

from custom_operator import PrintDateTimeOperatorwith dag: print_date_time = PrintDateTimeOperator(task_id='sample-task', name='foo_bar')

Conclusion

In this blog, I have taken you through the most basic parts of Airflow. There is so much more about Airflow that I cannot share yet in this blog, see you in later blogs!

References

  1. https://airflow.apache.org/docs/apache-airflow/stable/index.html
  2. https://hocdata.com/chuong-3-tools-frameworks/apache-airflow
  3. https://airflow.apache.org/blog/apache-airflow-for-newcomers/
  4. https://cloud.google.com/composer/docs/concepts/overview
  5. https://en.wikipedia.org/wiki/Apache_Airflow

Post Views: 1,387

No related posts.

FAQs

How difficult is Airflow? ›

If, for example, you delete a Task from your DAG and redeploy it, you'll lose the associated metadata on the Task Instance. This makes Airflow somewhat fragile, and unless you've written a script to capture this yourself, it makes debugging issues much more difficult.

Is Prefect better than Airflow? ›

Both Airflow and Prefect can be set up using pip, docker or other containerisation options. However, Prefect is very well organised and is probably more extensible out-of-the-box. To run Airflow, you'll need a scheduler and webserver, but AWS and GCP both provide managed services for the platform.

Is Apache airflow easy? ›

Easy to Use

Anyone with Python knowledge can deploy a workflow. Apache Airflow does not limit the scope of your pipelines; you can use it to build ML models, transfer data, manage your infrastructure, and more.

How many DAGs can Airflow run at once? ›

concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.

Can Airflow replace Jenkins? ›

Airflow vs Jenkins: Production and Testing

Since Airflow is not a DevOps tool, it does not support non-production tasks. This means that any job you load on Airflow will be processed in real-time. However, Jenkins is more suitable for testing builds. It supports test frameworks like Robot, PyTest, and Selenium.

Do I need to know Python for Airflow? ›

Conclusions. At this point you should be familiar with Python concepts like imports and how to call methods with more or less arguments. You should have also a general idea about the relationship between Airflow DAGs and Tasks, and how to create dependencies between Tasks.

What is an alternative to Apache Airflow? ›

Luigi, Apache NiFi, Jenkins, AWS Step Functions, and Pachyderm are the most popular alternatives and competitors to Airflow.

Why is Airflow so popular? ›

The advantage of using Airflow over other workflow management tools is that Airflow allows you to schedule and monitor workflows, not just author them. This outstanding feature enables enterprises to take their pipelines to the next level.

What is the difference between Airflow and dataflow? ›

Airflow is a platform to programmatically author, schedule, and monitor workflows. Cloud Dataflow is a fully-managed service on Google Cloud that can be used for data processing. You can write your Dataflow code and then use Airflow to schedule and monitor Dataflow job.

Do data scientists use Airflow? ›

Airflow can make life easier for the data science team at every stage of a project, from exploration and development through to deployment, production, and maintenance.

Is Airflow an ETL tool? ›

Airflow isn't an ETL tool per se. But it manages, structures, and organizes ETL pipelines using Directed Acyclic Graphs (DAGs).

How long does it take to learn Apache Airflow? ›

Syllabus: Learning Apache Airflow with Python in easy way in 40 Minutes. How to use Apache Airflow Email Operator in docker compose file Python. Apache Airflow + Flower + Celery Worker with Docker Compose.

How many tasks are in a DAG Airflow? ›

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. It defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others.

How many tasks can run in parallel Airflow? ›

Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.

Where does Airflow look for DAGs? ›

Airflow looks in your DAGS_FOLDER for modules that contain DAG objects in their global namespace and adds the objects it finds in the DagBag .

Is Airflow a DevOps? ›

Airflow provides an orchestration and management framework for integrating data pipelines with DevOps tasks. It supports any type of mainstream environment — containers, public cloud, VMs and so on. You can use Great Expectations and Airflow separately.

Is Airflow a CI CD tool? ›

Since Airflow and all its components are defined in source code, it is a fitting approach to create a robust development and deployment framework with CI/CD tools.

What is the difference between Kafka and Airflow? ›

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design. Airflow belongs to "Workflow Manager" category of the tech stack, while Kafka can be primarily classified under "Message Queue".

How do you practice Airflow? ›

Best Practices for Airflow Users
  1. Upgrade to Airflow's latest version. ...
  2. Get Familiar with The Core Concepts and Components. ...
  3. Keep Your DAGs Light — Avoid Top-level Data Processing Inside a DAG. ...
  4. Process Small Amounts of Data Wisely. ...
  5. Keep Your DAG Work Clean and Consistent. ...
  6. Focus on Idempotency… ...
  7. 7. …
8 Feb 2022

Who uses Airflow? ›

Who uses Apache Airflow?
CompanyWebsiteCompany Size
Live Intent, Incliveintent.com200-500
Dailymotion SAdailymotion.com500-1000
California State University-Stanislauscsustan.edu1000-5000
Lenovo Group Ltdlenovo.com.cn1000-5000
1 more row

Is Airflow free to use? ›

Airflow is free and open source, licensed under Apache License 2.0.

Where is Apache airflow used? ›

What is Airflow Used For? Apache Airflow is used for the scheduling and orchestration of data pipelines or workflows. Orchestration of data pipelines refers to the sequencing, coordination, scheduling, and managing complex data pipelines from diverse sources.

What is Airflow in big data? ›

Airflow enables you to manage your data pipelines by authoring workflows as Directed Acyclic Graphs (DAGs) of tasks. There's no concept of data input or output – just flow. You manage task scheduling as code, and can visualize your data pipelines' dependencies, progress, logs, code, trigger tasks, and success status.

What is spark and Airflow? ›

Furthermore, Apache Airflow is used to schedule and orchestrate data pipelines or workflows. Apache Spark is one of the most sought all-purpose, distributed data-processing engines. It is used on daily basis by many large organizations for use in a wide range of circumstances.

What is Airflow DAG used for? ›

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

When should you not use Airflow? ›

Use cases for which Airflow is a bad option
  1. if you need to share data between tasks.
  2. if you need versioning of your data pipelines → Airflow doesn't support that.
  3. if you would like to parallelize your Python code with Dask — Prefect supports Dask Distributed out of the box.
26 Aug 2020

How does Apache Airflow work? ›

How Apache Airflow Works? It accomplishes the tasks by taking DAG(Directed Acyclic Graphs) as an array of the workers, some of these workers have particularized contingencies. It results in the formation of DAG in Python itself which make these DAGs used easily further for the other processes.

Is Airflow owned by Google? ›

Airflow depends on many micro-services to run, so Cloud Composer provisions Google Cloud components to run your workflows. These components are collectively known as a Cloud Composer environment. Environments are self-contained Airflow deployments based on Google Kubernetes Engine.

Does Google use Airflow? ›

We will discuss how Google's partnership with Apache Airflow has enabled customers to build data pipelines across their cloud environments using Composer. We'll also share our experience on getting started with the Airflow community and how it's structured.

Is AWS glue similar to Airflow? ›

Airflow can be classified as a tool in the "Workflow Manager" category, while AWS Glue is grouped under "Big Data Tools". Some of the features offered by Airflow are: Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation.

Is Airflow good to learn? ›

Airflow is a powerful tool to run ETL pipelines; however, Airflow needs to be extended to run machine learning pipelines. With Flyte, you can version control your code, audit the data, reproduce executions, cache the outputs, and insert checkpoints without dwelling on the scalability of your machine learning pipelines.

Is Airflow better than cron? ›

Using cron to manage networks of jobs will not scale effectively. Airflow offers ability to schedule, monitor, and most importantly, scale, increasingly complex workflows.

When should you not use Airflow? ›

Use cases for which Airflow is a bad option
  1. if you need to share data between tasks.
  2. if you need versioning of your data pipelines → Airflow doesn't support that.
  3. if you would like to parallelize your Python code with Dask — Prefect supports Dask Distributed out of the box.
26 Aug 2020

Is Airflow a good tool? ›

It is one of the most robust platforms used by Data Engineers for orchestrating workflows or pipelines. You can easily visualize your data pipelines' dependencies, progress, logs, code, trigger tasks, and success status. With Airflow, users can author workflows as Directed Acyclic Graphs (DAGs) of tasks.

How long will it take to learn Apache Airflow? ›

Syllabus: Learning Apache Airflow with Python in easy way in 40 Minutes.

Is Airflow an ETL tool? ›

Airflow isn't an ETL tool per se. But it manages, structures, and organizes ETL pipelines using Directed Acyclic Graphs (DAGs).

Does Airflow cost money? ›

Airflow is free and open source, licensed under Apache License 2.0.

Why do you need Airflow? ›

Airflow enables you to manage your data pipelines by authoring workflows as Directed Acyclic Graphs (DAGs) of tasks. There's no concept of data input or output – just flow. You manage task scheduling as code, and can visualize your data pipelines' dependencies, progress, logs, code, trigger tasks, and success status.

Is Airflow like a cron job? ›

With a few simple tasks in a DAG, Airflow is a beautiful cron alternative or even as a replacement for data pipelines otherwise managed by cron. It is time to start using this in everyday big data processing.

Is Airflow similar to Autosys? ›

User Interface: Airflow provides a good graphical interface where you can monitor and admin DAGs. In terms of functionality this looks similar to other scheduling tools ( like Cronacle, Autosys) UI .

Who uses Airflow? ›

Who uses Apache Airflow?
CompanyWebsiteCompany Size
Live Intent, Incliveintent.com200-500
Dailymotion SAdailymotion.com500-1000
California State University-Stanislauscsustan.edu1000-5000
Lenovo Group Ltdlenovo.com.cn1000-5000
1 more row

How can you improve Airflow performance? ›

One can take a different approach by increasing the number of threads available on the machine that runs the scheduler process so that the max_threads parameter can be set to a higher value. With a higher value, the Airflow scheduler will be able to more effectively process the increased number of DAGs.

What is the difference between Kafka and Airflow? ›

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design. Airflow belongs to "Workflow Manager" category of the tech stack, while Kafka can be primarily classified under "Message Queue".

What is Airflow DAG used for? ›

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

What are the advantages of Apache Airflow? ›

The advantage of using Airflow over other workflow management tools is that Airflow allows you to schedule and monitor workflows, not just author them. This outstanding feature enables enterprises to take their pipelines to the next level.

Is Apache Airflow free for commercial use? ›

It is open source and free." "We are using the open-source version of Apache Airflow."

Top Articles

You might also like

Latest Posts

Article information

Author: Gregorio Kreiger

Last Updated: 10/19/2022

Views: 5373

Rating: 4.7 / 5 (57 voted)

Reviews: 88% of readers found this page helpful

Author information

Name: Gregorio Kreiger

Birthday: 1994-12-18

Address: 89212 Tracey Ramp, Sunside, MT 08453-0951

Phone: +9014805370218

Job: Customer Designer

Hobby: Mountain biking, Orienteering, Hiking, Sewing, Backpacking, Mushroom hunting, Backpacking

Introduction: My name is Gregorio Kreiger, I am a tender, brainy, enthusiastic, combative, agreeable, gentle, gentle person who loves writing and wants to share my knowledge and understanding with you.