[스파크 완벽 가이드] 14. 분산형 공유 변수
브로드캐스트 변수 어큐뮬레이터
14.1 브로드캐스트 변수
- 브로드캐스트 변수는 변하지 않는 값(불변성 값)을 클로저 함수의 변수로 캡슐화하지 않고, 클러스터에서 효율적으로 공유하는 방법을 제공한다.
- 모든 태스크마다 직렬화하지 않고 클러스터의 모든 머신에 캐시하는 불변성 공유 변수
- 익스큐터 메모리 크기에 맞는 조회용 테이블을 전달하고 함수에서 사용하는 것이 대표적인 예이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"
.split(" ")
val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200,
"Big" -> -300, "Simple" -> 100)
// 클러스터 모든 노드에 지연 처리방식으로 복제 처리
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)
// 브로드캐스트된 변수 참조
suppBroadcast.value
// 브로드캐스트된 데이터를 사용해 RDD 반환
words.map(word => (word, suppBroadcast.value.getOrElse(word, 0)))
.sortBy(wordPair => wordPair._2)
.collect()
- 브로드캐스트 변수를 사용한 방식과 클로저에 담아 전달하는 방식의 차이점은 브로드캐스트 변수를 사용하는게 훨씬 더 효율적이라는 것이다.
- 데이터 총량과 익스큐터 수에 따라 다를수 있으나 브로드캐스트 변수에 아주 큰 데이터를 사용하는 경우 효율적이다.
- 전체 데이터를 직렬화하는 데 발생하는 부하가 커질수 있으므로
14.2 어큐뮬레이터
- 어큐뮬레이터는 스파크의 두 번째 공유 변수 타입이다.
- 트랜스포메이션 내부의 다양한 값을 갱신하는데 사용됩니다.
- 내고장성을 보장하면서 효율적인 방식으로 드라이버에 값을 전달할 수 있습니다.
- 결합성과 가환성을 가진 연산을 통해서만 더할 수 있는 변수이므로, 병렬 처리 과정에서 효율적으로 사용할 수 있다.
- 카운터나 합계를 구하는 용도로 사용 가능
- 액션을 처리하는 과정에서만 갱신됨에 유의해야 한다.
- 각 태스크에서 어큐뮬레이터 한번만 갱신하도록 제어한다.
- 재시작한 태스크는 값을 갱신할 수 없음.
- 스파크 지연 연산 모델에 영향을 주지않는것에 유의해야 한다.
- RDD 처리 중에 갱신되면 RDD 연산이 실제로 수행된 시점에 딱 한번만 값을 갱신한다.
- map 함수 같은 지연 처리 형태의 트랜스포메이션에서 어큐뮬레이터 갱신 작업을 수행하는 경우 실제 실행 전까지는 값이 갱신되지 않는다.
14.2.1 기본 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.util.LongAccumulator
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flights = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet")
.as[Flight]
val accUnnamed = new LongAccumulator
val acc = spark.sparkContext.register(accUnnamed)
val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")
spark.sparkContext.register(accChina, "China")
- 출발지나 도착지가 중국인 항공편의 수를 구하는 어큐뮬레이터를 생성.(SQL로도 처리할수 있음)
- 어큐뮬레이터를 사용하면 프로그래밍 방식으로 처리가 가능한다.
sparkContext
을 통해 간단히 생성할수 있고, 이름을 붙여서 사용할 수 있다.- 이름을 붙인 경우 Spark UI에서 확인 가능(그 반대는 불가능)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def accChinaFunc(flight_row: Flight) = {
val destination = flight_row.DEST_COUNTRY_NAME
val origin = flight_row.ORIGIN_COUNTRY_NAME
if (destination == "China") {
accChina.add(flight_row.count.toLong)
}
if (origin == "China") {
accChina.add(flight_row.count.toLong)
}
}
flights.foreach(flight_row => accChinaFunc(flight_row))
// 수행 후 결과 확인
accChina.value // 953
- 어큐뮬레이터는 액션에서만 실행을 보장하므로 foreach 메서드(액션)을 수행시킨다.
14.2.2 사용자 정의 어큐뮬레이터
- 기본적으로 스파크에서는 수치와 관련된 유용한 어큐뮬레이터를 제공한다.
- 떄에 따라 사용자 정의 어큐뮬레이터가 필요할 수 있다.
ACcumulatorV2
클래스를 상속받아서 구현할 수 있다.(실제 사용시 최신버전 문서 확인 필요함, 책이 오래전이라..)- 파이썬에서는
AccumulatorParam
을 상속받아 구현
- 파이썬에서는
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2
val arr = ArrayBuffer[BigInt]()
class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
private var num:BigInt = 0
def reset(): Unit = {
this.num = 0
}
def add(intValue: BigInt): Unit = {
if (intValue % 2 == 0) {
this.num += intValue
}
}
def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = {
this.num += other.value
}
def value():BigInt = {
this.num
}
def copy(): AccumulatorV2[BigInt,BigInt] = {
new EvenAccumulator
}
def isZero():Boolean = {
this.num == 0
}
}
val acc = new EvenAccumulator
val newAcc = sc.register(acc, "evenAcc")
acc.value // 0
flights.foreach(flight_row => acc.add(flight_row.count))
acc.value // 31390
Comments powered by Disqus.