結構前にCloudWatch Logsの各ログをAmazon Kinesisを利用してS3に保管するように作ってみましたが、保管されたログの中身を見てみると、以下のようにCloudWatch Logsの独自ログに書き変わっていることがわかりますね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
{ "messageType": "DATA_MESSAGE", "owner": "xxxxxxxxxxxxx", "logGroup": "/app/nginx/access.log", "logStream": "i-xxxxxxxxxxxxxxxxx", "subscriptionFilters": [ "app-nginx_access-log_kinesis" ], "logEvents": [ { "id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "timestamp": xxxxxxxx, "message": "time:2021-10-30T15:26:13+00:00\tserver_addr:xxx.xxx.xxx.xxx\thost:xx.xxx.xxx.xxx\tmethod:GET\treqsize:2541\turi:/export/environment/\tquery:\tstatus:200\tsize:116\treferer:https://xxxxxxxxxx/login?ref=header_menu\tua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.54 Safari/537.36\tforwardedfor:xxxxxxxxxxxx\treqtime:0.117\tapptime:0.120", } ] } |
結局情報として欲しいのは message
の部分で、かつLTSV形式で生ログの形で出力されていることがふさわしく、分析でBigQueryやAthenaにDigdagで取り込めるようするには1行に一つのログがある形式にしないといけないので、ログをバラす必要があります。
またCloudWatch Logのサブスクリプションとして出力されるログはgzip圧縮されており、Base64エンコードされて送られてきます。今回は手軽にログ基盤を実装する方法をブログしたいと思います。
構成
- EC2/Amazon Linux 2(awslogs)
- CloudWatch Logs(subscription filter)
- Amazon Kinesis
- Lambda(change LTSV)
- S3
https://dev.classmethod.jp/articles/cloudwatch-logs-kinesis-origin-fireshose-s3-format/
- Kinesis Data Firehose Cloudwatch Logs Processor
- change_cloudwatchlogs_to_ltsv(Node.js 14.x)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
const zlib = require('zlib'); const AWS = require('aws-sdk'); function transformLogEvent(logEvent) { return Promise.resolve(`${logEvent.message}\n`); } function putRecordsToFirehoseStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) { client.putRecordBatch({ DeliveryStreamName: streamName, Records: records, }, (err, data) => { const codes = []; let failed = []; let errMsg = err; if (err) { failed = records; } else { for (let i = 0; i < data.RequestResponses.length; i++) { const code = data.RequestResponses[i].ErrorCode; if (code) { codes.push(code); failed.push(records[i]); } } errMsg = `Individual error codes: ${codes}`; } if (failed.length > 0) { if (attemptsMade + 1 < maxAttempts) { console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg); putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts); } else { reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`); } } else { resolve(''); } }); } function putRecordsToKinesisStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) { client.putRecords({ StreamName: streamName, Records: records, }, (err, data) => { const codes = []; let failed = []; let errMsg = err; if (err) { failed = records; } else { for (let i = 0; i < data.Records.length; i++) { const code = data.Records[i].ErrorCode; if (code) { codes.push(code); failed.push(records[i]); } } errMsg = `Individual error codes: ${codes}`; } if (failed.length > 0) { if (attemptsMade + 1 < maxAttempts) { console.log('Some records failed while calling PutRecords, retrying. %s', errMsg); putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts); } else { reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`); } } else { resolve(''); } }); } function createReingestionRecord(isSas, originalRecord) { if (isSas) { return { Data: Buffer.from(originalRecord.data, 'base64'), PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey, }; } else { return { Data: Buffer.from(originalRecord.data, 'base64'), }; } } function getReingestionRecord(isSas, reIngestionRecord) { if (isSas) { return { Data: reIngestionRecord.Data, PartitionKey: reIngestionRecord.PartitionKey, }; } else { return { Data: reIngestionRecord.Data, }; } } exports.handler = (event, context, callback) => { Promise.all(event.records.map(r => { const buffer = Buffer.from(r.data, 'base64'); let decompressed; try { decompressed = zlib.gunzipSync(buffer); } catch (e) { return Promise.resolve({ recordId: r.recordId, result: 'ProcessingFailed', }); } const data = JSON.parse(decompressed); // CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable. // They do not contain actual data. if (data.messageType === 'CONTROL_MESSAGE') { return Promise.resolve({ recordId: r.recordId, result: 'Dropped', }); } else if (data.messageType === 'DATA_MESSAGE') { const promises = data.logEvents.map(transformLogEvent); return Promise.all(promises) .then(transformed => { const payload = transformed.reduce((a, v) => a + v, ''); const encoded = Buffer.from(payload).toString('base64'); if (encoded.length <= 6000000) { return { recordId: r.recordId, result: 'Ok', data: encoded, }; } else { return { recordId: r.recordId, result: 'ProcessingFailed', }; } }); } else { return Promise.resolve({ recordId: r.recordId, result: 'ProcessingFailed', }); } })).then(recs => { const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn'); const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn; const region = streamARN.split(':')[3]; const streamName = streamARN.split('/')[1]; const result = { records: recs }; let recordsToReingest = []; const putRecordBatches = []; let totalRecordsToBeReingested = 0; const inputDataByRecId = {}; event.records.forEach(r => inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r)); let projectedSize = recs.filter(rec => rec.result === 'Ok') .map(r => r.recordId.length + r.data.length) .reduce((a, b) => a + b, 0); // 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) { const rec = result.records[idx]; if (rec.result === 'Ok') { totalRecordsToBeReingested++; recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId])); projectedSize -= rec.data.length; delete rec.data; result.records[idx].result = 'Dropped'; // split out the record batches into multiple groups, 500 records at max per group if (recordsToReingest.length === 500) { putRecordBatches.push(recordsToReingest); recordsToReingest = []; } } } if (recordsToReingest.length > 0) { // add the last batch putRecordBatches.push(recordsToReingest); } if (putRecordBatches.length > 0) { new Promise((resolve, reject) => { let recordsReingestedSoFar = 0; for (let idx = 0; idx < putRecordBatches.length; idx++) { const recordBatch = putRecordBatches[idx]; if (isSas) { const client = new AWS.Kinesis({ region: region }); putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20); } else { const client = new AWS.Firehose({ region: region }); putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20); } recordsReingestedSoFar += recordBatch.length; console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName); } }).then( () => { console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName); callback(null, result); }, failed => { console.log('Failed to reingest records. %s', failed); callback(failed, null); }); } else { console.log('No records needed to be reingested.'); callback(null, result); } }).catch(ex => { console.log('Error: ', ex); callback(ex, null); }); }; |
前回と変わったところはLambdaを利用してLTSV形式に戻すように作り込む必要があるのですが、「いちいちLambda書かないといけないの?」という……心配は必要ございません。実はKinesisのTransform機能でLambdaのテンプレートを用意しております。(めちゃくちゃ助かる..)一応コード貼っておきますが、EC2での設定から説明していきましょう。
EC2
- install
1 |
# yum install -y awslogs |
- awscli.conf
1 2 3 4 |
[plugins] cwlogs = cwlogs [default] region = ap-northeast-1 |
- awslogs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
[general] state_file = /var/lib/awslogs/agent-state ## nginx [/app/nginx/access.log] datetime_format = %d/%b/%Y:%H:%M:%S %z file = /var/log/nginx/access.log buffer_duration = 5000 log_stream_name = {instance_id} initial_position = start_of_file log_group_name = /app/nginx/access.log [/app/nginx/error.log] datetime_format = %d/%b/%Y:%H:%M:%S %z file = /var/log/nginx/error.log buffer_duration = 5000 log_stream_name = {instance_id} initial_position = start_of_file log_group_name = /app/nginx/error.log |
- Nginx(LTSV)
- IAMロール割り当て
CloudWatchLogsFullAccess
今回はEC2で行いましたが、まずはawslogsの設定をしてCloudWatch Logsに出力しましょう。ECSの場合はawslogsをインストールしなくても標準出力していればCloudWatch Logsに出力されるので特に設定することはありません。ただ、Nginxとアプリが同居している場合はawslogsをインストールする必要があるので注意が必要です。続いてはTerraform化です。
Terraform
- assume_role_policy/firehose.json
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "firehose.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } |
- assume_role_policy/change_cloudwatchlogs_to_ltsv_policy.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:ap-northeast-1:xxxxxxxxxxxxx:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:ap-northeast-1:xxxxxxxxx:log-group:/aws/lambda/change_cloudwatchlogs_to_ltsv:*" ] }, { "Effect": "Allow", "Action": [ "firehose:PutRecordBatch" ], "Resource": [ "arn:aws:firehose:ap-northeast-1:xxxxxxxxxxxx:deliverystream/*" ] } ] } |
- assume_role_policy/kinesis_access_s3_logs_bucket.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::hoge.log.jp", "arn:aws:s3:::hoge.log.jp/*" ] } ] } |
- assume_role_policy/kinesis_allow_deliverystreams.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Resource": [ "arn:aws:firehose:ap-northeast-1:xxxxxxxxxxx:deliverystream/app-nginx_access-log", "arn:aws:firehose:ap-northeast-1:xxxxxxxxxxx:deliverystream/app-nginx_error-log" ] } ] } |
- assume_role_policy/kinesis_basic_role_policy.json
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } |
- assume_role_policy/lambda.json
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } |
- iam.tf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
## Kinesis (Firehose_Delivery_Role) resource "aws_iam_role" "firehose_delivery_role" { name = "Firehose_Delivery_Role" assume_role_policy = file("files/assume_role_policy/firehose.json") } resource "aws_iam_policy" "kinesis_access_s3_logs_bucket" { name = "Kinesis_Access_S3_Logs_Bucket" description = "Kinesis_Access_S3_Logs_Bucket" policy = file("files/assume_role_policy/kinesis_access_s3_logs_bucket.json") } resource "aws_iam_role_policy_attachment" "firehose_delivery_role-attach" { role = aws_iam_role.firehose_delivery_role.name policy_arn = aws_iam_policy.kinesis_access_s3_logs_bucket.arn } resource "aws_iam_role_policy_attachment" "firehose_delivery_role-attach_lambda" { role = aws_iam_role.firehose_delivery_role.name policy_arn = "arn:aws:iam::aws:policy/AWSLambda_FullAccess" } ## Kinesis (Kinesis_Basic_Role) resource "aws_iam_role" "kinesis_basic_role" { name = "Kinesis_Basic_Role" assume_role_policy = file("files/assume_role_policy/kinesis_basic_role_policy.json") } resource "aws_iam_policy" "kinesis_allow_deliverystreams" { name = "Kinesis_Allow_DeliveryStreams" description = "Kinesis_Allow_DeliveryStreams" policy = file("files/assume_role_policy/kinesis_allow_deliverystreams.json") } resource "aws_iam_role_policy_attachment" "firehose_basic_role-attach" { role = aws_iam_role.kinesis_basic_role.name policy_arn = aws_iam_policy.kinesis_allow_deliverystreams.arn } ## change_cloudwatchlogs_to_ltsv role resource "aws_iam_role" "change_cloudwatchlogs_to_ltsv_role" { name = "change_cloudwatchlogs_to_ltsv_role" assume_role_policy = file("files/assume_role_policy/lambda.json") } resource "aws_iam_policy" "change_cloudwatchlogs_to_ltsv_policy" { name = "change_cloudwatchlogs_to_ltsv_policy" description = "change_cloudwatchlogs_to_ltsv_policy" policy = file("files/assume_role_policy/change_cloudwatchlogs_to_ltsv_policy.json") } resource "aws_iam_role_policy_attachment" "change_cloudwatchlogs_to_ltsv_role-attach" { role = aws_iam_role.change_cloudwatchlogs_to_ltsv_role.name policy_arn = aws_iam_policy.change_cloudwatchlogs_to_ltsv_policy.arn } |
18行目でKinesis側にLambdaの権限を渡さないといつになっても、以下のようにLambdaが動作しないので、一旦Fullで権限を加えました。うまくいかないときはKinesis側のAmazon S3 destination > S3 bucket error output prefixで確認しましょう。
“errorCode”:”Lambda.InvokeAccessDenied”,”errorMessage”:”Access was denied. Ensure that the access policy allows access to the Lambda function.”
- kinesis.tf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
resource "aws_cloudwatch_log_group" "kinesis" { name = "/aws/kinesisfirehose/" retention_in_days = 30 } resource "aws_cloudwatch_log_group" "change_cloudwatchlogs_to_ltsv" { name = "/aws/lambda/change_cloudwatchlogs_to_ltsv" retention_in_days = 30 } resource "aws_kinesis_firehose_delivery_stream" "firehose_s3" { name = element(var.kinesis_name_s3, count.index) destination = "extended_s3" count = length(var.kinesis_s3_cloudwatch_logs_group) extended_s3_configuration { role_arn = aws_iam_role.firehose_delivery_role.arn bucket_arn = "arn:aws:s3:::hoge.log.jp" buffer_size = 3 buffer_interval = 60 prefix = join("", [element(var.kinesis_s3_cloudwatch_logs_group, count.index), "/!{timestamp:yyyy-MM-dd/}"]) error_output_prefix = join("", [element(var.kinesis_s3_cloudwatch_logs_group, count.index), "/result=!{firehose:error-output-type}/!{timestamp:yyyy-MM-dd/}"]) compression_format = "GZIP" cloudwatch_logging_options { enabled = true log_group_name = aws_cloudwatch_log_group.kinesis.id log_stream_name = "DestinationDelivery" } processing_configuration { enabled = "true" processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = "${aws_lambda_function.change_cloudwatchlogs_to_ltsv.arn}:$LATEST" } parameters { parameter_name = "BufferSizeInMBs" parameter_value = "1" } ## 差分が出るため適用したらコメントアウトすること #parameters { #parameter_name = "BufferIntervalInSeconds" #parameter_value = "60" #} } } } } resource "aws_cloudwatch_log_subscription_filter" "forward_cloudwatchlogs_to_s3" { name = "forward_cloudwatchlogs_to_kinesis" count = length(var.cloudwatch_logs_group) log_group_name = element(var.cloudwatch_logs_group, count.index) filter_pattern = "" destination_arn = element(aws_kinesis_firehose_delivery_stream.firehose_s3.*.arn, count.index) role_arn = aws_iam_role.kinesis_basic_role.arn distribution = "ByLogStream" } |
S3までのBufferとintervalは5MBと300秒にしました。またログの損失をしないように短ければ短いほどいいのですが、ここは調整が必要となります。
また、Kinesis Data Firehoseのスループット上限を超過する場合(PutRecordBatchの上限である、コールごとに最大500レコード、または4 MiB )があり、ServiceUnavailableExceptionが出る可能性があるので、Transform側のBuffer sizeとBuffer intervalは1MBと60秒にしてバッファ間隔を短くしています。S3先のディレクトリprefixは必ず年/月/日のように管理しましょう。
- lambda.tf
1 2 3 4 5 6 7 8 9 10 11 |
# change_cloudwatchlogs_to_ltsv resource "aws_lambda_function" "change_cloudwatchlogs_to_ltsv" { s3_bucket = "hoge.log.jp" s3_key = "lambda/change_cloudwatchlogs_to_ltsv.zip" function_name = "change_cloudwatchlogs_to_ltsv" role = aws_iam_role.change_cloudwatchlogs_to_ltsv_role.arn handler = "index.handler" runtime = "nodejs14.x" memory_size = "512" timeout = "300" } |
LambdaはS3から取ってくるようにしましょう。timeoutは5分にしていますが、手動のデフォルトで設定してしまうとKinesis側でtimeout伸ばしてくれと言われてしまうのであえて設定しています。
- variables.tf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
## CloudWatch Logs Group variable "cloudwatch_logs_group" { default = [ "/app/nginx/access.log", "/app/nginx/error.log ] } ## Kinesis variable "kinesis_s3_cloudwatch_logs_group" { default = [ "adachin/log/nginx/access", "adachin/log/nginx/error" ] } ## Kinesis Data Firehose Name variable "kinesis_name_s3" { default = [ "app-nginx_access-log", "app-nginx_error-log" ] } |
環境変数でCloudWatch Logs名、S3のディレクトリ先、Kinesis名を指定することで今後ログが増えた場合追加するだけなので便利です。あとはログがS3に保存されるか気楽に待つだけです!
ログ確認
1 2 3 4 |
$ mv app-nginx_access-log-9-2021-11-16-xxxxx.gz log $ head log time:2021-11-16T17:51:32+09:00 server_addr:xxx.xxx.xxx.xxx host:xxx.xxx.xxx.xxx method:GET reqsize:2232 uri:/hoge query: status:200 size:27 referer:https://hogehoge ua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95. Safari/537.36 forwardedfor:xx.xx.xxx.xx reqtime:0.107 apptime:0.110 time:2021-11-16T17:51:32+09:00 server_addr:xxx.xxx.xxx.xxx host:xxx.xxx.xxx.xxx method:GET reqsize:2232 uri:/hoge query: status:200 size:27 referer:https://hogehoge ua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95. Safari/537.36 forwardedfor:xx.xx.xxx.xx reqtime:0.107 apptime:0.110 |
ちゃんとLTSV形式になっていることを確認できました!オートスケールしても問題なくログは取れていたので、BigQueryやAthenaで取り込める準備が整いました。
※S3からgzファイルをダウンロードする時に既に解凍されているので、わざわざmvしなくても圧縮ファイルをクリックすれば中身を閲覧することができます。(AWSの私用らしい)
※余談
Digdag/Embulkの場合はembulk-input-s3プラグインを利用して以下のように設定すればBigQueryにシンクできるようになります。
- Gemfile
1 |
embulk-parser-ltsv |
- access_log.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 |
in: type: s3 ~省略~ parser: type: ltsv charset: UTF-8 newline: CRLF schema: - {name: "time", type: "timestamp", format: "%Y-%m-%dT%H:%M:%S%z"} - {name: "server_addr", type: "string"} - {name: "host", type: "string"} ~省略~ |
まとめ
簡単にAmazon Kinesisでログ基盤を作ることができました。Fluentd/td-agentの場合は様々なログ加工等しやすいというメリットがあるのですが、バージョンアップがツライことや運用工数を考えると、awslogsのメンテのしやすさとKinesisに任せてしまうというのがいいかもしれません。また課題が出てきたらブログします!!
0件のコメント