안녕하세요, 토스 코어 Data Warehouse 팀의 김문수입니다.
Spark 작업이 효율적으로 실행되는지 모니터링하기 위해서 Spark Analyzer를 만든 경험을 공유하려고 합니다. 저희와 비슷한 고민을 가진 분들께 도움이 되길 바랍니다.
- Spark Job이 많아 관리가 어려워지고, 어디서부터 성능을 개선해야 할지 막막한 분
- Spark 클러스터 비용이 많이 나오거나, 리소스가 부족해서 고통받는 분
- 많은 유저들이 만들어 낸 Spark Job을 직접 고칠 수 있게 돕고 싶은 분
일단 돌아가게 만든 작업들이 모이면, 비싸고 느린 데이터 파이프라인이 됩니다.
Spark는 굉장히 강력한 도구이지만, 잘못 사용하면 비효율적으로 동작하여 시스템에 부담을 줄 수 있습니다. 토스코어에서는 하루 평균 6천 개 이상의 Spark 작업이 매일 또는 특정 주기로 실행되고 있는데요. 여러 팀에서 각기 다른 수준과 방식으로 작업을 추가하고 운영하다 보니 효율성이 떨어지는 작업이 종종 추가되기도 합니다. 그리고 비록 처음에는 효율적으로 만들었다고 하더라도, 시간이 지나면 처리하는 데이터의 양상이 달라져서 비효율적으로 동작하는 경우도 있습니다.
이러한 비효율적인 작업을 모니터링하고 개선할 방법을 고민하던 중, Uber의 Spark Application 안티 패턴을 다룬 블로그 글에서 영감을 받았습니다. Spark 메트릭을 계산해, 성능 개선이 필요한 작업이 있으면 시스템이 자동으로 경고를 보내는 구조를 만들고자 했습니다.
그러던 와중에 DataFlint라는 플러그인을 사용하게 되었습니다. DataFlint는 Spark Application UI와 History Server를 통해 다양한 메트릭을 시각적으로 보여주어 문제를 직관적으로 파악할 수 있게 도와주는 도구입니다.
DataFlint만으로도 Alert 화면에서 이 작업에 대한 문제를 바로 알 수 있어서 도움이 되고, 필요로 하는 핵심적인 metric이 구현되어서 좋긴 합니다만, 화면에 일일이 들어가서 확인하는 것은 비효율적이었습니다. (하루 평균 6천 개…!)
그래서 처음에는 Spark History Server에서 애플리케이션 목록을 구해서 DataFlint로 분석하고, 이를 기반으로 알림을 주기적으로 보내는 시스템을 구상했어요. 그런데 역시 쉽지가 않더라고요. DataFlint는 당시 오픈소스로는 REST API를 제공하지 않고 있기 때문이었어요. 고민이 됐습니다. “API는 없더라도 화면에는 보이니까 web parsing을 해야 할까?” 하고요.
결국 Spark History Server의 API와 자체적으로 계산할 수 있는 메트릭을 활용하는 방향으로 선회했습니다. 직접 계산하는 것이 커스텀 메트릭을 추가하거나 임계치를 조정할 때 훨씬 유연하겠다는 결론을 내렸기 때문이에요.
이제 가능성을 봤으니, 구조를 고민하고 구현하면 됩니다. Spark History Server에서 Spark Job의 메트릭을 수집하고 이를 분석하는 두 가지 주요 단계로 작업을 구성했어요.
첫 번째 단계는 작업 이력 수집입니다. Spark History Server에서 일정 시간 동안 완료된 Job의 메트릭을 수집하여 저장합니다. REST API를 제공하고 있어서 필요한 정보를 가져올 수 있었어요. 수집된 데이터는 application id를 key로 해서 이름, 각 단계(stages, sqls)의 실행 메트릭, 환경 정보 등을 포함합니다.
두 번째 단계는 수집된 메트릭을 바탕으로 Job의 성능 문제를 분석합니다. 주요 문제로는 파티션 불균형, 작은 파일 읽기/쓰기 문제, 중복된 데이터 처리 등 Spark Job에서 흔히 발생할 수 있는 성능 저하 요인들을 들 수 있습니다. 각 작업의 메트릭을 분석해 이러한 문제를 감지하고, 계산한 결과를 다시 저장하여 문제가 있는 작업을 쉽게 파악할 수 있도록 했습니다.
처음에는 두 단계를 합쳐서 하나의 Task로 실행 했었습니다. 그런데 History Server를 많이 호출해야해서 오래 걸리기도 하고, 메트릭 계산을 추가할 때마다 History Server 호출부터 시작하는게 불편해서 원본 데이터 저장과 메트릭 계산 후 저장 단계를 나누었습니다. 그리고 작업의 주기도 일 단위에서 시간 단위로 짧게 줄여서 작업당 처리해야하는 크기를 줄였습니다.
실행 중인 Spark job 이나 streaming job 까지 하면 더 좋을거 같은데? 라는 생각도 했지만, Spark streaming 보다는 flink를 주로 사용하고 있는 환경이었고, 이미 중요하게, 주기적으로 실행되는 작업이 많다보니 지금 알림을 받으나, 내일 아침에 알림을 받으나 큰 차이가 없어서 우선순위를 아주 낮춰버렸습니다.
문제라고 판단할 기준을 잡는 것이 꽤 중요했는데요, 우선적으로 감지할 수 있게 구현한 Alert은 다음과 같습니다.
- 파티션 불균형: 파티션 스큐(Partition Skew)는 작업의 일부 파티션이 과도한 시간을 소모할 때 발생합니다. 각 파티션의 작업 시간이 균형 있게 분배되지 않으면 전체 작업의 성능이 저하됩니다. 파티션 불균형 여부를 아래와 같은 수식으로 계산했습니다. Skew Ratio = Max Task Duration / Median Task Duration
만약 Skew Ratio가 임계치(예: 10배)를 초과하면, 파티션 불균형 문제가 발생했다고 판단합니다. 다만 skew가 발생했다고 하더라도 task duration이 짧다면 큰 영향을 미치지 않기 때문에, max task duration 이 5분 이하인 경우에는 무시하도록 했습니다.
- 작은 파일 읽기 문제: 작은 파일을 과도하게 읽으면 I/O 오버헤드가 발생해 작업 성능이 저하됩니다. Hadoop 네임노드가 파일을 더 많이 관리해야 하기 때문에 Hadoop 클러스터에도 문제가 생길 수 있습니다. 아래와 같은 조건으로 작은 파일 문제를 감지했습니다. Avg File Size = Size of Files Read / Number of Files Read
만약 평균 파일 크기가 특정 임계치(예: 3MB) 이하이면서 읽은 파일의 수가 임계치(예: 100개)를 초과하면, 작은 파일 문제를 경고합니다. 토스에서는 Spark 외에 Impala, Kafka Connector 등의 방식으로도 파일이 쓰이기 때문에 Job에서 작은 파일을 읽는 경우에 소스를 인지할 수 있도록 했습니다.
- 디스크 스필: 메모리 부족으로 인해 디스크에 데이터를 임시로 쓰는 현상은 작업 성능을 크게 저하시킬 수 있기 때문에 중요한 감지 대상입니다. 아래와 같은 수식으로 계산하여 감지했습니다.Spill Size(GB) = Disk Bytes Spilled / 1042^3
Spill 크기가 임계치(예: 1GB)를 초과하면, 해당 작업이 과도한 Spill로 성능 저하를 겪고 있을 가능성이 있다고 판단했습니다.
마지막 단계로 이렇게 계산된 값을 매일 알림으로 제공합니다. 각 문제마다 대표적인 해결 방법을 제안해서 문제 해결에 도움을 주고 있습니다. 매일 실행되는 6000개 이상의 작업 중에서 1000개 이상의 크고 작은 잠재적 문제들을 발견할 수 있었어요.
Spark Analyzer를 통해 Spark Job에서 발생하는 성능 문제를 자동으로 감지하고, 성능 저하의 원인을 분석할 수 있었습니다. 문제는 Job이 너무 많아서 혼자서, 혹은 팀 하나에서는 다 하기 어렵다는 겁니다. 고치는 와중에도 계속 추가되기도 하고요. (하하)
그래서 앞으로는 더 많은 Alert을 추가하는 것과 함께, 문제가 발생한 작업에 대해 담당자에게 JIRA 티켓을 자동으로 발행하여 성능 개선을 유도하는 시스템을 추가하려고 합니다. 스스로 성능 문제를 파악하고 해결할 수 있도록 돕는 것이 목표입니다.