カーキ色はヒンディー語らしい

技術記事は https://zenn.dev/notrogue

今日のPodcast Software Engineering DailyでRedwoodJSの話

softwareengineeringdaily.com

redwoodjs.com

 

Software Engineering Dailyで、RedwoodJSというフレームワークの話。

ゲストはTom Preston-Wernerさんで、Githubの創業者&CEO&RedwoodJSの生みの親の人。

 

  • GraphQL
  • React
  • AWS Lambda

あたりを組み合わせたフルスタックなフレームワークで、簡単に開発&デプロイ出来るのが売りらしい。

(部品を組み合わせて開発するのがJS界隈大変だよねという背景)

 

 

 

 

 

 

 

BigQueryのinsertIdの話

tl;dr

  • insertId付けると、Streaming Insertの重複の可能性が下がる
  • 重複排除はベストエフォートだよ
  • 付けないメリットもあるよ

insertIdとは

https://cloud.google.com/bigquery/streaming-data-into-bigquery?hl=ja#dataconsistency

BigQueryでStreaming Insertを行う時、各行にinsertIdと呼ばれるidを設定する事が可能です。 insertIdを設定することで、行が重複して挿入される事を ベストエフォートで回避します。

ただし、

  • 重複を確認する期間を過ぎた 時
    • (最短)一分
  • Googleのインフラで何かが起きた時
    • Google データセンターの接続が予期せず失われる稀なケースにおいては、自動重複排除を実行できない場合があります」
  • (クライアント側の実装の問題で)内容が同じ行に別のinsertIdが入っている時

場合は、行が重複して挿入される可能性があります。

Next generation BigQuery streaming

2020/5月に「Next generation BigQuery streaming」なる機能がGAになりました。 https://cloud.google.com/bigquery/docs/release-notes?hl=ja#May_20_2020

無駄にカッチョイイ名前ですが、実はinsertIdが設定されないStreaming Insertの事です。

わざわざ重複排除(の可能性)を止めて何が嬉しいかというと、

ようなメリットがあります。

なお、Next generation BigQuery streamingはどこでも使えるわけではなく、2020/5月時点では

  • us(マルチリージョン)
  • eu(マルチリージョン)
  • asia-northeast1(リージョン)

だけです。

Prefectをローカルで動かしてみる

not-rogue.hatenablog.com

では、

  • タスクはローカルのDocker agent
  • メタデータとWebUIはPrefectCloud

でしたが、全部をローカルで動かすことも出来ます。

 

準備

https://docs.prefect.io/core/getting_started/installation.html

  • Docker
  • Docker compose
  • Python

あたりをインストールした後

prefect backend server
prefect server start

を実行します。うまくいくと

http://localhost:8080/

でPrefectのWebUIが見られます。

 

Dockerコンテナが色々起動しています

> docker ps                                                      
CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS              PORTS                            NAMES
46b341464ee3        prefecthq/ui:0.10.5            "/intercept.sh"          13 days ago         Up 13 days          80/tcp, 0.0.0.0:8080->8080/tcp   cli_ui_1
bfba03d93db3        prefecthq/apollo:0.10.5        "npm run serve"          13 days ago         Up 13 days          0.0.0.0:4200->4200/tcp           cli_apollo_1
6e90ba52339f        prefecthq/server:0.10.5        "python src/prefect_…"   13 days ago         Up 13 days                                           cli_scheduler_1
13bdd322d7b6        prefecthq/server:0.10.5        "bash -c 'prefect-se…"   13 days ago         Up 13 days          0.0.0.0:4201->4201/tcp           cli_graphql_1
42d010c1bf38        hasura/graphql-engine:v1.1.0   "graphql-engine serve"   13 days ago         Up 13 days          0.0.0.0:3000->3000/tcp           cli_hasura_1
0c4d3a11b8b9        postgres:11                    "docker-entrypoint.s…"   13 days ago         Up 13 days          0.0.0.0:5432->5432/tcp           cli_postgres_1

 

Flowを登録

from prefect import task, Flow

@task
def say_hello():
    print("Hello, world!")

with Flow("Hello") as flow:
    task = say_hello()
    flow.register()
    flow.run_agent

 

> python hello.py
Result Handler check: OK
Flow: http://localhost:8080/flow/9a90a3f4-0221-4a4f-8688-0e1a8ff78b5b

実行

この時点では、Flowが登録されただけで実行はされません。

Flowが実行されるには、

  • FlowRunが作られて、scheduledなステータスになる
  • Flow Runを監視し、実行するagent

が必要です。

 

FlowRunは、Flow画面の右上の「Quick Run」をクリックすると作れます。

agentは何種類かありますがLocalAgentで動かしてみましょう。

 

prefect agent start


 ____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2020-05-19 13:03:38,521] INFO - agent | Starting LocalAgent with labels ['kimuramakotonoMacBook-Pro.local', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage']
[2020-05-19 13:03:38,521] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2020-05-19 13:03:38,521] INFO - agent | Agent connecting to the Prefect API at http://localhost:4200
[2020-05-19 13:03:38,635] INFO - agent | Waiting for flow runs...
[2020-05-19 13:03:38,950] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-05-19 13:03:39,000] INFO - agent | Deploying flow run 705d62e1-c436-4ed6-a90b-47f2679a5567

もう一度WebUIを見ると、Flow Runが作られ、Successになっていることが確認出来るはずです。

 

 

 

Prefect見てみる コンポーネント編

not-rogue.hatenablog.comの続きで、Prefectに関連する概念のメモです。

 

Hybrid Model

www.prefect.io

medium.com

Prefectでは「Hybrid Model」を採用しています。これは、

が担当するという方針です。センシティブなデータやコードを預けなくてすみ、また、好きなインフラを使えるメリットがあります。

 

AirflowのSaasSであるCloud Composer Astronomerは、

  • 処理を行う場所(Worker)
  • 処理する場所に配置するやつ(Executor)
  • DAGファイルの置き場
  • DAG/タスクの状態を制御するやつ(Scheduler)
  • メタデータのデータベース
  • 管理インターフェイス(Webserver、API)

を全てSaaSとして提供しますが、Prefect Cloudでは、

はPrefect Cloudが提供しますが、

  • 処理を行う場所(Environment/Executor)
  • 処理する場所に配置するやつ(Agent)
  • Flowファイルの置き場(Storage)

に関しては、処理するための仕組みはPrefectが提供しますが、実際のインフラはユーザーが(AWSGCPなどに)用意します。

 

例えば、本番用にGKEのクラスタを使うなら、

  • Environment/Executor: DaskKubernetesEnvironment/RemoteDaskExecutor
  • Storage: Flowコードが入ったコンテナイメージ
  • Agent: GKE上でKubernates Agent

開発用にローカルで動かすなら、

  • Environment: 指定しない(デフォルトのRemoteEnvironmentになる)
  • Storage: ローカル
  • Agent: Local

などの設定を行います。環境の例は、Configure Your Environment | Prefect Docsがわかりやすいです。

 

Storage

docs.prefect.io

PrefectはHybrid Modelを採用しているので、Flowのコードは

  • ローカルファイル
  • GCS
  • S3
  • Azure Blob
  • Dockerイメージ

のいずれかに配置します。その設定がStorageです。

 

Agent

docs.prefect.io

flowの状態を監視して、処理するための準備をするのがAgentです。

  • Storageからコードの読み込み
  • flowコードで指定されている、環境変数の設定
  • Environmentの起動

などの処理を行っており、

  • ローカル
  • Docker
  • Kubernates
  • AWS Fargate

で動くAgentが用意されています。

 

 

Environment

docs.prefect.ioflowを処理する環境を定義するのが、Environmentです。

  • LocalEnvironment
  • RemoteEnvironment
  • RemoteDaskEnvironment
  • DaskKubernetesEnvironment(k8s上にDaskクラスターを作成し、そのジョブとして処理)
  • KubernetesJobEnvironment
  • FargateTaskEnvironment

のEnvironmentが用意されています。

なお、指定しない場合はRemoteEnvironment(+LocalExcutor)になります。

 

Executor

docs.prefect.io

Environmentに似た概念にExecutorというのもあります。こちらは、Environmentの上で、「function」(タスクに限らない?)の実行方法を指定するやつで、

  • Local(ローカルのプロセス)
  • Dask
  • Sync/LocalDask(これもDaskで処理)

が用意されています。デフォルトはLocalExecutorですが、DaskExecutorが本番だとオススメらしいです。

 

 

 

 

 

 

Prefect見てみる PrefectCloud触ってみる編その2

not-rogue.hatenablog.comの続きです。

 

前回はPrefect Cloudのアカウントを作ったので、さっそく触ってみます。

 

ダッシュボードの左側、サイドメニューにはチュートリアル(下図)があります。

それをやってみます。

 

f:id:toukoudo:20200505065356p:plain

 

 

1. プロジェクトの選択

Prefectでは、「チーム」の下に「プロジェクト」という概念があります。

flowはプロジェクトにひも付くので、ここでプロジェクトを作成します。

 

2. flowのデプロイ

自動で下のflowのコードがデプロイされます(DockerHubのDockerイメージから)。

 

import prefect
from prefect import task, Flow
from prefect.environments.storage import Docker


@task(name="Welcome", slug="welcome-task")
def welcome_logger():
    logger = prefect.context["logger"]
    with open("/ascii-welcome.txt", "r") as f:
        lines = "\n\n" + "".join(f.readlines()) + "\n\n"

    logger.info(lines)

storage = Docker(
    registry_url="prefecthq",
    image_name="flows",
    image_tag="welcome-flow",
    files={"welcome.txt": "/ascii-welcome.txt"},
)
f = Flow("Welcome Flow", tasks=[welcome_logger], storage=storage)
      

 

3. ローカル環境の設定

  • Python3.6以上
  • Dockerデーモン

が必要です。

その後

pip install prefect -U

を実行します。

 

4. Prefect CLIでログイン

以下の手順でログインします。

  1. トークン名を決めて、「Create token」(名前は何でも良い?)
  2. ローカルでprefect auth   login -t 表示されるトーク
  3. Login successful

 

5. ローカルでDocker agent動かす

PrefectではAgentでflowのスケジュール状態を監視し、適当なタスクをExecution Environmentで実行します。

Agentには、

  • local
  • Docker
  • Kubernates
  • Fargate

の四種類があります。

 

チュートリアルではDocker Agent | Prefect Docsを使用するので、以下の手順で設定します。

  1. (起動していなければ)Docker Daemonの起動
  2. トークン名を決めて、「Create token」
  3. prefect agent start docker -t 表示されるトーク

なお、Docker agentとPrefect CLIのログインの二箇所でトークンを作りましたが、それらは別物です(Personal TokenとAPI Token)。

 

成功すると、Prefect CloudのWebUIにagent動いているよ的な表示が更新されます(下図の右下)。

f:id:toukoudo:20200505072619p:plain

 

6.  デモのFlowを動かす

チュートリアルページに「Run Demo Flow」ボタンが表示されるので、ポチっとしましょう。

 

すると、起動したDocker agentで、Docker imageのpullが行われ、Prefect WebUIでFlowの情報が表示されます。

 

 

ダッシュボード画面

f:id:toukoudo:20200505073933p:plain

 

Flowの詳細画面(ダッシュボードのActivityをクリック)

f:id:toukoudo:20200505074005p:plain

 

Flowのログ(Flow の詳細画面の「LOGS」)。flowコードのタスク(WELOCOME TO PREFECT CLOUDのアスキーアートを表示)が実行されていることがわかります。

f:id:toukoudo:20200505074111p:plain

 

Prefect見てみる PrefectCloud触ってみる編その1

 

Prefect Cloudとは

Prefectのクラウドサービスで、スケジューラーやワーカー、データベース一式を用意してくれます。

 

料金

www.prefect.io

実行出来るFlow、ユーザー数、履歴の保存期間が違う3つの料金プランがあります。

  • Scheduler(無料)
  • Team ($550)
  • Enterprise(要相談)

 

登録

無料のSchedulerプランだと、クレジットカードの登録も無く、二、三分あれば登録出来ます。

 

 

 

f:id:toukoudo:20200505062757p:plain

 

Pricing - Prefectの「GET STARTED」をクリックすると、下のログイン画面に飛びます。

 

f:id:toukoudo:20200505063008p:plain

Google/GitHub/自前のいずれかで認証すると、

  • チーム名(後で変えられます)の設定
  • 料金プランの選択
  • 利用規約の確認

の後、アカウント・チームが作られ、下のようなダッシュボードが表示されます。

 

 

f:id:toukoudo:20200505063640p:plain

 

 

 

 

Prefect見てみる 動的なDAG編

not-rogue.hatenablog.comの続き。

 

PrefectやAirflowで言及されている動的なDAGとは

動的なDAG(Dynamic DAG)は、タスクを、DAGの実行時に決まる回数や、引数で繰り返すDAGです。

 例えば、

  • マルチテナントなシステムで、各テナント毎に処理をしたい
  • 似たようなテーブルが複数あって、それぞれに処理を行いたい

ような場合に使いたくなります。

 

Airflowでの動的なDAG

AIrflowで動的なDAGな対応するには、

などの方法があります。

ただし、

  • リトライの単位が大きくなってしまう
  • タスク固有の制御(リトライやプール)が効かない
  • DAGの構造が変わると過去のDAG Runが不確実になる

などの欠点があり、無理ではないがちょっと考えたくなる感じです。

 

Prefect

Prefectでは、

の2つの方法で動的なDAGを、最初からサポートしています。

 

マップの場合、タスクにリスト(Pythonの普通のやつ)を渡す、もしくは返すことで、実行時に繰り返し数が決まるDAGを定義出来ます。

 

シグナルは、タスク中でLOOPという例外をraiseする事で、同じタスクを(ただし違う引数で)繰り返すDAGを定義出来ます。

 

気になること

そのうち調べます。

  •  マップとシグナルの使い分けは?
  • 複数あるタスクが失敗した時の扱いは?
  • DAG 実行履歴の扱いは?過去のタスクインスタンスの繰り返し回数や、引数を確認出来る?