Airflow基本概念
Airflow中的相关概念如下: - Operators:Airflow定义的一系列算子/操作符,更直接的理解就是python class。不同的Operator类实现了具体的功能,比如: - BashOperator:可以执行用户指定的一个Bash命令 - PythonOperator:可以执行用户指定的一个python函数 - EmailOperator:可以进行邮件发送 - Sensor:感知器/触发器,可以定义触发条件和动作,在条件满足时执行某个动作。Airflow提供了更具体的Sensor,比如FileSensor,DatabaseSensor等 - DAG(Directed Acyclic Graph): 字面意有向无环图。是执行任务流的图,在此集合中可以定义任务的依赖关系,另外这个DAG是由python实现,存放在$AIRFLOW_HOME路径下的dags文件夹下,可以看成是一个对象,在使用时需要进行实例化。DAG中包含task。 - task:任务,Operators的具体实例。
使用步骤
- 根据实际需要,使用不同的Operator
- 传入具体的参数,定义一系列的Tasks
- 定义Tasks间的关系,形成一个DAG
- 调度DAG运行,每个Task会行成一个Instance
- 使用命令行或者Web UI进行查看和管理
DAG代码示例
下面是一个官方的DAG的python文件示例,为了方便理解,笔者加入了中文注释: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# 以下为DAG的默认参数,这些参数会传给每个operator
default_args = {
    'owner': 'airflow', # 任务的owner,建议用unix user的用户名
    'depends_on_past': False, # 当设置为True时,任务实例将按顺序运行,同时依赖于前一个任务的调度成功
    'start_date': datetime(2019, 4, 1), #
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1, # 在任务报失败前应执行的重试次数
    'retry_delay': timedelta(minutes=5), # 重试间的延时
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
# 传参建立dag,其中'luke_airflow'即为dag_id,是dag的唯一标识,schedule_interval为执行频率
dag = DAG('luke_airflow', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date', # task_id,任务的唯一标识
    bash_command='date', # 执行的bash命令
    dag=dag)
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""
t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
t2.set_upstream(t1) # 设置t1为t2的前置任务,参数中可以为task的列表。
t3.set_upstream(t1) # 设置t1为t3的前置任务
上面这个文件实质是一个配置文件(像未实例化的对象),这个脚本不能用于不同文件之间通信,如果要交叉通信,需要用Xcom
另外在task中的传参有如下优先级: 1. BashOperator指定的参数; 2. 如果没有,则用传入的default_args; 3. 如果依然没有,则用Operator的默认参数。
测试
解析脚本
把上述的文件放到和airflow.cfg同目录下的dags文件夹下,执行。 1
python3 luke_airflow.py
确保没有报错。
验证脚本
让我们运行一些命令来进一步验证这个脚本。 1
2
3
4
5
6
7
8# 打印所有激活的dag列表,可以看到luke_airflow的dag在其中
airflow list_dags
# 打印指定id的dag中任务,这里为"luke_airflow",可以看到dag中的任务
airflow list_tasks luke_airflow
# 打印dag中任务树,可以看到dag中任务层级图。
airflow list_tasks luke_airflow --tree
执行测试
| 1 | airflow test luke_airflow templated 2019-03-03 | 
后面跟的日期为模拟执行日期,可以看到执行结果。
其他
以上一些操作也可在建立的airflow 网站上用webUI进行操作。
