カーキ色はヒンディー語らしい

技術記事は https://zenn.dev/notrogue

Airflowの動的なDAGでclearする時の話

ハマったのでメモ。(Airflow 1.10.2/Cloud Composer)

 

状態

  1. Airflow Variableにも基づき、Airflowのタスクを動的に作成していた
    Apache Airflow: Create dynamic DAG – Big Data & ETLのようにタスク外の部分でループしている感じ)
  2. DAG Runが実行・修了
  3. Airflow Variableを修正し、DAGにタスクインスタンスが追加される
    (Airflow Variablesの変更によって追加されたTaskはstatusが空欄)

通常はclearでTask Instanceを再実行する事ができますが、今回の場合は対象のTask Instanceをclearしても実行されません。

 

 

f:id:toukoudo:20200712075431p:plain

Task Instanceが増えた状態のDAG Run

 

f:id:toukoudo:20200712075528p:plain

増えたTask Instanceをclearした時のエラー

 

対応

  1. 対象のTask Instanceを「Mark Failed」にする
  2. 対象のTask Instanceをclearする
  3. DAGがRunningになり、Task Instanceが実行される

増えたTask Instanceが複数ある時、どれか一つで1・2を実行すると、追加されたTask Instanceは全て実行されます。

 

原因

  1. AirlfowはDAG Run・Task InstanceをDBのレコードとして持っている
  2. clearを実行すると、Airflowは、Task Instanceのレコードを探しステータスを変える
  3. DAGが変化した時、過去のDAG Runに対応するTask Instanceのレコードは追加されない(下図)
    (画面からは追加されたように見えますが、見えているだけ)

ので、DAGが変化してTask Instanceが増えた時、増えたTask Instanceは単純にはClear出来ないです。

 

f:id:toukoudo:20200712081324p:plain

Task Instanceが増えた状態のtask_instanceテーブル

 

しかし、前述の対応を取ると、

  1. Mark Failedすると、対象のTask Instanceのレコードが出来る
    (=Clear出来るようになる)
  2. ClearするとDAGがRunningになり、StatusがSuccessでもFailedでもないTask Instanceがスケジュール・実行される
    (=増えたTask Instanceが実行される)

となり、増えたTask Instanceが実行されます。

 

f:id:toukoudo:20200712081347p:plain

MarkFailed&Clear後のtask_instanceテーブル