[스파크 완벽 가이드] 16. 스파크 애플리케이션 개발하기
16.1 스파크 애플리케이션 작성하기
- 스파크 애플리케이션은 스파크 클러스터와 사용자 코드 2가지 조합으로 구성된다.
16.1.1 간단한 스칼라 기반 앱
- 스칼라는 스파크의 기본 언어이기 때문에 이를 개발하는 가장 적합한 방법이라 볼 수 있다.
- 다만 실무에서 airflow를 이용해 스케줄링이 가능한 파이프라인 구축시에는 python을 가장 많이 사용하는듯 하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| object DataFrameExample extends Serializable {
def main(args: Array[String]) = {
val spark = SparkSession
.builder()
.appName("Databricks Spark Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.getOrCreate()
import spark.implicits._
spark.udf.register("pointlessUDF", DFUtils.pointlessUDF(_:String):String)
val authors = Seq("bill,databricks", "matei,databricks")
val authorsDF = spark
.sparkContext
.parallelize(authors)
.toDF("raw")
.selectExpr("split(raw, ',') as values")
.selectExpr("pointlessUDF(values[0]) as name", "values[1] as company")
.show()
}
}
|
- 위와 같은 예제 코드가 있다고했을떄 이를 실행하기 위해 빌드를 우선해야 한다.
- JVM 언어를 빌드하기 위한 도구는 gradle, maven, sbt 등 다양한 방법이 있으나 여기서는 생략하도록 한다.
애플리케이션 실행하기
- 빌드된 jar파일을
spark-submit
에 파라미터로 지정하여 앱을 실행할 수 있다.
1
| $SPARK_HOME/bin/spark-submit --class com.databricks.example.DataFrameExample --master local target/scala-2.11/example_2.11-0.1-SNAPSHOT.jar "hello"
|
16.1.2 파이썬 애플리케이션 작성하기
- PySpark 애플리케이션을 작성하는 방법은 일반 파이썬 애플리케이션이나 패키지를 작성하는 방법과 거의 비슷하다.
- 파이썬에서는 빌드 개념이 없고, 일반 스크립트를 작성하는것에 지나지 않는다.
1
2
3
4
5
6
7
8
9
10
11
| from __future__ import print_function
if __name__ == '__main__':
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
print(spark.range(5000).where("id > 500").selectExpr("sum(id)").collect())
|
- 위 코드가 실행되면 애플리케이션에서 활용할 수 있는 SparkSession 객체가 생성된다.
- 권장 방법은 모든 파이썬 클래스에서 SparkSession 객체를 생성하는 것보다 런타임 환경에서는 변수를 생성해 파이썬 클래스에 전달하는 방식을 사용하는것이 더 좋다.
애플리케이션 실행하기
1
| $SPARK_HOME/bin/spark-submit -master local pyspark/template/amin.py
|
16.1.3 자바 애플리케이션 작성하기
16.2 스파크 애플리케이션 테스트
- 약간 지루하지만 아주 중요한 주제인 테스트 방법을 알아보겠습니다.
- 스파크 앱을 테스트하려면 작성시 몇가지 핵심 원칙과 구성 전략을 고려해야 한다.
16.2.1 전략적 원칙
- 데이터 파이프라인과 앱에 대한 테스트 코드 개발은 실제 애플리케이션 개발만큼이나 중요하다.
- 테스트 코드는 미래에 발생할 수 있는 데이터, 로직 그리고 결과 변환에 유연하게 대처할 수 있게 도와준다.
입력 데이터에 대한 유연성
- 데이터 파이프라인은 다양한 유형의 입력 데이터에 유연하게 대처할 수 있어야 한다.
- 비즈니스 요구사항이 변하면 데이터도 변한다.
- 스파크 앱과 파이프라인은 입력 데이터 중 일부가 변하더라도 유연하게 대처할 수 있어야 한다.
- 입력 데이터 변화로 발생할 수 있는 다양한 예외 상항을 테스트하는 코드를 작성해야 한다.
비즈니스 로직 변경에 대한 유연성
- 예상했던 원형 데이터의 형태가 실제 원형 데이터와 같은지 확인하고 싶은 경우, 원하는 결과를 얻을 수 있도록 실제와 유사한 데이터를 사용해 비즈니스 로직을 꼼꼼하게 테스트해야 한다.
- 이 유형의 테스트에서는 스파크의 고유 기능에 대한 단위 테스트를 작성하지 말고, 비즈니스 로직을 테스트해서 복잡한 비즈니스 파이프라인이 의도한 대로 동작하는지를 검증해야 한다.
결과의 유연성과 원자성
- 입력 데이터 및 비즈니스 로직의 테스트가 완료되었다면 결과가 의도한 대로 반환되느지 확인해야 한다.
- 데이터가 얼마나 자주 갱신되는지, 데이터가 완벽한지(늦게 들어온 데이터가 없는지), 마지막 순간에 데이터가 변경되지 않았는지 등을 이해할 수 있도록 만들어야 한다.
16.2.2 테스트 코드 작성시 고려사항
- 적절한 단위 테스트를 작성해 입력 데이터나 구조가 변경되어도 비즈니스 로직이 정상적으로 동작하는지 확인 해야 한다.
- 단위 테스트를 하면 스키마 변경되는 상황에 쉽게 대응할 수 있다.
SparkSession 관리하기
- 스파크 로컬 모드 덕분에 단위 테스트용 프레임워크로 비교적 쉽게 스파크 코드를 테스트할 수 있다.
- 단위 테스트에서는 의존성 주입 방식으로 SparkSession이 관리하도록 만들어주면 된다.
- 런타임에 적절히 SparkSession을 전달해주는 방식으로 코드 작성
테스트 코드용 스파크 API 선정하기
- SQL, DataFrame, Dataset 다양한 API 중 유지보수성, 테스트 용이성 측면에서 적절히 검토한 후 팀의 니즈에 맞게 선정하여야 한다.
- API 유형에 관계없이 함수의 입/출력(함수 시그니처)을 정의하면 추후 이런 변경에 유연하게 대응할 수 있다.
16.2.3 단위 테스트 프레임워크에 연결하기
- 테스트마다 SparkSession을 생성하고 제거하도록 설정하는 것이 좋다.
- JUnit의
@Before
, @After
애노테이션과 같은 것을 활용
16.2.4 데이터소스 연결하기
- 테스트 코드에서는 운영 환경의 데이터소스에 접속하지 않도록 해야 한다.
- 로컬 환경에 Docker를 이용해 별도 데이터 소스를 연결해서 처리할수 있도록 하거나 h2 in-memory db 같은것을 활용할 수 있음
16.3 개발 프로세스
- 스파크 애플리케이션의 개발에서도 기존에 삳용하던 개발 흐름과 유사하다.
- 대화형 노트북이나 유사한 환경에 치고화된 작업 공간을 마련한다.(Zeppline)
- 그리고 핵심 컴포넌트와 알고리즘을 만든 후에 영구적인 영역으로 코드를 옮긴다.
- 로컬 머신에서
spark-shell
과 같은 것을 사용할 수도 있다.
16.4 애플리케이션 시작하기
spark-submit
명령을 실행하고, 잡 제출시 클라이언트 모드 혹은 클러스터 모드를 선택해서 실행할 수 있다.
spark-submit
도움말
배포 환경별 설정
16.4.1 애플리케이션 시작 예제
1
2
3
4
5
6
7
8
9
10
11
| /bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master spark://207.184.161.138:7077
--executor-memory 206
--total-executor-cores 100
replace/with/path/to/examples.jar 1000
/bin/spark-submit
--master spark://207.184.161.138:7077
examples/src/main/python/pi.py
|
- master 옵션의 값을 local이나
local[*]
로 변경하면 애플리케이션 로컬 모드를 실행할 수 있다.
16.5 애플리케이션 환경 설정하기
애플리케이션 속성 런타임 환경 셔플 동작 방식 스파크 UI 압축과 직렬화 메모리 관리 처리 방식 네트워크 설정 스케줄링 동적할당 보안 암호화 스파크 SQL 스파크 스트리밍 SparkR
설정 방식
- 스파크 속성은 대부분 애플리케이션 파라미터를 제어하며, SparkConf 객체를 사용해 스파크 속성을 설정할 수 있다.
- 자바 시스템 Property
- 하드코딩된 환경 설정 파일
16.5.1 SparkConf
1
2
3
4
| import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[2]").setAppName("DefinitiveGuide")
.set("some.conf", "to.some.value")
|
- SparkConf 객체는 개별 스파크 애플리케이션에 대한 스파크 속성값을 구성하는 용도로 사용한다.
- 애플리케이션의 동작 방식과 클러스터 구성 방식을 제어한다.
- 명령행 파라미터를 통해 런타임에 구성할 수 있다.
1
| /bin/spark.submit --name "DefinitiveGuide" --master local[4]
|
시간 주기 형태 속성값(포맷)
25ms, 5s, 10m or 10min, 3h, 5d, 1y
16.5.2 애플리케이션 속성
16.5.3 런타임 속성
- 드물지만 애플리케이션의 런타임 환경을 설정해야 할수도 있다.
- 관련 속성을 사용해 드라이버와 익스큐터를 위한 추가 클래스트패스와 파이썬패스, 파이썬 워커 설정, 다양한 로그 관련 속성을 정의할 수 있다.
- 자세한 내용은 공식 문서 참고
16.5.4 실행 속성
- 실행 속성과 관련된 설정값은 실제 처리를 더욱 세밀하게 제어할 수 있기 때문에 자주 사용된다.
spark.executor.cores
(익스큐터의 코어수)spark.files.maxPartitionsBytes
(파일 읽기 시 파티션의 최대크기)
16.5.5 메로리 관리 설정
- 앱 최적화를 위해 메모리 옵션을 수동으로 관리해야 하는 경우도 있다.
- 대다수의 메모리 설정은 메모리 자동 관리 기능의 추가로 인해 스파크 2.x 버전에서는 제거된 예전 개념이기는 하다.
- 세밀한 제어를 위한 설정이기 때문에 실제로 사용자는 이를 신경쓰지 않아도 된다.
16.5.6 셔플 동작방식 설정
- 셔플 동작 방식을 제어하기 위한 고급 설정이 존재한다는 점까지만 참고하라.
16.5.7 환경변수
- 스파크가 설치된 디렉터리의
conf/spark-env.sh
파일에서 읽은 환경변수로 특정 스파크 설정을 구성할 수 있다.
- 위 목록 외에도 각 머신이 사용할 코어 수나 최대 메모리 크기 같은 스파크 스탠드얼론 클러스터 설정과 관련된 옵션 등이 있다.
16.5.8 애플리케이션에서 잡 스케줄링
- 별도의 스레드를 사용해 여러 잡을 동시에 실행할 수 있다.
- 스파크의 스케줄러는 스레드 안정성을 보장하고, 여러 요청을 동시에 처리할 수 있는 애플리케이션으로 만들어 줄수 있다.
- 여러 스파크 잡이 자원을 공평하게 나눠 쓰도록 구성할 수도 있다.
페어 스케줄러
- SparkContext 설정시
spark.scheduler.mode
속성을 FAIR
로 지정해서 사용할 수 있다. - 여러 개의 잡을 풀로 그룹화하는 방식도 지원한다.
- 개별 풀에 다른 스케줄링 옵션이나 가중치를 설정할 수 있다.
- 더 중요한 스파크 잡을 할당할 수 있도록 우선순위가 높은 풀을 만들수 있음.
- 하둡의 페어 스케줄러 모델을 본떠서 만들었음.
Reference
Comments powered by Disqus.