Home [스파크 완벽 가이드] 14. 분산형 공유 변수
Post
Cancel

[스파크 완벽 가이드] 14. 분산형 공유 변수

[스파크 완벽 가이드] 14. 분산형 공유 변수

브로드캐스트 변수 어큐뮬레이터

14.1 브로드캐스트 변수

  • 브로드캐스트 변수는 변하지 않는 값(불변성 값)을 클로저 함수의 변수로 캡슐화하지 않고, 클러스터에서 효율적으로 공유하는 방법을 제공한다.
  • 모든 태스크마다 직렬화하지 않고 클러스터의 모든 머신에 캐시하는 불변성 공유 변수
  • 익스큐터 메모리 크기에 맞는 조회용 테이블을 전달하고 함수에서 사용하는 것이 대표적인 예이다.

스크린샷 2023-01-01 오후 6 47 46

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"
  .split(" ")

val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200,
                           "Big" -> -300, "Simple" -> 100)

// 클러스터 모든 노드에 지연 처리방식으로 복제 처리
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)

// 브로드캐스트된 변수 참조
suppBroadcast.value

// 브로드캐스트된 데이터를 사용해 RDD 반환
words.map(word => (word, suppBroadcast.value.getOrElse(word, 0)))
  .sortBy(wordPair => wordPair._2)
  .collect()
  • 브로드캐스트 변수를 사용한 방식과 클로저에 담아 전달하는 방식의 차이점은 브로드캐스트 변수를 사용하는게 훨씬 더 효율적이라는 것이다.
  • 데이터 총량과 익스큐터 수에 따라 다를수 있으나 브로드캐스트 변수에 아주 큰 데이터를 사용하는 경우 효율적이다.
    • 전체 데이터를 직렬화하는 데 발생하는 부하가 커질수 있으므로

14.2 어큐뮬레이터

  • 어큐뮬레이터는 스파크의 두 번째 공유 변수 타입이다.
  • 트랜스포메이션 내부의 다양한 값을 갱신하는데 사용됩니다.
  • 내고장성을 보장하면서 효율적인 방식으로 드라이버에 값을 전달할 수 있습니다.

스크린샷 2023-02-05 오후 5 28 35

  • 결합성과 가환성을 가진 연산을 통해서만 더할 수 있는 변수이므로, 병렬 처리 과정에서 효율적으로 사용할 수 있다.
    • 카운터나 합계를 구하는 용도로 사용 가능
  • 액션을 처리하는 과정에서만 갱신됨에 유의해야 한다.
    • 각 태스크에서 어큐뮬레이터 한번만 갱신하도록 제어한다.
    • 재시작한 태스크는 값을 갱신할 수 없음.
  • 스파크 지연 연산 모델에 영향을 주지않는것에 유의해야 한다.
    • RDD 처리 중에 갱신되면 RDD 연산이 실제로 수행된 시점에 딱 한번만 값을 갱신한다.
    • map 함수 같은 지연 처리 형태의 트랜스포메이션에서 어큐뮬레이터 갱신 작업을 수행하는 경우 실제 실행 전까지는 값이 갱신되지 않는다.

14.2.1 기본 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.util.LongAccumulator

case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flights = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet")
  .as[Flight]


val accUnnamed = new LongAccumulator
val acc = spark.sparkContext.register(accUnnamed)

val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")
spark.sparkContext.register(accChina, "China")
  • 출발지나 도착지가 중국인 항공편의 수를 구하는 어큐뮬레이터를 생성.(SQL로도 처리할수 있음)
  • 어큐뮬레이터를 사용하면 프로그래밍 방식으로 처리가 가능한다.
  • sparkContext을 통해 간단히 생성할수 있고, 이름을 붙여서 사용할 수 있다.
    • 이름을 붙인 경우 Spark UI에서 확인 가능(그 반대는 불가능)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def accChinaFunc(flight_row: Flight) = {
  val destination = flight_row.DEST_COUNTRY_NAME
  val origin = flight_row.ORIGIN_COUNTRY_NAME
  if (destination == "China") {
    accChina.add(flight_row.count.toLong)
  }
  if (origin == "China") {
    accChina.add(flight_row.count.toLong)
  }
}


flights.foreach(flight_row => accChinaFunc(flight_row))

// 수행 후 결과 확인
accChina.value // 953
  • 어큐뮬레이터는 액션에서만 실행을 보장하므로 foreach 메서드(액션)을 수행시킨다.

14.2.2 사용자 정의 어큐뮬레이터

  • 기본적으로 스파크에서는 수치와 관련된 유용한 어큐뮬레이터를 제공한다.
  • 떄에 따라 사용자 정의 어큐뮬레이터가 필요할 수 있다.
  • ACcumulatorV2클래스를 상속받아서 구현할 수 있다.(실제 사용시 최신버전 문서 확인 필요함, 책이 오래전이라..)
    • 파이썬에서는 AccumulatorParam을 상속받아 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2

val arr = ArrayBuffer[BigInt]()

class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
  private var num:BigInt = 0
  def reset(): Unit = {
    this.num = 0
  }
  def add(intValue: BigInt): Unit = {
    if (intValue % 2 == 0) {
        this.num += intValue
    }
  }
  def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = {
    this.num += other.value
  }
  def value():BigInt = {
    this.num
  }
  def copy(): AccumulatorV2[BigInt,BigInt] = {
    new EvenAccumulator
  }
  def isZero():Boolean = {
    this.num == 0
  }
}

val acc = new EvenAccumulator
val newAcc = sc.register(acc, "evenAcc")

acc.value // 0
flights.foreach(flight_row => acc.add(flight_row.count))
acc.value // 31390

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 13. RDD 고급 개념

[스파크 완벽 가이드] 15. 클러스터에서 스파크 실행하기

Comments powered by Disqus.