Daniel Watrous on Software Engineering

A Collection of Software Problems and Solutions

Software Engineering

Hadoop Scripts in Python

I read that Hadoop supports scripts written in various languages other than Java, such as Python. Since I’m a fan of python, I wanted to prove this out.

It was my good fortune to find an excellent post by Michael Noll that walked me through the entire process of scripting in Python for Hadoop. It’s an excellent post and worked as written for me in Hadoop 2.2.0.

How hadoop processes scripts from other languages (stdin/stdout)

In order to accommodate scripts from other languages, hadoop focuses on standard in (stdin) and standard out (stdout). As lines are read from the input, they are written to stdout, so that the external script simply monitors stdin and then processes the line. Similarly, the mapper, which reads from stdin then writes to stdout so that the reducer can read from stdin and write again to stdout. Hadoop then consumes the reducer output to write the results. Hopefully this is easy to see in the scripts below.

It should be possible to write similar scripts in any language that can read stdin and write to stdout. There may be some performance implications when choosing another language for processing, but from a developer productivity perspective, a language like python can allow for more rapid development. If you have a process that would benefit from multi-processor use and is CPU bound, it may be worth the extra effort to create in Java.

Revisit Tomcat Logs in Python

I’ll try to add value by revisiting my original example of analyzing Apache Tomcat (Java) logs in Hadoop using python this time.

mapper.py

#!/usr/bin/env python
 
import sys
import re
 
log_pattern = r"""([0-9]{4}-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\s*([a-zA-Z]+)\s*([a-zA-Z0-9.]+)\s*-\s*(.+)$"""
log_pattern_re = re.compile(log_pattern)
 
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into segments
    try:
        segments = log_pattern_re.search(line).groups()
    except:
        segments = None
 
    if segments != None and segments[1] == "ERROR":
        # tab delimited indexed by log date (data[0])
        print '%s\t%s' % (segments[0], 1)
    else:
        pass

reducer.py

#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
current_word = None
current_count = 0
key = None
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    key, value = line.split('\t', 1)
 
    # convert value (currently a string) to int
    try:
        value = int(value)
    except ValueError:
        # value was not a number, so silently
        # ignore/discard this line
        continue
 
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: key) before it is passed to the reducer
    if current_word == key:
        current_count += value
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = value
        current_word = key
 
# do not forget to output the last key if needed!
if current_word == key:
    print '%s\t%s' % (current_word, current_count)

Running a streaming hadoop job using python

[watrous@myhost ~]$ hadoop jar ./hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/watrous/mapper.py -mapper /home/watrous/mapper.py -file /home/watrous/reducer.py -reducer /home/watrous/reducer.py -input /user/watrous/log_myhost.houston.hp.com/* -output /user/watrous/python_output_myhost.houston.hp.com
packageJobJar: [/home/watrous/mapper.py, /home/watrous/reducer.py] [] /tmp/streamjob6730226310886003770.jar tmpDir=null
13/11/18 19:41:09 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
...
13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/mapper.py as file:/tmp/hadoop-watrous/mapred/local/1384803670534/mapper.py
13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/reducer.py as file:/tmp/hadoop-watrous/mapred/local/1384803670535/reducer.py
...
13/11/18 19:41:11 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./mapper.py]
...
13/11/18 19:43:35 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./reducer.py]
...
13/11/18 19:43:36 INFO mapreduce.Job:  map 100% reduce 100%
13/11/18 19:43:36 INFO mapreduce.Job: Job job_local848014294_0001 completed successfully
13/11/18 19:43:36 INFO mapreduce.Job: Counters: 32
        File System Counters
                FILE: Number of bytes read=291759
                FILE: Number of bytes written=3394292
...

After successfully running the map reduce job, confirm the results are present and view the results.

[watrous@myhost ~]$ hdfs dfs -ls -R /user/watrous/python_output_myhost.houston.hp.com/
13/11/18 19:45:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   3 watrous supergroup          0 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/_SUCCESS
-rw-r--r--   3 watrous supergroup      10813 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/part-00000
[watrous@myhost ~]$ hdfs dfs -cat /user/watrous/python_output_myhost.houston.hp.com/part-00000 | more
13/11/18 19:45:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-10-25 06:07:37,108 1
2013-10-25 06:07:37,366 1
2013-10-27 06:08:35,770 1
2013-10-27 06:08:35,777 1
2013-10-27 06:08:35,783 1
2013-10-27 06:09:33,748 11
2013-10-27 06:09:33,749 25
2013-10-27 06:09:33,750 26
2013-10-27 06:09:33,751 14
2013-10-27 07:07:58,383 5

Leave A Comment