Home [스파크 완벽 가이드] 13. RDD 고급 개념
Post
Cancel

[스파크 완벽 가이드] 13. RDD 고급 개념

[스파크 완벽 가이드] 13. RDD 고급 개념

핵심 주제

집계와 키-값 형태의 RDD 사용자 정의 파티셔닝 RDD조인

13.1 키-값 형태의 기초(키-값 형태의 RDD)

  • 데이터를 키-값 형태로 다룰수 있는 메소드
  • ~byKey 류는 PairRDD 만 사용 가능 (ex. Pair(“s”, “Scala”))
1
words.map(word => (word.toLowerCase, 1))

13.1.1 keyBy

  • 현재 값으로부터 키 생성
1
val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)

13.1.2 값 매핑하기

mapValues

  • 스파크에서는 튜플인 경우 첫번째 요소를 키, 두번쨰 요소를 값으로 추정
  • 튜플에서 값만 추출
1
2
3
4
5
6
7
8
9
10
11
12
13
14
keyword.mapValues(word => word.toUpperCase).collect( )

[
	('s', 'SPARK'),
	('t', 'THE'),
	('d', 'DEFINITIVE'),
	('g', ,'GUIDE'),
	(':' ':'),
	('b', 'BIG'),
	('d', 'DATA'),
	('p', 'PROCESSING'),
	('m', 'MADE'),
	('s', 'SIMPLE')
]

flatMapValues

1
keyword.flatMapValues(word =>word.toUpperCase).collect()

스크린샷 2023-01-01 오후 5 47 31

13.1.3 키와 값 추출하기

  • keys , values
1
2
keyword.keys.collect()
keyWord.values.collect()

13.1.4 lookup

  • 특정 키에 관한 결과 검색
1
2
3
keyword.lookup("s")

// 키가 "s"인 Spark와 Simpe 반환

13.1.5 sampleByKey

  • 근사치나 정확도를 이용해서 키 기반 RDD 샘플 생성
  • 간단한 무작위 샘플링을 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import scala.util.Random

val distinctchars = words.flatMap(word => word.toLowerCase.toSeq).distinct
.collect()

// sampleByKey
val sampleMap = distinctChars.map(c => (c, new Random().nextDouble())).toMap
words
	.map(word => (word.toLowerCase.toSeq(0), word))
	.sampleByKey(true, sampleMap, 6L)
	.collect()

// sampleByKeyExact
val sampleByKeyExact = words
	.map(word => (word.toLowerCase.toSeq(0), word))
	.sampleByKeyExact(true, sampleMap, 6L)
	.collect()
  • 99.99% 신뢰도를 가진 모든 키 값에 대해서 RDD를 추가로 처리하므로 math.ceil (numltems * samplingRate)의 합과 완전히 동일한 크기의 샘플 데이터를 생성

13. 2 집계

  • RDD 또는 PairRDD를 사용해 집계 수행
1
2
3
4
5
6
7
val chars = words.flatMap(word => word.toLowerCase.toSeq)
val KVcharacters = chars.map(letter => (letter, 1))

def maxFunc(left:Int, right:Int) = math.max(left, right)
def addFunc(left:Int, right:Int) = left + right

val nums = sc.parallelize(1 to 30, 5)

13.2.1 countByKey

  • 각 키의 아이템 수를 구하고 맵으로 결과 반환
    • Scala 또는 java에서는 timeout과 신뢰도를 인수로 지정해 근사치를 구할 수 있음.
1
2
3
4
5
val timeout = 1000L // 일리세컨드 단위
val confidence = 0.95

KVcharacters.countByKey()
KVcharacters.countByKeyApprox(timeout, confidence)

13.2.2 집계 연산 구현 방식 이해

  • 키-값 형태의 PairRDD 생성하는 몇가지 방식

groupByKey

1
2
3
4
KVcharacters
	.groupByKey()
	.map(row => (row._1, row._2.reduce(addFunc)))
	.count()
  • 사용시 주의사항
    • 모든 익스큐터에서 함수를 적용하기 전에 해당 키와 관련된 모든 값을 메모리에 올려야하는 문제
      • 심각하게 치우쳐진 키가 있다면 일부 파티션에서 OOM이 발생할 수 있음.
    • 각 키에 대한 값의 크기가 일정하고, 익스큐터에 할당된 메모리에서 처리 가능할 정도의 크기 인 경우에만 groupByKey 사용

reduceByKey

  • 각 파티션에서 리듀스 작업을 수행하기 떄문에 훨씬 안정적이고, 모든 값을 메모리에 유지할 필요가 없음.
  • 최종 리듀스 과정을 제외한 모든 작업은 개별 워커에서 처리하므로 연산 수행 속도도 향상될 수 있음.
1
2
3
4
5
6
KVcharacters
	.reduceByKey(addFunc)
	.collect()

// key별 그룹 RDD Array 반환
// Array((d,4), (p,3), (t,3), (b,1), (h,1), (n,2), (a,4), (i,7), (k,1), (u,1), (o,1), (g,3), (m,2), (c,1))
  • 정렬되어 있지 않기 떄문에 작업 부하를 줄일수는 있으나, 순서가 중요한 경우 적합하지 않음.

13.2.3 기타 집계 메서드

  • 대부분은 구조적 API를 사용하면 간단히 집계를 수행할수 있음.
  • 고급 집계 함수를 사용해 클러스터 노드에서 수행하는 집계를 구체적이고 세밀하게 제어 가능

aggregate

1
2
3
4
5
nums.aggregate(0)(maxFunc, addFunc)

// nulll 또는 집계의 시작값 필요
// 첫번쨰 함수 : 파티션 내에서 수행
// 두번째 함수 : 모든 파티션에 걸쳐 수행
  • 드라이버에서 최종 집계를 수행하므로 성능에 영향이 있음.
    • executor의 결과가 너무 크면 OOM 발생할수 있음.

treeAggreate

1
2
val depth = 3
nums.treeAggregate(0)(maxFunc, addFunc, depth)
  • treeAggregate 를 이용하면 처리과정은 다르지만 같은 결과를 얻을 수 있음.
    • executor끼리 트리를 형성해 집계 처리의 일부 하위 과정을 push down 방식으로 먼저 수행

aggregateByKey

  • aggregate 함수와 동일하지만 파티션 대신 키를 기준으로 연산 수행
1
2
3
KVcharacters
	.aggregateByKey(0)(addFunc, maxFunC)
	.collect()

combineByKey

  • 집계 함수 대신 comniner를 사용
    • key를 기준으로 연산을 수행하고, 파라미터로 사용된 함수에 따라 값을 병합
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val valToCombiner = (value:Int) => List(value)
val mergeValuesFunc = (vals: List[Int], valToAppend: Int) => valToAppend :: vals
val mergeCombinerFunc = (vals1: List[Int], vals2:List[Int]) => vals1 ::: vals2
// 함수형 변수 정의도 가능
val outputPartitions = 6

val result = KVcharacters
	.combineByKey(
		valToCombiner,
		mergeValuesFunc,
		mergeCombinerFunc,
		outputPartitions
	)
	.collect()

foldByKey

  • 결합 함수와 항등원(neutral)인 제로값을 이용해 각 키의 값을 병합
1
2
3
KVcharacters
	.foldByKey(0)(addFunc)
	.collect()

13.3 cogroup

  • RDD에 대한 그룹 기반의 조인을 수행
    • 스칼라 : 3개, 파이썬 2개의 키-값 형태의 RDD를 그룹화할 수 있음.
    • 출력 파티션 수나 클러스터에 데이터 분산 방식 제어를 위한 사용자 정의 파라미터 제공
1
2
3
4
5
6
7
8
9
10
11
12
13
import scala.util.Random

val distinctChars = words
	.flatMap(word => word.toLowerCase.toSeq)
	.distinct

val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))

charRDD
	.cogroup(charRDD2, charRDD3)
	.take(5)

스크린샷 2023-01-01 오후 5 49 33

13.4 JOIN

  • 구조적 API와 동일한 조인 방식을 가지고 있지만, RDD를 사용하면 사용자가 많은 부분을 관여해야 함.
    • 출력 파티션 수나 사용자 정의 파티션 함수 파라미터 사용

13.4.1 INNER JOIN

1
2
3
4
5
val keyedChars = distinctChars.map(C => (C, new Random().nextDouble()))
val outputPartitions = 10

KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
  • 이 외의 조인들도 동일한 기본 형식을 따름

13.4.2 zip

  • 실제로 JOIN은 아니고 두 개의 RDD를 결합하는 방식
    • 동일한 길이의 2개의 RDD를 zipper를 잠그듯이 연결하여 PairRDD 생성
    • 요소와 파티션수가 일치해야 함.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val numRange = sc.parallelize(0 to 9, 2)
words.zip(numRange).collect()

[
	('Spark', 0),
	('The', 1),
	('Definitive', 2),
	('Guide', 3),
	(':', 4),
	('Big', 5),
	('Data', 6),
	('ProceSsing', 7),
	('Made', 8),
	('Simple', 9)
]

13.5 파티션 제어하기

  • RDD를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식으로 정의 할 수 있음.
  • 기본적으로 구조적 API와 동일

구조적 API와 차이점

  • 파티션 함수를 파라미터로 사용할 수 있음.
    • 사용자 지정 Partitioner

13.5.1 coalesce

  • 파티션을 재분배할 떄 발생하는 데이터 셔플을 방지하기 위해 동일한 워커에 존재하는 파티션을 합치는 메서드
    • 2개의 파티션으로 구성된 words RDD를 셔플링 없이 하나의 파티션으로 합친다.
1
words.coalesce(1).getNumPartitions // 값은1

13.5.2 repartition

  • 파티션 수를 늘리거나 줄일 때 사용(노드간 셔플 발생할 수 있음)
    • 파티션 수를 늘리면 필터 map or filter 타입의 연산을 수행할때 병렬 처리 수준을 높일 수 있음
1
words.repartition(10) // 10개의 파티션이 생성됩니다

13.5.3 repartitionAndSortWithinPartitions

  • 파티션을 재분배할 수 있고, 재분배된 결과 파티션의 정렬 방식을 지정
    • partition연산을 할때 key에대해서 sorting

https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner

  • 문서는 보기 좋지 않아보임.
  • 셔플링 단계에서 정렬 작업을 함께 수행하기 때문에 repartition을 호출한 후 직접 정렬하는 것보다 성능이 더 좋음

13.5.4 사용자 정의 파티셔닝

  • RDD를 사용하는 가장 큰 이유 중 하나
    • 구조적 API에서는 사용자 정의 파티셔너를 사용할 수 없음.
    • 저수준 API의 세부적인 구현 방식
  • 데이터 치우짐 문제를 피하고자 클러스터 전체에 데이터를 균등하게 분배하는 목적으로 사용될 수 있음.
  • 구조적 API로 RDD를 얻고, 사용자 정의 파티셔너를 적용한 뒤 다시 DataFrame 또는 Dataset으로 변환하여 사용
  • 구조적 API와 RDD 장점을 모두 활용 가능
  • Partitioner를 확장한 클래스를 직접 구현해야 하므로 문제에 대한 업무 지식을 충분히 가진 경우에만 사용
    • 단일 값이나 다수 값(다수 컬럼)을 파티셔닝해야 한다면 DataFrame API를 사용하는 것이 좋음.
1
2
3
4
5
6
val df = spark.read
	.option("header", "true")
	.option("inferSchema", "true")
	.csv("/data/retail-data/all/")

val rdd = df.coalesce(10).rdd

HashPartitioner, RangePartitoner

  • RDD API에서만 사용할 수 있는 내장형
  • 이산형과 연속형 값을 다룰 때 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.HashPartitioner

rdd
	.map(r => r(6))
	.take(5)
	.foreach(println)

val keyedRDD = rdd
	.keyBy(row => row(6).asInstanceOf[Int].toDouble)

keyedRDD
	.partitionBy(newHashPartitioner(10))
	.take(10)
  • 이는 유용하지만 매우 기초적인 기능을 제공.
    • 매우 큰 데이터나 심각하게 치우친 키를 다뤄야 한다면 고급 파티셔닝 기능을 사용해야 함.

키 치우침 현상

  • 특정 키가 다른 키들에 비해 아주 많은 데이터를 가지는 현상
  • 병렬성을 개선하고 실행과정에서 OOM을 방지하기 위해서는 키를 최대한 분할해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class DomainPartitioner extends Partitioner {
	def numPartitions = 3
	def getPartition(key: Any): Int = {
		val customerld = key.aslnstanceOf[Double].tolnt

		if (customerld == 17850.0 || customerld == 12583.0) {
			return 0
		} else {
			return new java.util.Random().nextlnt(2) + 1
		}
	}
}

keyedRDD
	.partitionBy(new DomainPartitioner)
	.map(_._1)
	.glom()
	.map(_.toSet.toSeq.length)
	.take(5)
  • 사용자 정의 키 분배 로직은 RDD 수준에서만 사용 가능.
    • 임의의 로직을 사용해 물리적인 방식으로 클러스터에 데이터를 분배하는 강력한 방법

13.6 사용자 정의 직렬화

Kryo 직렬화

  • 병렬화 대상인 모든 객체나 함수는 직렬화할 수 있어야 한다.
1
2
3
4
5
6
7
8
9
10
class SomeClass extends Serializable {
	var someValue = 0
	def setSomeValue(i:Int) = {
		somevalue= i
		this
	}
}

sc.parallelize(1 to 10)
	.map(num => new SomeClass().setSomeValue(num))
  • 기본 직렬화는 매우 느리다.
  • Kryo를 사용해 빠르게 객체를 직렬화 할 수 있다.
    • Java 직렬화보다 10배 이상 성능이 좋음.
    • Job 초기화시 spark.serializer=org.apache.Spark.Serializer.Kryo5erializer 를 설정
    • Spark 2.0.0부터 단순 데이터타입, 배열, 문자열 데이터 타입의 RDD 셔플링시 내부적으로 Kryo Serializer 사용
1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf()
	.setMaster(...)
	.setAppName(...)

conf.registerKryoClasses(Array(
	classOf[MyClass1],
	classOf[MyCIass2]
))

val sc = new SparkContext(conf)

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 12. RDD

[스파크 완벽 가이드] 14. 분산형 공유 변수

Comments powered by Disqus.