[스파크 완벽 가이드] 5. 구조적 API 기본 연산
5.1 스키마
- 스키마는 DataFrame의 컬럼명과 데이터 타입을 정의(데이터소스로부터 얻거나 직접 정의)
- 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체
- 이름, 데이터 타입, 컬럼의 nullable 여부를 가짐.
1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", LongType, false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema)
.load("/data/flight-data/json/2015-summary.json")
- 프로그래밍 언어의 데이터 타입을 스파크 데이터 타입으로 설정할 수 없다는점에 유의
5.2 컬럼과 표현식
- 사용자는 표현식으로 DataFrame의 컬럼을 선택, 조작, 제거할 수 있음.
- DataFrame을 통해 컬럼에 접근하여야 하고, 수정하려면 DataFrame의 트랜스포메이션을 사용해야 함.
5.2.1 컬럼
col()
,colum()
을 사용하는것이 가장 간단
1
2
3
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")
명시적 컬럼 참조
- DataFrame의 컬럼은 col 메서드로 참조하고, 이는 데이터 JOIN시 유용하게 사용할 수 있음.
5.2.2 표현식
- 표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션의 집합을 의미.
expr("someCol")
,col("someCol")
표현식으로 컬럼 표현
- 트랜스포메이션을 수행하려면 반드시 컬럼 참조를 사용해야 함.
1
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
DataFrame 컬럼에 접근하기
1
2
spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
.columns
5.3 레코드와 로우
- DataFrame의 각 로우는 하나의 레코드이며, 스파크에서는 Row 객체로 표현된다.
1
df.first()
5.3.1 로우 생성
- Row 객체는 스키마 정보를 가지고 있지 않음.
- Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 함.
1
2
3
4
5
6
7
8
9
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)
// 스칼라 버전
myRow(0) // type Any
myRow(0).asInstanceOf[String] // String
myRow.getString(0) // String
myRow.getInt(2) // Int
5.4 DataFrame의 트랜스포메이션
- 로우나 컬럼 추가/제거, 로우->컬럼 변환, 로우 순서 변경 등 처리
5.4.1 DataFrame 생성
- 원시 데이터소스에서 DataFrame을 생성할 수 있음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()
5.4.2 select
, selectExpr
- 데이터 테이블에 SQL을 사용한것처럼 DataFrame에서도 사용할 수 있음.
1
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
1
2
3
SELECT DEST_COUNTRY_NAME, 0RIGIN_COUNTRY_NAME
FROM dfTable
LIMIT 2
selectExpr
- 새로운 DataFrame을 생성하는 복잡한 표현식으로 간단하게 만들어주는 도구
- 모든 유효한 비집계형 SQL 구문을 지정할 수 있음.
1
2
3
4
df.selectExpr(
"*", // include all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
.show(2)
1
2
3
SELECT *, (DEST=COUNTRY NAME = 0RIGIN COUNTRY_NAME) as withinCountry
FROM dfTable
LIMIT2
5.4.3 스파크 데이터 타입으로 변환하기
- 때로는 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야 함.(ex. 상수)
- 리터럴을 사용할 수 있음.
1
2
import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)
1
2
3
SELECT *, 1 as One
FROM dfTable
LIMIT 2
5.4.4 컬럼 추가하기
- DataFrame의 withColumn 메서드롤 사용해서 신규 컬럼을 추가할 수 있음.
1
df.withColumn("numberOne", lit(1)).show(2)
1
2
3
SELECT *, 1 as numberOne
FROM dfTable
LIMIT 2
5.4.5 컬럼명 변경하기
withColumn
대신withColumnRenamed
메서드로 컬럼명을 변경할 수 있음.
1
2
3
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
// ... dest, ORIGIN-COUNTRY_NAME, count
5.4.6 예약 문자와 키워드
- 공백이나 하이픈(-) 같은 예약 문자는 컬럼명에 사용할 수 없음.
- 이를 사용하기 위해서는 백틱(`) 문자을 이용해 escaping해야 함.
1
2
3
4
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")
.show(2)
1
2
3
SELECT `This Long Column-Name`, `This Long CoIumn-Name` as `new-col`
FROM dfTableLong
LIMIT 2
- 표현식 대신 문자열을 사용해서 명시적으로 컬럼을 참조하면 리터럴로 해석되기 떄문에 예약 문자가 포함된 컬럼을 참조할 수 있음.
5.4.7 대소문자 구분
- 기본적으로 스파크에서는 대소문자를 구별하지 않으나, 설정을 통해 구분하게 할 수 있음.
1
set spark.sql.caseSensitive true
5.4.8 컬럼 제거하기
1
2
3
4
df.drop("ORIGIN_COUNTRY_NAME").columns
// 다수의 컬럼 한번에 제거
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")
5.4.9 컬럼의 데이터 타입 변경
1
df.withColumn("count2", col("count").cast("long"))
1
2
5ELECT *, cast(count as string) AS count2
FROM dfTable
5.4.10 로우 필터링하기
- true / false을 판별하는 표현식을 만들어 필터링을 할 수 있음.
where
또는filter
메소드롤 이용해서 필터링(같은 동작)
1
2
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
1
2
3
4
SELECT *
FROM dfTable
WHERE count < 2
LIMIT 2
여러 필터 적용
1
2
3
4
df
.where(col("count") < 2)
.where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
.show(2)
1
2
3
4
5
SELECT *
FROM dfTable
WHERE count < 2
AND 0RIGIN_COUNTRY_NAME != 'Croatia'
LIMIT 2
5.4.11 고유한 로우 얻기
1
2
3
4
df
.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")
.distinct()
.count()
1
2
SELECT COUNT(DISTINCT(0RIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME))
FROM dfTable
5.4.12 무작위 샘플 만들기
- sample 메서드를 이용해서 무작위 샘플 데이터를 얻을수 있음.
- 표본 데이터 추출 비율, 복원 추출 또는 비복원 추출의 사용 여부를 지정할 수 있음.
1
2
3
4
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
5.4.13 임의 분할하기
- 원본 DataFrame의 임의 크기로 분할할때 유용하다.
1
2
3
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // False
5.4.14 로우 합치기와 추가하기
- DataFrame은 불변성이기 때문에 레코드 추가하는 작업은 구조적으로 불가능.
- 레코드를 추가하려면 원본 DF에 새로운 DF를 통합해야 함.
- 반드시 동일한 스키마와 컬럼수를 가져야 함.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.sql.Row
val schema = df.schema
val newRows = Seq(
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)
.where("count = 1")
.where($"ORIGIN_COUNTRY_NAME" =!= "United States")
.show() // get all of them and we'll see our new rows at the end
- 컬럼 표현식과 문자열을 비교할떄
=!=
연산자를 사용하면 컬럼의 실제 값을 비교 대상 문자열과 비교함.=!=
,===
5.4.15 로우 정렬하기
sort
와orderBy
메소드를 사용해 최대값 혹은 최솟값이 상단에 위치하도록 정렬할 수 있음.
1
2
3
4
5
6
7
8
import org.apache.5park.Sql.functions.{desc, asc}
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
asc_nulls_first
,desc_null_first
와 같은 메소드를 사용하여 렬된 DataFrame에서 null 값 표시 기준을 지정할 수 있음.
5.4.16 로우 수 제한하기
- 추출할 로우수 제한을 하는 경우 사용
1
df.limit(5).show()
5.4.17 repartition와 coalesce
- 자주 필터링하는 컬럼을 기준으로 데디터를 분할하여 최적화할 수 있다.
- 파티셔닝 스키마와 파티션 수를 포함한 물리적인 데이터 구성 제어
- repartition을 하면 무조건 전체 데이터 셔플이 발생함. 파티션 수가 현재보다 많거나 컬럼을 기준으로 파티션을 만들어야 하는 경우에만 사용
1
2
3
4
5
6
7
8
9
df.rdd.getNumPartitions // 1
df.repartition(5)
// 특정 컬럼을 기준으로 자주 필터링한다면 이를 기준으로 파티션을 재분배하는것이 좋음.
df.repartition(col("DEST_COUNTRY_NAME"))
// 선택적으로 파티션 수를 지정할 수 있음.
df.repartition(5, col("DEST_COUNTRY_NAME"))
coalesec
- 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우에 사용
1
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
5.4.18 드라이버로 로우 데이터 수집하기
- 로컬환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야 함.
1
2
3
4
5
6
val collectDF = df.limit(10)
collectDF.take(5) // take works with an Integer count
collectDF.show() // this prints it out nicely
collectDF.show(5, false)
collectDF.collect()
전체 데이터셋에 대한 반복 처리를 위해 드라이버로 로우를 모으는 다른 방법
1
collectDF.toLocalIterator()
- 드라이버로 모든 데이터 컬렉션을 수집하는 경우 매우 큰 비용(CPU, 메모리, 네트워크 등)이 발생하므로 데이터와 필요에 따라 적절히 선택되어야 한다.
Comments powered by Disqus.