今回S3にある、圧縮された配列でできているJSONファイルをembulkでBigQueryにシンクしてみました。配列という稀なケースでしたが、プラグインの公式をちゃんと嫁ばできる!
■test.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
[ "Id": "xxxxx", "ID_c": "1111", "Name": "あだちん工業", "MyNumberBiz_c": "123456789", "date_c": "2018-06-22 06:44:04", "LastModifiedDate": "2019-05-19 11:10:31", "Jigsaw": null, "Fax": "****" }, { "Id": "xxxxxxxxx", "ID_c": "1920", "Name": "アダチン", "MyNumberBiz_c": "1234567890333", "date_c": "2018-06-22 08:29:51", "LastModifiedDate": "2019-05-19 10:23:17" } ] |
■hiroyuki-sato/embulk-parser-jsonpath
https://github.com/hiroyuki-sato/embulk-parser-jsonpath
type: jsonpath
にするだけでOK
※ちなみにembulk-input-s3
プラグインはインストールされている前提です。
https://github.com/embulk/embulk-input-s3
- プラグインインストール
1 2 3 4 5 6 7 8 |
$ gem search -r embulk-parser-jsonpath *** REMOTE GEMS *** embulk-parser-jsonpath (0.3.1) $ vim digdag/vendor/Gemfile gem 'embulk-parser-jsonpath' #追加 |
- 反映
1 2 3 4 5 |
$ embulk bundle $ grep embulk-parser-jsonpath Gemfile.lock embulk-parser-jsonpath (0.3.1) embulk-parser-jsonpath |
■embulk/digdag
- test.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
in: type: s3 path_prefix: db/test/{{ env.s3_target_date }}/test.json bucket: {{ env.EMBULK_INPUT_S3_BUCKET }} access_key_id: {{ env.AWS_ACCESS_KEY }} secret_access_key: {{ env.AWS_SECRET_KEY }} parser: type: jsonpath charset: UTF-8 newline: CRLF columns: - {name: Id, type: string} - {name: ID_c, type: string} - {name: Name, type: string} - {name: MyNumberBiz_c, type: string} ~省略~ out: type: bigquery mode: replace auth_method: json_key json_keyfile: /opt/redash/digdag/s3_to_bigquery/config/bq.key {% if env.EMBULK_ENV == 'production' %} {% include 'db/prod_bigquery' %} {% else %} {% include 'db/pre_bigquery' %} {% endif %} auto_create_dataset: true auto_create_table: true dataset: adachin_db table: test schema_file: /opt/redash/digdag/s3_to_bigquery/embulk/db/test.json open_timeout_sec: 300 send_timeout_sec: 300 read_timeout_sec: 300 auto_create_gcs_bucket: false gcs_bucket: {{ env.EMBULK_OUTPUT_GCS_BUCKET }} compression: GZIP source_format: NEWLINE_DELIMITED_JSON default_timezone: Asia/Tokyo default_timestamp_format: '%Y-%m-%d %H:%M:%S' |
- embulk/db/_pre_bigquery.yml.liquid
1 2 |
# project: pre-adachin |
- test.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
[ { "name": "ID", "type": "STRING" }, { "name": "ID_c", "type": "INT64" }, { "name": "Name", "type": "STRING" }, { "name": "MyNumberBiz_c", "type": "STRING" }, ~省略~ ] |
- test.dig
1 2 3 4 5 6 7 8 9 |
_export: s3_target_date: ${moment(session_date).subtract(1,'days').format('YYYY-MM-DD')} bq_target_date: ${moment(session_date).subtract(1,'days').format('YYYYMMDD')} _error: sh>: export $(cat config/env | xargs) && /opt/redash/digdag/post_slack.sh "[${session_time}][${session_id}] DigDag Fail test" +load: sh>: export $(cat config/env | xargs) && /usr/local/bin/embulk run -b $EMBULK_BUNDLE_PATH embulk/test.yml.liquid |
- 実行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
$ digdag run test.dig --rerun 2019-05-26 22:40:19 +0900: Digdag v0.9.19 2019-05-26 22:40:21 +0900 [WARN] (main): Reusing the last session time 2019-05-24T00:00:00+00:00. 2019-05-26 22:40:21 +0900 [INFO] (main): Using session /opt/redash/digdag/s3_to_bigquery/.digdag/status/20190524T000000+0000. 2019-05-26 22:40:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=account session_time=2019-05-24T00:00:00+00:00 2019-05-26 22:40:23 +0900 [INFO] (0016@[0:default]+test+load): sh>: export $(cat config/env | xargs) && /usr/local/bin/embulk run -b $EMBULK_BUNDLE_PATH embulk/test.yml.liquid 2019-05-26 22:40:23.447 +0900: Embulk v0.8.35 2019-05-26 22:40:34.663 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-s3 (0.2.14) 2019-05-26 22:40:34.787 +0900 [INFO] (0001:transaction): Loaded plugin embulk-parser-jsonpath (0.3.1) 2019-05-26 22:40:36.425 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2 2019-05-26 22:40:36.437 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 2019-05-26 22:40:36.502 +0900 [INFO] (0017:task-0000): JSONPath = $ 1234567890111,あだちん株式会社,1925, 1234567890222,あだちん工業,1926, 1234567890333,アダチン,1928, 2019-05-26 22:40:36.972 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2019-05-26 22:40:36.978 +0900 [INFO] (main): Committed. 2019-05-26 22:40:36.978 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"db/adachin/test/dt=2019-05-23/test_20190523.log_0.gz"},"out":{}} Success. Task state is saved at /opt/redash/digdag/s3_to_bigquery/.digdag/status/20190524T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks. |
今回out
の部分をtype: stdout
にして標準出力できるか確認してみましたが、ちゃんと出力されてますね。
- BigQuery
■まとめ
しかしこのプラグイン配列だろうがなんだろうが、様々な形式に対応しているので万能ですな。
そしてこのプラグインを作った@hiroysatoさんからリプ!
ご紹介ありがとうございます。今(v0.9.16以降)は組み込みjsonでも類似の機能が使えるのでこちらもいいですよ。https://t.co/9R33kzVqxEhttps://t.co/41X7SAtduh (__experimental_xxを外してください)
— Hiroyuki Sato (@hiroysato) May 27, 2019
0件のコメント