Airflowの動的なDAGでclearする時の話
ハマったのでメモ。(Airflow 1.10.2/Cloud Composer)
状態
- Airflow Variableにも基づき、Airflowのタスクを動的に作成していた
(Apache Airflow: Create dynamic DAG – Big Data & ETLのようにタスク外の部分でループしている感じ) - DAG Runが実行・修了
- Airflow Variableを修正し、DAGにタスクインスタンスが追加される
(Airflow Variablesの変更によって追加されたTaskはstatusが空欄)
通常はclearでTask Instanceを再実行する事ができますが、今回の場合は対象のTask Instanceをclearしても実行されません。
対応
- 対象のTask Instanceを「Mark Failed」にする
- 対象のTask Instanceをclearする
- DAGがRunningになり、Task Instanceが実行される
増えたTask Instanceが複数ある時、どれか一つで1・2を実行すると、追加されたTask Instanceは全て実行されます。
原因
- AirlfowはDAG Run・Task InstanceをDBのレコードとして持っている
- clearを実行すると、Airflowは、Task Instanceのレコードを探しステータスを変える
- DAGが変化した時、過去のDAG Runに対応するTask Instanceのレコードは追加されない(下図)
(画面からは追加されたように見えますが、見えているだけ)
ので、DAGが変化してTask Instanceが増えた時、増えたTask Instanceは単純にはClear出来ないです。
しかし、前述の対応を取ると、
- Mark Failedすると、対象のTask Instanceのレコードが出来る
(=Clear出来るようになる) - ClearするとDAGがRunningになり、StatusがSuccessでもFailedでもないTask Instanceがスケジュール・実行される
(=増えたTask Instanceが実行される)
となり、増えたTask Instanceが実行されます。