airflow使用

Airflow基本概念

Airflow中的相关概念如下: - Operators:Airflow定义的一系列算子/操作符,更直接的理解就是python class。不同的Operator类实现了具体的功能,比如: - BashOperator:可以执行用户指定的一个Bash命令 - PythonOperator:可以执行用户指定的一个python函数 - EmailOperator:可以进行邮件发送 - Sensor:感知器/触发器,可以定义触发条件和动作,在条件满足时执行某个动作。Airflow提供了更具体的Sensor,比如FileSensor,DatabaseSensor等 - DAG(Directed Acyclic Graph): 字面意有向无环图。是执行任务流的图,在此集合中可以定义任务的依赖关系,另外这个DAG是由python实现,存放在$AIRFLOW_HOME路径下的dags文件夹下,可以看成是一个对象,在使用时需要进行实例化。DAG中包含task。 - task:任务,Operators的具体实例。

使用步骤

  1. 根据实际需要,使用不同的Operator
  2. 传入具体的参数,定义一系列的Tasks
  3. 定义Tasks间的关系,形成一个DAG
  4. 调度DAG运行,每个Task会行成一个Instance
  5. 使用命令行或者Web UI进行查看和管理

DAG代码示例

下面是一个官方的DAG的python文件示例,为了方便理解,笔者加入了中文注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# 以下为DAG的默认参数,这些参数会传给每个operator
default_args = {
'owner': 'airflow', # 任务的owner,建议用unix user的用户名
'depends_on_past': False, # 当设置为True时,任务实例将按顺序运行,同时依赖于前一个任务的调度成功
'start_date': datetime(2019, 4, 1), #
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # 在任务报失败前应执行的重试次数
'retry_delay': timedelta(minutes=5), # 重试间的延时
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

# 传参建立dag,其中'luke_airflow'即为dag_id,是dag的唯一标识,schedule_interval为执行频率
dag = DAG('luke_airflow', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date', # task_id,任务的唯一标识
bash_command='date', # 执行的bash命令
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)

t2.set_upstream(t1) # 设置t1为t2的前置任务,参数中可以为task的列表。
t3.set_upstream(t1) # 设置t1为t3的前置任务

上面这个文件实质是一个配置文件(像未实例化的对象),这个脚本不能用于不同文件之间通信,如果要交叉通信,需要用Xcom

另外在task中的传参有如下优先级: 1. BashOperator指定的参数; 2. 如果没有,则用传入的default_args; 3. 如果依然没有,则用Operator的默认参数。

测试

解析脚本

把上述的文件放到和airflow.cfg同目录下的dags文件夹下,执行。

1
python3 luke_airflow.py

确保没有报错。

验证脚本

让我们运行一些命令来进一步验证这个脚本。

1
2
3
4
5
6
7
8
# 打印所有激活的dag列表,可以看到luke_airflow的dag在其中
airflow list_dags

# 打印指定id的dag中任务,这里为"luke_airflow",可以看到dag中的任务
airflow list_tasks luke_airflow

# 打印dag中任务树,可以看到dag中任务层级图。
airflow list_tasks luke_airflow --tree

执行测试

1
airflow test luke_airflow templated  2019-03-03

后面跟的日期为模拟执行日期,可以看到执行结果。

其他

以上一些操作也可在建立的airflow 网站上用webUI进行操作。

参考链接

瑾锋 wechat
心明录公众号