Home [스파크 완벽 가이드] 19. 성능 튜닝
Post
Cancel

[스파크 완벽 가이드] 19. 성능 튜닝

[스파크 완벽 가이드] 19. 성능 튜닝

스파크 잡의 최적화 주요 영역

코드 수준의 설계(RRR, DataFrame 중 선택) 보관용 데이터 조인 집계 데이터 전송 애플리케이션별 속성 익스큐터 프로세스의 JVM 워커 노드 클러스터와 배포 환경 속성

19.1 간접적인 성능 향상 기법

  • 하드웨어 개선 같은 방법도 있지만 이부분은 제외합니다.

19.1.1 설계 방안

  • 성능 최적화를 위해서는 좋은 설계 방안 필요합니다.

스칼라 vs 자바 vs 파이썬 vs R

  • 언어 선택에 대한 정답은 없음. 가장 편안하게 사용하는 언어나 상황에 따라 선택하는 것이 좋음.
  • 구조적 API로 만들 수 없는 사용자 정의 트랜스포메이션을 사용해야 한다면 R과 파이썬은 사용하지 않는 것이 좋음.
    • 주 언어가 파이썬이었다고 했을때, 필요한 영역에만 스칼라로 정의해 사용하는 방법이 있다.

DataFrame vs SQL vs Dataset vs RDD

  • 모든 언어에서 DataFrame, Dataset, SQL의 속도는 동일하다.
    • DataFrame을 어떤 언어에서 사용하더라도 성능 차이가 없음.
    • 단, 파이썬이나 R에서 UDF(사용자 정의 함수)를 정의한다면 성능 저하가 발생할 가능성이 있으므로 이때는 자바나 스칼라를 사용해 UDF를 정의하는 것이 좋음.
  • 근본적으로 성능을 개선하고 싶다면 UDF 대신 DataFrame이나 SQL을 사용해야 한다.
    • 이렇게 만들어진 코드는 결과적으로는 RDD로 컴파일되는데, 이때 보통 사용자가 직접 RDD 코드를 작성하는 것보다 스파크 최적화 엔진으로 만들어진 코드가 더 나은 경우가 대부분임.
    • 또, 사용자가 저수준 API를 사용하는것보다 훨씬 더 쉽게 사용할수 있음.
  • RDD를 사용하고자 한다면 스칼라나 자바를 이용해서 애플리케이션을 개발하는것이 좋음.(그게 안될 경우 사용 영역을 최소화)
    • 파이썬에서 RDD를 사용한다고 했을때 큰 데이터 직렬화 비용, 안정성 문제가 있음.

19.1.2 RDD 객체 직렬화

  • Kryo 직렬화가 Java 직렬화보다 효율적이나 애플리케이션에 사용할 클래스 등록해야 하는 번거로움이 있음.

19.1.3 클러스터 설정

동적 할당

  • 워크로드에 따라 애플리케이션이 차지할 자원을 동적으로 조절하는 매커니즘을 제공.
    • spark.dynamicAllocation.enabled=true로 변경 가능

19.1.4 스케줄링

  • 스케줄러 풀, 동적 할당, max-executor-cross 설정과 같이 병렬 실행 관련된 최적화 방법들이 있음.
  • 스케줄링 최적화는 실제로 설정값을 바꿔가면서 테스트해서 최적화 지점을 찾는것이 중요합니다.

19.1.5 보관용 데이터

  • 자주 발생하지 않지만, 여러 가지 분석을 수행하기 위해 동일한 데이터셋을 여러 번 읽는 경우가 있을수 있음.
  • 적절한 저장소, 데이터 포맷을 선택하고, 파티셔닝을 활용해야 한다.

파일 기반 장기 데이터 저장소

  • csv파일, 바이너리 blob 파일, Parquet(파케이) 다양한 포맷 제공합니다.
  • 데이터 저장시 모범 사례를 따르는게 가장 쉬운 최적화 방법이다.
  • 데이터를 바이너리로 저장하려면 구조적 API를 사용하는것이 좋음.
    • CSV같은 파일은 구조화되어 있는것처럼 보이지만 실제로 파싱 속도가 느리고 예외 케이스가 자주 발생함.

분할 가능한 파일 포맷과 압축

  • 분할 가능한 포맷인지 확인 해야 한다.
    • 여러 태스크가 파일의 서로 다른 부분을 동시에 읽을 수 있어야 함.
    • JSON 파일처럼 분할 불가능한 포맷을 사용하면 단일 머신에서 전체 파일을 읽어야 하므로 병렬성이 떨어짐.
    • 압축 포맷도 분할할 수 없어서 병렬로 읽을수 없음.(10 코어 장비에서 단일 코어만을 사용해서 데이터를 읽어야 함)

테이블 파티셔닝

  • 데이터의 날짜 필드 같은 키를 기준으로 개별 디렉터리에 파일을 저장하는것을 의미(아파치 하이브 같은)
  • 키를 기준으로 데이터가 분할되어있다면, 특정 범위의 데이터를 조회할떄 ㅇ를 기준으로 관련 없는 파일을 건너뛸수 있음.
    • customerId나 date 컬럼을 기준으로 자주 필터링 되는 데이터라면 이 2가지 중 하나를 기준으로 파티션을 생성하는것이 좋음.
    • 너무 작은 단위로 분할하면 작은 크기의 파일이 대량으로 생성될수 있으므로 주의(전체 파일 목록을 읽을 떄 오버헤드 발생 가능)

버켓팅

  • 조인이나 집계를 수행하는 방식에 따라 사전 분할(pre-partition) 할수 있음.
  • 버켓팅을 사용하면 데이터를 1~2개의 파티션에 치우치지 않고 전체파티션에 균등하게 분산시킬수 있음. => 성능, 안전성 향상
  • 조인 전에 셔플을 미리 방지할 수 있어서 데이터 접근 속도를 높일 수 있음.

파일 수

  • 데이터를 파티션이나 버켓으로 구성하려면 파일 수파일 크기도 고려해야 합니다.
  • 데이터를 저장하는 방법에서는 트레이드 오프를 고려해야 함.
  • 많은 수의 크기가 작은 파일인 경우 스케줄러는 그만큼 많은 읽기 태스크를 실행해야 하므로 네트워크와 잡 스케줄링 부하가 증가
  • 적은 수의 대용량 파일이 있따면 스케줄러의 부하는 줄일 수 있지만, 태스크 수행시간이 오래 걸립니다. 적은 수의 대용량 파일은 입력 파일 수보다 더 많은 태스크 수를 스파크에 설정해 병렬성을 높일수는 있습니다. -입력 데이터 파일이 최소 수십 메가바이트의 데이터를 갖도록 크기를 조정하는것이 좋음.

데이터 지역성(data locality)

  • 기본적으로 네트워크를 통해 데이터 블록을 교환하지 않고, 특정 데이터를 가진 노드에서 동작할 수 있도록 저장하는 것을 의미함.
  • 저장소 시스템과 스파크가 동일한 노드에 있고, 해당 시스템이 데이터 지역성 정보를 제공한다면 스파크는 입력 데이터 블록과 최대한 가까운 노드에서 태스크를 할당한다.

통계 수집

  • 구조적 API를 사용하면 비용 기반 쿼리 옵티마이저가 내부적으로 동작함.(입력 데이터의 속성을 기반으로 쿼리 실행 계획을 만듬)
  • 비용 기반 옵티마이저를 작동시키려면 통계 를 수집하고 유지해야 한다.(이름이 지정된 테이블에서만 사용 가능)
1
2
3
4
5
// 테이블 수준 통게 수집
ANALYZE TABLE table_name COMPUTE STATISTICS

// 컬럼 수준 통계 수집
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column_name1,  column_name2, ...

19.1.6 셔플 설정

  • 외부 셔플 서비스를 설정하면 머신에서 실행되는 익스큐터가 바쁜 상황에서도(ex. GC) 원격 머신에서 셔플 데이터를 읽을 수 있으므로 성능을 높일수 있음.(코드가 복잡해지고 유지하기 어려워서 운영환경에서는 비추천)
  • 외부 셔플 서비그 구성, 익스큐터당 동시 연결 수 등 셔플 관련 여러 설정들은 기본값을 사용하는것을 권장함.

셔플 파티션 수에 따른 트레이드 오프

  • 파티션 수가 너무 적으면 소수의 노드만 작업하여 데이터 치우침 현상 발생
  • 파티션 수가 너무 많으면 태스크를 많이 실행해야 하므로 부하 발생.

19.1.7 메모리 부족과 가비지 컬렉션

  • 스파크 잡 실행 과정 중에 익스큐터나 드라이버 머신의 메모리가 부족하거나 메모리 압박(memory presure)으로 인해 태스크를 완료하지 못할 수 있다.
    • 첫째, 애플리케이션 실행 중 메모리를 너무 많이 사용한 경우
    • 둘째, 가비지 컬렉션이 자주 수행되는 경우
    • 셋쨰, JVM 내에서 객체가 너무 많이 생성되어 더 이상 사용하지 않은 객체를 GC로 정리하면서 실행속도가 느려지는 경우
      • 이는 구조적 API를 활용해 해결할수 있음.(잡의 효율성 증대, JVM 객체를 전혀 생성하지 않음.)

가비지 컬렉션 영향도 측정

  • GC 발생 빈도와 소요시간에 대한 통계를 모으는게 첫번째 단계이다.
  • spark.executor.extraJavaOptions 속성에 JVM 옵션으로 -verbose:gc, -XX:+PrintGCTimeStamps 값을 추가해 통계를 모을수 있음.

가비지 컬렉션 튜닝

  • spark.executor.extraJavaOptions 속성에 익스큐터에 대한 가비지 컬렉션 튜닝 옵션을 추가할 수 있음.
  • 주기적인 GC 모니터링을 통해 튜닝 포인트를 찾아야 함.

19.2 직접적인 성능 향상 기법

19.2.1 병렬화

  • 특정 스테이지의 처리 속도를 높이려면 병렬성을 높이는 작업부터 시작해야 함.
  • spark.default.parallelismspark.sql.shuffle.partitons 값을 클러스터 코어수에 따라 설정.

19.2.2 향상된 필터링

  • 상황에 따라 데이터소스에 필터링을 위임하여 최종 결과와 무관한 데이터를 스파크에서 읽지 않고 작업을 진행할수 있도록 해야 합니다.

19.2.3 파티션 재분배와 병합

  • 파티션 재분배 과정은 셔플을 수반합니다. 하지만 클러스터 전체에 걸쳐 데이터가 균등하게 분배되므로 잡의 전체 실행 단계를 최적화 할 수 있음.
  • 동일 노드의 파티션을 하나로 합치는 coalesce를 실행해 DataFrame이나 RDD의 전체 파티션 수를 줄이는것이 좋음.
  • repartition 메소드는 부하를 분산하기 위해 네트워크로 데이터를 셔플링
  • 파티션 재분배 과정은 부하를 유발하지만 애플리케이션 전체적인 성능과 스파크 잡의 병렬성을 높일 수 있음을 기억하자

사용자 정의 파티셔닝

  • 잡이 여전히 느리거나 불안정하다면 RDD를 이용한 사용자 정의 파티셔닝 기법을 적용할 수 있다.
  • DataFrame보다 더 정밀한 수준으로 클러스터 전반의 데이터 체계를 제어(아주 드물게 사용되지만, 최적화 기법 중 하나)

19.2.4 사용자 정의 함수(UDF)

  • UDF 사용을 최대한 피하는 것도 좋은 최적화 방법 중 하나이다.
  • UDF는 데이터를 JVM 객체로 변환하고 쿼리에서 레코드당 여러번 수행되므로 많은 자원을 소모함.
  • 최대한 구조적 API를 활용하도록 하자.

19.2.5 임시 데이터 자장소(캐싱)

  • 같은 데이터셋을 계속해서 재사용한다면 캐싱을 사용해서 최적화 할 수 있음.
  • 캐싱은 클러스터의 익스큐터 전반에 걸쳐 만들어진 임시 저장소(메모리나 디스크)에 DataFrame, 테이블 또는 RDD를 보관해 빠르게 접근할 수 있도록 한다.
  • DataFrame이나 RDD의 cache 메소드를 사용해 데이터셋을 캐싱할수 있음.

캐싱이 필요한 상황

  • 스파크의 대화형 세션이나 스탠드얼론 앱에서 특정 데이터셋을 다시 사용하려 할 경우
  • 데이터를 읽고 정제한 다음 다수의 처리에서 동일한 데이터를 재사용하는 경우

캐시 사용시 주의사항

  • RDD는 물리적 데이터를 캐시에 저장함
  • 반면 구조적 API 캐싱은 물리적 실행 계획 기반으로 이뤄짐.
    • 실행 계획을 키로 저장하고 처리 과정 동안 물리적 실행 계획을 참조함.
    • 누군가 원시 데이터를 읽으려 시도했지만, 먼저 캐시해놓은 버전의 데이터를 읽게 되면서 혼란이 발생할 수 있음.

데이터 캐시 저장소 레벨

스크린샷 2022-11-12 오후 6 12 35

캐시된 DataFrame

스크린샷 2022-11-12 오후 6 15 39

  • 위와 같은 과정을 통해 CSV 파일을 읽어 데이터를 파싱하는 부분을 반복적으로 하지 않을수 있음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# DataFrame을 캐싱하지 않는 원본 코드
DF1 = spark.read.format("csv')\
     .option("inferSchema", "true")\
     .option("header", "true)\
     .load("/data/flight-data/csv/2015-summary.csv")

DF2 = DF1 .groupBy("DEST_COUNTRY_NAME").count( ).collect( )
DF3 = DF1 groupBy("0RIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1..groupBy( "count'' ).count( ).collect( )


# 캐싱한다면?
DF1.cache()
// 캐싱은 지연처리이므로, 데이터를 즉시 캐싱하기 위해 count 메서드를 사용.
DF1.count()

DF2 = DF1 .groupBy("DEST_COUNTRY_NAME").count( ).collect( )
DF3 = DF1 groupBy("0RIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1..groupBy( "count'' ).count( ).collect( )

  • 만약 클러스터의 전체 메모리가 가득 찼다면 데이터셋의 일부 데이터만 캐싱함.

19.2.6 조인

  • 최적화를 위한 공통 영역으로, 조인 타입에 따른 특성과 동작 방식을 이해해야 최적화를 잘 할 수 있음.
  • 조인 순서 변경하는 간단한 작업만으로도 성능을 크게 높일 수 있음.
    • 내부 조인을 사용해 필터링하는 것과 동일한 효과
  • 브로드캐스트 조인 힌트를 사용하면 스파크가 쿼리 실행계획을 생성할 때 지능적으로 계획을 세울 수 있음.
  • 안정성과 최적화를 위해 카테시안 조인이나 전체 외부 조인 사용은 지양해야 함.

19.2.7 집계

  • 집계 전에 충분히 많은 수의 파티션을 가질 수 있도록 데이터를 필터링하는것이 최선의 방법
  • RDD를 사용하여 집계 수행 방식을 정확하게 제어하여 성능과 안정성을 개선할 수 있음(groupByKey, reduceByKey)

19.2.8 브로드캐스트 변수

  • 앱에서 사용되는 다수의 UDF에서 큰 데이터 조각을 사용한다면 이 데이터 조각을 개별 노드에 전송해 읽기 전용 복사본으로 저장할 수 있다.
  • 이 기능을 활용하면 잡마다 데이터 조각을 재전송하는 과정을 스킵할 수 있으므로 성능 향상에 도움이 된다.

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 18. 모니터링과 디버깅

[엘라스틱서치 바이블] 1. 엘라스틱 서치 소개

Comments powered by Disqus.