数据血缘

Airflow 可以帮助跟踪数据的来源,以及数据发生了什么变化;这有助于实现审计跟踪和数据治理,还可以调试数据流

Airflow 通过任务的 inlets 和 outlets 跟踪数据

from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

args = {
    'owner': 'airflow' ,
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_lineage', default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))

f_final = File("/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(
    task_id='run_me_first', bash_command='echo 1', dag=dag,
    inlets={"datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)

任务定义了参数inletsoutletsinlets可以是一个数据集列表{"datesets":[dataset1,dataset2]},也可以是指定的上游任务outlets像这样{"task_ids":["task_id1","task_id2"]},或者不想指定直接用{"auto":True}也可以,甚至是前面几种的组合。 outlets 也是一个数据集列表{"datesets":[dataset1,dataset2]}。 在运行任务时,数据集的字段会被模板渲染。

注意: 只要 Operator 支持,它会自动地加上 inlets 和 outlets。

在示例 DAG 任务中, run_me_first是一个 BashOperator,它接收CAT1, CAT2, CAT3作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的execution_date会在任务运行时被渲染成执行时间。

注意: 在底层,Airflow 会在pre_execute方法中准备 lineage 元数据。 当任务运行结束时,会调用post_execute将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用prepare_lineageapply_lineage装饰这些方法。

Airflow 可以将 lineage 元数据发送到 Apache Atlas。 您需要在airflow.cfg中配置atlas

[lineage]
backend = airflow.lineage.backend.atlas

[atlas]
username = my_username
password = my_password
host = host
port = 21000