[스파크 완벽 가이드] 13. RDD 고급 개념
핵심 주제
집계와 키-값 형태의 RDD 사용자 정의 파티셔닝 RDD조인
13.1 키-값 형태의 기초(키-값 형태의 RDD)
- 데이터를 키-값 형태로 다룰수 있는 메소드
~byKey
류는 PairRDD 만 사용 가능 (ex.Pair(“s”, “Scala”)
)
1
words.map(word => (word.toLowerCase, 1))
13.1.1 keyBy
- 현재 값으로부터 키 생성
1
val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)
13.1.2 값 매핑하기
mapValues
- 스파크에서는 튜플인 경우 첫번째 요소를 키, 두번쨰 요소를 값으로 추정
- 튜플에서 값만 추출
1
2
3
4
5
6
7
8
9
10
11
12
13
14
keyword.mapValues(word => word.toUpperCase).collect( )
[
('s', 'SPARK'),
('t', 'THE'),
('d', 'DEFINITIVE'),
('g', ,'GUIDE'),
(':' ':'),
('b', 'BIG'),
('d', 'DATA'),
('p', 'PROCESSING'),
('m', 'MADE'),
('s', 'SIMPLE')
]
flatMapValues
1
keyword.flatMapValues(word =>word.toUpperCase).collect()
13.1.3 키와 값 추출하기
keys
,values
1
2
keyword.keys.collect()
keyWord.values.collect()
13.1.4 lookup
- 특정 키에 관한 결과 검색
1
2
3
keyword.lookup("s")
// 키가 "s"인 Spark와 Simpe 반환
13.1.5 sampleByKey
- 근사치나 정확도를 이용해서 키 기반 RDD 샘플 생성
- 간단한 무작위 샘플링을 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import scala.util.Random
val distinctchars = words.flatMap(word => word.toLowerCase.toSeq).distinct
.collect()
// sampleByKey
val sampleMap = distinctChars.map(c => (c, new Random().nextDouble())).toMap
words
.map(word => (word.toLowerCase.toSeq(0), word))
.sampleByKey(true, sampleMap, 6L)
.collect()
// sampleByKeyExact
val sampleByKeyExact = words
.map(word => (word.toLowerCase.toSeq(0), word))
.sampleByKeyExact(true, sampleMap, 6L)
.collect()
- 99.99% 신뢰도를 가진 모든 키 값에 대해서 RDD를 추가로 처리하므로
math.ceil (numltems * samplingRate)
의 합과 완전히 동일한 크기의 샘플 데이터를 생성
13. 2 집계
- RDD 또는 PairRDD를 사용해 집계 수행
1
2
3
4
5
6
7
val chars = words.flatMap(word => word.toLowerCase.toSeq)
val KVcharacters = chars.map(letter => (letter, 1))
def maxFunc(left:Int, right:Int) = math.max(left, right)
def addFunc(left:Int, right:Int) = left + right
val nums = sc.parallelize(1 to 30, 5)
13.2.1 countByKey
- 각 키의 아이템 수를 구하고 맵으로 결과 반환
- Scala 또는 java에서는 timeout과 신뢰도를 인수로 지정해 근사치를 구할 수 있음.
1
2
3
4
5
val timeout = 1000L // 일리세컨드 단위
val confidence = 0.95
KVcharacters.countByKey()
KVcharacters.countByKeyApprox(timeout, confidence)
13.2.2 집계 연산 구현 방식 이해
- 키-값 형태의 PairRDD 생성하는 몇가지 방식
groupByKey
1
2
3
4
KVcharacters
.groupByKey()
.map(row => (row._1, row._2.reduce(addFunc)))
.count()
- 사용시 주의사항
- 모든 익스큐터에서 함수를 적용하기 전에 해당 키와 관련된 모든 값을 메모리에 올려야하는 문제
- 심각하게 치우쳐진 키가 있다면 일부 파티션에서 OOM이 발생할 수 있음.
- 각 키에 대한 값의 크기가 일정하고, 익스큐터에 할당된 메모리에서 처리 가능할 정도의 크기 인 경우에만
groupByKey
사용
- 모든 익스큐터에서 함수를 적용하기 전에 해당 키와 관련된 모든 값을 메모리에 올려야하는 문제
reduceByKey
- 각 파티션에서 리듀스 작업을 수행하기 떄문에 훨씬 안정적이고, 모든 값을 메모리에 유지할 필요가 없음.
- 최종 리듀스 과정을 제외한 모든 작업은 개별 워커에서 처리하므로 연산 수행 속도도 향상될 수 있음.
1
2
3
4
5
6
KVcharacters
.reduceByKey(addFunc)
.collect()
// key별 그룹 RDD Array 반환
// Array((d,4), (p,3), (t,3), (b,1), (h,1), (n,2), (a,4), (i,7), (k,1), (u,1), (o,1), (g,3), (m,2), (c,1))
- 정렬되어 있지 않기 떄문에 작업 부하를 줄일수는 있으나, 순서가 중요한 경우 적합하지 않음.
13.2.3 기타 집계 메서드
- 대부분은 구조적 API를 사용하면 간단히 집계를 수행할수 있음.
- 고급 집계 함수를 사용해 클러스터 노드에서 수행하는 집계를 구체적이고 세밀하게 제어 가능
aggregate
1
2
3
4
5
nums.aggregate(0)(maxFunc, addFunc)
// nulll 또는 집계의 시작값 필요
// 첫번쨰 함수 : 파티션 내에서 수행
// 두번째 함수 : 모든 파티션에 걸쳐 수행
- 드라이버에서 최종 집계를 수행하므로 성능에 영향이 있음.
- executor의 결과가 너무 크면 OOM 발생할수 있음.
treeAggreate
1
2
val depth = 3
nums.treeAggregate(0)(maxFunc, addFunc, depth)
treeAggregate
를 이용하면 처리과정은 다르지만 같은 결과를 얻을 수 있음.- executor끼리 트리를 형성해 집계 처리의 일부 하위 과정을 push down 방식으로 먼저 수행
aggregateByKey
- aggregate 함수와 동일하지만 파티션 대신 키를 기준으로 연산 수행
1
2
3
KVcharacters
.aggregateByKey(0)(addFunc, maxFunC)
.collect()
combineByKey
- 집계 함수 대신 comniner를 사용
- key를 기준으로 연산을 수행하고, 파라미터로 사용된 함수에 따라 값을 병합
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val valToCombiner = (value:Int) => List(value)
val mergeValuesFunc = (vals: List[Int], valToAppend: Int) => valToAppend :: vals
val mergeCombinerFunc = (vals1: List[Int], vals2:List[Int]) => vals1 ::: vals2
// 함수형 변수 정의도 가능
val outputPartitions = 6
val result = KVcharacters
.combineByKey(
valToCombiner,
mergeValuesFunc,
mergeCombinerFunc,
outputPartitions
)
.collect()
foldByKey
- 결합 함수와 항등원(neutral)인 제로값을 이용해 각 키의 값을 병합
1
2
3
KVcharacters
.foldByKey(0)(addFunc)
.collect()
13.3 cogroup
- RDD에 대한 그룹 기반의 조인을 수행
- 스칼라 : 3개, 파이썬 2개의 키-값 형태의 RDD를 그룹화할 수 있음.
- 출력 파티션 수나 클러스터에 데이터 분산 방식 제어를 위한 사용자 정의 파라미터 제공
1
2
3
4
5
6
7
8
9
10
11
12
13
import scala.util.Random
val distinctChars = words
.flatMap(word => word.toLowerCase.toSeq)
.distinct
val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))
charRDD
.cogroup(charRDD2, charRDD3)
.take(5)
13.4 JOIN
- 구조적 API와 동일한 조인 방식을 가지고 있지만, RDD를 사용하면 사용자가 많은 부분을 관여해야 함.
- 출력 파티션 수나 사용자 정의 파티션 함수 파라미터 사용
13.4.1 INNER JOIN
1
2
3
4
5
val keyedChars = distinctChars.map(C => (C, new Random().nextDouble()))
val outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
- 이 외의 조인들도 동일한 기본 형식을 따름
13.4.2 zip
- 실제로 JOIN은 아니고 두 개의 RDD를 결합하는 방식
- 동일한 길이의 2개의 RDD를 zipper를 잠그듯이 연결하여 PairRDD 생성
- 요소와 파티션수가 일치해야 함.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val numRange = sc.parallelize(0 to 9, 2)
words.zip(numRange).collect()
[
('Spark', 0),
('The', 1),
('Definitive', 2),
('Guide', 3),
(':', 4),
('Big', 5),
('Data', 6),
('ProceSsing', 7),
('Made', 8),
('Simple', 9)
]
13.5 파티션 제어하기
- RDD를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식으로 정의 할 수 있음.
- 기본적으로 구조적 API와 동일
구조적 API와 차이점
- 파티션 함수를 파라미터로 사용할 수 있음.
- 사용자 지정 Partitioner
13.5.1 coalesce
- 파티션을 재분배할 떄 발생하는 데이터 셔플을 방지하기 위해 동일한 워커에 존재하는 파티션을 합치는 메서드
- 2개의 파티션으로 구성된 words RDD를 셔플링 없이 하나의 파티션으로 합친다.
1
words.coalesce(1).getNumPartitions // 값은1
13.5.2 repartition
- 파티션 수를 늘리거나 줄일 때 사용(노드간 셔플 발생할 수 있음)
- 파티션 수를 늘리면 필터 map or filter 타입의 연산을 수행할때 병렬 처리 수준을 높일 수 있음
1
words.repartition(10) // 10개의 파티션이 생성됩니다
13.5.3 repartitionAndSortWithinPartitions
- 파티션을 재분배할 수 있고, 재분배된 결과 파티션의 정렬 방식을 지정
- partition연산을 할때 key에대해서 sorting
- 문서는 보기 좋지 않아보임.
- 셔플링 단계에서 정렬 작업을 함께 수행하기 때문에
repartition
을 호출한 후 직접 정렬하는 것보다 성능이 더 좋음
13.5.4 사용자 정의 파티셔닝
- RDD를 사용하는 가장 큰 이유 중 하나
- 구조적 API에서는 사용자 정의 파티셔너를 사용할 수 없음.
- 저수준 API의 세부적인 구현 방식
- 데이터 치우짐 문제를 피하고자 클러스터 전체에 데이터를 균등하게 분배하는 목적으로 사용될 수 있음.
- 구조적 API로 RDD를 얻고, 사용자 정의 파티셔너를 적용한 뒤 다시 DataFrame 또는 Dataset으로 변환하여 사용
- 구조적 API와 RDD 장점을 모두 활용 가능
- Partitioner를 확장한 클래스를 직접 구현해야 하므로 문제에 대한 업무 지식을 충분히 가진 경우에만 사용
- 단일 값이나 다수 값(다수 컬럼)을 파티셔닝해야 한다면 DataFrame API를 사용하는 것이 좋음.
1
2
3
4
5
6
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/data/retail-data/all/")
val rdd = df.coalesce(10).rdd
HashPartitioner, RangePartitoner
- RDD API에서만 사용할 수 있는 내장형
- 이산형과 연속형 값을 다룰 때 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.HashPartitioner
rdd
.map(r => r(6))
.take(5)
.foreach(println)
val keyedRDD = rdd
.keyBy(row => row(6).asInstanceOf[Int].toDouble)
keyedRDD
.partitionBy(newHashPartitioner(10))
.take(10)
- 이는 유용하지만 매우 기초적인 기능을 제공.
- 매우 큰 데이터나 심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용해야 함.
키 치우침 현상
- 특정 키가 다른 키들에 비해 아주 많은 데이터를 가지는 현상
- 병렬성을 개선하고 실행과정에서 OOM을 방지하기 위해서는 키를 최대한 분할해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class DomainPartitioner extends Partitioner {
def numPartitions = 3
def getPartition(key: Any): Int = {
val customerld = key.aslnstanceOf[Double].tolnt
if (customerld == 17850.0 || customerld == 12583.0) {
return 0
} else {
return new java.util.Random().nextlnt(2) + 1
}
}
}
keyedRDD
.partitionBy(new DomainPartitioner)
.map(_._1)
.glom()
.map(_.toSet.toSeq.length)
.take(5)
- 사용자 정의 키 분배 로직은 RDD 수준에서만 사용 가능.
- 임의의 로직을 사용해 물리적인 방식으로 클러스터에 데이터를 분배하는 강력한 방법
13.6 사용자 정의 직렬화
Kryo 직렬화
- 병렬화 대상인 모든 객체나 함수는 직렬화할 수 있어야 한다.
1
2
3
4
5
6
7
8
9
10
class SomeClass extends Serializable {
var someValue = 0
def setSomeValue(i:Int) = {
somevalue= i
this
}
}
sc.parallelize(1 to 10)
.map(num => new SomeClass().setSomeValue(num))
- 기본 직렬화는 매우 느리다.
- Kryo를 사용해 빠르게 객체를 직렬화 할 수 있다.
- Java 직렬화보다 10배 이상 성능이 좋음.
- Job 초기화시
spark.serializer=org.apache.Spark.Serializer.Kryo5erializer
를 설정 - Spark 2.0.0부터 단순 데이터타입, 배열, 문자열 데이터 타입의 RDD 셔플링시 내부적으로 Kryo Serializer 사용
1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
conf.registerKryoClasses(Array(
classOf[MyClass1],
classOf[MyCIass2]
))
val sc = new SparkContext(conf)
Comments powered by Disqus.