Embulk でらくらく ETL 基盤構築

こんにちは! KEPPLE CREATORS LAB でエンジニアリングマネージャーをしている池浦です。

とあるプロジェクトで複数のデータソースを定期的に同期する必要があり、その際に Embulk を利用してみました。今回は Embulk の使い方について簡単に紹介したいと思います。

ETL と Embulk

ETL とは Extract Transform Load の頭文字をとったもので、特定のデータソースからデータを抽出(Extract)し、抽出したデータを変形(Transform)したあと、別のデータソースに保存(Load)する処理を指します。Every Little Thing ではありません。ちなみに、変形と保存の順序を逆にした ELT という考え方も存在します。
Embulk は様々なデータソースに対応したバルクデータローダーになっています。また、サードパーティプラグイン開発も活発であり、ほとんどのデータソースに対応しています。
Embulk では Input / Filter / Output を定義した YAML ファイルで実行します。

  • Extract ⇔ Input

  • Transform ⇔ Filter

  • Load ⇔ Output

上記のように、 ETL のステップと 1:1 で対応しているイメージです。他にも実行時のスレッド数を定義できますが、その多くは上記の 3 ステップで Embulk の定義ファイルを書くことになると思います。
Embulk は Treasure Data 社によってメンテナンスされる企業によるオープンソースでしたが、昨年からメンテナンスがオープン化され、Treasure Data 社以外のメンバーも積極的に関わるようになりました。これにより、Embulk の機能開発もさることながら、その運営体制にも信頼性が増し、技術選定時の指標である運営体制の安全性がさらに高まったと言えます。

Embulk のセットアップ

Embulk 公式Java 11 / Java 17 のテストを進めていると言われているので、筆者は Java 11 で利用することにしてみました。基本は Embulk 公式の Quick Start をベースに進めれば良いですが、Embulk の最新バージョン 0.11.0 を利用する上で 1つ 注意点があります。
公式に記載されている 9.1.15.0 を利用すると、一部の Gem が利用できないため、9.3.x を利用する必要があります。逆に、9.4.x を利用すると Ruby のバージョンが上がりすぎるため、9.3.x に止めるとよいでしょう。

Kintone から PostgreSQL

弊社では、一部のデータを Kintone で管理しています。今回のプロジェクトでは頻繁にデータを参照する必要があり、参照元のデータソースに対して問い合わせによる負荷がかかるため、可能な限り負荷がコントローラブルなデータソースを利用したいということから PostgreSQL へデータを同期することになりました。
Embulk から Input のデータソースとして Kintone を利用する場合は、 embulk-input-kintone プラグインを利用します。Kintone が提供する Cursor API を利用して取得するため、想像以上に素早くデータを取得でき、体験としてはとても良いものになっています。
embulk-input-kintone の最新バージョン 0.1.8 は こちら にあるため、gem install を行う時は明示的に source を指定する必要があります。

java -jar embulk.jar gem install embulk-input-kintone --version 0.1.8 --source "https://[USER]:[SECRET]@rubygems.pkg.github.com/trocco-io"

余談ですが、GitHub の Packages から Gem をインストールする時に GitHub App のトークンは利用できず、 Personal Access Token を利用する必要があるのは、組織で利用する時に少し勝手が悪いため、将来的に改善されることを期待しています。

MySQLPostgreSQL などのデータベースに対してアウトプットする場合は、 embulk-output-jdbc を利用することになります。今回は PostgreSQL へのアウトプットであるため、 embulk-output-postgresql を利用しました。

最終的に、Kintone から PostgreSQL へデータをロードする YAML ファイルとして、以下を作成しました。

in:
  type: kintone
  domain: [Organization Name].cybozu.com
  token: [Kintone API Token]
  app_id: [Kintone App Id]
  fields:
  - {name: $id, type: long}
  - {name: $revision, type: long}
  - {name: name, type: string}
filters:
  - type: rename
    columns:
      $id: id
      $revision: version
out:
  type: postgresql
  driver_path: /path/to/postgresql-42.6.0.jar
  host: [Database Host]
  user: [Database User]
  password: [Database Password]
  database: [Database Name]
  table: [Table Name]
  options: {loglevel: 2}
  mode: truncate_insert
  create_table_constraint: 'PRIMARY KEY (id)'
  column_options:
    id: {type: 'bigint'}
    version: {type: 'bigint'}
    name: {type: 'varchar(255)'}

Filter では、カラム名の変更処理を行っています。Rename Filter Plugin はデフォルトでインストールされているため、追加で Gem をインストールする必要はありません。

上記 YAML ファイルの注意点としては、 embulk-output-jdbc で利用している JDBC ドライバーは 41 系であるため、42 系のドライバーのパスを明示的に指定しています。PostgreSQL 14 系以降を使っている場合には、認証周りの変更から上記の対応が必要になります。

最後に

Dataflow など、ETL 基盤を構築するためのツールは他にもありますが、今回は導入コストの観点から Embulk を利用しました。
レコード数が数万件・数十万件程度であれば比較的少ないメモリ・CPUのコア数でも十分に処理できるため、いまのところパフォーマンスに懸念はないように見えます。
スタートアップ・投資家を取り巻くエコシステムの発展に向けて、これからも多くのデータを蓄積し、それを利用しやすいように加工・保存していく必要があるため、継続的にパフォーマンスを監視し、改善していきたいと考えています。
Embulk の利用を検討されている方の参考になれば幸いです。