前回は抽出のみだったので、今回はBigQueyにぶち込めるのか試してみました!
しかもSalesforceにあるカラムが鬼のようにあるのでシェル芸もかましてやりました。
■create-salesforce-config.sh
https://github.com/RVIRUS0817/shellscripts/blob/master/digdag_script/create-salesforce-config.sh
- 実行方法
1 2 3 4 5 6 7 8 9 |
$ ./create-salesforce-config.sh Type: (例) ./create-salesforce-config.sh* テーブル名 カラム名 CSV名 $ ./create-salesforce-config.sh test id,Name test.csv test.dig done! test.yml.liquid done! test.json done! |
csvからJSONファイルを作成し、embulkファイルやらdigdagファイルを自動生成するように作ってみました。
- test.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
in: type: salesforce_bulk userName: {{ env.SALESFORCE_USER }} password: {{ env.SALESFORCE_PASS }} authEndpointUrl: {{ env.SALESFORCE_URL }} objectType: test pollingIntervalMillisecond: 5000 querySelectFrom: | SELECT Account.Id,Account.IsDeleted,Account.MasterRecordId,Account.Name,Account.Type,Account.RecordTypeId,Account.ParentId,Account.BillingStreet,Account.BillingCity,Account.BillingState,Account.BillingPostalCode,Account.BillingCountry,Account.ShippingStreet,Account.ShippingCity~省略~ FROM Account columns: - {type: string, name: Id} - {type: string, name: IsDeleted} - {type: string, name: MasterRecordId} - {type: string, name: Name} ~省略~ #out: # type: stdout out: type: bigquery mode: replace auth_method: json_key json_keyfile: /digdag/salesforce_to_bigquery/config/bq.key {% if env.EMBULK_ENV == 'production' %} {% include 'db/prod' %} {% else %} {% include 'db/pre' %} {% endif %} auto_create_dataset: true auto_create_table: true dataset: salesforce table: Account schema_file: /digdag/salesforce_to_bigquery/embulk/db/test.json open_timeout_sec: 300 send_timeout_sec: 300 read_timeout_sec: 300 auto_create_gcs_bucket: false gcs_bucket: {{ env.EMBULK_OUTPUT_GCS_BUCKET }} compression: GZIP source_format: NEWLINE_DELIMITED_JSON default_timezone: Asia/Tokyo |
- test.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
[ { "name": "Id", "type": "STRING" }, { "name": "IsDeleted", "type": "STRING" }, { "name": "MasterRecordId", "type": "STRING" }, { "name": "Name", "type": "STRING" }, ~省略~ ] |
- test.dig
1 2 3 4 |
_error: sh>: export $(cat config/env | xargs) && /digdag/post_slack.sh "[${session_time}][${session_id}] DigDag Fail test" +Account: sh>: export $(cat config/env | xargs) && /usr/local/bin/embulk run -b $EMBULK_BUNDLE_PATH embulk/test.yml.liquid |
- embulkのエラーでハマったところ
1 2 3 4 5 6 7 8 |
2019-02-06 20:00:09.246 +0900 [ERROR] (0018:task-0000): class com.sforce.async.AsyncApiException com.sforce.async.AsyncApiException: InvalidBatch : InvalidBatch : Failed to process query: INVALID_TYPE_FOR_OPERATION: The root entity of the requested query (test02) does not match the entity of the requested Bulk API Job (test). at org.embulk.input.salesforce_bulk.SalesforceBulkWrapper.syncQuery(SalesforceBulkWrapper.java:137) ~[na:na] at org.embulk.input.salesforce_bulk.SalesforceBulkInputPlugin.run(SalesforceBulkInputPlugin.java:211) ~[na:na] at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:294) [embulk:0.8.35] at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:212) [embulk:0.8.35] at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:257) [embulk:0.8.35] at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:253) [embulk:0.8.35] |
objectTypeは基本salesforceの参照するテーブル名なので気をつけましょう!!!
笑えないくらいハマったw このプラグイン作成者の@mikoto2000さんあざます!
outの設定は事前にstdoutをしてsalesforce側かBigQuery側のエラーなのか判断が楽になります。
ジョブ作成時に指定したobjectと、クエリのFROMで指定したobjectが違うと出るエラーですね。
Bulkは他のAPIと比べてSOQLの制限が多いので、転送量に問題が無ければSOAP, RESTを検討するのも良いかもしれません。(私は単純なクエリしか出したことが無いのでBulk以外使ったことがありませんが…)— 大雪 命 (@mikoto2000) February 6, 2019
■BigQuery
✌️
■まとめ
単なる興味なんですが、何が大変なんですか?Salesforceから出すところ?
— Hiroyuki Sato (@hiroysato) February 6, 2019
理解するとそんなツラくなかったというお話。
0件のコメント