Download presentation
Presentation is loading. Please wait.
1
Parallel Programming with Spark
빅데이터 분산컴퓨팅 박 영 택
2
Parallel Programming with Spark
How RDDs are distributed across a cluster How Spark executes RDD operations in parallel rdd가 어떻게 클러스터에 분산되는가, 그리고 스파크에서 rdd가 어떻게 병렬처리 되는 것 인가에 대해서 살펴보겠습니다.
3
Chapter Topics RDD Partitions Partitioning of FileTbased RDDs Executing Parallel Operations Stages and Tasks Conclusion HandsTOn Exercise: Viewing Stages and Tasks in the Spark Application UI
4
Spark Cluster Review Executor Task Worker (Slave) Nodes
Cluster Master Node HDFS Master Node Executor Task 전에 설명 했다시피 스파크의 클러스터 구조는 마스터 노드, 워커 노드, hdfs마스터노드 형식으로 구성되어 있습니다. Executor
5
RDDs on a Cluster Resilient Distributed Datasets
Data is partitioned across worker nodes Partitioning is done automatically by Spark Optionally, you can control how many partitions are created RDD 1 Executor rdd_1_0 Executor rdd_1_1 Executor rdd_1_2 RDD는 분산되어 각 워커노드에 배정됩니다. Excutor에 task가 할당되면 결과를 driver로 전송하게 되고 메모리에 저장된 RDD를 제공합니다.
6
Chapter Topics RDD Partitions Partitioning of File based RDDs Executing Parallel Operations Stages and Tasks Conclusion HandsTOn Exercise: Viewing Stages and Tasks in the Spark Application UI
7
File Partitioning: Single Files
Partitions from single files – Partitions based on size sc.textFile("myfile",3) RDD You can optionally specify a minimum number of partitions textFile(file, minPartitions) More partitions = more parallelization Executor myfile Executor 싱글 파일을 파티션으로 나눌 때 아규먼트에 파티션 개수를 나눠줄 수 있습니다. Executor
8
Example: Count JPGs Requests per File
Note: Works with s mall files that each fit in a single partition Count (0,17) (1,13) (2,20) …
9
Chapter Topics RDD Partitions Partitioning of File based RDDs Executing Parallel Operations Stages and Tasks Conclusion HandsTOn Exercise: Viewing Stages and Tasks in the Spark Application UI
10
Parallel Operations on Partitions
RDD operations are executed in parallel on each partition When possible, tasks execute on the worker nodes where the data is in memory Some operations preserve partitioning e.g., map, flatMap, filter Some operations repartition e.g., reduce, sort, group RDD작업은 각 파티션에서 병렬로 실행됩니다. 일부 작업은 파티션을 보존하고 일부 작업은 리파티셔닝을 합니다. 파티션을 보존하는 function은 map, flatmap,filter 등 셔플이 일어나지 않는 작업이며 리파티셔닝을 하는 function으로는 reduce, sort, group 등 셔플이 일어나는 작업입니다.
11
Example: Average Word Length by Letter (1)
> avglens = sc.textFile(file) RDD HDFS: mydata
12
Example: Average Word Length by Letter (2)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) RDD RDD HDFS: mydata
13
Example: Average Word Length by Letter (3)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) RDD RDD RDD HDFS: mydata
14
Example: Average Word Length by Letter (4)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) \ .groupByKey() RDD RDD RDD RDD HDFS: mydata
15
Example: Average Word Length by Letter (5)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) \ .groupByKey() \ .map(lambda (k, values): \ (k, sum(values)/len(values))) RDD RDD RDD RDD RDD HDFS: mydata
16
Chapter Topics RDD Partitions Partitioning of File based RDDs Executing Parallel Operations Stages and Tasks Conclusion HandsTOn Exercise: Viewing Stages and Tasks in the Spark Application UI
17
Spark Execution: Stages (1)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) \ .groupByKey() \ .map(lambda (k, values): \ (k, sum(values)/len(values))) > avglens.count() Stage 1 Stage 2 RDD RDD RDD RDD RDD 리파티셔닝이 일어나게되면 스테이지가 바뀌게 되는데
18
Spark Execution: Stages (2)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) \ .groupByKey() \ .map(lambda (k, values): \ (k, sum(values)/len(values))) > avglens.count() Stage 1 Stage 2 Task 1 Task 4 Task 2 Task 5 Task 3
19
Spark Execution: Stages (3)
> avglens = sc.textFile(file) \ .flatMap(lambda line: line.split()) \ .map(lambda word: (word[0],len(word))) \ .groupByKey() \ .map(lambda (k, values): \ (k, sum(values)/len(values))) > avglens.count() Stage 1 Stage 2 Task 1 Task 4 이 형태로 스테이지 별로 분리됩니다. Task 2 Task 5 Task 3
20
Summary of Spark Terminology
Job – a set of tasks executed as a result of an action Stage – a set of tasks in a job that can be executed in parallel Task – an individual unit of work sent to one executor Job Task Stage RDD RDD RDD RDD RDD Job은 결과로서 실행될 태스크의 집합을 말하는 것이고 Stage는 간단히 말해서 병렬로 실행될 수 있는 작업의 태스크 집합이라고 보시면 됩니다. Task 는 RDD 파티션마다 한 세트입니다. Stage
21
Spark Execution: Task Scheduling (1)
Stage 1 Stage 2 Task 1 Task 4 Task 2 Task 5 Executor HDFS Block 1 Task 3 Executor HDFS Block 2 Master Node Spark Master Executor HDFS Block 3
22
Spark Execution: Task Scheduling (2)
Stage 1 Stage 2 Task 4 Task 5 Executor Task 1 HDFS Block 1 Executor HDFS Block 2 Master Node Task 2 Spark Master Executor HDFS Block 3 Task 3
23
Spark Execution: Task Scheduling (3)
Stage 2 Task 4 Task 5 Executor HDFS Block 1 Executor HDFS Block 2 Master Node Spark Master Executor HDFS Block 3
24
Spark Execution: Task Scheduling (3)
Stage 2 Executor HDFS Block 1 Task 4 Executor HDFS Block 2 Master Node Spark Master Executor Task 5 HDFS Block 3
25
Caching and Persistence
빅데이터 분산컴퓨팅 박 영 택
26
Caching and Persistence
How Spark uses an RDD’s lineage in operations How to persist RDDs to improve performance 스파크가 RDD’S 리니지를 작업에 어떻게 사용하나 성능을 향상시키기 위해 RDDs을 유지하는 방법
27
Chapter Topics RDD Lineage Caching Overview Distributed Persistence
28
Each transformation operation creates a new child RDD
Lineage Example (1) Each transformation operation creates a new child RDD File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one.
29
Each transformation operation creates a new child RDD
Lineage Example (2) Each transformation operation creates a new child RDD File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. MappedRDD[1] (mydata) > mydata = sc.textFile("purplecow.txt")
30
.filter(lambda s:s.startswith('I'))
Lineage Example (3) Each transformation operation creates a new child RDD File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. MappedRDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd = mydata.map(lambda s: s.upper())\ .filter(lambda s:s.startswith('I')) MappedRDD[2] FilteredRDD[3]: (myrdd)
31
.filter(lambda s:s.startswith('I'))
Lineage Example (4) Spark keeps track of the parent RDD for each new RDD Child RDDs depend on their parents File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. MappedRDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd = mydata.map(lambda s: s.upper())\ .filter(lambda s:s.startswith('I')) MappedRDD[2] FilteredRDD[3]: (myrdd)
32
.filter(lambda s:s.startswith('I'))
Lineage Example (5) Action operations execute the parent transformations > mydata = sc.textFile("purplecow.txt") > myrdd = mydata.map(lambda s: s.upper())\ .filter(lambda s:s.startswith('I')) > myrdd.count() 3
33
.filter(lambda s:s.startswith('I'))
Lineage Example (6) Each action re’executes the lineage transformations starting with the base File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. MappedRDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd = mydata.map(lambda s: s.upper())\ .filter(lambda s:s.startswith('I')) > myrdd.count() 3 > myrdd.count() MappedRDD[2] FilteredRDD[3]: (myrdd)
34
.filter(lambda s:s.startswith('I'))
Lineage Example (7) Each action re’executes the lineage transformations starting with the base > mydata = sc.textFile("purplecow.txt") > myrdd = mydata.map(lambda s: s.upper())\ .filter(lambda s:s.startswith('I')) > myrdd.count() 3 > myrdd.count() 3
35
Distributed Persistence
Chapter Topics RDD Lineage Caching Overview Distributed Persistence *스파크에서는 RDD 를 저장해놓고 사용하는 기능으로 persist()와 cache() 라는 두 가지 오퍼레이션을 지원합니다.
36
Caching is a suggestion to Spark
스파크의 기본 cache()연산은 메모리에 데이터를 저장합니다. 즉, 새로운 RDD 파티션을 저장할 메모리가 모자란다면 오래된 것들은 삭제되고, 나중에 삭제된 데이터가 다시 필요해지면 RDD는 재 연산을 하게 됩니다.
37
Caching an RDD saves the data in memory
File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. 캐싱 할 때 RDD가 메모리에 어떻게 저장되는지 설명 드리겠습니다.
38
Caching an RDD saves the data in memory
File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. RDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) RDD[2] (myrdd) myrdd2.count()
39
Caching an RDD saves the data in memory
File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. RDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) > myrdd1.cache() RDD[2] (myrdd1)
40
Caching an RDD saves the data in memory
File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. RDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) > myrdd1.cache() > myrdd2 = myrdd1.filter(lambda \ s:s.startswith('I')) RDD[2] (myrdd1) RDD[3] (myrdd2)
41
Caching an RDD saves the data in memory
> mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) > myrdd1.cache() > myrdd2 = myrdd1.filter(lambda \ s:s.startswith('I')) > myrdd2.count() 3
42
Subsequent operations use saved data
Caching Subsequent operations use saved data File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. RDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) > myrdd1.cache() > myrdd2 = myrdd1.filter(lambda \ s:s.startswith('I')) > myrdd2.count() 3 > RDD[2] (myrdd1) I'VE NEVER SEEN A PURPLE COW. I NEVER HOPE TO SEE ONE; BUT I CAN TELL YOU, ANYHOW, I'D RATHER SEE THAN BE ONE. myrdd2.count()
43
Subsequent operations use saved data
Caching Subsequent operations use saved data File: purplecow.txt I've never seen a purple cow. I never hope to see one; But I can tell you, anyhow, I'd rather see than be one. RDD[1] (mydata) > mydata = sc.textFile("purplecow.txt") > myrdd1 = mydata.map(lambda s: s.upper()) > myrdd1.cache() > myrdd2 = myrdd1.filter(lambda \ s:s.startswith('I')) > myrdd2.count() 3 > 3 RDD[2] (myrdd1) I'VE NEVER SEEN A PURPLE COW. I NEVER HOPE TO SEE ONE; BUT I CAN TELL YOU, ANYHOW, I'D RATHER SEE THAN BE ONE. RDD[3] (myrdd2) I NEVER HOPE TO SEE ONE; I'D RATHER SEE THAN BE ONE. 캐 myrdd2.count()
44
Chapter Topics RDD Lineage Caching Overview Distributed Persistence
45
The cache method stores data in memory only
Persistence Levels (1) The cache method stores data in memory only The persist method offers other options called Storage Levels Storage location – where is the data stored? –MEMORY_ONLY (default) – same as cache –MEMORY_AND_DISK – Store partitions on disk if they do not fit in memory – Called spilling –DISK_ONLY – Store all partitions on disk Persistence 는 cache와는 다르게 파라미터안에 옵션을 줄 수 있습니다. MEMORY_ONLY(default) = cache와 같이 메모리에 데이터를 저장합니다. 즉, 새로운 RDD 파티션을 저장할 메모리가 모자란다면 오래된 것들은 간단히 삭제될 수 있고, 나중에 삭제된 데이터가 다시 필요해지면 RDD는 재연산을 하게됩니다. MEMORY_AND_DISK 레벨을 적용하면 cache처럼 메모리를 삭제 하는 게 아닌 RDD파티션을 디스크로 내리게 되고 나중에 다시 필요해지면 간단하게 로컬 저장장치에서 메모리로 읽어 들일 수 있게된다. 이는 블록을 재연산 하는 것보다는 비용이 적게 들고 어느 정도 성능에 대한 예측도 가능해 집니다. 특히 RDD파티션을 재연산 하는 비용이 매우 비싼 경우에 이런 방식이 유용합니다. Disk_ONLY = MEMORY에 올리는 것이 아니라 DISK 에 올리는 것입니다.
46
Serialization – you can choose to serialize the data in memory
Persistence Levels (2) Serialization – you can choose to serialize the data in memory - MEMORY_ONLY_SER and MEMORY_AND_DISK_SER Much more space efficient Less time efficient > from pyspark import StorageLevel > myrdd.persist(StorageLevel.DISK_ONLY) 셔플 작업동안 이루어지며 잠재적으로 매우 클 가능성이 있는 데이터들이 전송되므로 디스크에 데이터를 쓸 때 객체들을 직렬화해 바이너리 포맷으로 변환시켜야 합니다. Serialization옵션을 사용하면 바이너리 형태로 메모리에 저장해놓아 메모리 공간을 절약 할 수 있습니다.
47
To stop persisting and remove from memory and disk
Persistence Levels (1) To stop persisting and remove from memory and disk rdd.unpersist() persist() 를 사용하면 spark driver program 이 종료되더라도 memory 에 상주하게 된다. LRU 정책에 따라 memory 에 유지되는데, 이를 별도로 해제해주지 않으면 계속 상주하게 된다. 제거하기 위해서는 RDD method 인 unpersist() 를 호출해 주어야 한다
Similar presentations