Home [스파크 완벽 가이드] 3. 스파크 기능 둘러보기
Post
Cancel

[스파크 완벽 가이드] 3. 스파크 기능 둘러보기

[스파크 완벽 가이드] 3. 스파크 기능 둘러보기

3.1 스파크의 기능

스크린샷 2022-10-30 오후 5 01 09

3.2 Dataset : 타입 안정성을 제공하는 구조적 API

  • 자바와 스칼라의 정적 데이터 타입 코드를 지원하기 위해 고안된 구조적 API
  • 이는 타입 안정성을 지원하고, 동적 타입 언어인 파이썬과 R에서는 사용할수 없음.
1
2
3
4
5
6
7
8
9
10
11
12
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME : String, count: BigInt) {
   val flightsDF = spark.read
       .parquet("/data/flight-data/parquet/2010-summray.parquet/")

   val flights = flightsDF.as[Flight]

   flights
       .filter(flightRow => flightRow.0RIGIN_COUNTRY_NAME !=
  ''Canada'')
       .map(flightRow => flightRow)
       .take(5)
}

3.3 구조적 스트리밍

  • 구조적 스트리밍은 안정화된 스트림 처리용 고수준 API
  • 구조적 APi로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행할 수 있음.

스트리밍이 아닌 일반적인 read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val staticDataFrame = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .show(5)

스트리밍 방식으로 처리

  • read 대신 readStream을 사용한 것이 가장 큰 차이점.
  • 아래 예제와 같은 방식을 운영 환경에 적용하는것은 추천하지 않음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val streamingDataFrame = spark.readStream
    .schema(staticSchema)
    .option("maxFilesPerTrigger", 1)
    .format("csv")
    .option("header", "true")
    .load("/data/retail-data/by-day/*.csv")

//  총 판매금액 계산
val purchaseByCustomerPerHour = streamingDataFrame
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")


purchaseByCustomerPerHour.writeStream
    .format("memory") // memory = store in-memory table
    .queryName("customer_purchases") // the name of the in-memory table
    .outputMode("complete") // complete = all the counts should be in the table
    .start()
  • 스트림이 시작되면 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인할수 있음.

3.4 머신러닝과 고급 분석

  • 스파크는 내장된 MLlib를 사용해 대규모 머신러닝을 수행할 수 있음.
  • 대용량 데이터를 대상으로 preprocessing, munging(rawdata를 접근/분석이 쉽도록 가공하는 행위) , model training, prediction 등을 할수 있음.
  • classfication, regression, clustering, deep learning 등 정교한 API 제공

3.5 저수준 API

  • 스파크는 RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기능을 제공함.
  • 스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌음.
  • 파티션 제어 등 DataFrame보다 더 세밀한 제어가 가능함.
  • RDD는 toDF() 와 같은 API를 통해 손쉽게 DataFrame으롭 변환할 수 있음.
  • 기본적으로 사용이 권장되지는 않으나, 비정형 데이터나 정제되지 않은 rawdata를 처리해야 한다면 사용할 일이 있을수도 있음.
1
2
spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()

3.6 SparkR

  • R 언어를 사용하기 위한 기능을 제공함.

3.7 스파크 에코시스템과 패키지

  • 스파크의 최고 장점은 커뮤니티가 만들어낸 패키지 에코시스템과 다양한 기능.
  • 개발자 누구나 자신이 개발한 패키지를 공개할수 있는 spark-packages.org 저장소가 있음.

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 2. 스파크 간단히 살펴보기

[스파크 완벽 가이드] 4. 구조적 API 개요

Comments powered by Disqus.