Home [스파크 완벽 가이드] 15. 클러스터에서 스파크 실행하기
Post
Cancel

[스파크 완벽 가이드] 15. 클러스터에서 스파크 실행하기

[스파크 완벽 가이드] 15. 클러스터에서 스파크 실행하기

스파크 애플리케이션의 아키텍처와 컴포넌트 스파크 내/외부에서 실행되는 스파크 애플리케이션 생애주기 파이프라이닝과 같은 중요한 저수준 실행 속성 스파크 애플리케이션을 실행하는 데 필요한 사항

15.1 스파크 애플리케이션의 아키텍쳐

스파크 드라이버

  • 스파크 애플리케이션의 운전자 역할 프로세스
  • 스파크 애플리케이션의 실행을 제어하고, 클러스터의 모든 상태 정보를 유지한다.
  • 클러스터 내 물리적 컴퓨팅 자원 확보와 익스큐터 실행을 위해 클러스터 매니저와 통신할 수 있어야 한다.

실제 정의한 스파크 태스크 코드를 실행하는 주체라고 이해 하면 될듯 하다. 리소스가 사용되는것은 스파크 클러스터의 익스큐터이겠지만, 이 명령을 요청하는 클라이언트 애플리케이션이라고 이해하면 됨. 예를 들면 제플린과 같은 BI툴이 될수도 있고, 사용자가 작성한 애플리케이션 코드(web application일수도, batch application일수도 있음)라고 생각하면 됨.

스파크 익스큐터

  • 스파크 드라이버가 할당한 태스크를 수행하는 프로세서
  • 드라이버가 할당한 태스크를 받아 실행하고 태스크의 상태와 결과를 드라이버에 보고한다.

클러스터 매니저

  • 스파크 드라이버와 익스큐터를 관리하는 클러스터 매니저가 필요하다,
  • 클러스터 매니저를 드라이버 혹은 마스터라고 부르기도 한다.
  • 스파크 애플리케이션이 실제로 실행할 때가 되면 클러스터 매니저에 자원 할당 요청을 한다.

스크린샷 2023-02-05 오후 5 45 33

스파크가 지원하는 클러스터 매니저 종류

  • 스탠드얼론 클러스터 매니저
  • 아파치 메소스
  • 하둡 YARN

15.1.1 실행 모드

  • 실행 모드는 애플리케이션을 실행할 때 요청한 자원의 물리적인 위치를 결정한다.

클러스터 모드

  • 가장 흔하게 사용되는 실행 방식
  • 컴파일된 JAR 파일이나 파이썬 스크립트를 클러스터 매니저에게 전달하고, 매니저는 이 파일을 받아 워커 노드에 드라이버와 익스큐터 프로세스를 실행하는 방식이다.

스크린샷 2023-02-05 오후 5 48 32

클라이언트 모드

  • 애플리케이션을 제출한 클라이언트 머신에 스파크 드라이버가 위치한다는것을 제외하면 클러스터 모드와 비슷하다.
  • 클라이언트 내에 스파크 드라이버 프로세스를 유지하며 매니저는 익스큐터 프로세스를 유지한다.
    • 스파크 앱이 클러스터와는 무관한 머신에서 동작할 수 있음.
    • 이런 머신을 게이트웨이 머신 또는 에지 노드라고 부른다.

스크린샷 2023-02-05 오후 5 49 27

로컬 모드

  • 모든 스파크 애플리케이션이 단일 머신에서 실행되는 구조
  • 병렬 처리는 머신의 스레드를 활용하여 처리된다.
  • 개발 테스트 및 학습용에서 쓰이는 방식

15. 2 스파크 애플리케이션의 생명 주기(스파크 외부)

15.2.1 클라이언트 요청

  • 첫 단계는 스파크 애플리케이션을 제출하는 것.
    • 컴파일된 JAR나 라이브러리 파일 등

스크린샷 2023-02-05 오후 5 52 22

15.2.2 시작

  • 드라이버 프로세스가 클러스터에 배치되면 사용자 코드를 실제로 실행할 차례이다.
  • 사용자 코드에는 반드시 스파크 클러스터를 초기화하는 SparkSession이 포함되어야 한다.
  • 클러스터 매니저는 익스큐터 프로세스를 시작하고 결과를 응답받아 익스큐터 위치와 관련 정보를 드라이버 프로세스로 전송한다.

스크린샷 2023-02-05 오후 5 53 35

15.2.3 실행

  • 클러스터 처리를 위한 준비가 끝나고 이제 실제 코드가 실행된다.
  • 드라이버와 워커는 코드를 실행하고, 데이터를 이동하는 과정에서 서로 통신한다.
  • 드라이버는 각 워커에 태스크를 할당하고, 워커는 태스크의 상태와 성공/실패 여부를 드라이버로 전송한다.

스크린샷 2023-02-05 오후 5 55 20

15.2.4 완료

  • 실행이 완료되면 드라이버 프로세스가 성공/실패 중 하나의 상태로 종료된다.
  • 그 이후 클러스터 매니저는 드라이버가 속한 클러스터의 모든 익스큐터를 종료시킨다.

스크린샷 2023-02-05 오후 5 56 53

15.3 스파크 애플리케이션의 생애주기(스파크 내부)

  • 스파크 앱은 하나 이상의 스파크 잡으로 구성된다.
  • 스레드를 사용해 여러 액션을 병렬로 수행하는 경우가 아니라면 스파크 잡은 차례대로 실행된다.

15.3.1 SparkSession

  • 모든 스파크 앱은 가장 먼저 SparkSession을 생성한다.
    • 대화형 모드에서는 자동으로 생성됨.
    • 개발시 빌더 메소드를 사용해 생성하는 것을 추천함.
1
2
3
4
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Databricks Spark Example")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .getOrCreate()

SparkContext

  • SparkSession의 SparkContext는 스파크 클러스터에 대한 연결을 나타낸다.
  • 이를 이용해 RDD 같은 스파크 저수준 API를 사용할 수 있다.
    • RDD, 어큐뮬레이터, 브로드캐스트 변수를 생성하고 코드를 실행할 수 있음.
  • SparkSession을 통해 SparkContext에 접근할 수 있으므로 명시적으로 초기화하지 않고, getOrCrete()을 사용한다.
1
2
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()

15.3.2 논리적 명령

  • 사용자는 SQL, 저수준 RDD처리, 머신러닝 알고리즘 등을 사용해 트랜스포메이션과 액션을 마음대로 구성할 수 있다.
  • 하지만, 스파크 코드는 트랜스포메이션과 액션으로만 구성된다는 점에서 실제 내부동작은 동일하다.
  • 논리적 명령이 물리적 실행계획으로 어떻게 변환되는지 이해하는 것이 중요하다.

논리적 명령을 물리적 실행 계획으로 변환하기

  • 스파크가 사용자 코드를 어떻게 받아들이고, 클러스터에 어떻게 명령을 전달하는지 다시한번 짚어보자.
1
2
3
4
5
6
7
8
9
10
11
12
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect() # 2500000000000

step4.explain()

스크린샷 2023-02-05 오후 6 06 13

  • collect 같은 액션을 호출하면 개별 스테이지와 태스크로 이루어진 스파크 잡이 실행된다.

15.3.3 스파크 잡

  • 액션 하나당 하나의 스파크 잡이 생성되며, 액션은 항상 결과를 반환한다.
  • 스파크 잡은 일련의 스테이지로 나뉘며 스테이지 수는 셔플 작업이 얼마나 많이 발생하는지에 따라 달라진다.

스크린샷 2023-02-05 오후 6 11 59

15.3.4 스테이지

  • 다수의 머신에서 동일한 연산을 수행하는 태스크의 그룹을 나타낸다.
  • 스파크는 가능한 한 많은 태스크(잡의 트랜스포메이션)을 동일한 스테이지로 묶으려 노력한다.
  • 셔플 작업이 일어난 다음에는 반드시 새로운 스테이지를 시작한다.
    • 셔플은 데이터의 물리적 재분배 과정
  • 조인(셔플) 수행시 스테이지 태스크 수가 증가할 수 있는 부분에 대해 유의하여야 한다.
    • spark.sql.shuffle.partitions 기본값이 200인데, 잡 실행 도중 셔플이 수행되면 태스크 수가 200까지 늘어날 수 있는 것이다.
  • 클러스터의 익스큐터 수보다 파티션 수를 더 크게 지정하는 것이 일반적으로 좋음
  • 최종 스테이지 태스크 1개는 드라이버로 결과를 전송하기 전에 파티션마다 개별적으로 수행된 결과를 단일 파티션으로 모으는 작업을 수행하는 부분이다.

15.3.5 태스크

  • 스파크의 스테이지는 태스크로 구성된다.
  • 각 태스크는 단일 익스큐터에서 실행할 데이터의 블록과 다수의 트랜스포메이션의 조합으로 볼수 있다.
  • 데이터셋이 거대한 하나의 파티션인 경우 하나의 태스크만 생성된다.
    • 1,000개의 작은 파티션으로 구성되어 있다면 1,000개의 태스크를 만들어 병렬로 실행할 수 있다.
  • 태스크는 데이터 단위(파티션)에 적용되는 연산 단위를 의미한다.
    • 파티션 수를 늘리면 높은 병렬성을 얻을수 있다.
    • 최적화를 위한 가장 쉬운 방법 중 하나

15.4 세부 실행 과정

  • 스파크의 스테이지와 태스크는 알아두면 좋을만한 중요한 특성을 가지고 있다.
  • 첫째, 스파크는 map 연산 후 다른 map 연산이 이어진다면 함께 실행할 수 있도록 스테이지와 태스크를 자동으로 연결.
  • 둘쨰, 스파카는 모든 셔플을 작업할 때 데이터를 안정적인 저장소(디스크)에 저장하므로 여러 잡에서 재사용할 수 있다.

15.4.1 파이프라이닝

  • 스파크를 인메모리 컴퓨팅 도구로 만들어주는 핵심 요소 중 하나는 메모리나 디스크에 데이터를 쓰기 전에 최대한 많은 단계를 수행한다는 점이다.(맵리듀스와 비교되는 차이점)
  • 스파크가 수행하는 주요 최적화 기법 중 하나는 RDD나 더 RDD보다 더 아래에서 발생하는 파이프라이닝 기법이다.
  • 노드 간의 데이터 이동 없이 각 노드가 데이터를 직접 가공할 수 있는 연산만 모아 태스크의 단일 스테이지를 만들어 준다.

15.4.2 셔플 결과 저장

  • 두번쨰 특성은 셔플 결과를 저장하는 것이다.
  • reduceByKey 연산과 같이 노드간 복제를 유발하는 연산을 실행하면 엔진에서 파이프라이닝을 수행하지 못하므로 네트워크 셔플이 발생한다.
  • 노드간 복제를 유발하는 연산은 각 키에 대한 입력데이터를 먼저 여러 노드로부터 복사하고, 셔플 파일을 로컬 디스크에 기록한다.
  • 다만, 위와 같은 처리를 할때 소스와 관련된 셔플이 다시 실행되지 않고 재사용되기 때문에 사이드이펙트가 발생할 수 있음을 알고 있어야 한다.
  • 이러한 자동 최적화 기능을 통해 동일한 데이터를 사용해 여러 잡을 실행하는 워크로드의 시간을 절약할 수 있다.
  • 더 나은 성능을 얻기 위해 직접 DataFrame이나 RDD의 cache 메소드를 사용할 수도 있다.

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 14. 분산형 공유 변수

[스파크 완벽 가이드] 16. 스파크 애플리케이션 개발하기

Comments powered by Disqus.