今回はembulkBigQueryのview table(クエリー)をMySQL(Aurora)に飛ばすには
どうしたらいいもんか悩んでいたところ。。
embulk-input-bigqueryという非公式プラグインがあるので試してみました!
他のやり方ではview tableをGCSにexportして、そこからbq cliコマンドやらembulkでMySQLに飛ばす!
というなんとも力技な方法がありますが、さすがにつらたんなので、プラグインが楽かと!とりあえずブログしやす。


■構成

S3のログをBigqueryに集約して、さらにきれいにクエリーでviewで作成し、
embulkでそのクエリーをMySQL(Aurora)に飛ばす!あとはDigdagでスケジューリング!というのをやりたいんですわ。


■medjed/embulk-input-bigquery

https://github.com/medjed/embulk-input-bigquery

■Query Options

・max(integer)

結果の1ページあたりに返されるデータの最大行数。 1000などの小さな値に設定して結果をページングすると、
クエリ結果セットが大きい場合に信頼性が向上する可能性があります。 この制限に加えて応答も10 MBに制限されています。
デフォルトでは最大行数はなく、バイト数制限のみが適用されます。

ちなみにBiqQueryで検索してCSVでダウンロードしようとしたときに、
容量制限がかかるので注意しないとですな。

https://cloud.google.com/bigquery/exporting-data-from-bigquery?hl=ja

・cache(boolean)

クエリキャッシュ内の結果を検索するかしないか。 クエリキャッシュはクエリ内のテーブルが変更されるたびに
フラッシュされるベストエフォート型キャッシュです。 デフォルト値はtrue。

・standard_sql(boolean)

BigQueryの標準SQL型を使用するかどうかを指定します。 trueに設定するとクエリは従来のSQLではなく標準SQLを使用します。
また、large_resultsとflattenの値は無視されます。 large_resultsがtrueでflattenがfalseであるかのようにクエリが実行されます。デフォルト値はtrue。

BigQueryは独自SQLの書き方があるのですが、(下記より)
最近では標準のMySQLと同じSQLが書けるようになりました。
なのでlegacy_sqlを使って書かなくてOKかと。😋

・legacy_sql(boolean)

BigQueryのレガシーSQLを使用するかどうかを指定します。 falseに設定するとクエリはBigQueryの標準SQLを使用します。
またlarge_resultsとflattenの値は無視されます。 large_resultsがtrueでflattenがfalseであるかのようにクエリが実行されます。デフォルト値はfalseです。

・location(strong)

データが米国またはEUの複数地域以外の場所にある場合は場所を指定する必要があります。

■Install

・Gemfile

基本Gemfileでバージョン管理したほうがいいので、以下のようにしましょう。

・反映

 ■views.yml.liquid

まずはinputから作っていきます。
とりあえずview tableからsqlでテーブルにある値を標準出力していきましょう。
カラムのdtは日付でtypeをdateにするとembulk側でエラーが出るので一旦stringにしてます。
それとbigquery側は日付がUTCなのでDATETIME_ADDで9時間プラスしてます。
whereでテーブルの日付を指定していますが、digdag run時に特定日付を出力したいためです。
standard_sqlをtrueにしましたがデフォルトがtrueなので書かなくてもOK。

・run embulk

これで値が標準出力できたら次はoutでMySQLに飛ばしましょう!


■embulk/embulk-output-jdbc/embulk-output-mysql

https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql

■column_options

・type

新しいセルを作成するときの列の型(VARCHAR(255)、INTEGER NOT NULL UNIQUEなど)
このプラグインが中間テーブル(insert、insert_truncateおよびmergeモード)を作成し、
ターゲットテーブルを作成するとき(insert_direct、merge_direct、およびreplaceモード)、
および存在しないターゲットテーブルを自動的に作成するときに使用されます。
(文字列、デフォルト:入力列の型に依存BIGINT入力列の型が長い場合はブール値、
ブール値の場合はブール値、倍数の場合は倍精度値、文字列の場合はCLOB、タイムスタンプの場合はTIMESTAMP)

・value_type

INSERT文を作成するために、入力列の型(embulk型)をデータベース型に変換します。
このvalue_typeオプションは、INSERTステートメント内の値のタイプを制御します。
(文字列、デフォルト:カラムのSQL型によって異なります)
使用可能な値のオプションは以下のとおりです
byte、short、int、long、double、float、boolean、string、nstring、日付、時刻、タイムスタンプ、10進数、json、null、pass )

・timestamp_format

入力列タイプ(エンブレークタイプ)がタイムスタンプでvalue_typeがstringまたはnstringの場合、
タイムスタンプ値を文字列にフォーマットする必要があります。 このtimestamp_formatオプションは
タイムスタンプのフォーマットを制御するために使用されます。 (文字列、デフォルト:%Y-%m-%d%H:%M:%S.%6N)

・timezone

入力列タイプ(エンブレークタイプ)がタイムスタンプの場合、タイムスタンプ値をSQL文字列にフォーマットする必要があります。
タイムゾーンオプションを使用してタイムゾーンを制御します。 (文字列、default_timezoneオプションの値がデフォルトで使用されます)

■Modes

replaceにするとinputで指定したとおりにテーブルが再作成されるので、ビビらないように!!
基本日付ごとにテーブルへ插入したいのでinsertでOKです。
ちなみにembulk実行時にTRANSACTIONを発行して処理が終わったらcommitをしているので、
エラーなど出てもincertなどは発行されません。

■install

※こちらも上記のGemfileで管理しましょう!!

■views.yml.liquid

先程作成した設定ファイルにoutとしてMySQLの制御を追加します。
dateの部分はtimestampでフォーマット指定しないと入らないという..
ちなみにテーブルのフォーマットではExtraauto_incrementを指定しています。
これでembulk runできたら成功です!😋

・auto_increment

カラムに値が指定されなかった場合、MySQLが自動的に値を割り当てる
データ型は整数で値は1ずつ増加して連番になる

・desc views

・see  views


■Digdag

・run.dig

・views.dig

・過去ログ投入スクリプト

これでDigdagにスケジューリングすればOK!😋


■まとめ

ブログ長くなってもうた。。ドキュメント理解するのが難しい….
BigQueryの仕様理解してないとだめですな〜。
とりあえずレコメンド機能の一部として実現できました。
次はデータ消して入れ直す作業が。。。

😅


あだちん

1989年生まれ。 ランサーズ/SRE。 ホスティングから大規模なアドテクなどのインフラエンジニアとして携わり、他社インフラレスポンス改善、ランサーズでの副業、ansibleでのインフラコード化を推進し、未経験によるエンジニア勉強会なども実施している。また、「脆弱性スキャナVuls」のOSS活動もしており、自称エバンジェリスト/広報/VulsRepo init file,chatwork通知のコントリビュータでもある。現在はDocker開発環境の提供,AWSで新サービスのインフラ構築,PHPバージョンアップ,分析基盤を担当しておりDigdag,embulk,BigQueryを使いこなそうと奮闘中である。

コメントを残す

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください