[스파크 완벽 가이드] 11. Dataset
- Dataset은 구조적 API의 기본 데이터 타입이다.
- DataFrame은 Row 타입의 Dataset (
DataFrame == Dataset[Row]
) - Dataset을 사용하면 Row 타입 대신 사용자가 정의한 데이터 타입을 분산 방식으로 다룰 수 있다.
11.1 Dataset을 사용할 시기
Dataset을 사용하면 성능이 떨어진다는데 사용할 필요가 있을까?
Dataset을 사용해야 하는 두가지 이유
- DataFrame 기능만으로 수행할 연산을 표현할 수 없는 경우
- 성능 저하를 감수하더라도 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶은 경우
구조적 API를 사용해 표현할수 없는 몇가지 작업
- 비즈니스 로직을 SQL이나 DataFrame 대신 단일 함수로 인코딩해야 하는 경우
- 두 문자열을 사용해 뺄셈 연산을 하는 것처럼 타입이 유효하지 않을떄 런타임 에러가 아닌 컴파일 단계에서 에러를 탐지하기 좋은 케이스
- 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려는 경우
- 로컬과 분산환경의 워크로드 재사용 가능
- 경우에 따라서 더 적합한 워크로드를 만들기 위해 DataFrame과 Dataset을 동시에 사용해야 하는 경우가 있음.
- 성능과 타입 안정성 중 하나는 반드시 희생되어야 하는 트레이드오프 관계
11.2 Dataset 생성
- 정의할 스키마를 미리 알고 있어야 한다.
11.2.1 자바: Encoders
- 데이터 타입 클래스를 정으한 다음에 DataFrame(Dataset
타입)에 지정해 인코딩
1
2
3
4
5
6
7
8
9
10
import org.apache.spark.Sql.Encoders;
public class FIight implements Serializable{
String DEST COUNTRY=NAME;
String 0RIGIN COUNTRY=NAME;
Long DEST COUNTRY-NAME;
}
Dataset<FIight> flights = spark.read .parquet("/data/flight-data/parquet/2010-summary.parquet/")
.as(Encoders.bean(FIight.class));
11.2.2 슴칼라: 케이스 클래스
- 스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야 한다.
케이스 클래스의 특징
- 불변성
- 패턴 매칭으로 분해 가능
- 참조값 대신 클래스 구조를 기반으로 비교
- 사용하기 쉽고 다루기 편함.
케이스 클래스의 장점
- 불변성이므로 객체들이 언제 어디서 변경되었는지 추적할 필요가 없다.
- 값으로 비교하면 인스턴스를 마치 원시(primitive) 데이터 타입의 값처럼 비교할 수 있다. 그러므로 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 불확실해하지 않아도 됨.
- 패턴 매칭은 로직 분기를 단수화해 버그를 줄이고 가독성을 좋게 한다.
1
2
3
4
5
6
7
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
11.3 액션
- Dataset과 DataFrame에 collect, take, count와 같은 액션을 적용할 수 있다는 사실이 중요하다.
1
flights.show(2)
11.4 트랜스포메이션
- Dataset의 트랜스포메이션은 DataFrame과 동일하다.
- Dataset을 사용하면 원형의 JVM 데이터 타입을 다루기 때문에 DataFrame만 사용해서 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 사용할 수 있다.
11.4.1 필터링
1
2
3
4
5
6
7
8
// Flight 클래스를 파라미터로 사용해 Bollean 값을 반환하는 함수
def originIsDestination(flight_row: Flight): Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
flights.filter(flight_row => originIsDestination(flight_row)).first()
flights.collect().filter(flight_row => originIsDestination(flight_row))
11.4.2 맵핑
- 특정 값을 다른 값으로 맵핑
- JVM 데이터 타입을 알고 있기 때문에 컴파일 타임에 데이터 타입 유효성 검사를 할수 있다.
1
2
3
val destinations = flights.map(f => f.DEST_COUNTRY_NAME)
val localDestinations = destinations.take(5)
11.5 조인
- DataFrame에서와 마찬가지로 Dataset에도 동일하게 적용된다.
- Dataset에서는
joinWith
와 같은 정교한 메소드를 제공함.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case class FlightMetadata(count: BigInt, randomData: BigInt)
val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
.withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
.as[FlightMetadata]
val flights2 = flights
.joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))
flights2.selectExpr("_1.DEST_COUNTRY_NAME")
flights2.take(2)
val flights2 = flights.join(flightsMeta, Seq("count"))
val flights2 = flights.join(flightsMeta.toDF(), Seq("count"))
- 일반 조인도 아주 잘 동작하나, DataFrame을 반환하므로 JVM 데이터 타입 정보를 잃게 된다.
- 이 정보를 다시 얻으려면 Dataset을 정의해야 한다.
- DataFrame과 Dataset을 조인하는것에는 아무런 문제가 되지 않음.
11.6 그룹화와 집계
- Dataset을 가지고도 groupBy, rollup, cube 메서드를 모두 사용할 수 있다.
- Dataset 대신 DataFrame을 반환하므로 데이터 타입 정보를 잃게 됨.
1
2
3
4
5
flights.groupBy("DEST_COUNTRY_NAME").count()
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count().explain
- Dataset의 키를 이용해 그룹화를 수행한 다음 결과를 키-값 형태로 전달해 원시 객체 형태로 그룹화된 데이터를 다룰수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
def grpSum(countryName:String, values: Iterator[Flight]) = {
values.dropWhile(_.count < 5).map(x => (countryName, x))
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)
def grpSum2(f:Flight):Integer = {
1
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)
1
2
3
4
5
6
7
// 새로운 처리방법을 생성해 그룹을 축소(reduce)하는 방법 정의 가능
def sum2(left:Flight, right:Flight) = {
Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
.take(5)
- gorupByKey 메서드는 동일한 결과를 반환하지만, 데이터 스캔 직후에 집계를 수행하는 groupBy에 비해 더 비싼 처리를 한다.
1
flights.groupBy("DEST_COUNTRY_NAME").count().explain
- 사용자가 정의한 인코딩으로 세밀한 처리가 필요한 경우에만 Dataset의 groupByKey 메서드를 사용하는것이 좋음.
- Dataset은 빅데이터 처리 파이프라인의 처음과 끝 작업에서 주로 사용하는것이 좋다.
Comments powered by Disqus.