Fractal Prologue 徒然なるままに...データと遊ぶ

スプレッドシートから BigQuery へ Digdag を使ったデータ連携

はじめに

今回は,teamaya という個人プロジェクトで進めているデータ連携の話になります.コードは以下のリポジトリに置いてあるので,ご自由に使用下さい!

具体的には,手元にあるスプレッドシートのデータを BigQuery の特定のテーブルに連携するまでの話になります.

普段家計簿のデータをスプレッドシートに手入力で管理してるんですが,そのデータを BigQuery に集めて色々と検証できると良いなーという思いから,データ連携を始めました.加えて,可視化も良くしたい思いもあり,Data Studio でダッシュボードを作ったりもしています.

データ連携をするだけであれば,Embulk を単体実行するのでこと足りますが,今回はサーバーモードで立ち上げた Digdag UI を使ってワークフローを実行しています.スケジュール実行,ワークフロー管理や履歴管理などが UI からだとしやすく,使い心地などを知るためにも使用しました.

また,Docker 環境で実行できるように構成しています.Docker 化することで,簡単に別環境に持っていくことができますし,スクラップ&ビルドがしやすいのもあります.

今回は以下の2種類の方法でデータを連携する方法を紹介します.内容的には既に技術記事に書かれているものが多いと思いますが,今回は Docker コンテナで各タスクが実行できるようにしているので,その辺りを参考頂けたらと思います.

  1. Docker コンテナ内から Embulk を直接実行して,データを転送する方法
  2. Digdag UI からワークフローを実行して,データを転送する方法
    • こちらも裏側では,Embulkが実行されます.

今回のデータ連携フローのアーキテクチャーは以下のような感じです.

データ連携フローのアーキテクチャー

アンドリュー・カーネギーの以下の名言にもあるように,機械学習エコシステムを自分で作っていくために,一歩一歩進めています!

最も高い目標を達成するには、一歩一歩進むしかないという事実を、頭に入れておかなければならない。

BigQuery とスプレッドシートの設定

GCP のアカウント登録方法は割愛しますが,gmail があれば簡単に登録できます.登録が完了したら,適当なプロジェクトを作成して下さい.

Google Sheets API の有効化を行う

「APIとサービス」 → 「ライブラリ」と画面遷移し,検索窓に「Google Sheets API」と入力して検索すると,スプレッドシートの API を有効化できる画面に遷移するので,有効化を行います.ここで有効化しておかないと,この後スプレッドシートを使用したデータ連携が出来ないので注意下さい!

Google Sheets API

サービスアカウントの作成

それが終わったら,サービスアカウントを作成します.「IAM と管理」 → 「サービスアカウント」へアクセスした後,必要な情報を入力し,キーの作成から JSON を選択してキーの作成を行います.そうすると,サービスアカウントの JSON ファイルがダウンロードされるので,これを ~/.gcp 配下に置いておきます.

サービスアカウントのメールアドレスをスプレッドシートに登録

データ連携したいスプレッドシートを開き,右上の共有ボタンから「ユーザーやグループを追加」の枠にダウンロードしたサービスアカウントのメールアドレスをコピー&ペーストして,送信をクリックします.そうすることで,このスプレッドシートのデータを登録したサービスアカウントで転送することができます.

ここで,メールアドレスの許可をしていない場合,Embulk 実行時に以下のエラーが発生します.

Error: (ClientError) forbidden: The caller does not have permission

BigQuery にデータセットを作成

データを格納するために,事前にデータセットを作成しておく必要があるので,データセット ID を適当に決めて,データセットの作成を行っておきます.

1. Embulk を直接実行してデータ転送を行う場合

この方法は,Docker コンテナ内からEmbulkを直接実行して,スプレッドシートのデータをBigQueryのテーブルに転送する方法になります.

Embulk の細かい説明は割愛しますが,簡単に言えば,バルクデータローダーの役割として BigQuery などのデータレイク/データウェアハウスにデータ転送を行うことができます.

データ転送を行うために用意するものとしては,以下になります.

  1. Gemfile
  2. Liquid ファイル
  3. Dockerfile & docker-compose.yml ファイル

Gemfile を用意する

Embulk のプラグインを Gemfile/Gemfile.lock でバージョン管理するために用意します.Embulk には,データの Input/Output のプラグインがあリ,これを使うことで様々なデータソースからターゲットにデータを連携することができます.

source 'https://rubygems.org/'

# No versions are specified for 'embulk' to use the gem embedded in embulk.jar.
# Note that prerelease versions (e.g. "0.9.0.beta") do not match the statement.
# Specify the exact prerelease version (like '= 0.9.0.beta') for prereleases.
gem 'embulk'

# input spreadsheets plugin
gem 'embulk-input-google_spreadsheets'
# ouput bigquery plugin
gem 'embulk-output-bigquery'
gem 'tzinfo-data'

Liquid ファイルを作成する

Embulk の設定ファイルとして Liquid ファイルを作成します.YAML ファイルに設定を記述することもできますが,以下のメリットで Liquid ファイルを使用しています.

  • 変数を設定することができる
  • 同じ設定内容を共通ファイルとして使うことができる
  • etc…

BigQuery とスプレッドシートの情報は,.env ファイルを作成して,環境変数として管理しています.これらの変数を Liquid ファイルで使用しています.

in:
  type: google_spreadsheets
  auth_method: service_account
  {% comment %} GCPのサービスアカウントのJSONファイルパス {% endcomment %}
  json_keyfile: {{ env.GCP_SERVICE_JSON }}
  {% comment %} スプレッドシートのURL {% endcomment %}
  spreadsheets_url: {{ env.SPREADSHEETS_TABLE }}
  default_timezone: 'Asia/Tokyo'
  {% comment %} スプレッドシートのワークシートタイトル {% endcomment %}
  worksheet_title: year_purchase_amount_2019
  {% comment %} headerを指定している場合は2行目からとなる {% endcomment %}
  start_row: 2
  {% comment %} カラム名と型を指定する {% endcomment %}
  columns:
    - {name: id, type: long}
    - {name: date, type: timestamp, format: '%Y/%m/%d', timezone: 'Asia/Tokyo'}
    - {name: category, type: string}
    - {name: purchaser, type: string}
    - {name: purchase_amount, type: long}
    - {name: memo, type: string}

out:
  type: bigquery
  mode: replace
  auth_method: service_account
  json_keyfile: {{ env.GCP_SERVICE_JSON }}
  {% comment %} BigQueryのプロジェクト名 {% endcomment %}
  project: {{ env.BIGQUERY_PROJECT }}
  {% comment %} BigQueryのデータセット名 {% endcomment %}
  dataset: {{ env.BIGQUERY_PURCHASE_AMOUNT_DATASET }}
  {% comment %} BigQueryのテーブル名 {% endcomment %}
  table: daily_purchase_amount
  auto_create_table: true
  source_format: NEWLINE_DELIMITED_JSON
  default_timezone: 'Asia/Tokyo'
  default_timestamp_format: '%Y-%m-%d'
  formatter: {type: jsonl}
  encoders:
    - {type: gzip}
  retries: 3

env ファイルの説明も補足でしておきます.適宜設定している環境に合わせて修正します.

SPREADSHEETS_TABLE=<該当するスプレッドシートのURL: https://docs.google.com/spreadsheets/d/hogehoge>
BIGQUERY_PROJECT=<BigQueryのプロジェクト名>
BIGQUERY_PURCHASE_AMOUNT_DATASET=<BigQueryのデータセット名>
GCP_SERVICE_JSON=/root/.gcp/hoge.json

Dockerfile & docker-compose.yml ファイルを作成する

今回は docker 環境から実行するので,Dockerfile と docker-compose.yml ファイルを作成します(後ほど使う Digdag の内容も記載されています).

FROM openjdk:8-alpine
LABEL MAINTAINER=masatakashiwagi

ENV DIGDAG_VERSION="0.9.42"
ENV EMBULK_VERSION="0.9.23"

RUN apk --update add --virtual build-dependencies \
    curl \
    tzdata \
    coreutils \
    bash \
    && curl --create-dirs -o /bin/digdag -L "https://dl.digdag.io/digdag-${DIGDAG_VERSION}" \
    && curl --create-dirs -o /bin/embulk -L "https://dl.embulk.org/embulk-$EMBULK_VERSION.jar" \
    && chmod +x /bin/digdag \
    && chmod +x /bin/embulk \
    && cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime \
    && apk del build-dependencies --purge

ENV PATH="$PATH:/bin"

# Install libc6-compat for Embulk Plugins to use JNI
# cf: https://github.com/jruby/jruby/wiki/JRuby-on-Alpine-Linux
# https://github.com/classmethod/docker-embulk
RUN apk --update add libc6-compat

# Copy Embulk configuration
COPY ./embulk/task /opt/workflow/embulk/task

# Make bundle
WORKDIR /opt/workflow/embulk
RUN embulk mkbundle bundle

# Copy Gemfile file
# This is the workaround, because jruby directory is not created
COPY ./embulk/bundle/Gemfile /opt/workflow/embulk/bundle
COPY ./embulk/bundle/Gemfile.lock /opt/workflow/embulk/bundle
WORKDIR /opt/workflow/embulk/bundle

# Install Embulk Plugins
RUN embulk bundle

# Set up Digdag Server
COPY ./digdag /opt/workflow/digdag
# ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.9.0/wait /bin/wait
# RUN chmod +x /bin/wait
WORKDIR /opt/workflow

CMD ["tail", "-f", "/dev/null"]

bundle 辺りでまわりくどいやり方をしていますが,RUN embulk bundle を実行した際に上手くインストールできなかったため,ワークアラウンドとして,mkbundle した後に Gemfile/Gemfile.lock を docker コンテナ内に COPY した上で,RUN embulk bundle を行っています.

Embulk を docker コンテナで実行する方法

では,実際に Embulk の実行を行うために,まずは Embulk のコンテナサービスを立ち上げます.

# コンテナの立ち上げ
docker compose up -d embulk

バックグラウンドでコンテナを起動しておきます.その後,dry-run を行うために preview コマンドを実行します.

# dry-run
docker exec embulk sh /bin/embulk preview -b embulk/bundle embulk/task/spreadsheet/export_hab_purchase_amount.yml.liquid

dry-run 実行後の結果を載せておきます.

Embulk dry-run

dry-run が大丈夫だった場合,本番実行を行います.本番は run コマンドを実行します.

# production-run
docker exec embulk sh /bin/embulk run -b embulk/bundle embulk/task/spreadsheet/export_hab_purchase_amount.yml.liquid

本番実行が上手くいくと BigQuery にデータが入っていることを確認できます.

Embulk production-run

2. Digdag UI からワークフローを実行してデータ転送を行う場合

次に説明するこちらの方法は,digdag server を立ち上げて,UI 上からワークフローを実行して,スプレッドシートのデータを BigQuery のテーブルに転送する方法になります(裏で Embulk が動いています).

Digdag の細かい説明は割愛しますが,簡単に言えば,設定ファイルでバッチのワークフロー実行を定義・管理できるワークフローエンジンになります.スケジュール実行や失敗時の通知などを行うことができます.

Digdag のワークフローからデータ転送を行うために用意するものとしては,以下になります.

  1. dig ファイル
  2. serverのproperties ファイル
  3. docker-compose.yml ファイル

上記に加えて,1で説明した Embulk 実行に必要なファイルも用意します.

dig ファイルを用意する

個別のワークフローを単発実行する場合は,digdag run hoge.dig で良いのですが,今回はUIから実行したいので,厳密には dig ファイルの中身が要ります.記述する内容としては,ワークフローで実行していくタスクをコードに落としていきます.

Digdag では,エラーの場合や成功した場合に,どういった処理をするのかを書くことができるので,例えば,それぞれの場合で slack に通知を行えたりもできます(今回は slack 通知は実装できていませんが,今後実装していきたいです!).リトライ回数の設定やスケジュール実行の設定もここでできます.

+task:
    _retry: 1
    sh>: /bin/embulk run -b /opt/workflow/embulk/bundle /opt/workflow/embulk/task/spreadsheet/export_hab_purchase_amount.yml.liquid
    _error:
        echo>: workflow error...
+success:
    echo>: workflow success!

今回は以下のワークフローになっています.

  • task:
    • リトライ回数: 1回
    • Embulk の実行
    • エラーの場合は workflow error... を echo する
  • success:
    • workflow success! を echo する

success は task が正常に終了した場合に,実行されることになります.

server の properties ファイルを用意する

サーバーモードで起動するために,引数にオプションを指定する必要があるのですが,それらの設定を server.properties ファイルに集約しています.設定内容は色々とあるので詳しくは公式ドキュメント: server-mode-commandsを参照下さい.

このファイルには,サーバーの情報やデータベースの情報を記載しています.

server.bind = 0.0.0.0
server.port = 65432
server.admin.bind = 0.0.0.0
server.admin.port = 65433
server.access-log.pattern = json
server.access-log.path = /var/log/digdag/access_logs
log-server.type = local

# database情報
# database.type = memory
database.type = postgresql
database.user = digdag
database.password = digdag
database.host = postgres
database.port = 5432
database.database = digdag
database.maximumPoolSize = 32

サーバーモードで起動すると,ワークフローの情報を保存するために,データベースの設定が必要になってきます.今回は PostgreSQL を別コンテナで立てて,Digdag コンテナと接続することにしています.ちなみに,これらの情報をインメモリで保存することもできます(この場合は,database.type = memory として下さい).

docker-compose.yml ファイルを作成する

色々と試して上手くいかなかったのですが,最終的には以下の内容で落ち着きました.service 共通の定義は x-template にまとめています.service は3つありますが,Digdag を使う場合は digdag と postgres のみを立ち上げて使います.

docker の depends_on は依存関係(起動順序)を指定できますが,DB 起動後のアプリ起動までを制御できるわけではないので,postgres の起動完了前に digdag がアクセスしてしまい起動失敗する事があります.

このため,condition: service_started と設定することで,postgres が起動後に digdag が立ち上がるようにしています.

depends_on:
  postgres:
    condition: service_started

また,tty を true に設定しているのは,コンテナが正常終了して止まらないようにするためになります.

version: '3.8'

# Common definition
x-template: &template
  volumes:
      - ~/.gcp:/root/.gcp:cached
      - /tmp:/tmp
  env_file:
      - .env

services:
  digdag:
    container_name: digdag
    build: .
    tty: true
    ports:
      - 65432:65432
      - 65433:65433
    volumes:
        - /var/run/docker.sock:/var/run/docker.sock
    command: ["java", "-jar", "/bin/digdag", "server", "-c", "digdag/server.properties", "--log", "/var/log/digdag/digdag_server.log", "--task-log", "/var/log/digdag/task_logs"]
    depends_on:
      postgres:
        condition: service_started
    <<: *template

  postgres:
    image: postgres:13.1-alpine
    container_name: postgres
    ports:
      - 5432:5432
    environment:
      POSTGRES_DB: digdag
      POSTGRES_USER: digdag
      POSTGRES_PASSWORD: digdag
    volumes:
      - /tmp/data:/var/lib/postgresql/data
    tty: true
    <<: *template

  embulk:
    container_name: embulk
    build: .
    <<: *template

networks:
  default:
    external:
      name: teamaya

Digdag を docker コンテナで実行する方法

では,Digdag UI を使うために Digdag をサーバーモードで起動します.

# コンテナの立ち上げ
docker compose up -d digdag

バックグラウンドでコンテナを起動しておきます.docker ps コマンドでコンテナが立ち上がっていることを確認したら,http://localhost:65432/ で UI にアクセスします.以下の画面が表示されたら OK です.

Digdag UI

New project から Name を設定したら,Add file をクリックして,dig ファイルのコードをコピー&ペーストします.貼り付けたら,Save で内容を保存します.そしたら,Workflows のタブを選択し,先ほど追加したワークフローが表示されているのでクリックします.右上の Run ボタンを押して実行が完了すると,下図のような結果になります.

Digdag UI

Status が Success になっていれば,正常終了で BigQuery にデータが入っていると思うので,確認してみて下さい.

おわりに

今回は,手元にあるスプレッドシートのデータを Digdag/Embulk を用いて BigQuery に連携するを紹介しました.

やっぱり,UI で直感的に状況や情報がわかるのはメリットだなと思います.複数のワークフローが動作したりする環境だとそれらも管理できるので良いと思います.スケジュール実行やエラーや成功時の通知設定なども出来るので活用したいと思います.

また今回は,データ連携に Digdag を用いましたが,他にも Apache Airflow やそのマネージドサービスである Cloud Composer を使っても同等のことができると思います.この辺りも別途試していきたいと思います.

個人的な次のステップとしては,Dataform や dbt を使った「データの品質管理」に興味があるので,データを Transform したり,テストしたりして考えていきたいです.また,データレイク/データウェアハウス/データマートの設計などを併せて学習していきたいと思います!

これとは別軸で機械学習パイプラインの設計もしていこうかなと考えています!

参考


この記事と関連する記事を読んでみる

このエントリーをはてなブックマークに追加

Support

ブログ記事を読んで頂き,ありがとうございます!もしこの記事が良かったり参考になったら,下記ボタンから☕一杯をサポートして頂けるとモチベーションが上がります!どうぞよろしくお願いします🤩