Tech Blog

グローバルな家族アプリFammを運営するTimers inc (タイマーズ) の公式Tech Blogです。弊社のエンジニアリングを支える記事を随時公開。エンジニア絶賛採用中!→ https://timers-inc.com/engineering

Embulk-parser-pluginとかを開発した話

はじめましてインターンの学生エンジニアな五段です。

弊社は最近収集されるデータをgoogleのBigQueryに入れるようになりました。 その時に各情報からBigQueryに入れると気に使ったミドルウェアであるEmbulk周りをお手伝いさせていただいた時に得た知見をまとめようと思います。 今回はRubyほぼ未経験な自分がRubyで開発したEmbulkparserの話です。

Embulkとは

バルク処理に特化したデータ転送ミドルウェアです。プラグイン方式でデータ入力先からデータ先を豊富にあり万が一なかったとしても簡単に作ることができます。

BigQueryとは

2012年にGoogleからリリースされたカラム型データストアです。数テラ数ペタでも数秒でデータ抽出を行ってくれるサービスで、かなりの低価格で提供されています。
BigQueryで150万円溶かした人の顔

今回作るもの

データは普通のタブ区切りのログテキストなのですが今回は流し込まれたデータの中にJSONデータが一部含まれていたので、そのJSONをパースする機能と不必要なカラムを削る機能を持つパーサープラグインです。

パースしたのは以下のようなデータ タブ区切りですが、中にJSONのデータとBigQueryに送らなくても良いデータが幾つかあります。

YYYY-MM-DD HH:mm:ss     {"hoge":"0",foo":"1","is_bar":true}      -       0000    000.000.000.000       japanese        1.4.6   /api/hogehoge/foobar/propro/post

EmbulkはYamlで設定ファイルを作成します。今回のプラグインでは下記を追加しました:

  • 不必要なカラムをアウトプットしないよう除外できるように
  • JSON文字列が入っているカラムをパースできるように
parser:
   type: parser_col_json
   delimiter: "\t"
   input_columns:
     - {name: date, type: string}
     - {name: json , type: json}
     - {name: option, type: string}
     - {name: ID, type: string}
     - {name: ip_address, type: string}
     - {name: country, type: string}
     - {name: version, type: string}
     - {name: pass, type: string}

   get_JSON:
     - {name: hoge , type: long}
     - {name: foo , type: long}
     - {name: bar , type: none}

CSVのパーサーと同じ感じになるようにしてます。typeJSONカラム指定用のjsonとアウトプットしないカラム用のnoneで指定するようにしました。またJSONの中身はもう一つ get_JSON 配列オプションに設定を書いてもらう感じです。

テンプレートファイル作成

Embulkにはプラグイン作成用のテンプレートを生成してくれるコマンドがあるのでそれを用います。

$ embulk new ruby-parser parser_col_json
  Creating embulk-parser-parser_col_json/
  Creating embulk-parser-parser_col_json/README.md
  Creating embulk-parser-parser_col_json/LICENSE.txt
  Creating embulk-parser-parser_col_json/.gitignore
︙
︙

いろいろと生成されますが今回のパーサープラグインの場合メインでコードを書き込むのは./embulk-parser-parser_col_json/lib/embulk/parser/parser_col_json.rbの中です。

実装周り

実際に実装したのが下のソースです。

require 'json'

module Embulk
  module Parser

    class ParserColJson < ParserPlugin
      Plugin.register_parser("parser_col_json", self)

      def self.transaction(config, &control)
        # Yamlの設定受け取り
        task = {
            "delimiter" => config.param("delimiter", :string, default: ","), # 区切り文字
            "columns" => config.param("input_columns", :array, default: []), # パースした時のカラム
            "get_json" => config.param("get_JSON", :array, default: []) #Jsonをデコードした時のカラム
        }

        #出力カラム配列を作成
        columns = []
        count = 0
        task["columns"].each do |col|
          #TypeがJSONだったJSONの設定を参照する
          if col["type"] == "json" && !task["get_json"].empty? then
            task["get_json"].each do |col|
              if (col["type"] == "none") then
                next
              end
              columns.push(Column.new(count, col["name"], col['type'].to_sym))
              count += 1
            end
          end
          if (col["type"] == "json" || col["type"] == "none") then
            next
          end
          columns.push(Column.new(count, col["name"], col['type'].to_sym))
          count += 1
        end

        yield(task, columns)
      end

      def init
        # initialization code:
        @delimiter = task["delimiter"]
        @col = task["columns"]
        @get_json = task["get_json"]
      end

      def run(file_input)
        while file = file_input.next_file
            #処理したカラムをカウントしながら Yamlに書かれた情報に則って処理
            #もっとスマートな処理を考える
            text = file.read
            text.each_line do |row|
              count = 0
              row = row.split(@delimiter)
              record=[]
              row.each do |col|
                if @col[count]["type"] == "json" then
                  value = JSON.parse(col)
                  @get_json.each do |data|
                    if data["type"] == "none" then
                      next
                    end
                    record.push(value[data['name']])
                  end
                elsif  @col[count]["type"] != "none" then
                  record.push(col)
                end
                count += 1
                end
              page_builder.add(record)
              end
          end
        page_builder.finish
      end
    end
  end
end

ruby素人感が拭えないコードですが、現状ちゃんと動いてるようです。

def self.transaction以下が読み込まれたYamlファイルを処理する部分です。今回は、アウトプットする形がJsonによって変わるので読み込まれたものを一行ずつforeachして最終的な物に何をアウトプットするカラムをColumn.new(index,カラム名,型)で追加していきます。

        #出力カラム配列を作成
        columns = []
        count = 0
        task["columns"].each do |col|
          #TypeがJSONだったJSONの設定を参照する
          if col["type"] == "json" && !task["get_json"].empty? then
            task["get_json"].each do |col|
              if (col["type"] == "none") then
                next
              end
              columns.push(Column.new(count, col["name"], col['type'].to_sym))
              count += 1
            end
          end
          if (col["type"] == "json" || col["type"] == "none") then
            next
          end
          columns.push(Column.new(count, col["name"], col['type'].to_sym))
          count += 1
        end

        yield(task, columns)
      end

def run以下が実際のファイルを処理する箇所です。 Yamlで指定されたデリミタで分けて、一行ずつrow.each doでひとつひとつ処理していきます。

  def run(file_input)
        while file = file_input.next_file
            #処理したカラムをカウントしながら Yamlに書かれた情報に則って処理
            text = file.read
            text.each_line do |row|
              count = 0
              row = row.split(@delimiter)
              record=[]
              row.each do |col|
                if @col[count]["type"] == "json" then
                  value = JSON.parse(col)
                  @get_json.each do |data|
                    if data["type"] == "none" then
                      next
                    end
                    record.push(value[data['name']])
                  end
                elsif  @col[count]["type"] != "none" then
                  record.push(col)
                end
                count += 1
                end
              page_builder.add(record)
              end
          end
        page_builder.finish
      end

実行してみる

試しに以下のYamlファイルを作ってembulk previewで動作確認してみます。

in:
  type: file
  path_prefix: ./dummy_log
  parser:
   type: parser_col_json
   delimiter: "\t"
   input_columns:
     - {name: date, type: string}
     - {name: json , type: json}
     - {name: option, type: string}
     - {name: ID, type: string}
     - {name: ip_address, type: string}
     - {name: country, type: string}
     - {name: version, type: string}
     - {name: api, type: none }

  get_JSON:
     - {name: hoge , type: long}
     - {name: foo , type: long}
     - {name: is_bar , type: none}

実際に実行したのが以下の物です。ちゃんと分けれて、削られてますね。

$ embulk preview damy.yml -L ./embulk-parser-parser_col_json/
2016-04-18 02:59:17.679 +0000 [INFO] (0001:preview): Loading files [damy_log]
+---------------------+---------------+-----------+-------------------+----------------+----------------+
|         date:string | option:string | ID:string | ip_address:string | country:string | version:string |
+---------------------+---------------+-----------+-------------------+----------------+----------------+
| YYYY-MM-DD HH:mm:ss |             - |      0000 |   000.000.000.000 |       japanese |          1.4.6 |
| YYYY-MM-DD HH:mm:ss |             - |      0000 |   000.000.000.000 |       japanese |          1.4.6 |
+---------------------+---------------+-----------+-------------------+----------------+----------------+

まとめ

いかがでしょうか?
まだまだ修正するべき所は多いですが、必要としていた機能の実装はできました。
EmbulkのPlugin開発はまだ日本語ドキュメントが豊富というわけではないですが、なかなか簡単にできます。 EmbulkのプラグインRuby初心者でも簡単に動くものが作れますので、もし既存のもので不便があったら開発してみるのも手かもしれません。


子育て家族アプリFamm、カップル専用アプリPairyを運営する Timers inc. では、現在エンジニアを積極採用中! 急成長中のサービスの技術の話を少しでも聞いてみたい方、スタートアップで働きたい方など、是非お気軽にご連絡ください! 採用HP : http://timers-inc.com/engineerings

Timersでは各職種を積極採用中!

急成長スタートアップで、最高のものづくりをしよう。

募集の詳細をみる