Distributed Computing (Apache Hadoop & Hive Review) 박 영 택 컴퓨터 학부
Traditional Large-Scale Computation 기존에는 processor에 의존하는 computation을 수행 상대적으로 적은 양의 데이터 많은 양의 복잡한 처리가 요구됨 초기 해결 방법 : 컴퓨터의 성능 향상 더 빠른 processor와 많은 memory 지속적으로 향상시키는 것에 한계점이 존재
분산 처리 시스템 해결 방법 : 더 많은 컴퓨터를 사용 분산 처리 시스템의 진화 하나의 작업에 여러 대의 machine을 사용 MPI(Message Passing Interface)
Apache Hadoop이란? 빅데이터를 저장, 처리, 분석할 수 있는 소프트웨어 프레임워크 Distributed : 수십만대의 컴퓨터에 자료 분산 저장 및 처리 Scalable : 용량이 증대되는 대로 컴퓨터 추가 Fault-tolerant : 하나이상의 컴퓨터가 고장나는 경우에도 시스템이 정상 동작 Open source : 공개 소프트웨어
Core Hadoop Concepts 애플리케이션을 High-level 코드로 작성 개발자에게 네트워크 프로그래밍, 의존성, low-level 인프라 구조에 대한 고려가 요구되지 않음 각 노드들은 가급적 최소한의 데이터를 주고 받음 개발자는 노드들 사이의 통신에 대한 코드 작성이 필요 없음 데이터는 여러 노드에 미리 분산되어 저장 데이터가 저장된 위치에서 연산을 수행 availability와 reliability를 향상 시키기 위해서 데이터는 여러 개의 복제본을 두고 사용
Hadoop : Basic Concepts File 시스템 (Data Storage) Hadoop Distributed File System (HDFS) Computation 방식 (Data Analytics) MapReduce 프로그램
Hadoop Components : HDFS HDFS(Hadoop Distributed File System)은 클러스터에 데이터를 저장하기 위해 사용 데이터는 block으로 분할되고, 클러스터의 여러 노드에 분산되어 저장 각 block은 기본적으로 64MB 또는 128MB의 크기 각 block은 여러 개의 replication을 생성하여 사용 기본적으로 3개의 replication block을 생성 replication block은 서로 다른 노드에 저장됨 이를 통해 reliability and availability 향상
Hadoop Components : MapReduce MapReduce는 Hadoop 클러스터의 데이터를 처리하기 위한 시스템 2개의 phase로 구성 : Map, Reduce Map과 Reduce 사이에는 shuffle과 sort라는 스테이지가 있음 각 Map task는 전체 데이터 셋에 대해서 별개의 부분에 대한 작업을 수 행 기본적으로 하나의 HDFS block을 대상으로 수행 모든 Map task가 종료되면, MapReduce 시스템은 intermediate 데이터를 Reduce phase를 수행 할 노드로 분산하여 전송
...... Hadoop Server roles Clients Secondary Job Tracker Name Node Data Analytics Jobs Map Reduce Data Storage Jobs HDFS Data Node & Task Tracker ...... Masters Slaves
Hadoop 의 구성요소 Client Master node Slave node 두가지 관점 Name Node 를 통해 정보를 받고 이후 직접적으로 Data Node 와 통신을 한다. Master node 물리적으로 Master Node 역할(Job Tracker, Name Node)을 하는 노드로서 , slave node 에 대한 정보와 실행을 할 Tasks 에 대한 관리를 담당 Slave node 물리적으로 Slave Node 역할(Data Node, Task Node)을 하는 노드로서, 실제로 데이터를 분산되어 가지고 있으며 Client 에서 요청이 오면 데이터를 전달하는 역할 및 담당 Task 를 수행하는 역할 두가지 관점 Data Analytics 관점 Job Tracker : 노드에 Task 를 할당하는 역할과 모든 Task 를 모니터링 하고 실패할 경우 Task 를 재실행 하는 역할 Task Tracker : Task 는 Map Task 와 Reduce Task 로 나눠질 수 있으며, Task 가 위치한 HDFS 의 데이터를 사용하여 MapReduce 수행. Data Storage 관점 Name Node : HDFS의 파일 및 디렉토리에 대한 메타 데이터(metadata)를 유지, 클라이언트로 부터 데이터 위치 요청이 오면 전달, 장비 손상 시 Secondary Node 로 대체 Data Node : 데이터를 HDFS 의 Block 단위로 구성, Fault Recovery 를 위해 default 로 3 copy 를 유지, Heartbeat 을 통하여 지속적으로 파일 위치 전달
hadoop fs Examples (1) 로컬 디스크의 foo.txt file을 HDFS의 사용자 디렉토리로 복사 위 명령어를 통해 /user/username/foo.txt 위치로 파일을 복사 HDFS의 사용자 경로의 하위 디렉토리 목록을 출력 HDFS의 root 경로의 하위 디렉토리 목록을 출력 ㅇ
hadoop fs Examples (2) HDFS의 경로 /user/fred/bar.txt의 내용을 출력 HDFS의 baz.txt file을 로컬 디렉토리로 복사
hadoop fs Examples (3) HDFS의 home 위치에 weblog 폴더 생성 HDFS의 폴더 삭제 $ hadoop fs –mkdir weblog $ hadoop fs –rm -r weblog
맵리듀스: 맵퍼(Mapper) 맵퍼는 Key/Value 쌍의 형태로 데이터를 읽는다. 맵퍼의 0개 또는 그 이상의 Key/Value 상을 출력한다 (pseudo-code): map(in_key, in_value) -> (inter_key, inter_value) list
맵퍼 예제: Explode Mapper 입력된 단어를 철자로 분리하여 출력(pseudo-code): Let map(k, v) = foreach char c in v: emit (k, c) (‘foo’, ‘bar’) -> (‘foo’, ‘b’), (‘foo’, ‘a’), (‘foo’, ‘r’) (‘baz’, ’other’) -> (‘baz’, ‘o’), (‘baz’, ‘t’), (‘baz’, ‘h’), (‘baz’, ‘e’), (‘baz’, ‘r’)
맵퍼 예제: Changing Keyspaces 맵퍼의 출력 key값이 입력데이터의 key값과 동일할 필요는 없다. 단어의 길이를 키로 출력(Pseudo-code): Let map(k, v) = emit(v.length(), v) (‘foo’, ‘bar’) -> (3, ‘bar’) (‘baz’, ‘other’) -> (5, ‘other’) (‘foo’, ’abracadabra’) -> (11, ‘abracadabra’)
맵리듀스 : 리듀서(The Reducer) 맵 단계가 끝나면, 중간 단계의 키 값을 기반으로 중간 값(intermediate values) 를 리스트 형태로 조합 리스트를 리듀서로 전달 하나의 리듀서나 여러개의 리듀서가 존재할 것이다. 잡(job)설정에서 이 부분은 정의되어 있다.(추후 설명) 중간 키와 연관되어 있는 모든 값은 같은 리듀서로 보내진다. 중간 키와 그 값들의 리스트들은 키 순서대로 정렬되어 리듀서로 보내진다. 이 단계는 ‘셔플(shuffle) 과 정렬(sort)’ 라고 알려져 있다. 리듀서의 output은 O 이거나 key/value 의 쌍이다. 이 결과들은 HDFS 에 저장된다. 실제로 리듀서는 보통 input키에 대해서 하나의 key/value 쌍으로 배출되어 쓰여진다.
리듀서 예제 : Sum Reducer 각 중간 키 값과 관련있는 모든 값들을 합 (pseudo-code) let reduce(k, vals) = sum = 0 foreach int i in vals: sum += i emit(k, sum) (’bar', [9, 3, -17, 44]) -> (’bar’. 39) (’foo', [123, 100, 77]) -> (’foo', 300)
리듀서 예제 : Identity Reducer Identity 리듀서 는 매우 흔하다.(pseudo-code) let reduce(k, vals) = foreach v in vals: emit(k, v) ('bar', [123, 100, 77]) -> ('bar', 123), ('bar', 100), ('bar', 77) ('foo', [9, 3, -17, 44]) -> ('foo', 9), ('foo', 3), ('foo', -17), ('foo', 44)
Hive Motivation Hive: Hadoop에서의 데이터웨어하우스 어플리케이션 쿼리언어는 SQL의 변형인 HiveQL을 사용함. Facebook에서 개발되어, 현재는 오픈소스 소프트웨어 Motivation 프로그래밍에 약하며 SQL-based Qieries에 익숙한 사람들을 위한 언어 필요 Unstructured Data Structured Data M/R model 위주의 사용법에 벗어나고자 함 MapReduce 함수의 다중 stage 필요한 경우 복잡(Job chaining) 자동화된 최적화 작업 필요 다른 데이터베이스와의 상호 연동성
Hive architecture (from the paper)
Hive 유저를 위한 High-Level 개요 Hive 는 Client Machine 에서 동작 HiveQL 질의문을 MapReduce job으로 변환 jobs 을 클러스터에 등록
Hive shell 사용법 Hive shell 에서 HiveQL 문장을 실행시킬 수 있음 MySQL Shell 의 interactive tool 과 유사 hive 명령어로 Hive shell 을 실행 Hive shell 은 “hive>” 프롬프트로 표현됨 각각의 명령어는 세미콜론 으로 문장을 끝냄 “quit” 를 사용하여 Hive shell 을 빠져나옴
Command Line 으로 Hive 접근 방법 HiveQL 코드가 들어있는 파일을 –f 옵션으로 실행 가능 -e 옵션을 통해 직접적인 HiveQL 실행 가능
Hive 의 Databases 와 Tables 접근 방법 USE 명령어를 통하여 현재 사용할 database 변경
Hive 의 Databases 와 Tables 접근 방법(계속) 현재 database 안에 있는 tables 들은 어떤것이 있는가? databases 에서 지정한 table을 포함하고 있는 databases 는?
Hive 의 Databases 와 Tables 접근 방법(계속) table 의 기본 구조를 보기 위한 DESCRIBE 명령 좀 더 세부적인 정보를 보기 위한 DESCRIBE FORMATTED 명령어
Hive 에서 Databases 생성 새로운 database 생성 기존의 database 가 있는지를 체크하면서 database 생성
Table 생성 예제 아래의 예제는 jobs 이름의 Table을 생성하는 예제 데이터는 text 포멧이며 하나의 line에는 comma-separated 형태의 4개 fields 를 가지고 있음. 위의 Table 과 일치하는 record 의 예제
Hive의 테이블 확인 ‘orders’ 테이블의 레코드 수 확인 hive> SELECT COUNT(*) FROM customers;
Hive의 기본 명령어 LIMIT은 질의문 결과의 레코드 수를 제한 ORDER BY는 질의문 결과를 특정 칼럼을 기준으로 정렬 hive> SELECT customer_fname, customer_lname FROM customers LIMIT 10; ORDER BY는 질의문 결과를 특정 칼럼을 기준으로 정렬 hive> SELECT customer_id, customer_fname FROM customers ORDER BY customer_id DESC LIMIT 10; WHERE는 특정 칼럼 값에 조건을 설정 hive> SELECT * FROM orders WHERE order_id=1287; hive> SELECT * FROM customers WHERE customer_state IN (‘CA’, ‘OR’, ‘WA’, ‘NV’, ‘AZ’);
Hive에서의 Joins Hive 에서 서로 다른 데이터간의 Join 은 빈번하게 발생 Inner joins Outer joins(left, right, and full) Cross joins( Hive 0.1 이상 버전) Left semi joins equality (‘=‘) 조건은 join 에서만 허용 Valid : customers.cust_id = orders.cust_id Invalid : customers.cust_id <> orders.cust_id Output의 각 records 는 각 table 의 지정된 key(e.g. customers.cust_id) 로 부터 찾은 데이터 최고의 질의응답 성능을 위해서는 가장 큰 테이블을 가장 나중에 질의하기
Hive 조인의 사용 예 각 고객별 주문 상황을 출력하기 위해 2개의 테이블 ‘customers’와 ‘orders’을 조인 Join hive> SELECT customer_lname, order_id, order_status FROM customers c JOIN orders o ON (c.customer_id = o.order_customer_id) LIMIT 10; orders
MySQL의 데이터를 HDFS로 가져오기 MYSQL 테이블을 HDFS 를 거쳐 Hive 테이블로 가져오기 Hive HDFS DATABASE Table Sqoop MYSQL DATABASE Table
MySQL의 데이터를 HDFS로 가져오기 테이블 ‘categories’를 HDFS로 가져오기 $ sqoop import \ --connect jdbc:mysql://localhost/retail_db \ --table categories --fields-terminated-by ‘\t’ \ --username root --password cloudera
Hive 테이블 생성 테이블 생성 hive> CREATE TABLE categories (category_id INT, category_department_id INT, category_name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘ ’ 테이블 명 칼럼 명 칼럼의 데이터 타입 불러올 파일의 각 라인을 하나의 레코드로 구분 불러올 레코드를 tab으로 구분하여 칼럼에 저장
Hive 테이블에 데이터 불러오기 및 확인 $ LOAD DATA INPATH ‘categories/part-m-00000’ INTO TABLE categories; $ SELECT * FROM categories LIMIT 10;
MySQL의 데이터를 직접 Hive로 가져오기 MYSQL 테이블을 HDFS 를 거치지 않고 직접 Hive 테이블로 가져오기 HDFS Hive DATABASE Table Sqoop MYSQL DATABASE Table
MySQL의 데이터를 Hive로 가져오기 ‘retail_db’의 ‘customers’ 테이블을 Hive로 가져오기 hive> sqoop import --connect jdbc:mysql://localhost/retail_db --username root --password cloudera --table customers --hive-database retail_db --hive-import
EXPLODE 를 사용하여 Array 를 Records 변환 (step #2) EXPLODE 함수는 array 의 각 element마다 하나의 record 생성 SPLIT 과 같은 함수는 table generating function 임 table generating function 을 EXPLODE 의 파라매터로 사용할때는 alias 필요(e.g. AS x)
n-grams (step #2) n-gram 은 단어의 조합(n = number of words) Bigram 은 연속된 두 단어의 조합(n = 2) n-gram frequency analysis 는 많은 application에서 중요한 과정 검색엔진과 같은 application 에서 검색 결과의 spelling 교정에 사용 웹 페이지에서 가장 중요한 topics 찾는데 사용 social media message등에서 트랜드 topics 검색에 사용
HIVE 에서의 n-grams(step #2) Hive 에서는 n-grams 를 계산하기 위한 NGRAMS 함수 제공 NGRAMS 함수는 3개의 파라매터 필요 String 타입의 Array of array 형태, 각 element는 word (e.g. [[“is”, “great”]]) n-gram 에서의 n 숫자 (number of words) 결과 값의 출력 개수(top-N, based on frequency) Output은 2개의 속성을 가진 STRUCT 구조의 array 리턴 ngram : n-gram 자체 값(an array of words) estfrequency : n-gram 의 각 값이 몇번 나타났는지에 대한 count 값
Sentences 를 Words 로 변경(step #2) HIVE 의 SENTENCES 함수는 sentences 를 array of words 로 변환 Input 값은 하나 이상의 sentences 가 될 수 있음 2개의 Sentences 를 Input 값으로 받을 경우, 2-dimensional 로 변환
Calculating n-grams in HIVE(step #2) NGRAMS 함수는 SENTENCES 함수와 자주 함께 사용 아래의 예제에서는 normalize case 를 위해 LOWER 사용 그리고 EXPLODE 함수를 사용하여 array 형태의 결과를 row 형태로 변환