MMP 데이터 분석 파이프라인 구축기 - 2탄 <AWS Gateway API>


AWS API Gateway


System Architecture

이 시스템의 전체적인 아키텍처는 아래와 같습니다.

mmp-datamap.png

그전에 비해 다양한 인프라를 사용하는 것은 사실입니다. 저도 사실 API Gateway, Kinesis, Lambda 그리고 S3까지, 모두 처음 접해보는 AWS 인프라들이었습니다. 아키텍처가 너무 복잡해지는 게 아닌지, 유지보수가 어렵지는 않을지, 또 구축하고 운영하는 데 힘들지 않을지 고민도 되었습니다.

(겨울 지나 벚꽃)





하지만 이렇게 구성을 한 이후, 생각보다 유지보수에 많은 어려움이 있지는 않았고, 구조적으로는 안정성을 갖게 되었습니다. 또한 데이터 분석 및 시각화가 가능해졌고, 추천 시스템의 트레이닝 데이터로의 활용 등 다양한 리소스 등으로도 사용이 가능해졌습니다. 가장 중요한 포인트는 데이터 양이 늘어나더라도 인프라 관리를 크게 고려하지 않아도 된다는 점입니다. API Gateway, Kinesis Data Streams, Kinesis Firehose, Lambda 그리고 저장장치인 S3까지, 데이터의 양에 비례해서, Auto Scailing이 되는 인프라이기 때문입니다. (물론 회사 규모가 커지고, 데이터가 방대해지게 되면 비용이 부담될 수 있습니다.)





오늘은 저희가 사용 중인 MMP툴에서 제공하는 파이프라인의 문지기, API Gateway에 대해서 설명해보려고 합니다. API Gateway는 실시간 Push API와 연동되어 데이터를 가장 앞단에서 받아내는 역할을 하는 시스템입니다.

실제 구축 단계에서는 API Gateway를 가장 마지막에 생성하게 됩니다. 그 이유는 Gateway에서 Kinesis로 데이터를 보내게 되고, Kinesis에서 S3로 데이터를 보내게 되는데, 각 인프라를 생성하려면 Target이 되는 인프라가 먼저 생성되어 있어야 하기 때문입니다.



AWS API Gateway


AWS API Gateway란 무엇일까요? AWS에서 제공하는 API Gateway는 규모와 관계없이 API를 생성, 게시, 유지, 모니터링 및 보호하기 위한 서비스입니다.
Gateway를 이용하면, 서버를 구축해서 관리할 필요 없이, 안정성이 있는 API를 트래픽 양을 크게 고려하지 않고 손쉽게 관리할 수 있습니다. 이 API는 어플리케이션으로 가는 ‘입구’로 생각할 수 있는데요, 수백 수천 개의 API call들을 지속적으로 받아주거나 처리하는 작업을 할 수 있습니다. IP Whitelisting 기능을 자체적으로 지원하기 때문에, 안전한 곳에서 오는 데이터만 받아주도록 처리할 수도 있습니다.
저희가 사용하고 있는 MMP의 Push API와 연동하기 위해서는 Rest API endpoint를 만들어서 등록해야 했습니다. 처음에는 플랫폼과 무관하게 단일 endpoint로 구성하려고 했으나, downstream은 android, 그리고 ios 스트림 등 플랫폼별로 따로 생성해서, 별개의 스트림으로 보내주는 구성으로 하는 것이 좋겠다고 판단했습니다. ios와 android의 트래픽 양이 크게 차이 나기 때문이었습니다.
만약 같은 스트림으로 두 플랫폼 데이터를 받게 되었을 때, android의 트래픽이 2배 정도라면, 결과적으로 ios데이터가 예상보다 늦게 도착하는 원인이 될 수 있습니다. 그리고 트래픽 양에 따라서 stream을 확장할 때, 다른 플랫폼은 확장할 필요가 없는데도 비용을 더 추가해야 할 경우가 생길 수 있습니다. 또한 단일 endpoint로 구성할 경우 data를 적절한 downstream에 mapping해줘야 하는 추가 운영비용이 발생합니다. 따라서 platform별 다중 endpoint를 구성하는 것으로 결정했습니다.









Allowlisting




















(겨울 지나 벚꽃)


AWS API Gateway에서는 IP Whitelisting 기능을 자체적으로 지원합니다. 저희가 사용 중인 MMP에서 제공하는 Push API는 Public Internet을 경유하는 HTTP 통신만 지원하고 있습니다. 그렇기 때문에 Push API에서 전송되는 traffic만 허용해서 안전한 데이터만 받을 수 있도록 구성해주어야 하는데요, MMP에서 오는 데이터만 받는 로직을 Gateway에 추가해서, 안정성을 강화할 수 있습니다. AWS API Gateway에서는 손쉽게 IP Whitelisting을 구성할 수 있는데, MMP의 allowlisting(서버 IP address range)들만 허용할 수 있도록 아래와 같이 리소스 정책을 생성해서 추가했습니다. 아래는 예시 IP들을 이용한 정책의 예시입니다. 예시에서는 NotIpAddress를 이용해 주어진 IP가 아니라면 허용하지 않는 룰을 보여주고 있습니다.



{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": "*", "Action": "execute-api:Invoke", "Resource": "execute-api:/*/*/*" }, { "Effect": "Deny", "Principal": "*", "Action": "execute-api:Invoke", "Resource": "execute-api:/*/*/*", "Condition": { "NotIpAddress": { "aws:SourceIp": [ "1.2.3.0/22", "4.5.6.7/31", "8.9.10.11/32" ] } } } ] }

특정 IP 주소만 API Gateway REST API에 액세스하도록 허용하려면 어떻게 해야 합니까? (Link)

위의 Policy를 Terraform으로 생성하면 아래와 같습니다.

resource "aws_api_gateway_rest_api_policy" "ip_policy" { rest_api_id = aws_api_gateway_rest_api.this.id policy = jsonencode({ "Version" = "2012-10-17", "Statement" = [ { Effect = "Deny", Principal = "*", Action = "execute-api:Invoke", Resource = "execute-api:/*/*/*", Condition = { "NotIpAddress" = { "aws:SourceIp" = [ "1.2.3.0/22", "4.5.6.7/31", "8.9.10.11/32" ] } } }, { Effect = "Allow", Principal = "*", Action = "execute-api:Invoke", Resource = "execute-api:/*/*/*" } ] }) }

해당 방식에 한 가지 단점이 있다면, MMP에서 제공하는 IP 대역이 변경될 경우, 그에 대응해서 인프라를 수정해야 하는 관리 비용이 들어갑니다. 만약 이 변경사항을 미리 적용하지 않는다면, 안전한 클라이언트로부터의 리퀘스트가 거절되어서 데이터가 유실되는 상황이 발생할 수 있습니다.

하지만 MMP 솔루션에서 IP 대역을 변경하는 경우가 잦지는 않기 때문에, 아직까지는 큰 문제 없이 사용 중입니다.





Terraform Module

태피툰에서는 IaC(Infrastructure as Code)를 위해 Terraform으로 거의 모든 인프라를 관리하고 있는데, 아래 이미지에서 보이는 것과 같은 Gateway 모듈을 만들어서 추후 다른 수집 파이프라인을 만들 때 재사용할 수 있도록 했습니다.

먼저 API Gateway REST API, deployment, stage 그리고 method settings들을 생성해줍니다. 이 리소스들은 각 플랫폼이 공통으로 사용하도록 해주었습니다. MMP에서 API를 호출하기 위해서는 deployment와 stage를 생성해서 연결해야 합니다. (자세한 내용은 AWS 문서를 참고해주세요.)

resource "aws_api_gateway_rest_api" "this" { name = var.api_name description = var.api_description endpoint_configuration { types = [ "REGIONAL" ] } } resource "aws_api_gateway_deployment" "this" { rest_api_id = aws_api_gateway_rest_api.this.id lifecycle { create_before_destroy = true } } resource "aws_api_gateway_stage" "this" { deployment_id = aws_api_gateway_deployment.this.id rest_api_id = aws_api_gateway_rest_api.this.id stage_name = var.stage_name cache_cluster_size = "0.5" lifecycle { ignore_changes = [ deployment_id, ] } } resource "aws_api_gateway_method_settings" "this" { rest_api_id = aws_api_gateway_rest_api.this.id stage_name = aws_api_gateway_stage.this.stage_name method_path = "*/*" settings { metrics_enabled = true logging_level = "INFO" } }

이렇게 stage와 deployment가 세팅된 method를 이용해서 아래와같이 각 플랫폼별로 resource, method request, integration request, integration response 그리고 method response를 생성해주면 됩니다.

resource "aws_api_gateway_resource" "this" { count = length(var.method_info) rest_api_id = aws_api_gateway_rest_api.this.id parent_id = aws_api_gateway_rest_api.this.root_resource_id path_part = var.method_info[count.index].api_gateway_path_part } resource "aws_api_gateway_method" "this" { count = length(var.method_info) rest_api_id = aws_api_gateway_rest_api.this.id resource_id = aws_api_gateway_resource.this[count.index].id http_method = "POST" authorization = "NONE" api_key_required = false } resource "aws_api_gateway_integration" "this" { count = length(var.method_info) rest_api_id = aws_api_gateway_rest_api.this.id resource_id = aws_api_gateway_resource.this[count.index].id http_method = aws_api_gateway_method.this[count.index].http_method type = "AWS" integration_http_method = "POST" uri = "arn:aws:apigateway:${var.kinesis_region}:kinesis:action/PutRecord" credentials = var.credentials passthrough_behavior = "NEVER" request_templates = { "application/json" = jsonencode({ "StreamName": var.method_info[count.index].kinesis_stream_name, "Data": "$util.base64Encode($input.body)", "PartitionKey": "$context.requestId" }) } } resource "aws_api_gateway_method_response" "this" { count = length(var.method_info) rest_api_id = aws_api_gateway_rest_api.this.id resource_id = aws_api_gateway_resource.this[count.index].id http_method = aws_api_gateway_method.this[count.index].http_method status_code = "200" } resource "aws_api_gateway_integration_response" "this" { count = length(var.method_info) rest_api_id = aws_api_gateway_rest_api.this.id resource_id = aws_api_gateway_resource.this[count.index].id http_method = aws_api_gateway_method.this[count.index].http_method status_code = aws_api_gateway_method_response.this[count.index].status_code }



Role and Policy

여기서 gateway api가 Kinesis에 데이터를 보낼 수 있게끔 권한을 주기 위해서는, 아래와 같은 iam role을 생성해주어야 합니다. 아래와 같이 role을 만들어주고, kinesis에 엑세스할 수 있는 policy를 더해주어야 하는데, 이 프로젝트 같은 경우 Kinesis에 데이터를 쓰는 권한이 필요하기 때문에 Full Access를 주었습니다. 아래와 같이 직접 policy를 작성해서 iam role에 추가해줄 수도 있고, 또는 AWS에서 제공하는 IAM-provisioned managed policy인 arn:aws:iam::aws:policy/AmazonKinesisFullAccess를 사용해도 무관합니다.

data "aws_iam_policy_document" "assume_role" { statement { sid = "APIGateWayAssumeRole" effect = "Allow" actions = [ "sts:AssumeRole", ] principals { identifiers = [ "apigateway.amazonaws.com", ] type = "Service" } } } data "aws_iam_policy_document" "kinesis_policy" { statement { sid = "AllowKinesisAll" effect = "Allow" actions = [ "kinesis:*" ] resources = local.all_resources } }

* Create a REST API as an Amazon Kinesis proxy in API Gateway (Link)



MMP to Kinesis

위와 같이 모듈을 만들어준 후, 모듈을 생성해 ios, android를 포함한 다양한 endpoints를 생성할 수 있습니다. 각 endpoint마다 다른 path와 kinesis stream의 이름만 input으로 주고, 모듈을 생성하면 됩니다.

locals { af_android_method_info = { api_gateway_path_part = "android" kinesis_stream_name = module.kinesis_for_appsflyer_android.kinesis_stream_name } af_ios_method_info = { api_gateway_path_part = "ios" kinesis_stream_name = module.kinesis_for_appsflyer_ios.kinesis_stream_name } } module "appsflyer" { source = "../modules/aws-api-gateway-for-mmp" api_name = "mmp-kinesis-stream-proxy-gateway" api_description = "Proxy to handle requests from an external service to a kinesis stream." credentials = module.api_gateway_account_with_role.iam_role_arn kinesis_region = data.aws_region.current.name stage_name = "af" method_info = [ local.af_android_method_info, local.af_ios_method_info, ] }




이렇게 생성한 Gateway API의 구성을 다시 살펴보면, 다음과 같습니다.


  1. MMP툴에, Method Requests의 endpoints들을 등록해서, 허용하는 곳에서 오는 데이터를 받아옵니다.
  2. 이후 Integration Request으로 또 다른 리퀘스트를 보내는데,
  3. Integration Request는 Amazon Kinesis Stream의 API 인 PutRecord를 이용해 Kinesis Stream에 데이터를 보내줍니다.
  4. 이후 Kinesis가 성공적으로 데이터를 받았다면 Integration Response로 응답을 주게 되는데,
  5. 다시 Method Response를 이용해서
  6. MMP에 응답을 보내주는 방식입니다.

그렇게 Gateway에서 받은 MMP raw 데이터는 Kinesis Stream으로 보내지게 되고, Firehose를 통해 최종적으로 Data Lake인 S3버킷에 파티션되어 저장됩니다. Kinesis Stream과 Firehose에 대해서는 다음 글에서 자세하게 설명 드리겠습니다.

Genie