今日のPodcast Software Engineering DailyでRedwoodJSの話
BigQueryのinsertIdの話
tl;dr
- insertId付けると、Streaming Insertの重複の可能性が下がる
- 重複排除はベストエフォートだよ
- 付けないメリットもあるよ
insertIdとは
https://cloud.google.com/bigquery/streaming-data-into-bigquery?hl=ja#dataconsistency
BigQueryでStreaming Insertを行う時、各行にinsertIdと呼ばれるidを設定する事が可能です。 insertIdを設定することで、行が重複して挿入される事を ベストエフォートで回避します。
ただし、
- 重複を確認する期間を過ぎた 時
- (最短)一分
- Googleのインフラで何かが起きた時
- 「Google データセンターの接続が予期せず失われる稀なケースにおいては、自動重複排除を実行できない場合があります」
- (クライアント側の実装の問題で)内容が同じ行に別のinsertIdが入っている時
場合は、行が重複して挿入される可能性があります。
Next generation BigQuery streaming
2020/5月に「Next generation BigQuery streaming」なる機能がGAになりました。 https://cloud.google.com/bigquery/docs/release-notes?hl=ja#May_20_2020
無駄にカッチョイイ名前ですが、実はinsertIdが設定されないStreaming Insertの事です。
わざわざ重複排除(の可能性)を止めて何が嬉しいかというと、
- クオータが優しくなる
- 例えば「秒あたりの最大バイト数」は 100MB/秒から 1GB/秒 になる
- 早くなる(らしい)
- Apache Beamのチケットで、Googleの人(たぶん)が「If user choose to opt out of using insert ids, they could potentially to be opt into using our current new streaming backend which gives higher speed」って書いてます
- BigQuery側のドキュメントは不明
ようなメリットがあります。
なお、Next generation BigQuery streamingはどこでも使えるわけではなく、2020/5月時点では
- us(マルチリージョン)
- eu(マルチリージョン)
- asia-northeast1(リージョン)
だけです。
Prefectをローカルで動かしてみる
では、
- タスクはローカルのDocker agent
- メタデータとWebUIはPrefectCloud
でしたが、全部をローカルで動かすことも出来ます。
準備
https://docs.prefect.io/core/getting_started/installation.html
- Docker
- Docker compose
- Python
あたりをインストールした後
を実行します。うまくいくと
でPrefectのWebUIが見られます。
Dockerコンテナが色々起動しています
Flowを登録
リンクを開くとFlowが登録されていることを確認出来ます。
実行
この時点では、Flowが登録されただけで実行はされません。
Flowが実行されるには、
- FlowRunが作られて、scheduledなステータスになる
- Flow Runを監視し、実行するagent
が必要です。
FlowRunは、Flow画面の右上の「Quick Run」をクリックすると作れます。
agentは何種類かありますがLocalAgentで動かしてみましょう。
Prefect見てみる コンポーネント編
not-rogue.hatenablog.comの続きで、Prefectに関連する概念のメモです。
Hybrid Model
Prefectでは「Hybrid Model」を採用しています。これは、
- 処理コードとデータはユーザー
- オーケストレーション(状態やメタデータの管理)はPrefect
が担当するという方針です。センシティブなデータやコードを預けなくてすみ、また、好きなインフラを使えるメリットがあります。
AirflowのSaasSであるCloud Composer やAstronomerは、
- 処理を行う場所(Worker)
- 処理する場所に配置するやつ(Executor)
- DAGファイルの置き場
- DAG/タスクの状態を制御するやつ(Scheduler)
- メタデータのデータベース
- 管理インターフェイス(Webserver、API)
を全てSaaSとして提供しますが、Prefect Cloudでは、
はPrefect Cloudが提供しますが、
- 処理を行う場所(Environment/Executor)
- 処理する場所に配置するやつ(Agent)
- Flowファイルの置き場(Storage)
に関しては、処理するための仕組みはPrefectが提供しますが、実際のインフラはユーザーが(AWSやGCPなどに)用意します。
例えば、本番用にGKEのクラスタを使うなら、
- Environment/Executor: DaskKubernetesEnvironment/RemoteDaskExecutor
- Storage: Flowコードが入ったコンテナイメージ
- Agent: GKE上でKubernates Agent
開発用にローカルで動かすなら、
- Environment: 指定しない(デフォルトのRemoteEnvironmentになる)
- Storage: ローカル
- Agent: Local
などの設定を行います。環境の例は、Configure Your Environment | Prefect Docsがわかりやすいです。
Storage
PrefectはHybrid Modelを採用しているので、Flowのコードは
- ローカルファイル
- GCS
- S3
- Azure Blob
- Dockerイメージ
のいずれかに配置します。その設定がStorageです。
Agent
flowの状態を監視して、処理するための準備をするのがAgentです。
- Storageからコードの読み込み
- flowコードで指定されている、環境変数の設定
- Environmentの起動
などの処理を行っており、
- ローカル
- Docker
- Kubernates
- AWS Fargate
で動くAgentが用意されています。
Environment
docs.prefect.ioflowを処理する環境を定義するのが、Environmentです。
- LocalEnvironment
- RemoteEnvironment
- RemoteDaskEnvironment
- DaskKubernetesEnvironment(k8s上にDaskクラスターを作成し、そのジョブとして処理)
- KubernetesJobEnvironment
- FargateTaskEnvironment
のEnvironmentが用意されています。
なお、指定しない場合はRemoteEnvironment(+LocalExcutor)になります。
Executor
Environmentに似た概念にExecutorというのもあります。こちらは、Environmentの上で、「function」(タスクに限らない?)の実行方法を指定するやつで、
- Local(ローカルのプロセス)
- Dask
- Sync/LocalDask(これもDaskで処理)
が用意されています。デフォルトはLocalExecutorですが、DaskExecutorが本番だとオススメらしいです。
Prefect見てみる PrefectCloud触ってみる編その2
not-rogue.hatenablog.comの続きです。
前回はPrefect Cloudのアカウントを作ったので、さっそく触ってみます。
ダッシュボードの左側、サイドメニューにはチュートリアル(下図)があります。
それをやってみます。
1. プロジェクトの選択
Prefectでは、「チーム」の下に「プロジェクト」という概念があります。
flowはプロジェクトにひも付くので、ここでプロジェクトを作成します。
2. flowのデプロイ
自動で下のflowのコードがデプロイされます(DockerHubのDockerイメージから)。
import prefect from prefect import task, Flow from prefect.environments.storage import Docker @task(name="Welcome", slug="welcome-task") def welcome_logger(): logger = prefect.context["logger"] with open("/ascii-welcome.txt", "r") as f: lines = "\n\n" + "".join(f.readlines()) + "\n\n" logger.info(lines) storage = Docker( registry_url="prefecthq", image_name="flows", image_tag="welcome-flow", files={"welcome.txt": "/ascii-welcome.txt"}, ) f = Flow("Welcome Flow", tasks=[welcome_logger], storage=storage)
3. ローカル環境の設定
- Python3.6以上
- Dockerデーモン
が必要です。
その後
pip install prefect -U
を実行します。
4. Prefect CLIでログイン
以下の手順でログインします。
5. ローカルでDocker agent動かす
PrefectではAgentでflowのスケジュール状態を監視し、適当なタスクをExecution Environmentで実行します。
Agentには、
- local
- Docker
- Kubernates
- Fargate
の四種類があります。
チュートリアルではDocker Agent | Prefect Docsを使用するので、以下の手順で設定します。
なお、Docker agentとPrefect CLIのログインの二箇所でトークンを作りましたが、それらは別物です(Personal TokenとAPI Token)。
成功すると、Prefect CloudのWebUIにagent動いているよ的な表示が更新されます(下図の右下)。
6. デモのFlowを動かす
チュートリアルページに「Run Demo Flow」ボタンが表示されるので、ポチっとしましょう。
すると、起動したDocker agentで、Docker imageのpullが行われ、Prefect WebUIでFlowの情報が表示されます。
ダッシュボード画面
Flowの詳細画面(ダッシュボードのActivityをクリック)
Flowのログ(Flow の詳細画面の「LOGS」)。flowコードのタスク(WELOCOME TO PREFECT CLOUDのアスキーアートを表示)が実行されていることがわかります。
Prefect見てみる PrefectCloud触ってみる編その1
Prefect Cloudとは
Prefectのクラウドサービスで、スケジューラーやワーカー、データベース一式を用意してくれます。
料金
実行出来るFlow、ユーザー数、履歴の保存期間が違う3つの料金プランがあります。
- Scheduler(無料)
- Team ($550)
- Enterprise(要相談)
登録
無料のSchedulerプランだと、クレジットカードの登録も無く、二、三分あれば登録出来ます。
Pricing - Prefectの「GET STARTED」をクリックすると、下のログイン画面に飛びます。
- チーム名(後で変えられます)の設定
- 料金プランの選択
- 利用規約の確認
の後、アカウント・チームが作られ、下のようなダッシュボードが表示されます。
Prefect見てみる 動的なDAG編
PrefectやAirflowで言及されている動的なDAGとは
動的なDAG(Dynamic DAG)は、タスクを、DAGの実行時に決まる回数や、引数で繰り返すDAGです。
例えば、
- マルチテナントなシステムで、各テナント毎に処理をしたい
- 似たようなテーブルが複数あって、それぞれに処理を行いたい
ような場合に使いたくなります。
Airflowでの動的なDAG
AIrflowで動的なDAGな対応するには、
- タスクの中でループする
- タスクの外側(DAG定義の部分)でループ(例:Apache Airflow: Create dynamic DAG – Big Data & ETL)
- DAGを動的にDAGから作る(https://www.astronomer.io/guides/dynamically-generating-dags/)
などの方法があります。
ただし、
- リトライの単位が大きくなってしまう
- タスク固有の制御(リトライやプール)が効かない
- DAGの構造が変わると過去のDAG Runが不確実になる
などの欠点があり、無理ではないがちょっと考えたくなる感じです。
Prefect
Prefectでは、
の2つの方法で動的なDAGを、最初からサポートしています。
マップの場合、タスクにリスト(Pythonの普通のやつ)を渡す、もしくは返すことで、実行時に繰り返し数が決まるDAGを定義出来ます。
シグナルは、タスク中でLOOPという例外をraiseする事で、同じタスクを(ただし違う引数で)繰り返すDAGを定義出来ます。
気になること
そのうち調べます。
- マップとシグナルの使い分けは?
- 複数あるタスクが失敗した時の扱いは?
- DAG 実行履歴の扱いは?過去のタスクインスタンスの繰り返し回数や、引数を確認出来る?