さてさて!今の会社に転職してから2年半、Digdagの運用をしているのですが、一から構築を経験したことがなかったので、今回EC2(Amazon Linux2)で作ってみました。コンテナじゃないの?と思いますが、私はいつも手動でのインストールから理解してDocker化をしております。しかしこのブログは懐かしい…
■Environment
- EC2(Amazon Linux2)
- Java v1.8.0_265
- Digdag v0.9.42
- Embulk v0.9.23
- PostgreSQL v12.4(RDS)
■ansible_digdag
https://github.com/RVIRUS0817/ansible_digdag
せっかくなのでAnsible化しました。なのでEC2起動したら上記を参考にして実行すれば細かい設定はインストールしてくれます。構成とAnsibleを適用後にやることは以下になります。
- 構成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
> tree . ├── README.md ├── ansible.cfg ├── group_vars │ └── all ├── hosts ├── prd.yml └── roles ├── common │ ├── files │ │ └── pgdg-redhat-all.repo │ └── tasks │ └── main.yml ├── digdag │ ├── files │ │ ├── digdag.service │ │ └── server.properties.tmp │ └── tasks │ └── main.yml └── embulk └── tasks └── main.yml |
■What to do after running Ansible
・Make digdag_db
1 2 3 4 5 6 7 8 9 |
$ psql -h digdag.xxxxxxxx.xxx.rds.amazonaws.com -U root -d redash_db CREATE ROLE digdag WITH PASSWORD 'xxxxxxxx' NOSUPERUSER NOCREATEDB NOCREATEROLE LOGIN; GRANT digdag TO root; CREATE DATABASE digdag_db WITH OWNER digdag; $ psql -h digdag.xxxxxxxx.xxx.rds.amazonaws.com -U digdag -d redash_db CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; |
RedashのDB内にdigdag_dbを作り、digdag専用のユーザーを作成しました。uuid-ossp
拡張モジュールを利用するので CREATE EXTENSION
することを忘れないようにしましょう。
- /etc/systemd/system/digdag.service
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[Unit] Description=digdag [Service] Type=simple User=hoge WorkingDirectory=/etc/digdag Restart=always ExecStart=/bin/sh -c "/usr/local/bin/digdag server --max-task-threads 2 --config /etc/digdag/server.properties -b 0.0.0.0 --log /var/log/digdag/digdag_server.log --task-log /var/log/digdag/tasklogs --access-log /var/log/digdag/accesslogs" KillMode=process [Install] WantedBy=multi-user.target |
- /var/log/digdag
1 2 3 4 5 6 7 8 9 10 11 12 |
/var/log/digdag$ tree . ├── accesslogs │ ├── access.2020-12-09.log │ ├── access.2020-12-10.log │ └── access.log ├── digdag_server.log └── tasklogs └── 2020-12-10 └── 0.1run@20201211T000000+0900 ├── +run+actions@5fd2622139387000.1338@test-digdag.log.gz └── +run+actions^sub+actions@5fd26223122dee40.1338@test-digdag.log.gz |
Digdagの起動はSystemdにしました。 --memory
を指定してしまうとローカルのインメモリを見にいってしまうので、以下のようにRDSに接続するよう適宜変更しましょう。 --max-task-threads
を2にすることでEmbulkが大量に動作することを制限してくれます。ログは /var/log/digdag
配下に出力するようにしました。
- Setting Digdag file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
$ sudo su - # cd /etc/digdag # cp server.properties.tmp server.properties # echo -n '16_bytes_phrase!' | openssl base64 #下記digdag.secret-encryption-keyに指定 # cat files/server.properties.tmp database.type = postgresql database.user = database.password = database.host = database.port = 5432 database.database = digdag.secret-encryption-key = Mxxxxxxxxxxxxxxxxx== # vim server.properties # digdag server --config server.properties # systemctl restart digdag |
Digdagの設定ファイルは /etc/digdag
にある server.properties.tmp
をコピーして管理しました。 digdag server --config server.properties
を実行することでDBに初期データがマイグレーションされます。そしてsystemdでdigdagを起動すれば構築は完了になります。あとはEmbulk書いて digdag push
すれば特定の時間にDigdagが動作します。
■ハマったこと
- ERROR: function “lock_shared_tasks” already exists with same argument types
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
$ digdag server --config server.properties 2020-12-11 13:38:05 +0900: Digdag v0.9.42 2020-12-11 13:38:06 +0900 [INFO] (main): secret encryption engine: aesgcm 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20151204221156 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20160602123456 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20160602184025 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20160610154832 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20160623123456 2020-12-11 13:38:07 +0900 [INFO] (main): Applying database migration:20160719172538 Exception in thread "main" java.lang.Error: java.lang.reflect.InvocationTargetException at org.embulk.guice.LifeCycleModule$1$1.afterInjection(LifeCycleModule.java:81) at com.google.inject.internal.MembersInjectorImpl.notifyListeners(MembersInjectorImpl.java:131) at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:125) at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91) at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306) at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42) at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65) at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113) at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91) at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306) at com.google.inject.internal.BoundProviderFactory.get(BoundProviderFactory.java:60) at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42) at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65) at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113) at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91) at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306) at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40) at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168) at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39) at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211) at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182) at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109) at com.google.inject.Guice.createInjector(Guice.java:87) at org.embulk.guice.Bootstrap.start(Bootstrap.java:168) at org.embulk.guice.Bootstrap.build(Bootstrap.java:130) at org.embulk.guice.Bootstrap.initializeCloseable(Bootstrap.java:125) at io.digdag.guice.rs.server.undertow.UndertowBootstrap.initialize(UndertowBootstrap.java:70) at io.digdag.guice.rs.GuiceRsServletContainerInitializer.processBootstrap(GuiceRsServletContainerInitializer.java:61) at io.digdag.guice.rs.GuiceRsServletContainerInitializer.onStartup(GuiceRsServletContainerInitializer.java:36) at io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:186) at io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:171) at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:42) at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43) at io.undertow.servlet.core.DeploymentManagerImpl.deploy(DeploymentManagerImpl.java:234) at io.digdag.guice.rs.server.undertow.UndertowServer.start(UndertowServer.java:179) at io.digdag.server.ServerBootstrap.start(ServerBootstrap.java:81) at io.digdag.cli.Server.startServer(Server.java:136) at io.digdag.cli.Server.main(Server.java:100) at io.digdag.cli.Main.cli(Main.java:191) at io.digdag.cli.Main.main(Main.java:83) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.embulk.guice.LifeCycleManager.startInstance(LifeCycleManager.java:203) at org.embulk.guice.LifeCycleManager.addInstance(LifeCycleManager.java:185) at org.embulk.guice.LifeCycleModule$1$1.afterInjection(LifeCycleModule.java:78) ... 39 more Caused by: org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException: org.postgresql.util.PSQLException: ERROR: function "lock_shared_tasks" already exists with same argument types [statement:"CREATE FUNCTION lock_shared_tasks(target_site_id int, target_site_max_concurrency bigint, limit_count int, lock_expire_seconds int, agent_id text) returns setof bigint as $$ BEGIN IF pg_try_advisory_xact_lock(23300, target_site_id) THEN RETURN QUERY with updated as ( update queued_task_locks set lock_expire_time = cast(extract(epoch from statement_timestamp()) as bigint) + lock_expire_seconds, lock_agent_id = agent_id where id = any( select queued_task_locks.id from queued_task_locks where lock_expire_time is null and site_id = target_site_id and not exists ( select * from ( select queue_id, count(*) as count from queued_task_locks where lock_expire_time is not null and site_id = target_site_id group by queue_id ) runnings join queues on queues.id = runnings.queue_id where runnings.count >= queues.max_concurrency and runnings.queue_id = queued_task_locks.queue_id ) and not exists ( select count(*) from queued_task_locks where lock_expire_time is not null and site_id = target_site_id having count(*) >= target_site_max_concurrency ) order by queue_id, priority desc, id limit limit_count ) returning queue_id, priority, id ) select id from updated order by queue_id, priority desc, id; END IF; END; $$ LANGUAGE plpgsql VOLATILE ", located:"CREATE FUNCTION lock_shared_tasks(target_site_id int, target_site_max_concurrency bigint, limit_count int, lock_expire_seconds int, agent_id text) returns setof bigint as $$ BEGIN IF pg_try_advisory_xact_lock(23300, target_site_id) THEN RETURN QUERY with updated as ( update queued_task_locks set lock_expire_time = cast(extract(epoch from statement_timestamp()) as bigint) + lock_expire_seconds, lock_agent_id = agent_id where id = any( select queued_task_locks.id from queued_task_locks where lock_expire_time is null and site_id = target_site_id and not exists ( select * from ( select queue_id, count(*) as count from queued_task_locks where lock_expire_time is not null and site_id = target_site_id group by queue_id ) runnings join queues on queues.id = runnings.queue_id where runnings.count >= queues.max_concurrency and runnings.queue_id = queued_task_locks.queue_id ) and not exists ( select count(*) from queued_task_locks where lock_expire_time is not null and site_id = target_site_id having count(*) >= target_site_max_concurrency ) order by queue_id, priority desc, id limit limit_count ) returning queue_id, priority, id ) select id from updated order by queue_id, priority desc, id; END IF; END; $$ LANGUAGE plpgsql VOLATILE ", rewritten:"CREATE FUNCTION lock_shared_tasks(target_site_id int, target_site_max_concurrency bigint, limit_count int, lock_expire_seconds int, agent_id text) returns setof bigint as $$ BEGIN IF pg_try_advisory_xact_lock(23300, target_site_id) THEN RETURN QUERY with updated as ( update queued_task_locks set lock_expire_time = cast(extract(epoch from statement_timestamp()) as bigint) + lock_expire_seconds, lock_agent_id = agent_id where id = any( select queued_task_locks.id from queued_task_locks where lock_expire_time is null and site_id = target_site_id and not exists ( select * from ( select queue_id, count(*) as count from queued_task_locks where lock_expire_time is not null and site_id = target_site_id group by queue_id ) runnings join queues on queues.id = runnings.queue_id where runnings.count >= queues.max_concurrency and runnings.queue_id = queued_task_locks.queue_id ) and not exists ( select count(*) from queued_task_locks where lock_expire_time is not null and site_id = target_site_id having count(*) >= target_site_max_concurrency ) order by queue_id, priority desc, id limit limit_count ) returning queue_id, priority, id ) select id from updated order by queue_id, priority desc, id; END IF; END; $$ LANGUAGE plpgsql VOLATILE ", arguments:{ positional:{}, named:{}, finder:[]}] at org.skife.jdbi.v2.SQLStatement.internalExecute(SQLStatement.java:1338) at org.skife.jdbi.v2.Update.execute(Update.java:56) at org.skife.jdbi.v2.BasicHandle.update(BasicHandle.java:298) at io.digdag.core.database.migrate.Migration_20160719172538_QueueRearchitecture.migrate(Migration_20160719172538_QueueRearchitecture.java:52) at io.digdag.core.database.DatabaseMigrator.applyMigration(DatabaseMigrator.java:240) at io.digdag.core.database.DatabaseMigrator.lambda$applyMigrationIfNotDone$1(DatabaseMigrator.java:147) at org.skife.jdbi.v2.tweak.transactions.LocalTransactionHandler.inTransaction(LocalTransactionHandler.java:183) at org.skife.jdbi.v2.BasicHandle.inTransaction(BasicHandle.java:331) at io.digdag.core.database.DatabaseMigrator.applyMigrationIfNotDone(DatabaseMigrator.java:140) at io.digdag.core.database.DatabaseMigrator.migrate(DatabaseMigrator.java:97) at io.digdag.core.database.DatabaseModule$AutoMigrator.migrate(DatabaseModule.java:62) ... 46 more Caused by: org.postgresql.util.PSQLException: ERROR: function "lock_shared_tasks" already exists with same argument types at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2458) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2158) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:291) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:432) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:358) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:171) at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:160) at com.zaxxer.hikari.pool.ProxyPreparedStatement.execute(ProxyPreparedStatement.java:44) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.execute(HikariProxyPreparedStatement.java) at org.skife.jdbi.v2.SQLStatement.internalExecute(SQLStatement.java:1327) ... 56 more |
いくらdigdag serverで実行してもエラーになる場合はDBを作り直すことで実行できるようになりました。これは半日ハマった…..
@hiroysatoさんが拾ってくれましたが、原因不明…毎度フォローあざます!
なんでそんな状態になっているかわかりませんが、ここみたいですね。https://t.co/OuDbgfchZj
— Hiroyuki Sato (@hiroysato) December 11, 2020
■まとめ
しかしDigdagの構築は振り返ると非常に簡単でした。(DBさえハマらなければ…) 次はDockerで作ってみよう!Ansibleであればサクッと構築できるので参考にしてみてください。Google Cloud SDKをAnsible化するの忘れてたので手動は以下を参考に!
新しいヘッドフォン買ったのでオススメです!
Anker Soundcore Life Q10が届いた。ワイヤレス兼マイク内蔵でコードも使えて4000円は安い。音質はまあまあなので良き🙆♂️ pic.twitter.com/xxmv9PiHhg
— adachin👾SRE (@adachin0817) December 10, 2020
0件のコメント