Home [스파크 완벽 가이드] 8. 조인
Post
Cancel

[스파크 완벽 가이드] 8. 조인

[스파크 완벽 가이드] 8. 조인

8.1 조인 표현식

  • 스파크는 왼쪽과 오른쪽 데이터셋에 있는 하나 이상의 키값을 비교하고 왼쪽 데이터셋과 오른쪽 데이터셋의 결합 여부를 결정하는 조인 표현식의 평가 결과에 따라 2개의 데이터셋을 조인합니다.
  • 일반적인 RDB의 JOIN과 유사한 방식

8.2 조인 타입

  • 내부 조인(inner join) : 왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 외부 조인(outer join) : 왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 외부 조인(left outer join) : 왼쪽 데이터셋에 키가 있는 로우를 유지
  • 오른쪽 외부 조인(right outer join) : 오른쪽 데이터셋에 키가 있는 로우를 유지
  • 왼쪽 세미 조인(left semi join) : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우 키가 일치하는 왼쪽 데이터셋만 유지
  • 왼쪽 안티 조인(left anti join) : 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않은 왼쪽 데이터셋만 유지
  • 자연 조인(natural join) : 두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적(implicit)으로 결합하는 조인을 수행
  • 교차 조인(cross join) 또는 카테시안 조인(cartesian join) : 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우 조합을 표시

base data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val person = Seq(
    (0, "Bill Chambers", 0, Seq(100)),
    (1, "Matei Zaharia", 1, Seq(500, 250, 100)),
    (2, "Michael Armbrust", 1, Seq(250, 100)))
  .toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley"))
  .toDF("id", "degree", "department", "school")

val sparkStatus = Seq(
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor"))
  .toDF("id", "status")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

8.3 내부 조인(INNER JOIN)

  • 테이블에 존재하는 키를 평가하고, 결과가 참(true)인 로우만 결합
1
2
3
4
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
val wrongJoinExpression = person.col("name") === graduateProgram.col("school")

person.join(graduateProgram, joinExpression).show()
1
2
3
SELECT *
FROM person
JOIN graduateProgram ON person.graduate_program = graduateProgram.id

스크린샷 2022-11-20 오후 4 49 12

joinType 지정

1
2
3
4
var joinType = "inner"

// 3번쨰 파라미터로 joinType을 명시해줄수 있다.
person.join(graduateProgram, joinExpression, joinType).show()

8.4 외부 조인(FULL OUTER JOIN)

  • Outer Join은 DataFrame이나 테이블에 존재하는 키를 평가하여 참이나 거짓으로 평가한 로우를 조인하고, 일치하는 로우가 없다면 해당 위치를 null로 채워주는 조인 방식
1
2
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()
1
2
3
SELECT *
FROM person
FULL OUTER JOIN graduateProgram ON graduate_program = graduateProgram.id

스크린샷 2022-11-20 오후 4 51 38

8.5 왼쪽 외부 조인(LEFT OUTER JOIN)

  • 왼쪽 DataFrame의 모든 로우를 표시하고, 이와 일치하는 오른쪽 DataFrame 로우를 함꼐 표시해주는 조인 방식
    • 오른쪽 DataFrame에 일치하는 로우가 없다면 해당 위치는 null로 채워짐
1
2
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()
1
2
3
SELECT *
FROM graduateProgram
LEFT OUTER JOIN person ON person.graduate_program = graduateProgram.id

스크린샷 2022-11-20 오후 4 53 29

8.6 오른쪽 외부 조인(RIGHT OUTER JOIN)

  • LEFT OUTER JOIN과 드라이빙 테이블이 왼쪽이냐 오른쪽이냐만 차이가 있고 동일.

스크린샷 2022-11-20 오후 4 54 40

8.7 왼쪽 세미 조인

  • 왼쪾 DataFrame의 어떤 값도 포함하지 않기 떄문에 조금 다르지만, 오른쪽 DataFrame의 존재여부에 따라 결과가 달라질수 있는 조인 타입
  • 결과 데이터에는 왼쪽 DataFrame만 표시됨.
1
2
3
4
5
6
7
8
9
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

val gradProgram2 = graduateProgram.union(Seq(
    (0, "Masters", "Duplicated Row", "Duplicated School")).toDF())

gradProgram2.createOrReplaceTempView("gradProgram2")

gradProgram2.join(person, joinExpression, joinType).show()
1
2
3
SELECT *
FROM gradProgram2
LEFT SEMI JOIN person ON gradProgram2.id = person.graduate_program

스크린샷 2022-11-20 오후 5 06 28

8.8 왼쪽 안티 조인(LEFT ANTI JOIN)

  • 세미 조인의 반대 개념
  • 오른쪽 DataFrame의 어떤 값과도 일치되지 않은 왼쪽 DataFrame만 표시
1
2
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()
1
2
3
SELECT *
FROM graduateProgram
LEFT ANTI JOIN person ON graduateProgram.id = person.graduate_program

스크린샷 2022-11-20 오후 5 11 11

8.9 자연 조인(NATURAL JOIN)

  • 조인하려는 컬럼을 암시적으로 추정하여 JOIN하는 방식
  • 별도로 조인 키를 지정해주지 않고 동일한 컬럼명으로 참조되기 때문에 의도치 않은 결과를 반환할수 있어서 사용시 주의 필요.
1
2
3
SELECT *
FROM graduateProgram
NATURAL JOIN person

8.10 교차 조인(CROSS JOIN) 또는 카테시안 조인(CARTESIAN JOIN)

  • 교차 조인은 조건절을 기술하지 않은 내부 조인
  • DataFrame의 모든 로우를 오른쪽 모든 로우와 결합
  • 교차 조인을 하게 되면 엄청나게 많은 수의 로우가 생성될 수 있음.
    • 1,000 X 1,000의 교차 조인 결과는 1,000,000개의 로우가 생성됨.
1
2
3
4
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

person.crossJoin(graduateProgram).show()
1
2
3
SELECT *
FROM graduateProgram
CROSS JOIN person ON graduateProgram.id = person.graduate_program

스크린샷 2022-11-20 오후 5 15 13

8.11 조인 사용시 문제점

8.11.1 복합 데이터 타입의 조인

  • 복합 데이터 타입의 조인이 어려워보일수 있지만, 실제로 블리언을 반환하는 모든 표현식을 이용해서 조인을 할수 있다.
1
2
3
4
import org.apache.spark.sql.functions.expr

person.withColumnRenamed("id", "personId")
  .join(sparkStatus, expr("array_contains(spark_status, id)")).show()
1
2
3
4
5
6
SELECT *
FROM (
    SELECT id as personId, name, graduate_program, spark_status
    FROM person
)
INNER JOIN sparkStatus ON array_contains(spark_status, id)

스크린샷 2022-11-20 오후 5 17 13

8.11.2 중복 컬럼명 처리

  • JOIN을 수행할때 가장 까다로운 것중 하나는 결과 DataFrame에서 중복된 컬럼명을 다루는 것이다.
  • 각 컬럼은 스파크 SQL엔진인 카탈리스트 내에 고유 ID를 가지고 있으나, 이는 카탈리스트 내부에서만 관리되고, 직접 참조할 수 없다.
    • 따라서 중복된 컬럼명이 존재하는 DataFrame을 사용할 떄는 특정 컬럼을 참조할 수 없어서 별도 처리가 필요하다.

스크린샷 2022-11-20 오후 5 22 02

해결방법1 : 다른 조인 표현식 사용

  • 불리언 형태의 조인 표현식을 문자열이나 시퀀스 형태로 변경한다.
  • 조인할때 두 컬럼 중 하나가 자동으로 제거 됨.
1
2
3
val gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")

person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()

해결방법2 : 조인 후 컬럼 제거

1
2
3
4
5
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program"))
  .select("graduate_program").show()

val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).drop(graduateProgram.col("id")).show()

해결 방법3 : 조인 전 컬럼명 변경(가장 좋은 방법이라고 생각됨)

1
2
3
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
val joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpr).show()

8.12 스파크의 조인 수행 방식

  • 스파크의 조인 수행 방식을 이해하기 위해서는 두 가지 핵심 전략을 이해해야 한다.

핵심 전략

  • 노드간 네트워크 통신 전략
  • 노드별 연산 전략

8.12.1 네트워크 통신 전략

  • 스파크는 조인시 2가지 클러스터 통신 방식을 활용한다.
    • 셔플 조인(shuffle join), 브로드캐스트 조인(broadcast join)

큰 테이블과 큰 테이블 조인

  • 하나의 큰 테이블을 다른 큰 테이블과 조인하면 아래와 같은 셔플 조인이 발생합니다.

스크린샷 2022-11-20 오후 5 27 50

  • 셔플 조인은 전체 노드 간 통신이 발생하는데, 사용된 특정 키나 키 집합이 어떤 노드에 있느냐에 따라 해당 노드와 데이터를 공유해야 합니다.
  • 이런 방식 떄문에 네트워크가 복잡해지고, 많은 자원을 사용해야 합니다.
  • 이런 데이터가 자주 사용된다면 네트워크 비용을 줄이기 위해 1차적으로 비정규화된 데이터셋을 하나 만들어서 프로세싱하는것도 방법이 될수 있다.

큰 테이블과 작은 테이블 조인

  • 작은 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작다면 조인 연산을 최적화 할수 있습니다.
  • 이 경우 작은 DataFrame을 클러스터 전체 워커노드에 복제하는 브로드캐스트 조인을 수행한다면 효율적으로 처리가 간으합니다.

스크린샷 2022-11-20 오후 5 31 04

  • 최초 전체 워커노드에 데이터를 복제하는 과정에서는 I/O가 발생하겠지만 그이후로는 추가적인 네트워크 비용이 들기 않기 때문에 작업 전체적인 리소스 효율을 증대시킬수 있음.
broadcast join 수행하지 않는 경우
1
2
3
val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).explain()

스크린샷 2022-11-20 오후 5 32 39

broadcast join hint
  • 아래와 같이 힌트를 주어 broadcast join을 수행할 수 있습니다.
    • 다만, 강제성이 있는것은 아니라서 옵티마이저 판단에 의해 무시될 수 있습니다.
1
2
3
4
import org.apache.spark.sql.functions.broadcast

val joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr).explain()

아주 작은 테이블 사이의 조인

  • 이 경우는 스파크 옵티마이저가 조인 방식을 결정하도록 내버려 두는게 가장 좋은 방법

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 7. 집계 연산

[스파크 완벽 가이드] 9. 데이터소스

Comments powered by Disqus.