python写的调度系统,用python脚本,动态生成dag,跨dag依赖,是一个不错的调度系统,下面介绍一些我使用过程中用到的命令和问题的解决方案。
1.operator
1 | BashOperator |
2.给DAG实例传递参数
执行命令
1 | airflow trigger_dag example_passing_params_via_test_command -c '{"foo":"bar"}' |
代码获取变量:
1 | def my_py_command(ds, **kwargs): |
3.填补数据
1 | #清除dag在这段时间内的状态,清除后airflow会自动启动这些任务,如果dag设置了catchup=True;dependency_on_past=True;那么dag会按照时间顺序一天一天跑任务,这对于修补数据很有用哦 |
4.根据depend_on_past
True or False来判断是否需要依赖start_time前段时间跑的相同的任务情况来运行现在的任务。
5.airflow卡住的问题
连接元数据mysql库:select * from task_instance where state = ‘running’;
6.airflow自带变量:
1 | | Variable | Description | |
7.导入导出airflow变量
1 | airflow variables --import variable.json |
8.Template Not Found
TemplateNotFound: sh /data/airflow_dag/dags_migration/sh/export-variables.sh
这是由于airflow使用了jinja2作为模板引擎导致的一个陷阱,当使用bash命令的时候,尾部必须加一个空格
1 | t2 = BashOperator( |
9. 手动触发dag运行
1 | airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE |
10. 手动触发task运行
1 | airflow run dag_id task_id EXEC_DATE |
11. “Failed to fetch log file from worker”
查看task_instance中hostname字段,存储的均为localhost;
分析:修改/etc/hosts文件,删除127.0.0.1 hostname映射;worker log服务获取到hostname后,映射到ip后得到127.0.0.1,故无法访问到log。
12. airflow中每个task对应的执行priority计算方式
dummy2 = DummyOperator(
task_id=’dummy_’ + src_db,
pool=’db’,
priority_weight=weight,
dag=dag
)
所有后置依赖的priority_weight之和,最后一个任务的priority_weight如果没有自定义,默认为1,这样,在同一个pool中做到了任务优先运行;
书山有路勤为径,学海无涯苦作舟。
欢迎关注微信公众号:【程序员写书】
喜欢宠物的朋友可以关注:【电巴克宠物Pets】
一起学习,一起进步。
