Home [스파크 완벽 가이드] 10. 스파크 SQL
Post
Cancel

[스파크 완벽 가이드] 10. 스파크 SQL

[스파크 완벽 가이드] 10. 스파크 SQL

10.1 SQL이란

  • SQL 또는 구조적 질의 언어(Structured Query Langauge)는 데이터에 대한 관계형 연산을 표현하기 위한 도메인 특화언어
  • 스파크에서는 NoSQL DB에서도 쉽게 사용할 수 있는 변형된 자체 SQL을 제공합니다.
  • 스파크에서는 ANSI SQL 2003의 일부도 구현함.

10.2 빅데이터와 SQL : 아파치 하이브

  • 스파크가 등장하기 전에는 Hive가 빅데이터 SQL 접근 계층에서 사실상 표준이었음.
  • 현재는 많은 사용자가 스파크 SQL을 사용함.

10.3 빅데이터와 SQL : 스파크 SQL

  • 스파크 2.0 버전에는 하이브를 지원할 수 있는 상위 호환 기능으로 ANSI-SQL과 HiveSL을 모두 지원하는 자체 개발된 SQL 파서가 포함되어 있음.
  • 기존 하이브 기반 파이프라인에 비해 엄청난 성능 개선(4.5~6배 CPU 성능 개선, 3~4배의 자원 예약 개선, 최대 5배의 지연 시간 감소)를 만들어 냈음.

10.3.1 스파크와 하이브의 관계

  • 스파크 SQL은 하이브 메타스토어를 사용하므로 하이브와 잘 연동할 수 있다.
    • 하이브 메타스토어는 여러 세션에서 사용할 테이블 정보를 보관.
    • 스파크 SQL은 메타스토어에 접속하여 조회할 파일 수를 최소화 하기 위해 참조함.

하이브 메타스토어

  • 스파크에서 spark.sql.hive.metastore.version 설정을 통해 접근 가능.
  • HiveMetastoreClient 초기화 방식 변경을 위해서는 spark.sql.hive.metastore.jars을 변경해서 처리 가능

10.4 스파크 SQL 쿼리 실행 방법

10.4.1 스파크 SQL CLI

  • 로컬 환경의 CLI에서 스파크 SQL 쿼리를 해볼수 있는 편리한 도구
1
./bin/spark-sql

10.4.2 스파크의 프로그래밍 SQL 인터페이스

  • 서버를 설정해 SQl을 사용할수 있지만, 스파크에서 지원하는 언어 APi로 비정형 SQL을 실행할 수 있음.
  • SparkSession 객체의 sql 메서드를 사용.
1
2
3
4
5
6
7
8
spark.sql("SELECT 1 + 1").show()

spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""")
  .where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")
  .count() // SQL => DF
  • 쿼리 명령을 수행하면 프로그래밍 방식으로 평가할 수 있는 DataFrame을 반환함.
  • 스파크SQL도 마찬가지로 즉시 실행되는것이 아니라 지연처리됨.
  • SQL과 DataFrame은 완벽히 연동 가능함.
    • DataFrame을 생성하고 SQl을 사용해 처리하고 그 결과를 다시 DataFrame으로 받을수 있음.

10.4.3 스파크 SQL, 쓰리프트 JDBC/ODBC 서버

  • 스파크는 자바 DB 연결(JDBC) 인터페이스를 제공함.
  • 비즈니스 분석가가 태블로 같은 BI 소프트웨어를 이용해 스파크에 접속하는 혀애가 대표적인 케이스
  • 쓰리프트 JDBC/oDBC 서버는 하이브 1.2.1 버전의 HiveServer2에 맞추어 구현되어있음.

10.5 카탈로그

  • 스파크 SQL에서 가장 높은 추상화 단계는 카탈로그입니다.
  • 테이블에 저장된 데이터의 메타데이터 뿐만 아니라 DB, Table, Function, View에 대한 정보를 추상화
  • org.apache.spark.sql.catelog.Catalog 패키지로 사용

10.6 테이블

  • 스파크 SQL을 사용해 작업을 수행하려면 먼저 테이블을 정의해야함.
  • DataFrame과 논리적으로 동일함.
  • 조인, 필터링, 집계 등 여러 데이터 변환 작업을 수행할 수 있다.
  • DatFrame은 프로그래밍 언어 레벨에서 정의할수 있으나, 테이블은 데이터베이스상에 정의해야 함.

10.6.1 스파크 관리형 테이블

  • 관리형 테이블과 외부 테이블 개념을 기억해야 함.

외부 테이블

  • 디스크에 저장된 파일을 용해서 테이블을 정의하는 경우 해당됨.

관리형 테이블

  • DataFrame의 saveAsTable 메서드를 이용해서 스파크가 관련된 모든 정보를 추적할 수 있는 관리형 테이블을 만들수 있음.

10.6.2 테이블 생성하기

  • 다양한 데이터 소스를 사용해 테이블을 생성할 수 있음.
1
2
3
CREATE TABLE flights (
  DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path '/data/flight-data/json/2015-summary.json')

USINGSTORED AS 구문

  • USING 구문은 매우 중요함. 포맷을 지정하지 않으면 기본적으로 하이브 SerDe설정을 사용한다.
    • 하이브의 SerDe는 스파크의 자체 직렬화보다 훨씬 느리므로 테이블을 사용하는 Reader, Writer성능에 영향을 미칠 수 있음.
  • 하이브 사용자는 STORED AS 구문을 통해 하이브 테이블을 생성할 수 있음.

COMMENT

1
2
3
4
5
CREATE TABLE flights_csv (
  DEST_COUNTRY_NAME STRING,
  ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
  count LONG)
USING csv OPTIONS (header true, path '/data/flight-data/csv/2015-summary.csv')

CREATE TABLE FROM SELECT

1
CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights

테이블이 없는 경우에만 생성

1
CREATE TABLE IF NOT EXISTS flights_from_select AS SELECT * FROM flights

파티셔닝된 데이터셋을 저장해 데이터 레이아웃을 제어

1
2
CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 5

10.6.3 외부 테이블 생성하기

  • 스파크 SQL은 초기 빅데이터 SQL시스템 중 하나인 하이브 SQl과 완벽하게 호환됨.
  • 기존 하이브 쿼리문을 스파크 SQL로 변환해야 하는 경우 대부분 문제없이 바로 사용할수 있음.
1
2
3
CREATE EXTERNAL TABLE hive_flights (
  DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/flight-data-hive/'

SELECT로부터 생성

1
2
3
CREATE EXTERNAL TABLE hive_flights_2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights

10.6.4 테이블에 데이터 삽입하기

  • 표준 SQL 문법을 따름.
1
2
3
4
INSERT INTO flights_from_select
    SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count
    FROM flights
    LIMIT 20

특정 파티션에만 저장하고 싶은 경우

  • 파티션 명세 추가하여 가능.
  • 쓰기 연산은 파티셔닝 스키마에 맞게 데이터를 저장
1
2
3
4
INSERT INTO partitioned_flights
  PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
  SELECT count, ORIGIN_COUNTRY_NAME FROM flights
  WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12

10.6.5 테이블 메타데이터 확인

1
2
3
4
DESCRIBE TABLE flights_csv

// 파티셔닝 스키마 정보 확인
SHOW PARTITIONS partitioned_flights

10.6.6 테이블 메타데이터 갱신하기

  • 테이블 메타데이터를 유지하는 것은 가장 최신의 데이터셋을 읽고 있다는 것을 보장할 수 있는 중요한 작업.
  • 테이블 메타데이터를 갱신할 수 있는 2가지 명령이 있다.
1
2
3
4
5
-- 테이블과 관련된 모든 캐싱된 항목(기본적으로 파일) 갱신
REFRESH table partitioned_flights

-- 카탈로그에서 관리하는 테이블의 파티션 정보를 새로고침.
MSCK REPAIR TABLE partitioned_flights

10.6.7 테이블 제거하기

  • 테이블은 삭제(DELETE)할 수 없음.
  • 오로지 제거(DROP)맘ㄴ 가능.
  • 관리형 테이블의 경우 제거하면 데이터와 테이블 정의 모두 제거 됨.
1
2
3
4
DROP TABLE flights_csv;

-- 존재하는 경우에만 제거
DROP TABLE IF EXISTS flights_csv;

외부 테이블 제거

  • 외부 테이블을 제거하면 데이터는 삭제되지 않지만, 외부 테이블명을 통해 데이터를 조회할 수 없음.

10.6.8 테이블 캐싱하기

  • DataFrame에서처럼 테이블을 캐시하거나 캐시에서 제거할 수 있다.
1
2
3
4
5
-- 캐시
CACHE TABLE flights

-- 캐시 제거
UNCACHE TABLE FLIGHTS

10.7 뷰

  • 뷰는 기준 테이블에서 여러 트랜스포메이션 작업을 지정하여 정의
  • 기본적으로 뷰는 단순 쿼리 실행 계획일 뿐이다.
  • 뷰를 사용하면 쿼리 로직을 체계화하거나 재사용하기 편하게 만들수 있다.

10.7.1 뷰 생성하기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 테이블처럼 데이터베이스에 등록한 뷰 생성
CREATE VIEW just_usa_view AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

-- 데이터베이스에 등록되지 않고, 현재 세션에서만 사용할 수 있는 임시 뷰 생성
CREATE TEMP VIEW just_usa_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

--  전역적 임시 뷰(데이터베이스에 관계 없이 상관없이 사용 가능, 전체 스파크 애플리케이션에서 볼수 있으나, 세션이 종료되면 사라짐)
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

-- 생성된 뷰를 덮어쓸수 있음
CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
  SELECT * FROM flights WHERE dest_country_name = 'United States'

  • 뷰는 실질적으로 트랜스포메이션이기때문에 쿼리가 실행될 때만 뷰에 정의된 트랜스포메이션이 수행됨.
    • 테이블의 데이터를 실제로 조회하는 경우에만 필터를 적용.
    • 새로운 DataFrame을 만드는것과 동일
1
2
3
4
val flights = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")
val just_usa_df = flights.where("dest_country_name = 'United States'")
just_usa_df.selectExpr("*").explain
  • 내부적으로는 동일한 매커니즘으로 동작하므로 DataFrame이나 SQL 중 편한 방법을 선택해서 사용하면 됨.

###10.7.2 뷰 제거하기

  • 테이블을 제거하는 것과 동일한 방식으로 뷰 제거 가능.
1
DROP VIEW IF EXISTS just_usa_view;

10.8 데이터 베이스

  • 데이터베이스는 여러 테이블을 조직화하기 위한 도구이다.
  • 스파크에서 실행하는 모든 SQL 명령은 사용중인 데이터베이스 범위 내에서 실행.
1
2
-- 전체 데이터베이스 목록 조회
SHOW DATABASES

10.8.1 데이터베이스 생성하기

1
CREATE DATABASE some_db

10.8.2 데이터베이스 설정하기

1
2
3
4
5
-- 현재 사용하는 데이터베이스 변경
USE some_db

--  현재 사용중인 데이터베이스 확인
SELECT current_database()

10.8.3 데이터베이스 제거하기

1
DROP DATABASE IF EXISTS some_db;

10.9 select 구문

  • 스파크 쿼리는 ANSI SQL 요건을 충족한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SELECT [ALL|DISTINCT] named_expression[, named_expression, ...]
    FROM relation[, relation, ...]
    [lateral_view[, lateral_view, ...]]
    [WHERE boolean_expression]
    [aggregation [HAVING boolean_expression]]
    [ORDER BY sort_expressions]
    [CLUSTER BY expressions]
    [DISTRIBUTE BY expressions]
    [SORT BY sort_expressions]
    [WINDOW named_window[, WINDOW named_window, ...]]
    [LIMIT num_rows]

named_expression:
    : expression [AS alias]

relation:
    | join_relation
    | (table_name|query|relation) [sample] [AS alias]
    : VALUES (expressions)[, (expressions), ...]
          [AS (column_name[, column_name, ...])]

expressions:
    : expression[, expression, ...]

sort_expressions:
    : expression [ASC|DESC][, expression [ASC|DESC], ...]

10.9.1 case..when..then 구문

  • SQL 쿼리으 값을 조건에 맞게 변경해야 하는 경우 위 구문을 사용해 조건에 맞는 처리를 할수 있다.
1
2
3
4
5
SELECT
  CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
       WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
       ELSE -1 END
FROM partitioned_flights

10.10 고급 주제

  • SQL 구문은 조작, 정의, 제어와 관련된 명령을 정의할 수 있다.

10.10.1 복합 데이터 타입

  • 복합 데이터 타입은 표준 SQL과는 거리가 있는 스파크의 매우 강력한 기능이다.
  • 이를 SQL에서 어떻게 적절하게 처리하는지 이해해야 한다.
  • 스파크 SQL에는 구조체, 리스트, 맵 3가지 핵심 복합 데이터 타입이 존재함

구조체

  • 구조체는 맵에 더 가까우며 스파크에서 중첩 데이터를 생성하거나 쿼리하는 방법을 제공함.
  • 여러 컬럼이나 표현식으로 괄호로 묶기만 하면 된다.
1
2
3
4
5
6
7
8
CREATE VIEW IF NOT EXISTS nested_data AS
  SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights

SELECT * FROM nested_data

SELECT country.DEST_COUNTRY_NAME, count FROM nested_data

SELECT country.*, count FROM nested_data
  • 구조체의 이름과 모든 하위 컬럼을 지정해 모든 값을 조회할 수 있다.

리스트

  • 일반적인 프로그래밍 언어의 리스트와 유사함.
  • 값의 배열이나 리스트는 여러가지 방법으로 생성 가능.
1
2
3
4
5
6
7
8
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
  collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME

SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights

SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME
  • explode 함수를 사용해 배열을 다시 여러 로우로 변환할 수 있다.
1
2
3
4
5
6
CREATE OR REPLACE TEMP VIEW flights_agg AS
  SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
  FROM flights GROUP BY DEST_COUNTRY_NAME


SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg

10.10.2 함수

  • 스파크 SQL은 복합 데이터 타입 외에도 다양한 고급 함수를 제공한다.
  • DataFrame 함수 문서에서 모든 함수를 찾아볼수 있다.
  • SQL로도 제공하는 전체 함수 목록 확인 가능.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 전체
SHOW FUNCTIONS

-- 내장 시스템 함수
SHOW SYSTEM FUNCTIONS

-- 누군가가 스파크 환경에 공개한 함수
SHOW USER FUNCTIONS

-- s로 시작하는 모든 함수
SHOW FUNCTIONS "s*";

-- LIKE 키워드를 사용해 함수 검색
SHOW FUNCTIONS LIKE "collect*";

사용자 정의 함수

  • 스파크는 사용자 정의 함수를 정의하고 분산 환경에서 사용할수 있는 기능을 제공함.
  • 특정 언어를 사용해 함수를 개발하고 등록하여 사용할 수 있다.
1
2
def power3(number:Double):Double = number * number * number
spark.udf.register("power3", power3(_:Double):Double)

10.10.3 서브쿼리

  • 서브쿼리르 사용하면 쿼리 안에 쿼리를 지정할 수 있다.
  • SQL에서 정교하 로직을 명시.
  • 상호연관 서브쿼리(correlated subquery) : 서브쿼리의 정보를 보완하기 위해 쿼리의 외부 범위에 있는 일부 정보를 사용
  • 비상호연관 서브쿼리(uncorrelated subquery) : 외부 범위에 있는 정보를 사용하지 않는 서브쿼리
  • 조건절 서브쿼리(predicate subquery) : 값에 따라 필터링할 수 있는 서브 쿼리

비상호연관 서브쿼리(uncorrelated subquery)

스크린샷 2023-01-01 오후 3 53 44

1
2
3
4
5
6
7
8
9
10
11
SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5

SELECT * FROM flights
WHERE origin_country_name IN (
     SELECT dest_country_name
     FROM flights
     GROUP BY dest_country_name
     ORDER BY sum(count) DESC
     LIMIT 5
)

상호연관 서브쿼리(correlated subquery)

1
2
3
4
5
6
7
8
9
10
11
SELECT * FROM flights f1
WHERE EXISTS (
     SELECT 1
     FROM flights f2
     WHERE f1.dest_country_name = f2.origin_country_name
)
AND EXISTS (
     SELECT 1
     FROM flights f2
     WHERE f2.dest_country_name = f1.origin_country_name
)

비상호연관 스칼라 쿼리

  • 기존에 없던 일부 부가 정보를 가져올 수 있다.
1
2
SELECT *, (SELECT max(count) FROM flights) AS maximum
FROM flights

10.11 다양한 기능

  • SQL 코드의 성능 최적화나 디버깅이 필요한 경우 관련될 수 있는 부분들

10.11.1 설정

  • 애플리케이션과 관련된 몇가지 환경 설정값들이 있다.
  • 셔플 파티션 수를 조정하는 것처럼 애플리케이션 초기화 시점 혹은 애플리케이션 실행 시점에 설정 가능.

스크린샷 2023-01-01 오후 3 57 02

스크린샷 2023-01-01 오후 3 57 14

10.11.2 SQL에 설정값 지정하기

  • SQL을 사용해 환경을 설정하는 경우 스파크 SQL과 관련된 설정만 가능하다는 것을 참고해야 한다.
1
2
-- 셔플 파티션 수를 지정하는 방법
SET spark.sql.shuffle.partitions=20

Reference

This post is licensed under CC BY 4.0 by the author.

[스파크 완벽 가이드] 9. 데이터소스

[스파크 완벽 가이드] 11. Dataset

Comments powered by Disqus.