Home [스파크 완벽 가이드] 11. Dataset
Post
Cancel

[스파크 완벽 가이드] 11. Dataset

[스파크 완벽 가이드] 11. Dataset

  • Dataset은 구조적 API의 기본 데이터 타입이다.
  • DataFrame은 Row 타입의 Dataset ( DataFrame == Dataset[Row] )
  • Dataset을 사용하면 Row 타입 대신 사용자가 정의한 데이터 타입을 분산 방식으로 다룰 수 있다.

11.1 Dataset을 사용할 시기

  • Dataset을 사용하면 성능이 떨어진다는데 사용할 필요가 있을까?

Dataset을 사용해야 하는 두가지 이유

  • DataFrame 기능만으로 수행할 연산을 표현할 수 없는 경우
  • 성능 저하를 감수하더라도 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶은 경우

구조적 API를 사용해 표현할수 없는 몇가지 작업

  • 비즈니스 로직을 SQL이나 DataFrame 대신 단일 함수로 인코딩해야 하는 경우
  • 두 문자열을 사용해 뺄셈 연산을 하는 것처럼 타입이 유효하지 않을떄 런타임 에러가 아닌 컴파일 단계에서 에러를 탐지하기 좋은 케이스
  • 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려는 경우
    • 로컬과 분산환경의 워크로드 재사용 가능
  • 경우에 따라서 더 적합한 워크로드를 만들기 위해 DataFrame과 Dataset을 동시에 사용해야 하는 경우가 있음.
    • 성능과 타입 안정성 중 하나는 반드시 희생되어야 하는 트레이드오프 관계

11.2 Dataset 생성

  • 정의할 스키마를 미리 알고 있어야 한다.

11.2.1 자바: Encoders

  • 데이터 타입 클래스를 정으한 다음에 DataFrame(Dataset 타입)에 지정해 인코딩
1
2
3
4
5
6
7
8
9
10
 import org.apache.spark.Sql.Encoders;

public class FIight implements Serializable{
   String DEST COUNTRY=NAME;
   String 0RIGIN COUNTRY=NAME;
   Long DEST COUNTRY-NAME;
}

Dataset<FIight> flights = spark.read .parquet("/data/flight-data/parquet/2010-summary.parquet/")
    .as(Encoders.bean(FIight.class));

11.2.2 슴칼라: 케이스 클래스

  • 스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야 한다.

케이스 클래스의 특징

  • 불변성
  • 패턴 매칭으로 분해 가능
  • 참조값 대신 클래스 구조를 기반으로 비교
  • 사용하기 쉽고 다루기 편함.

케이스 클래스의 장점

  • 불변성이므로 객체들이 언제 어디서 변경되었는지 추적할 필요가 없다.
  • 값으로 비교하면 인스턴스를 마치 원시(primitive) 데이터 타입의 값처럼 비교할 수 있다. 그러므로 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 불확실해하지 않아도 됨.
  • 패턴 매칭은 로직 분기를 단수화해 버그를 줄이고 가독성을 좋게 한다.
1
2
3
4
5
6
7
case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)

val flightsDF = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]

11.3 액션

  • Dataset과 DataFrame에 collect, take, count와 같은 액션을 적용할 수 있다는 사실이 중요하다.
1
flights.show(2)

스크린샷 2023-01-01 오후 5 20 55

11.4 트랜스포메이션

  • Dataset의 트랜스포메이션은 DataFrame과 동일하다.
  • Dataset을 사용하면 원형의 JVM 데이터 타입을 다루기 때문에 DataFrame만 사용해서 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 사용할 수 있다.

11.4.1 필터링

1
2
3
4
5
6
7
8
// Flight 클래스를 파라미터로 사용해 Bollean 값을 반환하는 함수
def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}

flights.filter(flight_row => originIsDestination(flight_row)).first()

flights.collect().filter(flight_row => originIsDestination(flight_row))

11.4.2 맵핑

  • 특정 값을 다른 값으로 맵핑
  • JVM 데이터 타입을 알고 있기 때문에 컴파일 타임에 데이터 타입 유효성 검사를 할수 있다.
1
2
3
val destinations = flights.map(f => f.DEST_COUNTRY_NAME)

val localDestinations = destinations.take(5)

11.5 조인

  • DataFrame에서와 마찬가지로 Dataset에도 동일하게 적용된다.
  • Dataset에서는 joinWith와 같은 정교한 메소드를 제공함.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case class FlightMetadata(count: BigInt, randomData: BigInt)

val flightsMeta = spark.range(500).map(x => (x, scala.util.Random.nextLong))
  .withColumnRenamed("_1", "count").withColumnRenamed("_2", "randomData")
  .as[FlightMetadata]

val flights2 = flights
  .joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))

flights2.selectExpr("_1.DEST_COUNTRY_NAME")

flights2.take(2)

val flights2 = flights.join(flightsMeta, Seq("count"))

val flights2 = flights.join(flightsMeta.toDF(), Seq("count"))
  • 일반 조인도 아주 잘 동작하나, DataFrame을 반환하므로 JVM 데이터 타입 정보를 잃게 된다.
  • 이 정보를 다시 얻으려면 Dataset을 정의해야 한다.
  • DataFrame과 Dataset을 조인하는것에는 아무런 문제가 되지 않음.

11.6 그룹화와 집계

  • Dataset을 가지고도 groupBy, rollup, cube 메서드를 모두 사용할 수 있다.
  • Dataset 대신 DataFrame을 반환하므로 데이터 타입 정보를 잃게 됨.
1
2
3
4
5
flights.groupBy("DEST_COUNTRY_NAME").count()

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()

flights.groupByKey(x => x.DEST_COUNTRY_NAME).count().explain

스크린샷 2023-01-01 오후 5 30 09

  • Dataset의 키를 이용해 그룹화를 수행한 다음 결과를 키-값 형태로 전달해 원시 객체 형태로 그룹화된 데이터를 다룰수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
def grpSum(countryName:String, values: Iterator[Flight]) = {
  values.dropWhile(_.count < 5).map(x => (countryName, x))
}

flights.groupByKey(x => x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(5)

def grpSum2(f:Flight):Integer = {
  1
}

flights.groupByKey(x => x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)

스크린샷 2023-01-01 오후 5 31 11

1
2
3
4
5
6
7
// 새로운 처리방법을 생성해 그룹을 축소(reduce)하는 방법 정의 가능
def sum2(left:Flight, right:Flight) = {
  Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}

flights.groupByKey(x => x.DEST_COUNTRY_NAME).reduceGroups((l, r) => sum2(l, r))
  .take(5)
  • gorupByKey 메서드는 동일한 결과를 반환하지만, 데이터 스캔 직후에 집계를 수행하는 groupBy에 비해 더 비싼 처리를 한다.
1
flights.groupBy("DEST_COUNTRY_NAME").count().explain

스크린샷 2023-01-01 오후 5 33 12

  • 사용자가 정의한 인코딩으로 세밀한 처리가 필요한 경우에만 Dataset의 groupByKey 메서드를 사용하는것이 좋음.
  • Dataset은 빅데이터 처리 파이프라인의 처음과 끝 작업에서 주로 사용하는것이 좋다.

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 10. 스파크 SQL

[스파크 완벽 가이드] 12. RDD

Comments powered by Disqus.