[스파크 완벽 가이드] 12. RDD
- 대부분의 상황에서는 구조적 API를 사용하는것이 좋음.
- 하지만 모든 비즈니스나 기술적 문제를 고수준 API(구조적 API)를 사용해 해결할수 있지는 않기 때문에는 저수준 API인 RDD를 제공한다.
12.1 저수준 API란?
- 분산 데이터 처리를 위한 RDD, 브로드캐스트 변수와 어큐뮬레이터처럼 분산형 공유 변수를 배포하고 다루는 API 이렇게 2가지가 있다.
12.1.1 저수준 API는 언제 사용할까?
- 고수준 API에서 제공하지 않는 기능이 필요한 경우(클러스터의 물리적 데이터 배치를 세밀하게 제어해야 하는 경우)
- RDD를 사용해 갭라된 기존 코드를 유지보수하는 경우
- 사용자가 정의한 공유 변수를 다뤄야 하는 경우(14장)
위와 같은 특정 상황이 아니라면 저수준 API는 사용하지 말도록 하자. 스파크 모든 워크로드가 저수준 기능을 사용하는 기초 형태로 컴파일되므로 이를 이해하는것은 중요하지만 이걸로 개발하는게 항상 좋은것을 의미하지 않는다.
12.1.2 저수준 API는 어떻게 사용할까?
- SparkContext는 저수준 API 기능을 사용하기 위한 진입점이다.
- SparkSession을 이용해 SparkContext에 접근할 수 있다.
1
spark.sparkContext
12.2 RDD 개요
- 스파크 1.x 버전의 핵심 API
- 2.x에서도 사용할 수 있지만 잘 사용핮 ㅣ않는다.
- 스파크에서 실행한 모든 DataFrame이나 Dataset 코드는 RDD로 컴파일 된다.
- RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
- 개발자가 강력한 제어권을 가질 수 있으나, 모든 값을 다루거나 값 사이의 상호작용 과정을 반드시 수동으로 정의해야 한다는 단점이 있다.
- 구조적 API에서 자동으로 데이터를 최적화하고 압축된 바이너리 포맷으로 저장하는걸 저수준 API에서는 동일한 공간 효율/성능을 얻기 위해 직접 포맷을 구현해 모든 연산과정에 사용해야 한다.
- 재정렬/집계 같은 최적화 기법도 직접 구현해야 함.
스파크의 구조적 API를 사용할것을 강력히 권고
12.2.1 RDD 유형
- RDD에 수많은 하위 클래스들이 존재함.
- RDD는 DataFrame API에서 최적화된 물리적 실행계획을 만드는데 대부분 사용됨.
- 사용자는 젠리기 RDD 타입, 키 기반의 집계가 가능한 키-값 RDD를 만들수 있다.
RDD 정의
- 파티션의 목록
- 각 조각을 연산하는 함수
- 다른 RDD와의 의존성 목록
- 부가적으로 키-값 RDD를 위한 Partitioner
- 부가적으로 각 조각을 연산하기 위한 기본 위치 목록(hdfs 파일 블록 위치)
자바/스칼라가 아닌 언어에서 RDD를 다루는 경우 주사항
- 파이썬에서 RDD를 다루는 경우 상당한 성능 저하가 발생할수 있음.
- 파이썬으로 RDD를 실행하는 것은 파이썬으로 만들어진 사용자 정의 함수를 사용해 로우마다 적용하는 것과 동일
12.2.2 RDD는 언제 사용할까?
- 정말 필요한 경우가 아니라면 수동으로 RDD를 생성해서는 안된다.
- 구조적 API가 제공하는 여러 최적화 기법을 사용할 수 없다.
- DataFrame은 RDD보다 더 효율적이고 안정적이며 표현력이 좋다.
- 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요한 경우에 RDD를 사용하는것이 적합하다.
12.2.3 Dataset과 RDD의 케이스 클래스
- Dataset과 케이스 클래스를 사용해서 만들어진 RDD의 차이점은 구조적 API가 제공하는 풍부한 기능과 최적화 기법을 제공하느냐 아니냐가 가장 큰 차이점이다.
- Dataset을 사용하면 JVM 데이터 타입과 스파크 데이터 타입 중 어떤것을 쓸지 고민하지 않아도 된다.
- 어떤것을 사용하더라도 동일한 성능을 보장한다.
12.3 RDD 생성하기
12.3.1 DataFrame, Dataset으로 RDD 생성
1
2
3
4
5
6
// 스칼라 버전: converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
spark.range(10).rdd.toDF()
rdd
메서드는 Row 타입을 가진 RDD를 생성한다. 이를 이용해 상황에 따라 구조적 API / 저수준 API를 오고가게 만들수 있다.
12.3.2 로컬 컬렉션으로 RDD 생성하기
- SparkSession안에 있는 sparkContext의 parallelize 메서드를 호출해서 컬렉션 객체를 RDD로 만들수 있다.
- 단일 노드에 있는 컬렉션을 병렬 컬렉션으로 전환하고, 이때 파티션 수를 명시적으로 지정할 수 있다.
1
2
3
4
5
6
7
8
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"
.split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
// RDD에 이름 지정
words.setName("myWords")
words.name // myWords
12.3.3 데이터소스로 RDD 생성하기
- 데이터소스나 텍스트 파일을 이용해 RDD를 직접 생성할 수 있다.
- DataSource API를 사용하는것이 바람직하나, RDD에는 이런 개념이 없다.
1
2
3
spark.sparkContext.textFile("/some/path/withTextFiles")
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")
12.4 RDD 다루기
- RDD는 스파크 데이터 타입 대신 자바나 스칼라의 객체를 다룬다는 사실이 가장 큰 차이점이다.
- 연산을 단순화하는 헬퍼 메서드나 함수가 구조적 API에 비해 많이 부족하다.(사용자가 직접 정의해야 함)
12.5 트랜스포메이션
- 대부분의 RDD 트랜스포메이션은 구조적 API에서 사용할 수 있는 기능을 가지고 있다.
12.5.1 distincdt
- RDD에서 중복된 데이터 제거
1
2
words.distinct().count()
12.5.2 filter
- SQL의 where 조건절을 생성하는것과 비슷하다.
1
2
3
4
5
6
def startsWithS(individual:String) = {
individual.startsWith("S")
}
words.filter(word => startsWithS(word)).collect()
12.5.3 map과 flatMap
map
- 주어진 입력을 원하는 값으로 변환하는 함수를 명시하고, 레코드별로 적용할 수 있다.
1
2
3
4
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
// 3번째 반환값으로 필터링
words2.filter(record => record._3).take(5)
flatMap
- map의 확장 버전
- 단일 로우를 여러 로우로 변환해야 하는 경우
1
2
// word를 character 집합으로 반환
words.flatMap(word => word.toSeq).take(5)
12.5.4 sortBy
- RDD 정렬을 위해 sortBy 를 사용해야 한다.
1
words.sortBy(word => word.length() * -1).take(2)
12.5.5 randomSplit
- RDD를 임의로 분할하여 RDD 배열을 만들때 사용
1
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
12.6 액션
- DataFrame과 Dataset에서 했던 것처럼 지정된 트랜스포메이션 연산을 시작하려면 액션을 사용해야 한다.
- 액션은 데이터를 드라이버로 모으거나 외부 데이터소스로 내보내는것을 의미
12.6.1 reduce
- RDD의 모든 값을 하나의 값으로 만들려면 reduce를 사용해야 항다.
1
2
3
4
5
6
7
8
9
10
11
spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
// 단어 집합에서 가장 긴 단어를 찾는 function
def wordLengthReducer(leftWord:String, rightWord:String): String = {
if (leftWord.length > rightWord.length)
return leftWord
else
return rightWord
}
words.reduce(wordLengthReducer)
- 파티션에 대한 리듀스 연산은 비결정적 특성을 가진다.
- 위 예제에서 길이가 10인 단어가 중복될 경우 실행할때마다 결과가 다를 수 있음.
- 길이가 10인
definitive
나processing
중 하나가 leftWord 변수에 할당될 수 있음.
12.6.2 count
- RDD의 전체 로우 수 반환
1
words.count()
countApprox
- 반환 결과는 조금 이상해보일 수 있지만 꽤 정교한 편
- count 함수의 근사치를 제한시간 내에 계산할 수 있음.(불완전한 결과를 반환할 수 있다.)
- 신뢰도(confidence)는 실제로 연산한 결과와의 오차율
1
2
3
val confidence = 0.95
val timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)
countApproxDistinct
1
2
3
4
5
// 정확도 파라미터를 기준으로 카운팅을 추정함.(0.000017보다 커야함)
words.countApproxDistinct(0.05)
// 정밀도와 희소 정밀도를 받아서 카운팅 추정
words.countApproxDistinct(4, 10)
countByValue
- RDD의 개수를 구한다.
- 결과 데이터셋을 드라이버의 메모리로 읽어들여 처리함.
- 익스ㅠ터의 연산 결과가 드라이버 메모리에 모두 적재되므로 결과가 작은 경우에만 사용해야 한다.
1
words.countByValue()
countByValueApprox
- count 함수와 동일한 연산을 수행하지만 근사치를 계산한다.
1
2
// 신뢰도 95%
words.countByValueApprox(1000, 0.95)
12.6.3 first
- 데이터셋의 첫번째 값을 반환
1
words.first()
12.6.4 max / min
- 최댓값과 최솟값 반환
1
2
3
spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()
12.6.5 take
- RDD에서 가져올 값의 개수를 지정하여 조회
- 고정 크기의 임의 표본 데이터를 얻기 위한 메소드들도 제공함(takeSample)
1
2
3
4
5
6
7
words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)
12.7 파일 저장하기
- 데이터 처리 결과를 일반 텍스트 파일로 쓰는것을 의미
- RDD를 사용하면 일반적인 의미의 데이터소스에 저장할 수 없다.
12.7.1 saveAsTextFile
1
2
3
4
5
words.saveAsTextFile("file:/tmp/bookTitle")
// 압축 코덱 지정
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])
12.7.2 시퀀스 파일
- 시퀀스 파일은 바이너리 키-값 쌍으로 구성된 플랫 파일이며, 맵리듀스의 입출력 포맷으로 널리 사용되는 형태
saveAsObjectFile
메서드나 명시적인 키-값 쌍 데이터 저장방식을 이용해 시퀀스 파일을 만들수 있다.
1
words.saveAsObjectFile("/tmp/my/sequenceFilePath")
12.7.3 하둡 파일
- 하둡 파일 포맷을 사용하면 클래스, 출력 포맷, 하둡 설정 그리고 압축 방식을 지정할 수 있다.
- 하둡 에코시스템이나 기존의 맵리듀스 잡을 깊이 있게 다루는 경우가 아니라면 상관 없음.
12.8 캐싱
- RDD 캐싱에도 DataFrame이나 Dataset의 캐싱과 동일한 원칙이 적용된다.
- 기본적으로 캐시와 저장은 메모리에 있는 데이터만을 대상으로 하고,
setName
함수를 통해 캐시된 RDD에 이름을 지정할 수 있다.
1
2
3
4
words.cache()
// 저장소 수준 조회
words.getStorageLevel
12.9 체크포인팅
- DataFrame API에서 사용할 수 없는 기능 중 하나인 체크포인팅 개념을 제공한다.
- 체크포인팅이란 RDD를 디스크에 저장하는 방식이다.
- 메모리 대신 디스크에 저장한다는 점만 다르고 캐싱과 유사하다.
- 반복적인 연산 수행시 유용하게 사용할 수 있다.
1
2
spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")
words.checkpoint()
12.10 RDD를 시스템 명령으로 전송하기
pipe
메소드를 사용하면 파이핑 요소로 생성된 RDD를 외부 프로세스로 전달할 수 있다.- 외부 프로세스는 파티션마다 한 번씩 처리해 결과 RDD를 생성
1
words.pipe("wc -l").collect()
12.10.1 mapPartitons
- 스파크는 코드를 실행할때 파티션 단위로 동작한다.
- map 함수에서 반환하는 RDD의 진짜 형태가 MapPartionsRDD이다.
1
words.mapPartitions(part => Iterator[Int](1)).sum() // 2
- 파티션 그룹 전체 값을 단일 파티션으로 모으고 임의의 함수를 적용하고 제어할 수 있다.
mapPartitonsWithIndex
- mapPartions와 유사하지만, 인덱스(파티션 범위의 인덱스)와 파티션의 모든 아이템을 순회하는 이터레이터를 가진 함수를 인수로 지정하여 처리 가능.
1
2
3
4
5
6
// 인덱스 함수를 넣어 각 레코드가 속한 데이터셋이 어디인지 알아내는 디버깅용 함수 추가
def indexedFunc(partitionIndex:Int, withinPartIterator: Iterator[String]) = {
withinPartIterator.toList.map(
value => s"Partition: $partitionIndex => $value").iterator
}
words.mapPartitionsWithIndex(indexedFunc).collect()
12.10.2 foreachPartiton
- map과 달리 파티션의 모든 데이터를 순회만 하는 함수
1
2
3
4
5
6
7
8
9
10
11
words.foreachPartition { iter =>
import java.io._
import scala.util.Random
val randomFileName = new Random().nextInt()
val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
while (iter.hasNext) {
pw.write(iter.next())
}
pw.close()
}
12.10.3 glom
- glom 함수는 데이터셋의 모든 파티션을 배열로 변환하는 함수
- 데이터를 드라이버로 모으고 데이터가 존재하는 파티션의 배열이 필요한 경우 유용하다.
- 파티션이 크거나 파티션 수가 많다면 드라이버가 비정상 종료될 수 있기 때문에 사용시 주의 필요
1
2
3
4
// 입력된 단어를 2개에 파티션에 개별적으로 할당
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()
// Array(Array(Hello), Array(World))
Comments powered by Disqus.