Apache Spark Tutorial 빅데이터 분산 컴퓨팅 박영택
Introduction Spark Spark Context Spark 에서는 driver 와 executors 사이에 통신이 일어남 driver 는 Spark job 을 가지고 있으며, 이를 실행하기 위해서는 executors 에게 일(Tasks)을 나 누어 줘야 함 executors 에서 작업이 끝나면 다시 driver 에게 결과를 리턴 Mater
Introduction Spark Spark Context Spark Application 을 시작할 때, SparkContext 생성으로 부터 시작 Master node 에서는 동작 가능한 cores 을 사용자의 Spark Application 전용으로 할당 사용자는 보통 Spark Context 를 사용하여 RDD(Resilient Distributed Datasets) 생성에 사용
SparkContext 기본적인 SparkContext 시작 방법 from pyspark import SparkConf, SparkContext conf = (SparkConf() .setMaster("local") .setAppName("My app") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) local mode 사용 사용자 Application 이름 executor 메모리 할당 최종 SparkContext 를 sc 변수로 할당 실습 환경에서는 기본적으로 ”sc” 변수로 SparkContext 할당되어 있음
SparkContext
SparkContext Application 이름 바꾸기
파일로 부터 RDD 생성 sample.txt 파일을 4개의 partitions 으로 생성 RDD 함수 중, glom() 함수는 파티션 별로 데이터 확인 가능 <sample.txt>
collect 함수 분산되어 있는 데이터를 모두 모아서 driver 로 전달 리턴되어 오는 값은 더이상 RDD 형태가 아님(python 경우, list, tuple 등)
Partition 개수 변경 repartition(n) 함수 getNumPartitions() 함수 n 에 변경하고 싶은 partition 개수를 전달 getNumPartitions() 함수 RDD 변수가 몇개의 partitions 으로 나누어져 있는지 알고 싶을 때.
xrange, range 차이점(Python 내용) xrange 함수는 lazy evaluation 으로서 선언된 변수가 사용되는 시점에 메모리에 값을 할당 range 함수는 변수 선언 즉시 메모리에 값 할당
객체로 부터 RDD 생성 Python 객체로 부터 RDD 생성 File 이 아닌 메모리 공간에 할당된 객체로 부터 RDD 생성 가능
RDD lineage 와 type 확인 RDD 의 lineage 확인 RDD 의 type 확인
map 함수 map 함수에 sub 함수를 전달함으로서 각 partition의 모든 데이터에 대해 sub 함수 적용 Spark 에서 map 함수 적용 과정 sub partition #1 partition #2 partition #3 partition #4 RDD1 RDD2
map 함수 map 함수에 sub 함수를 전달함으로서 모든 데이터에 대한 sub 함수 적용
flatMap 함수 list of list 는 list 로 풀어서 생성 tuple 경우 key, value 를 모두 분해하여 list 로 생성
filter 함수 적용 filter 함수에는 true / false 를 리턴하는 함수를 넘겨야 하며, 모든 데 이터에 대해 true 인 값들만 유지
filter 함수 적용 filter 함수에는 true / false 를 리턴하는 함수를 넘겨야 하며, 모든 데 이터에 대해 true 인 값들만 유지
WordCount first() : 첫 번째 partition의 첫 번째 Element take(n) : 첫 번째 partition부터 n 개수만큼 리턴 n 이 첫 번째 partition이 가진 elements 개수보다 많으면 2번째 partition 으로 부터 elements 가져옴 takeOrdered(n) : 오름차순으로 n 개 리턴 tuple 일 경우, key 값 기준으로 오름차순 top(n) : 내림차순으로 정렬하여 n 개 리턴 tuple 일 경우, key 값 기준으로 내림차순
takeSample 함수 withReplacement : 중복 허용 여부 num : sample 개수 seed : 값을 줄 경우, 항상 같은 samples 추출
reduceByKey, GroupByKey 같은 node 의 같은 key 값 기준으로 values 를 미리 병 합 shuffling 할때, 네트워크의 부하를 줄여줌 groupByKey 특별한 작업 없이 모든 pair 데이터들이 key 값을 기준으로 shuffling 일어남 네트워크의 부하가 많이 생김 하나의 key 값에 많은 데이터가 몰릴 경우 out of memory 발생 가능
groupByKey groupByKey
groupByKey groupByKey / groupByKey with sum(value)
groupByKey groupByKey with map / groupByKey with mapValues
groupByKey groupByKey , mapValues
reduceByKey reduceByKey
countByValue countByValue
Map : python & pySpark map : python & pySpark
pySpark map & flatMap pySpark map & flatMap Result of pyspark map Result of pyspark flatmap
pySpark flatMap reduceByKey
wordCount : pySpark flatMap reduceByKey sortByKey
reduce 함수 hodoop의 reduce 와 유사하며, reduce 함수의 파라매터로 function 을 전달 reduce 에 전달할 function은 항상 associative 하고 commutative 해야 함 associative : a + (b + c) = (a + b) + c e.g. 2 + (3 + 4) = (2 + 3) + 4 commutative : a + b = b + a e.g. 2 + 3 = 3 + 2 partition 변경에 따라 값이 다름!!
reduceByKey, GroupByKey 예제 shuffling 전에 하는 특별한 작업이 없기 때문 reduceByKey 는 shuffling 하기 전의 작업을 위한 함수 f 를 전달 shullfing 전에 f 함수를 시행
reduceByKey, GroupByKey 사용한 WordCount : dictionary type
key 를 tuple의 value 로 인식하여 정렬하라는 의미 WordCount key 를 tuple의 value 로 인식하여 정렬하라는 의미 1 2 3 4 <sample.txt> * reduceByKey 함수는 22p 에 설명
join 함수 특정 delimiter 를 사용하여 Sequence 타입(list, tuple, etc) 의 elements 을 연결 element의 타입이 string 형태만 가능
mapPartitions, mapPartitionsWithIndex 함수 자신의 Partition 안에서만 f 함수 연산 mapPartitionsWithIndex 함수 index 가 자동으로 추가된 tuple(index, Sequence type) 형태의 데이터를 자신의 Partition 에서만 f 함수 연산 sequence 타입의 객체 int, sequence 타입
End !