MMP 데이터 분석 파이프라인 구축기 - 3탄 <Streaming and Process>




Streaming and Process

System Architecture

mmp-datamap.png (MMP System Architecture)

지난번 글에서는 AWS API Gateway를 이용해 MMP에서 Data Ingestion을 할 수 있는 체계를 갖추고 Kinesis로 데이터를 안전하게 전송하는 부분에 대해 자세히 알아보았습니다. 이번 글에서는 Stream Storage와 Processing 역할을 하며 Data Lake로 분석 가능하도록 데이터를 보내주는 역할을 하는 Kinesis Data Streams와 Kinesis Data Firehose에 대해 알아보겠습니다.

big_picture.png (AWS Pipeline 구조)

Kinesis Data Streams는 데이터를 신속하고 정확하게 흐를 수 있도록 도와주는 역할(Stream Storage)을 하고 Kinesis Data Firehose는 여기에 적재된 데이터를 최종적으로 원하는 장소에 배달해 주는 역할(Stream Process)을 합니다. 결국, API Gateway에서 전달받은 데이터를 Data Lake까지 안전하게 배달하여 적재해 주는 중간 다리 역할의 Stream Storage와 Stream Process에 대해서 살펴본다고 할 수 있겠습니다.




Stream Storage

태피툰(Tappytoon)의 성장과 함께 사용자가 생성해 내는 이벤트의 양도 점점 증가하고 있습니다. 이러한 빠른 생성 속도에 맞추어 실시간으로 데이터를 손실 없이 안전하게 전송해 주는 파이프라인이 필요한데 그것이 바로 Stream Storage입니다.



analysis-architecture.png (보통의 Data Pipeline 구성 요소의 역할)

먼저, 실시간 데이터 분석 시스템의 주요 구성 요소를 살펴보겠습니다.

  1. MMP와 같은 Data Source에서 데이터가 빠른 속도로 생성된다.
  2. 생성된 데이터를 수집(Ingestion)한다.
  3. 데이터를 Stream Storage에 저장한다.
  4. Batch 또는 실시간으로 Stream Storage에 임시 저장된 데이터를 실시간으로 분석하거나 처리하거나 변경한다.
  5. 최종적으로, Data Lake or 웨어 하우스 or 레이크 하우스에 적재하여 데이터를 소비한다.

이때 데이터들의 입력 순서를 보장하고 재처리가 가능하며 일정 기간 저장할 수 있게 해주는 Stream Storage를 먼저 설명드리겠습니다.

kinesis-kafka.png (AWS인프라를 사용한 Data Pipeline 구성 옵션)

데이터를 수집하고 API를 제공하는 MMP는 Data Source로 볼 수 있고, 이전 글에서 다루었던 AWS API Gateway는 Data 수집(Stream Ingestion)을 담당합니다. Stream Storage는 일정한 주기 또는 일정한 크기의 데이터를 처리할 수 있도록 데이터를 잠시 버퍼링해 주는 저장 공간입니다. Kinesis Data Streams와 Amazon Managed Streaming for Kafka(MSK)는 Stream Storage의 역할을 해주는 대표적인 인프라입니다. 이후 Stream Process를 통해 Data Sink로 데이터가 운반되어 소비됩니다.




Queue, Distributed and Storage

먼저, Kinesis Data Streams나 MSK에는 Stream Storage의 역할을 수행할 수 있는 특징 세 가지가 있습니다. 첫째, 큐(Queue)입니다. 즉, 데이터가 입력된 순서대로 처리되도록 순서를 보장해 줍니다. 둘째, 그냥 한 개의 큐가 아닌, 데이터를 여러 개로 나눠서 처리할 수 있는 분산(Distributed) 큐를 이용하도록 만들어졌습니다. 셋째, 데이터를 저장하기 위해서 메모리가 아닌 디스크와 같은 비휘발성 저장소(Storage)를 사용합니다. 이러한 각각의 특징을 좀 더 자세하게 살펴보겠습니다.



큐(Queue)

queue.png (큐 예시)

첫째로 큐의 특성을 살펴보겠습니다. 큐는 데이터가 입력된 순서대로 처리되도록 순서를 보장해 줍니다. 다음의 예시를 보겠습니다. 유저가 웹툰을 본 기록을 스트림에 저장하고 처리한다고 가정했을 때, 제일 먼저 스트림(또는 큐)에 들어온 ‘벚꽃 지나 봄’의 데이터가 큐에서 빠져나가고(Pop), 이제 막 들어온(Push) ‘나 혼자 레벨업’의 데이터는 큐에서 가장 마지막 순서로 저장됩니다. 이때 데이터를 생성해서 스트림에 보내는 주체를 Producer라고 하고, 스트림에 저장된 데이터를 가져가서 사용하는 주체를 Consumer라고 합니다.

queue_producers.png (Producer와 Consumer)



분산(Distributed)

둘째로, 분산(Distributed)의 특성을 살펴보겠습니다. In-memory 자료 구조인 큐와 Stream Storage의 큐는 조금 다른 점이 있습니다. 만약 위의 그림에서처럼 오른쪽에 Producer가 많아져서 엄청나게 많은 데이터가 큐에 들어온다면, 큐는 대용량의 트래픽을 처리할 수 있어야 합니다. 보통 웹 서버에서는 트래픽이 많아지면 스케일 업을 하거나 스케일 아웃을 해서 트래픽을 처리합니다. Kinesis Streams와 MSK는 스케일 업과 스케일 아웃, 이 두 가지 방법 중에서 스케일 아웃이라는 전략을 사용하여 분산이라는 특징을 갖게 됩니다.

distributed_queue.png (Distributed Queue)

이 예시에서 보시다시피, Kinesis Streams나 MSK는 동일한 큐를 내부적으로 여러 개 가지고 있고, 이러한 큐를 각각 Shard 또는 파티션이라고 부릅니다. 각각의 큐를 구분하기 위해 큐에 번호를 붙여서 Shard 1, Shard 2, Shard 3 등으로 부릅니다. 그리고 효율을 극대화하기 위해, 모든 큐에 비슷한 트래픽의 데이터가 분산될 수 있도록 ‘데이터 부하 분산 전략’이 필요합니다. 그래서 Hash Function(해시 함수)을 이용해서 들어오는 데이터를 샤드나 파티션으로 골고루 분배해 줍니다. 보통 Producer에서 데이터를 보낼 때 파티션 또는 샤드 키를 함께 보내주면 데이터를 해당하는 큐로 보내는 방식입니다. 이렇게 여러 개의 큐로 분산하면, 즉 스케일 아웃을 하면 쓰기 및 읽기의 성능을 올릴 수 있게 됩니다. 위 예시에서는 각 샤드마다 Consumer를 다르게 생성했지만, 여러개의 Consumer가 한 샤드에 있는 데이터를 사용할 수 있으며 원하는 목적에 따라 구성은 달라집니다.



스토리지(Storage)

셋째로, Kinesis Streams와 MSK의 스토리지로서의 역할입니다. 흔히 알고 있는 In-memory 큐를 생각해 볼까요? 큐에서 데이터가 한번 읽히면(Pop) 그 데이터는 해당 큐에서 삭제됩니다. 이러한 특징을 만약 그대로 사용한다면, 한번 읽힌 데이터를 다른 Consumer가 읽으려고 했을 때 해당 데이터는 큐에 없기 때문에 읽을 수가 없게 됩니다. 또한 큐에 저장된 앞쪽부터 순차적으로 사용해야 하죠.

queue_order_1.png (허니블러드 - In-memory 큐)

Kinesis Streams와 MSK에서는 어떤 특정 Consumer가 데이터를 읽었다고 해서 그 데이터를 삭제하는 것이 아니라 큐에 그대로 남겨둡니다. 그다음에 읽을 데이터의 위치만 내부적으로 표시하는 방식입니다. 그래서 다른 Consumer가 읽으려고 해도 큐에 데이터가 남아있기 때문에 처음 또는 특정 위치부터 데이터를 읽는 것이 가능합니다.

queue_order_2.png (허니블러드 - Kinesis Streams와 MSK)

아래 예시를 보면, 각 큐마다 그 다음 Consumer로 보내질 아이템들의 Index를 각각 기록하고 있습니다. Queue에서 데이터가 Push되어 데이터가 Consumer에 전달된다고 하더라도, Index값만 변경될 뿐, 데이터는 Streams Storage에 남아있게 됩니다. 그렇기 때문에, 다른 Consumer가 또다른 요청을 했을 경우, Stream Storage에 남아있는 데이터는, Index에 상관 없이, 전송을 시작할 수 있게 됩니다.

distributed_queue_index.png (Index를 사용하는 Stream Storage의 예시)

이러한 관점에서 Kinesis Data Streams와 MSK를 일종의 Storage라고 생각할 수 있습니다. 하지만 데이터를 얼마나 오랫동안 보관할 것인지를 설정하는 Retention이 존재하고, 이 Retention 기간이 지난 데이터는 오래된 순서부터 자동으로 삭제됩니다. 이 Retention 기간을 길게 잡을수록 인프라 비용은 올라가게 됩니다. 내부 구조가 어떻게 되어있는지 확실하게 설명드릴수는 없지만, 정리해보자면 순서 보장이 되고, 일정 기간 저장할 수 있으며, 이미 전송되었던 데이터도 재사용 할 수 있고, 여러 컨슈머가 사용할 수 있는 리소스입니다.

이러한 세 가지 특징(큐, 분산, 스토리지) 때문에 Kinesis Data Streams와 MSK를 Stream Storage 라고 부릅니다. Data Stream Storage 서비스를 선택하는 과정에서 이 두 인프라를 포함한 여러 가지 기술 스택을 리서치했는데, Kafka및 Kinesis 서비스로 최종 후보군이 압축되었고 결론적으로는 Kinesis Data Streams를 사용하기로 결정했습니다.




Terraform Module

태피툰에서는 IaC(Infrastructure as Code)를 위해 거의 모든 인프라를 Terraform으로 관리하고 있는데, 지난번 글에서 다룬 AWS Gateway API와 같이, Kinesis도 Module을 만들어서 추후 다른 수집 파이프라인을 만들 때에 재사용할 수 있도록 했습니다. Firehose에 Lambda를 넣을 수 있도록 하였고, Kinesis Streams와 Firehose를 동시에 생성한 후 각자의 서로가 Source와 Target로 설정하는 로직을 넣어 주었습니다. 즉, Kinesis Streams는 Firehose의 Source가 되고, 반대로 Firehose는 Streams의 Target이 되는것입니다.

locals {
 lambda_arns = var.lambda_arn != "" ? [var.lambda_arn] : []
}

…(생략)

resource "aws_kinesis_stream" "kinesis_stream" {
 name = format("%s-stream", var.name)
 shard_count = var.stream_shard_count
 retention_period = var.stream_retention_period
 shard_level_metrics = var.stream_shard_level_metrics
 enforce_consumer_deletion = true
}

resource "aws_kinesis_firehose_delivery_stream" "kinesis_firehose_s3" {
 name = format("%s-firehose", var.name)
 destination = "extended_s3"

 kinesis_source_configuration {
   role_arn = var.firehose_role_arn
   kinesis_stream_arn = aws_kinesis_stream.kinesis_stream.arn
 }

 extended_s3_configuration {
   bucket_arn = var.datalake_bucket_arn
   role_arn = var.firehose_role_arn
   buffer_size = var.buffer_size_mb
   buffer_interval = var.buffer_interval_seconds
   compression_format = var.compression_format
   error_output_prefix = "firehose/errors/!{firehose:error-output-type}/${var.bucket_prefix}"
   prefix = var.bucket_prefix

   processing_configuration {
     enabled = length(local.lambda_arns) > 0

     dynamic "processors" {
       for_each = local.lambda_arns

       content {
         type = "Lambda"

         parameters {
           parameter_name = "LambdaArn"
           parameter_value = "${processors.value}:$LATEST"
         }
       }
     }
   }

   s3_backup_mode = "Enabled"
   s3_backup_configuration {
     bucket_arn = var.datalake_bucket_arn
     role_arn = var.firehose_role_arn
     buffer_size = var.buffer_size_mb
     buffer_interval = var.buffer_interval_seconds
     compression_format = var.backup_compression_format
     prefix = "firehose/backups/${var.bucket_prefix}"
   }

   cloudwatch_logging_options {
     enabled = true
     log_group_name = aws_cloudwatch_log_group.log_group.name
     log_stream_name = aws_cloudwatch_log_stream.log_stream.name
   }
 }

 lifecycle {
   ignore_changes = [
     extended_s3_configuration,
   ]
 }
}




Kinesis Data Firehose with Lambda

Firehose

stream_process.png (Stream Process)

Stream Storage에 담겨 있는 정보를 누군가 목적지까지 가져가야만 분석을 진행할 수 있는데, 이를 보통 Stream Process라고 합니다. 이때 바로 Kinesis Data Firehose가 스트리밍 이벤트 데이터를 원하는 목적지에 배달해 주는 배달부 역할을 합니다. Kinesis는 탄력적 확장 기능이 있는 서비스이므로 이를 이용해서 데이터를 배달하면, 트래픽의 양이 늘어나더라도 자동으로 스케일링되기 때문에 특별한 구성을 할 필요가 없습니다. 또한 이벤트의 양이 엄청나게 많다 해도 단일 파일로 만드는 것이 가능합니다.

kinesis.png (AWS Amazon Kinesis Data Firehose)

이 예시처럼 Kinesis를 통해 원하는 목적지에서 데이터를 가져오고 필요하다면 원하는 것을 수정해서 원하는 장소에 저장하도록 할 수 있습니다.

Blog_Part3_Kinesis.png (Tappytoon's Data Streams and Process)

저희 같은 경우에는 Source, 즉 데이터를 받아오는 곳이 Kinesis Data Streams입니다. 그리고 Target, 즉 목적지는 Data Lake(S3)입니다. Data Lake로 이벤트 데이터를 보내기 전에 Lambda Function을 이용해 이벤트 데이터에서 몇 가지를 수정하고 데이터를 넘겨주고 있습니다.

(허니블러드 - Kinesis Data Firehose의 임무)



Lambda

Firehose를 사용하면, Lambda를 이용해 약간의 데이터 전처리를 해줄 수 있다는 장점이 있습니다. Data Lake에 들어간 데이터에 반드시 수정해야 할 점이 있다면, Lambda를 Firehose에 붙여주는 것이 좋은 방법입니다. 저희도 Raw 데이터를 Data Lake에 적재하기 전, 몇 가지 수정할 사항이 보였습니다.

먼저, MMP에서 오는 데이터는 모든 데이터가 Line break 없이 한 줄로 오고 있었습니다. 이 때문에 데이터마다 New line(‘\n’)을 넣어주어야 했습니다. 그렇지 않으면 추후에 Athena에서 쿼리할 때 한 파일에 있는 모든 데이터를 1건으로 인식하기 때문이었습니다.

그리고 추후 분석 시 편의를 위해 Nested된 JSON들을 Flatten하는 작업을 추가했습니다. 그 밖에도 기존 Frontend 앱에서 Appsflyer로 데이터를 전달할 때 명시해 주는 Key name이 Naming convention에 맞지 않게 보내지고 있거나 공백이 들어가 있는 등 여러 가지 문제점이 있었는데, 이런 부분도 알맞게 저장될 수 있도록 Transform 및 Migration을 하는 작업도 진행했습니다.

다음은 Firehose에서 사용된 Lambda 코드의 일부분입니다. 보시는 것처럼, ‘\n’ 캐릭터를 각 데이터마다 넣어 주었고, ‘_’ 를 중심으로 Nested 된 JSON을 Flatten하는 작업을 수행했습니다.


const flatten = require('flat').flatten; const RESULT_OK = 'Ok'; const encodeData = (parsedData) => { return Buffer.from(JSON.stringify(parsedData) + '\n', 'utf-8').toString('base64'); }; const decodeData = (data) => { return JSON.parse(Buffer.from(data, 'base64').toString('utf-8')); }; const flattenData = (data) => { return flatten(data, { delimiter: '_' }); } exports.handler = (event, context, callback) => { const output = event.records.map((record) => ({ recordId: record.recordId, result: RESULT_OK, data: encodeData(flattenData(transformKinesisData(record.data))), })); callback(null, {records: output}); };

이렇게 처리된 데이터는 다음과 같습니다. JSON Parsing과 Flatten작업 등을 거쳐, Athena 또는 Redshift를 이용하면 S3에서 바로 쿼리가 가능한 데이터로 변했습니다.


Before

{"event_name": "view","event_value":"{\"screen_info\":\"Reader\",\"content_list\":{\"comic_id\":321,\"chapter_id\":1234567,\"chapter_sequence\":76,\"comic_title\":\"Father, I don't Want this Marriage\",\"content_type\":\"comic\"},\"user_id\":112233}","event_time": "2022-06-21 00:12:30.017"}{"event_name": "view", "event_value":"{\"screen_info\":\"Reader\",\"content_list\":{\"comic_id\":123,\"chapter_id\":1234,\"chapter_sequence\":76,\"comic_title\":\"Solo Leveling\",\"content_type\":\"comic\"},\"user_id\":112233}","event_time": "2022-06-21 00:12:33.217"}

After

{ "event_name": "view", "event_value_screen_info": "Reader", "event_value_content_list_comic_id": 321, "event_value_content_list_chapter_id": 1234567, "event_value_content_list_chapter_sequence": 76, "event_value_content_list_comic_title": "Father, I don't Want this Marriage", "event_value_content_list_content_type": "comic", "event_value_user_id": 112233, "event_time": "2022-06-21 00:12:30.017", } { "event_name": "view", "event_value_screen_info": "Reader", "event_value_content_list_comic_id": 123, "event_value_content_list_chapter_id": 1234, "event_value_content_list_chapter_sequence": 76, "event_value_content_list_comic_title": "Solo Leveling", "event_value_content_list_content_type": "comic", "event_value_user_id": 112233, "event_time": "2022-06-21 00:12:33.217", }

Firehose에서는 Lambda의 Parsing 작업에 오류가 있어 파일이 잘못 Parsing 되었을 경우를 대비해 Lambda 작업이 되기 전에 완전 Raw한 형태의 Data도 Backup으로 적재할 수 있도록 지원해 줍니다. 그리고 Error가 발생한 파일들도 따로 적재하기 때문에 이렇게 Lambda로 작업하더라도 위험 부담을 줄일 수 있습니다.



Kinesis to Data Lake

Data Lake에 데이터를 저장할 때는 JSON 형태 그대로 저장했습니다. 그런데 앞서 말씀드린 Transform과 Migration 작업으로 인해 필수적인 부분은 가공해 주었지만 최대한 Raw한 형태를 저장하는 게 미래를 위해 안전하다고 판단했습니다. Raw한 저장본이 있기 때문에 Parquet 변환 같은 작업을 추후에 수행해도 안전하게 진행할 수 있었습니다.

이렇게 Gateway에서 넘어온 데이터를 1차 가공하여 S3에 적재하는데, 이때 폴더를 파티션해서 저장해 주었습니다. Firehose를 Default 설정으로 사용하면, 자동으로 Root 폴더 아래에 데이터가 year/month/day/hour로 파티셔닝 되어 저장됩니다. 저희가 구축한 MMP 파이프라인에서는 플랫폼별로 스트리밍 라인이 있기 때문에 플랫폼도 파티션 사항에 추가했습니다. 추후에 블로그에서 다루겠지만, 이렇게 파티셔닝을 잘해서 적재해 두면, 원하는 만큼의 데이터만 읽어서 쿼리하기 때문에 비용적으로나 시간적으로 더 많은 이득을 볼 수 있습니다.

이렇게 Kinesis Data Streams와 Kinesis Data Firehose를 이용하여 AWS Gateway API에서 전달받은 데이터를 실시간으로 S3에 파티션해서 적재하는 파이프라인을 만들었습니다. 다음에는 S3에 적재된 데이터를 좀 더 효율적으로 쿼리할 수 있도록 처리하는 방법과 다양한 방식으로 쿼리하거나 시각화한 내용에 대해 포스팅하도록 하겠습니다.

(허니블러드 - DataLake, 너에게 보내는 편지)



Reference

2022 Summit Korea 데이터 분석 실시간으로 처리하기: Kinesis Data Streams vs MSK


Genie