一、Airflow 3.0.2 从基础到实战应用(快速上手)
编辑本系列文章基于 airflow 3.0.2 版本编写,不同版本的 airflow 在命令和特性上可能会有所出入。
一、安装
在开始使用 Airflow 之前,首先需要完成安装。Airflow 的安装方式有多种,根据不同的使用场景和环境,我们可以选择最适合的安装方法。
1.1 安装方式比较
以下是几种常见的 Airflow 安装方式及其优缺点比较:
1.2 推荐安装步骤(PyPI 方式)
对于初学者和本地开发环境,推荐使用 PyPI 方式安装,步骤如下:
准备 Python 环境:确保系统已安装 Python 3.8 或更高版本。
创建虚拟环境(建议操作):
python -m venv airflow_env
source airflow_env/bin/activate # Linux/macOS
# Windows系统使用:
# airflow_env\Scripts\activate
安装 Airflow 3.0.2:
pip install 'apache-airflow==3.0.2' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.2/constraints-3.10.txt"
验证安装:
airflow version
预期输出应显示3.0.2版本信息。
二、初始化数据
安装完成后,需要初始化 Airflow 的元数据库,这是 Airflow 存储所有 DAG、任务状态、配置信息等的地方。
2.1 初始化数据库
使用以下命令初始化 Airflow 元数据库:
airflow db init
该命令会自动创建数据库表结构,并初始化默认用户和角色。
2.2 数据库配置
Airflow 默认使用 SQLite 数据库,适合开发和测试环境。对于生产环境,建议使用 PostgreSQL 或 MySQL 等更可靠的数据库。
要配置其他数据库,需修改airflow.cfg文件中的sql_alchemy_conn参数:
[core]
sql_alchemy_conn = postgresql+psycopg2://user:password@localhost:5432/airflow
# 或
sql_alchemy_conn = mysql+pymysql://user:password@localhost:3306/airflow
修改完成后,重新运行airflow db init命令使配置生效。
2.3 管理员用户密码管理
3.x系列移除users
命令,默认使用的SimpleAuth管理器使用json文件管理用户名密码
# 在airflow目录下创建一个passwords.json文件
echo '{"admin": "password"}' > passwords.json
# 使用环境变量告知airflow使用这个文件
AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_PASSWORDS_FILE = ${AIRFLOW_DIR}/passwords.json
三、启动 Web UI
Airflow 提供了一个强大的 Web UI,用于管理和监控 DAG 的运行状态。
3.1 启动 Web 服务器
使用以下命令启动 Web UI 服务:
airflow webserver --port 8080
默认情况下,Web UI 会在端口 8080 上运行。访问http://localhost:8080即可查看 Airflow 界面。
3.2 启动调度器
除了 Web 服务器,还需要启动调度器(Scheduler)来执行 DAG 中的任务:
airflow scheduler
调度器是 Airflow 的核心组件,负责监控所有 DAG 和任务实例的状态,并根据调度策略触发任务执行。
3.3 启动apiserver
3.x系列webserver
改动成api-server
命令
airflow api-server
3.4 启动celery worker
airflow celery worker
四、编写第一个 Dag
现在,我们来编写一个简单的 DAG(Directed Acyclic Graph),这是 Airflow 中定义工作流的基本单元。
4.1 DAG 文件结构
DAG 文件通常放在dags目录下,默认路径为~/airflow/dags/。Airflow 会自动扫描该目录下的所有 Python 文件以发现 DAG。
创建一个名为first_dag.py的文件,并添加以下内容:
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator # airflow 3.x系列引用更规范
# 定义DAG
with DAG(
dag_id='first_dag',
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
# 定义任务
def hello_world():
print("Hello, World!")
task1 = PythonOperator(
task_id='hello_task',
python_callable=hello_world
)
# 设置任务依赖关系(这里只有一个任务,所以不需要)
task1
4.2 DAG 参数说明
上述示例中使用的关键参数:
dag_id
DAG 的唯一标识符,必须在系统中唯一
start_date
DAG 的开始日期,任务不会在此时之前运行
catchup
是否补全历史任务,设为False可避免启动时运行大量历史任务
4.3 常用任务操作符
Airflow 提供了多种类型的操作符(Operator),用于执行不同类型的任务:
五、测试 Dag 是否正确运行
编写好 DAG 后,需要验证其是否被正确解析和执行。
5.1 检查 DAG 是否被正确加载
使用以下命令检查 DAG 是否被 Airflow 正确加载:
airflow dags list
如果 DAG 被成功加载,你应该在输出中看到first_dag。
5.2 查看 DAG 结构
使用以下命令查看 DAG 的详细信息和任务结构:
airflow dags show first_dag
该命令会显示 DAG 的任务依赖关系图、参数设置等信息。
5.3 测试任务执行
使用以下命令测试特定任务的执行:
airflow tasks test first_dag hello_task 2025-01-01
这里的2025-01-01是执行日期,应与 DAG 的start_date匹配。如果一切正常,你应该在输出中看到Hello, World!。
5.4 检查日志
查看任务执行的日志是排查问题的重要手段:
airflow tasks logs first_dag hello_task 2025-01-01
六、手动触发 Dag
Airflow 允许手动触发 DAG 的执行,这在测试和临时运行任务时非常有用。
6.1 通过 Web UI 触发
访问 Airflow Web UI http://localhost:8080
在左侧导航栏中选择 "DAGs"
找到 "first_dag" 并点击其名称
点击 "Trigger Dag" 按钮
触发后,可以在 "DAG Runs" 选项卡中查看执行状态。
6.2 使用命令行触发
使用以下命令从命令行触发 DAG 执行:
airflow dags trigger first_dag
你还可以传递参数给 DAG:
airflow dags trigger first_dag --conf '{"key": "value"}'
6.3 查看执行结果
触发 DAG 后,可以通过以下方式查看执行结果:
在 Web UI 的 "DAG Runs" 选项卡中查看执行状态
点击具体的 DAG 运行记录,查看任务执行状态
点击任务实例查看详细日志
七、定时触发 Dag
除了手动触发,Airflow 还支持根据时间表自动触发 DAG 执行。
7.1 设置调度
在 DAG 定义中,通过schedule参数设置调度:
with DAG(
dag_id='first_dag',
start_date=datetime(2025, 1, 1),
schedule='0 0 * * *', # 每天午夜执行
catchup=False
) as dag:
# 任务定义
7.2 Cron 表达式语法
schedule支持标准的 Cron 表达式,格式为:
* * * * *
- - - - -
| | | | |
| | | | +----- 星期几 (0-6, 0代表周日)
| | | +------- 月份 (1-12)
| | +--------- 日期 (1-31)
| +----------- 小时 (0-23)
+------------- 分钟 (0-59)
7.3 预定义调度
Airflow 还提供了一些预定义的调度常量:
常量
说明
@once:仅执行一次
@hourly:每小时执行一次
@daily:每天执行一次(相当于0 0 * * *)
@weekly:每周日午夜执行一次
@monthly:每月 1 日午夜执行一次
@yearly:每年 1 月 1 日午夜执行一次
八、结语
通过本文的学习,你已经掌握了 Airflow 的基本使用方法,包括安装、初始化、启动服务、编写 DAG 以及触发执行等操作。这些是使用 Airflow 进行工作流管理的基础。
在后续文章中,我们将深入探讨更多高级主题,包括:
任务依赖管理
参数化 DAG
连接管理
变量和配置
插件开发
监控和报警
生产环境部署最佳实践
Airflow 是一个功能强大且灵活的工作流管理工具,适用于各种规模的数据处理和自动化任务。随着你的深入学习和实践,你将能够充分发挥其潜力,构建复杂而高效的工作流系统。
- 0
- 0
-
分享