Python Map Reduce – Using Hadoop with Docker container

Bài viết này mình sẽ giới thiệu cách submit MapReduce job trên Hadoop. Ở các bài viết trước mình đã có giới thiệu cách dùng docker-compose để chạy các Hadoop container.

1. Code MapReduce bằng Python

Mình sẽ không giới thiệu MapReduce là gì và cách code nó như thế nào chi tiết các bạn xem hướng dẫn qua bài viết này trên Medium. Sau khi đọc xong chúng ta sẽ có file code và dataset.

mapper.py

#!/usr/bin/python

# The Mapper
import sys
import csv

# Set local variables
iteration = 0

currentCountry = None
previousCountry = None
currentFx = None
previousFx = None
percentChange = None
currentKey = None

fxMap = []

# print "Starting mapper.py"

infile = sys.stdin

next(infile) # skip first line of input file

for line in infile:

    line = line.strip()
    line = line.split(',', 2)

    try:
        # Get data from line
        currentCountry = line[1].rstrip()
        if len(line[2]) == 0:
            continue
        currentFx = float(line[2])

        if currentCountry != previousCountry:
            previousCountry = currentCountry
            previousFx = currentFx
            previousLine = line
            continue

        # If country same as previous, add to map
        elif currentCountry == previousCountry:
            percentChange = ((currentFx - previousFx) / previousFx) * 100.00
            percentChange = round(percentChange, 2)
            percentChange = percentChange

            currentKey = "%s: %6.2f%%" % (currentCountry, percentChange)

            # Set the array with tuple keys
            fxMap.append(tuple([currentKey, 1]))

        # Update Values
        previousCountry = currentCountry
        previousFx = currentFx
        previousLine = line

        # Uncomment if you want to see the output
#         if iteration % 50000 == 0:
#             print "Current iteration is %d" % iteration
#         iteration += 1

    # Handle unexpected errors
    except Exception as e:
        template = "An exception of type {0} occurred. Arguments:\n{1!r}"
        message = template.format(type(e).__name__, e.args)
        print "currentFx: %.2f previousFx: %.2f" % (currentFx, previousFx)
        print message
        sys.exit(0)

#
# print "mapper.py has completed with %d iterations" % (iteration - 1)

# Show the returned values
for i in sorted(fxMap):
    print "%-20s - %d" % (i[0], i[1])

reduce.py

#!/usr/bin/python

# The reducer

from operator import itemgetter
import sys

current_key = None
current_count = 0
key = None

# Import the mapped FX data data
for line in sys.stdin:

    # parse the input we got from mapper.py
    key, count = line.split('- ', 1)
    key = key.strip()

    try:
        count = int(count)
    except ValueError:
        continue

    if current_key == key:
        current_count += count
    else:
        if current_key:
            print '%s\t%s' % (current_key, current_count)
        current_count = count
        current_key = key

# do not forget to output the last word if needed!
if current_key == key:
    print '%s\t%s' % (current_key, current_count)

2. Submit code

2.1. Khởi chạy Hadoop container

Đầu tiên các bạn phải start các service cơ bản của Hadoop: namenode, datanode, resourcemanager, nodemanager, historyserver bằng docker-compose

version: "3"

services:
  namenode:
    image: nvtienanh/hadoop-namenode:${HADOOP_TAG}
    container_name: namenode
    hostname: namenode
    volumes:
      - ./data/namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    ports:
      - 9870:9870
    env_file:
      - ./hadoop.env
    networks:
      - hadoop-net

  datanode:
    image: nvtienanh/hadoop-datanode:${HADOOP_TAG}
    container_name: datanode
    hostname: datanode
    volumes:
      - ./data/datanode:/hadoop/dfs/name
    environment:
      SERVICE_PRECONDITION: "namenode:9870"
    ports:
      - 9864:9864
    env_file:
      - ./hadoop.env
    networks:
      - hadoop-net
  
  resourcemanager:
    image: nvtienanh/hadoop-resourcemanager:${HADOOP_TAG}
    container_name: resourcemanager
    hostname: resourcemanager
    environment:
      SERVICE_PRECONDITION: "namenode:9870 datanode:9864"
    ports:
      - 8088:8088
    env_file:
      - ./hadoop.env
    networks:
      - hadoop-net

  nodemanager:
    image: nvtienanh/hadoop-nodemanager:${HADOOP_TAG}
    container_name: nodemanager
    hostname: nodemanager
    environment:
      SERVICE_PRECONDITION: "namenode:9870 datanode:9864 resourcemanager:8088"
    ports:
      - 8042:8042
    env_file:
      - ./hadoop.env
    networks:
      - hadoop-net
  
  historyserver:
    image: nvtienanh/hadoop-historyserver:${HADOOP_TAG}
    container_name: historyserver
    hostname: historyserver
    environment:
      SERVICE_PRECONDITION: "namenode:9870 datanode:9864 resourcemanager:8088"
    volumes:
      - ./data/historyserver:/hadoop/yarn/timeline
    ports:
      - 8188:8188
    env_file:
      - ./hadoop.env
    networks:
      - hadoop-net

networks:
  hadoop-net:
    external:
      name: hadoop-net

2.2. Submit code bằng Hadoop stream

Các bạn download hadoop-submit tại: https://github.com/nvtienanh/docker-hadoop/tree/master/submit, sau đó mở terminal và cd đến thư mục làm việc submit. Build hadoop-submit image

docker build -t nvtienanh/hadoop-submit:3.2.0-debian .

Sau đó mở file run.sh và xem qua, đây là các lệnh sẽ thực thi khi các bạn chạy docker image: hadoop-submit

#!/bin/bash

hadoop fs -mkdir -p hdfs://namenode:9000/user/nvtienanh/input-data

hadoop fs -copyFromLocal /opt/hadoop/applications/app/input-data hdfs://namenode:9000/user/nvtienanh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-$HADOOP_VERSION.jar \
    -file /opt/hadoop/applications/app/src/mapper.py \
    -mapper /opt/hadoop/applications/app/src/mapper.py \
    -file /opt/hadoop/applications/app/src/reducer.py  \
    -reducer /opt/hadoop/applications/app/src/reducer.py  \
    -input hdfs://namenode:9000/user/nvtienanh/input-data/daily.csv \
    -output hdfs://namenode:9000/user/nvtienanh/output-data

hadoop fs -cat hdfs://namenode:9000/user/nvtienanh/output-data/*

Bước cuối cùng là chạy docker container cho chứa code MapReduce:

docker run --network hadoop-net --env-file hadoop.env nvtienanh/hadoop-submit:3.2.0-debian

Chi tiết các bạn có thể xem video mình chạy dưới dây, hy vọng bài viết sẽ hữu ích cho mọi người.

Lời kết

Đây là demo trong quá trình mình tìm hiểu về Big Data, kết quả khá sơ khai. Mọi góp ý trao đổi các bạn có thể để lại ở phần bình luận.

Xem thêm

Bình luận