[스파크 완벽 가이드] 19. 성능 튜닝
스파크 잡의 최적화 주요 영역
코드 수준의 설계(RRR, DataFrame 중 선택) 보관용 데이터 조인 집계 데이터 전송 애플리케이션별 속성 익스큐터 프로세스의 JVM 워커 노드 클러스터와 배포 환경 속성
19.1 간접적인 성능 향상 기법
하드웨어 개선
같은 방법도 있지만 이부분은 제외합니다.
19.1.1 설계 방안
- 성능 최적화를 위해서는 좋은 설계 방안 필요합니다.
스칼라 vs 자바 vs 파이썬 vs R
- 언어 선택에 대한 정답은 없음. 가장 편안하게 사용하는 언어나 상황에 따라 선택하는 것이 좋음.
- 구조적 API로 만들 수 없는 사용자 정의 트랜스포메이션을 사용해야 한다면 R과 파이썬은 사용하지 않는 것이 좋음.
- 주 언어가 파이썬이었다고 했을때, 필요한 영역에만 스칼라로 정의해 사용하는 방법이 있다.
DataFrame vs SQL vs Dataset vs RDD
- 모든 언어에서 DataFrame, Dataset, SQL의 속도는 동일하다.
- DataFrame을 어떤 언어에서 사용하더라도 성능 차이가 없음.
- 단, 파이썬이나 R에서 UDF(사용자 정의 함수)를 정의한다면 성능 저하가 발생할 가능성이 있으므로 이때는 자바나 스칼라를 사용해 UDF를 정의하는 것이 좋음.
- 근본적으로 성능을 개선하고 싶다면 UDF 대신 DataFrame이나 SQL을 사용해야 한다.
- 이렇게 만들어진 코드는 결과적으로는 RDD로 컴파일되는데, 이때 보통 사용자가 직접 RDD 코드를 작성하는 것보다 스파크 최적화 엔진으로 만들어진 코드가 더 나은 경우가 대부분임.
- 또, 사용자가 저수준 API를 사용하는것보다 훨씬 더 쉽게 사용할수 있음.
- RDD를 사용하고자 한다면 스칼라나 자바를 이용해서 애플리케이션을 개발하는것이 좋음.(그게 안될 경우 사용 영역을 최소화)
- 파이썬에서 RDD를 사용한다고 했을때 큰 데이터 직렬화 비용, 안정성 문제가 있음.
19.1.2 RDD 객체 직렬화
Kryo 직렬화
가 Java 직렬화보다 효율적이나 애플리케이션에 사용할 클래스 등록해야 하는 번거로움이 있음.
19.1.3 클러스터 설정
동적 할당
- 워크로드에 따라 애플리케이션이 차지할 자원을 동적으로 조절하는 매커니즘을 제공.
spark.dynamicAllocation.enabled=true
로 변경 가능
19.1.4 스케줄링
- 스케줄러 풀, 동적 할당,
max-executor-cross
설정과 같이 병렬 실행 관련된 최적화 방법들이 있음. - 스케줄링 최적화는 실제로 설정값을 바꿔가면서 테스트해서 최적화 지점을 찾는것이 중요합니다.
19.1.5 보관용 데이터
- 자주 발생하지 않지만, 여러 가지 분석을 수행하기 위해 동일한 데이터셋을 여러 번 읽는 경우가 있을수 있음.
- 적절한 저장소, 데이터 포맷을 선택하고, 파티셔닝을 활용해야 한다.
파일 기반 장기 데이터 저장소
- csv파일, 바이너리 blob 파일, Parquet(파케이) 다양한 포맷 제공합니다.
- 데이터 저장시 모범 사례를 따르는게 가장 쉬운 최적화 방법이다.
- 데이터를 바이너리로 저장하려면 구조적 API를 사용하는것이 좋음.
- CSV같은 파일은 구조화되어 있는것처럼 보이지만 실제로 파싱 속도가 느리고 예외 케이스가 자주 발생함.
분할 가능한 파일 포맷과 압축
분할 가능한
포맷인지 확인 해야 한다.- 여러 태스크가 파일의 서로 다른 부분을 동시에 읽을 수 있어야 함.
- JSON 파일처럼 분할 불가능한 포맷을 사용하면 단일 머신에서 전체 파일을 읽어야 하므로 병렬성이 떨어짐.
- 압축 포맷도 분할할 수 없어서 병렬로 읽을수 없음.(10 코어 장비에서 단일 코어만을 사용해서 데이터를 읽어야 함)
테이블 파티셔닝
- 데이터의 날짜 필드 같은 키를 기준으로 개별 디렉터리에 파일을 저장하는것을 의미(아파치 하이브 같은)
- 키를 기준으로 데이터가 분할되어있다면, 특정 범위의 데이터를 조회할떄 ㅇ를 기준으로 관련 없는 파일을 건너뛸수 있음.
- customerId나 date 컬럼을 기준으로 자주 필터링 되는 데이터라면 이 2가지 중 하나를 기준으로 파티션을 생성하는것이 좋음.
- 너무 작은 단위로 분할하면 작은 크기의 파일이 대량으로 생성될수 있으므로 주의(전체 파일 목록을 읽을 떄 오버헤드 발생 가능)
버켓팅
- 조인이나 집계를 수행하는 방식에 따라 사전 분할(pre-partition) 할수 있음.
- 버켓팅을 사용하면 데이터를 1~2개의 파티션에 치우치지 않고 전체파티션에 균등하게 분산시킬수 있음. => 성능, 안전성 향상
- 조인 전에 셔플을 미리 방지할 수 있어서 데이터 접근 속도를 높일 수 있음.
파일 수
- 데이터를 파티션이나 버켓으로 구성하려면
파일 수
와파일 크기
도 고려해야 합니다. - 데이터를 저장하는 방법에서는 트레이드 오프를 고려해야 함.
- 많은 수의 크기가 작은 파일인 경우 스케줄러는 그만큼 많은 읽기 태스크를 실행해야 하므로 네트워크와 잡 스케줄링 부하가 증가
- 적은 수의 대용량 파일이 있따면 스케줄러의 부하는 줄일 수 있지만, 태스크 수행시간이 오래 걸립니다. 적은 수의 대용량 파일은 입력 파일 수보다 더 많은 태스크 수를 스파크에 설정해 병렬성을 높일수는 있습니다. -입력 데이터 파일이 최소 수십 메가바이트의 데이터를 갖도록 크기를 조정하는것이 좋음.
데이터 지역성(data locality)
- 기본적으로 네트워크를 통해 데이터 블록을 교환하지 않고, 특정 데이터를 가진 노드에서 동작할 수 있도록 저장하는 것을 의미함.
- 저장소 시스템과 스파크가 동일한 노드에 있고, 해당 시스템이 데이터 지역성 정보를 제공한다면 스파크는 입력 데이터 블록과 최대한 가까운 노드에서 태스크를 할당한다.
통계 수집
- 구조적 API를 사용하면 비용 기반 쿼리 옵티마이저가 내부적으로 동작함.(입력 데이터의 속성을 기반으로 쿼리 실행 계획을 만듬)
- 비용 기반 옵티마이저를 작동시키려면 통계 를 수집하고 유지해야 한다.(이름이 지정된 테이블에서만 사용 가능)
1
2
3
4
5
// 테이블 수준 통게 수집
ANALYZE TABLE table_name COMPUTE STATISTICS
// 컬럼 수준 통계 수집
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column_name1, column_name2, ...
19.1.6 셔플 설정
- 외부 셔플 서비스를 설정하면 머신에서 실행되는 익스큐터가 바쁜 상황에서도(ex. GC) 원격 머신에서 셔플 데이터를 읽을 수 있으므로 성능을 높일수 있음.(코드가 복잡해지고 유지하기 어려워서 운영환경에서는 비추천)
- 외부 셔플 서비그 구성, 익스큐터당 동시 연결 수 등 셔플 관련 여러 설정들은 기본값을 사용하는것을 권장함.
셔플 파티션 수에 따른 트레이드 오프
- 파티션 수가 너무 적으면 소수의 노드만 작업하여 데이터 치우침 현상 발생
- 파티션 수가 너무 많으면 태스크를 많이 실행해야 하므로 부하 발생.
19.1.7 메모리 부족과 가비지 컬렉션
- 스파크 잡 실행 과정 중에 익스큐터나 드라이버 머신의 메모리가 부족하거나 메모리 압박(memory presure)으로 인해 태스크를 완료하지 못할 수 있다.
- 첫째, 애플리케이션 실행 중 메모리를 너무 많이 사용한 경우
- 둘째, 가비지 컬렉션이 자주 수행되는 경우
- 셋쨰, JVM 내에서 객체가 너무 많이 생성되어 더 이상 사용하지 않은 객체를 GC로 정리하면서 실행속도가 느려지는 경우
- 이는 구조적 API를 활용해 해결할수 있음.(잡의 효율성 증대, JVM 객체를 전혀 생성하지 않음.)
가비지 컬렉션 영향도 측정
- GC 발생 빈도와 소요시간에 대한 통계를 모으는게 첫번째 단계이다.
spark.executor.extraJavaOptions
속성에 JVM 옵션으로-verbose:gc, -XX:+PrintGCTimeStamps
값을 추가해 통계를 모을수 있음.
가비지 컬렉션 튜닝
spark.executor.extraJavaOptions
속성에 익스큐터에 대한 가비지 컬렉션 튜닝 옵션을 추가할 수 있음.- 주기적인 GC 모니터링을 통해 튜닝 포인트를 찾아야 함.
19.2 직접적인 성능 향상 기법
19.2.1 병렬화
- 특정 스테이지의 처리 속도를 높이려면 병렬성을 높이는 작업부터 시작해야 함.
spark.default.parallelism
과spark.sql.shuffle.partitons
값을 클러스터 코어수에 따라 설정.
19.2.2 향상된 필터링
- 상황에 따라 데이터소스에 필터링을 위임하여 최종 결과와 무관한 데이터를 스파크에서 읽지 않고 작업을 진행할수 있도록 해야 합니다.
19.2.3 파티션 재분배와 병합
- 파티션 재분배 과정은 셔플을 수반합니다. 하지만 클러스터 전체에 걸쳐 데이터가 균등하게 분배되므로 잡의 전체 실행 단계를 최적화 할 수 있음.
- 동일 노드의 파티션을 하나로 합치는
coalesce
를 실행해 DataFrame이나 RDD의 전체 파티션 수를 줄이는것이 좋음. repartition
메소드는 부하를 분산하기 위해 네트워크로 데이터를 셔플링- 파티션 재분배 과정은 부하를 유발하지만 애플리케이션 전체적인 성능과 스파크 잡의 병렬성을 높일 수 있음을 기억하자
사용자 정의 파티셔닝
- 잡이 여전히 느리거나 불안정하다면 RDD를 이용한 사용자 정의 파티셔닝 기법을 적용할 수 있다.
- DataFrame보다 더 정밀한 수준으로 클러스터 전반의 데이터 체계를 제어(아주 드물게 사용되지만, 최적화 기법 중 하나)
19.2.4 사용자 정의 함수(UDF)
- UDF 사용을 최대한 피하는 것도 좋은 최적화 방법 중 하나이다.
- UDF는 데이터를 JVM 객체로 변환하고 쿼리에서 레코드당 여러번 수행되므로 많은 자원을 소모함.
- 최대한 구조적 API를 활용하도록 하자.
19.2.5 임시 데이터 자장소(캐싱)
- 같은 데이터셋을 계속해서 재사용한다면 캐싱을 사용해서 최적화 할 수 있음.
- 캐싱은 클러스터의 익스큐터 전반에 걸쳐 만들어진 임시 저장소(메모리나 디스크)에 DataFrame, 테이블 또는 RDD를 보관해 빠르게 접근할 수 있도록 한다.
- DataFrame이나 RDD의
cache
메소드를 사용해 데이터셋을 캐싱할수 있음.
캐싱이 필요한 상황
- 스파크의 대화형 세션이나 스탠드얼론 앱에서 특정 데이터셋을 다시 사용하려 할 경우
- 데이터를 읽고 정제한 다음 다수의 처리에서 동일한 데이터를 재사용하는 경우
캐시 사용시 주의사항
- RDD는 물리적 데이터를 캐시에 저장함
- 반면 구조적 API 캐싱은
물리적 실행 계획
기반으로 이뤄짐.- 실행 계획을 키로 저장하고 처리 과정 동안 물리적 실행 계획을 참조함.
- 누군가 원시 데이터를 읽으려 시도했지만, 먼저 캐시해놓은 버전의 데이터를 읽게 되면서 혼란이 발생할 수 있음.
데이터 캐시 저장소 레벨
캐시된 DataFrame
- 위와 같은 과정을 통해 CSV 파일을 읽어 데이터를 파싱하는 부분을 반복적으로 하지 않을수 있음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# DataFrame을 캐싱하지 않는 원본 코드
DF1 = spark.read.format("csv')\
.option("inferSchema", "true")\
.option("header", "true)\
.load("/data/flight-data/csv/2015-summary.csv")
DF2 = DF1 .groupBy("DEST_COUNTRY_NAME").count( ).collect( )
DF3 = DF1 groupBy("0RIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1..groupBy( "count'' ).count( ).collect( )
# 캐싱한다면?
DF1.cache()
// 캐싱은 지연처리이므로, 데이터를 즉시 캐싱하기 위해 count 메서드를 사용.
DF1.count()
DF2 = DF1 .groupBy("DEST_COUNTRY_NAME").count( ).collect( )
DF3 = DF1 groupBy("0RIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1..groupBy( "count'' ).count( ).collect( )
- 만약 클러스터의 전체 메모리가 가득 찼다면 데이터셋의 일부 데이터만 캐싱함.
19.2.6 조인
- 최적화를 위한 공통 영역으로, 조인 타입에 따른 특성과 동작 방식을 이해해야 최적화를 잘 할 수 있음.
- 조인 순서 변경하는 간단한 작업만으로도 성능을 크게 높일 수 있음.
- 내부 조인을 사용해 필터링하는 것과 동일한 효과
- 브로드캐스트 조인 힌트를 사용하면 스파크가 쿼리 실행계획을 생성할 때 지능적으로 계획을 세울 수 있음.
- 안정성과 최적화를 위해 카테시안 조인이나 전체 외부 조인 사용은 지양해야 함.
19.2.7 집계
- 집계 전에 충분히 많은 수의 파티션을 가질 수 있도록 데이터를 필터링하는것이 최선의 방법
- RDD를 사용하여 집계 수행 방식을 정확하게 제어하여 성능과 안정성을 개선할 수 있음(groupByKey, reduceByKey)
19.2.8 브로드캐스트 변수
- 앱에서 사용되는 다수의 UDF에서 큰 데이터 조각을 사용한다면 이 데이터 조각을 개별 노드에 전송해 읽기 전용 복사본으로 저장할 수 있다.
- 이 기능을 활용하면 잡마다 데이터 조각을 재전송하는 과정을 스킵할 수 있으므로 성능 향상에 도움이 된다.
Comments powered by Disqus.