DAG入门
定义 Pipeline(管道)的例子
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
from __future__ import annotations
# [START tutorial]
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# [END import_module]
# [START instantiate_dag]
with DAG(
"tutorial",
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
# [END default_args]
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
# [END basic_task]
# [START documentation]
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
# [END documentation]
# [START jinja_template]
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
# [END jinja_template]
t1 >> [t2, t3]
# [END tutorial]
DAG 定义文件
导入模块
导入所需的模块
# DAG 用来实例化 DAG 对象,注意仅仅只是定义了一个对象,而不是进行真正的数据处理流程
from airflow import DAG
from airflow.operators.bash import BashOperator
设置默认参数
创建任务的时候可以使用这些默认参数
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
实例化一个 DAG
需要一个 DAG 对象来嵌入任务,下面的代码中,首先定义一个字符串,作为 DAG 的唯一标识,然后传入默认的参数字典(上面定义的),然后定义调度的间隔为 1 天
with DAG(
"tutorial",
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
# [END default_args]
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
任务
实例化 operator 时会生成任务。一个从 operator 实例化的对象也称为构造器(constructor),第一个参数 task_id
作为任务的唯一标识
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
注意如何将各个 Operator 特定的参数 bash_command
以及继承自 BaseOperator 的所有 Operator 的通用参数 retries
传递给Operator 的 constructor
这比将每个参数传递给每个 constructor 要简单
也注意到,t2 继承的通用参数 retries 被我们重载,赋值成 3
任务的前提规则如下:
- 明确传递参数
- 值在 default_args 字典中存在
- operator 的默认值(如果存在),一个任务必须包含或者继承参数 task_id 与 owner ,否则 Airflow 将会抛出异常
Templating with Jinja
Airflow 利用Jinja Templating 的强大功能,为管道作者提供一组内置参数和宏
Airflow 还为管道提供了定义自己的参数、宏和模板的钩子 Hooks
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
templated_command 包含 {%%}
块中的代码逻辑,引用参数如 {{ds}}
,调用 {{macros.ds_add (ds,7)}
} 中的函数,并在 {{params.my_param}}
中引用用户定义的参数
设置依赖关系
有互不依赖的三个任务 t1, t2, t3
接下来有一些定义它们之间依赖关系的方法
t1.set_downstream(t2)
# 这个表示 t2 将依赖于 t1
# 等价于
t2.set_upstream(t1)
# 位移运算符也可以完成 t2 依赖于 t1 的设置
t1 >> t2
# 位移运算符完成t1 依赖于 t2 的设置
t2 << t1
# 使用位移运算符更加简洁地设置多个连锁依赖关系
t1 >> t2 >> t3
# 任务列表也可以被设置成依赖,以下几种表达方式是等效的
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
在执行脚本时,Airflow 会在 DAG 中找到循环或多次引用依赖项时引发异常
测试
运行脚本
首先确认上述的代码已经存入tutorial.py
,文件的位置位于你的airflow.cfg
指定的 dags
文件夹内
在命令行执行:
命令行元数据验证
运行一些命令来进一步验证上一个脚本
# 打印激活的 DAGs 的列表
airflow list_dags
# 打印 dag_id 为 "tutorial" 的任务的列表
airflow list_tasks tutorial
# 打印 tutorial DAG 中任务的层级关系
airflow list_tasks tutorial --tree
测试
通过在特定日期运行实际任务实例来进行测试
在此上下文中指定的日期是execution_date
,它模拟特定日期+时间调度运行任务或 dag:
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
通过运行此命令,了解如何呈现和执行此模板:
airflow test
命令在本地运行任务实例,将其日志输出到stdout(在屏幕上),不依赖于依赖项,并且不向数据库传达状态(运行,成功,失败,...)
backfill
backfill: 在指定的日期范围内运行 DAG 的子部分