NexaGrid技术博客

NexaGrid技术博客

一、Airflow 3.0.2 从基础到实战应用(快速上手)

2025-07-01
一、Airflow 3.0.2 从基础到实战应用(快速上手)

本系列文章基于 airflow 3.0.2 版本编写,不同版本的 airflow 在命令和特性上可能会有所出入。

一、安装

在开始使用 Airflow 之前,首先需要完成安装。Airflow 的安装方式有多种,根据不同的使用场景和环境,我们可以选择最适合的安装方法。

1.1 安装方式比较

以下是几种常见的 Airflow 安装方式及其优缺点比较:

安装方式

优点

缺点

适用场景

PyPI 安装

简单直接,适合初学者,易于定制

简单直接,适合初学者,易于定制

简单直接,适合初学者,易于定制

Docker 安装

环境隔离性好,部署一致性高,官方支持

需要了解 Docker 知识

生产环境、多环境协作

Helm Chart 安装

适合 Kubernetes 环境,自动化部署

需要 Kubernetes 集群,学习成本较高

云原生环境,大规模部署

源码安装

高度定制化,可直接修改 Airflow 代码

复杂,需要编译环境,维护成本高

开发贡献、特殊定制需求

1.2 推荐安装步骤(PyPI 方式)

对于初学者和本地开发环境,推荐使用 PyPI 方式安装,步骤如下:

准备 Python 环境:确保系统已安装 Python 3.8 或更高版本。

创建虚拟环境(建议操作):

python -m venv airflow_env

source airflow_env/bin/activate  # Linux/macOS

# Windows系统使用:

# airflow_env\Scripts\activate

安装 Airflow 3.0.2:

pip install 'apache-airflow==3.0.2' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.2/constraints-3.10.txt"

验证安装:

airflow version

预期输出应显示3.0.2版本信息。

二、初始化数据

安装完成后,需要初始化 Airflow 的元数据库,这是 Airflow 存储所有 DAG、任务状态、配置信息等的地方。

2.1 初始化数据库

使用以下命令初始化 Airflow 元数据库:

airflow db init

该命令会自动创建数据库表结构,并初始化默认用户和角色。

2.2 数据库配置

Airflow 默认使用 SQLite 数据库,适合开发和测试环境。对于生产环境,建议使用 PostgreSQL 或 MySQL 等更可靠的数据库。

要配置其他数据库,需修改airflow.cfg文件中的sql_alchemy_conn参数:

[core]
sql_alchemy_conn = postgresql+psycopg2://user:password@localhost:5432/airflow
# 或
sql_alchemy_conn = mysql+pymysql://user:password@localhost:3306/airflow

修改完成后,重新运行airflow db init命令使配置生效。

2.3 管理员用户密码管理

3.x系列移除users 命令,默认使用的SimpleAuth管理器使用json文件管理用户名密码

# 在airflow目录下创建一个passwords.json文件
echo '{"admin": "password"}' > passwords.json
# 使用环境变量告知airflow使用这个文件
AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE = ${AIRFLOW_DIR}/passwords.json

三、启动 Web UI

Airflow 提供了一个强大的 Web UI,用于管理和监控 DAG 的运行状态。

3.1 启动 Web 服务器

使用以下命令启动 Web UI 服务:

airflow webserver --port 8080

默认情况下,Web UI 会在端口 8080 上运行。访问http://localhost:8080即可查看 Airflow 界面。

3.2 启动调度器

除了 Web 服务器,还需要启动调度器(Scheduler)来执行 DAG 中的任务:

airflow scheduler

调度器是 Airflow 的核心组件,负责监控所有 DAG 和任务实例的状态,并根据调度策略触发任务执行。

3.3 启动apiserver

3.x系列webserver改动成api-server 命令

airflow api-server

3.4 启动celery worker

airflow celery worker

四、编写第一个 Dag

现在,我们来编写一个简单的 DAG(Directed Acyclic Graph),这是 Airflow 中定义工作流的基本单元。

4.1 DAG 文件结构

DAG 文件通常放在dags目录下,默认路径为~/airflow/dags/。Airflow 会自动扫描该目录下的所有 Python 文件以发现 DAG。

创建一个名为first_dag.py的文件,并添加以下内容:

from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator # airflow 3.x系列引用更规范


# 定义DAG

with DAG(
    dag_id='first_dag',
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:
    # 定义任务
    def hello_world():
        print("Hello, World!")

    task1 = PythonOperator(
        task_id='hello_task',
        python_callable=hello_world
    )

    # 设置任务依赖关系(这里只有一个任务,所以不需要)

    task1

4.2 DAG 参数说明

上述示例中使用的关键参数:

  • dag_id

DAG 的唯一标识符,必须在系统中唯一

  • start_date

DAG 的开始日期,任务不会在此时之前运行

  • catchup

是否补全历史任务,设为False可避免启动时运行大量历史任务

4.3 常用任务操作符

Airflow 提供了多种类型的操作符(Operator),用于执行不同类型的任务:

五、测试 Dag 是否正确运行

编写好 DAG 后,需要验证其是否被正确解析和执行。

5.1 检查 DAG 是否被正确加载

使用以下命令检查 DAG 是否被 Airflow 正确加载:

airflow dags list

如果 DAG 被成功加载,你应该在输出中看到first_dag。

5.2 查看 DAG 结构

使用以下命令查看 DAG 的详细信息和任务结构:

airflow dags show first_dag

该命令会显示 DAG 的任务依赖关系图、参数设置等信息。

5.3 测试任务执行

使用以下命令测试特定任务的执行:

airflow tasks test first_dag hello_task 2025-01-01

这里的2025-01-01是执行日期,应与 DAG 的start_date匹配。如果一切正常,你应该在输出中看到Hello, World!。

5.4 检查日志

查看任务执行的日志是排查问题的重要手段:

airflow tasks logs first_dag hello_task 2025-01-01

六、手动触发 Dag

Airflow 允许手动触发 DAG 的执行,这在测试和临时运行任务时非常有用。

6.1 通过 Web UI 触发

访问 Airflow Web UI http://localhost:8080

在左侧导航栏中选择 "DAGs"

找到 "first_dag" 并点击其名称

点击 "Trigger Dag" 按钮

触发后,可以在 "DAG Runs" 选项卡中查看执行状态。

6.2 使用命令行触发

使用以下命令从命令行触发 DAG 执行:

airflow dags trigger first_dag

你还可以传递参数给 DAG:

airflow dags trigger first_dag --conf '{"key": "value"}'

6.3 查看执行结果

触发 DAG 后,可以通过以下方式查看执行结果:

在 Web UI 的 "DAG Runs" 选项卡中查看执行状态

点击具体的 DAG 运行记录,查看任务执行状态

点击任务实例查看详细日志

七、定时触发 Dag

除了手动触发,Airflow 还支持根据时间表自动触发 DAG 执行。

7.1 设置调度

在 DAG 定义中,通过schedule参数设置调度:

with DAG(
    dag_id='first_dag',
    start_date=datetime(2025, 1, 1),
    schedule='0 0 * * *',  # 每天午夜执行
    catchup=False
) as dag:
    # 任务定义

7.2 Cron 表达式语法

schedule支持标准的 Cron 表达式,格式为:

* * * * *

- - - - -

| | | | |

| | | | +----- 星期几 (0-6, 0代表周日)

| | | +------- 月份 (1-12)

| | +--------- 日期 (1-31)

| +----------- 小时 (0-23)

+------------- 分钟 (0-59)

7.3 预定义调度

Airflow 还提供了一些预定义的调度常量:

常量

说明

  • @once:仅执行一次

  • @hourly:每小时执行一次

  • @daily:每天执行一次(相当于0 0 * * *)

  • @weekly:每周日午夜执行一次

  • @monthly:每月 1 日午夜执行一次

  • @yearly:每年 1 月 1 日午夜执行一次

八、结语

通过本文的学习,你已经掌握了 Airflow 的基本使用方法,包括安装、初始化、启动服务、编写 DAG 以及触发执行等操作。这些是使用 Airflow 进行工作流管理的基础。

在后续文章中,我们将深入探讨更多高级主题,包括:

  • 任务依赖管理

  • 参数化 DAG

  • 连接管理

  • 变量和配置

  • 插件开发

  • 监控和报警

  • 生产环境部署最佳实践

Airflow 是一个功能强大且灵活的工作流管理工具,适用于各种规模的数据处理和自动化任务。随着你的深入学习和实践,你将能够充分发挥其潜力,构建复杂而高效的工作流系统。