Hadoop Scripts in Python
I read that Hadoop supports scripts written in various languages other than Java, such as Python. Since I’m a fan of python, I wanted to prove this out.
It was my good fortune to find an excellent post by Michael Noll that walked me through the entire process of scripting in Python for Hadoop. It’s an excellent post and worked as written for me in Hadoop 2.2.0.
How hadoop processes scripts from other languages (stdin/stdout)
In order to accommodate scripts from other languages, hadoop focuses on standard in (stdin) and standard out (stdout). As lines are read from the input, they are written to stdout, so that the external script simply monitors stdin and then processes the line. Similarly, the mapper, which reads from stdin then writes to stdout so that the reducer can read from stdin and write again to stdout. Hadoop then consumes the reducer output to write the results. Hopefully this is easy to see in the scripts below.
It should be possible to write similar scripts in any language that can read stdin and write to stdout. There may be some performance implications when choosing another language for processing, but from a developer productivity perspective, a language like python can allow for more rapid development. If you have a process that would benefit from multi-processor use and is CPU bound, it may be worth the extra effort to create in Java.
Revisit Tomcat Logs in Python
I’ll try to add value by revisiting my original example of analyzing Apache Tomcat (Java) logs in Hadoop using python this time.
mapper.py
#!/usr/bin/env python import sys import re log_pattern = r"""([0-9]{4}-[0-9]{2}-[0-9]{2}\s[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\s*([a-zA-Z]+)\s*([a-zA-Z0-9.]+)\s*-\s*(.+)$""" log_pattern_re = re.compile(log_pattern) # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into segments try: segments = log_pattern_re.search(line).groups() except: segments = None if segments != None and segments[1] == "ERROR": # tab delimited indexed by log date (data[0]) print '%s\t%s' % (segments[0], 1) else: pass |
reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 key = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py key, value = line.split('\t', 1) # convert value (currently a string) to int try: value = int(value) except ValueError: # value was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: key) before it is passed to the reducer if current_word == key: current_count += value else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = value current_word = key # do not forget to output the last key if needed! if current_word == key: print '%s\t%s' % (current_word, current_count) |
Running a streaming hadoop job using python
[watrous@myhost ~]$ hadoop jar ./hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -file /home/watrous/mapper.py -mapper /home/watrous/mapper.py -file /home/watrous/reducer.py -reducer /home/watrous/reducer.py -input /user/watrous/log_myhost.houston.hp.com/* -output /user/watrous/python_output_myhost.houston.hp.com packageJobJar: [/home/watrous/mapper.py, /home/watrous/reducer.py] [] /tmp/streamjob6730226310886003770.jar tmpDir=null 13/11/18 19:41:09 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id ... 13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/mapper.py as file:/tmp/hadoop-watrous/mapred/local/1384803670534/mapper.py 13/11/18 19:41:10 INFO mapred.LocalDistributedCacheManager: Localized file:/home/watrous/reducer.py as file:/tmp/hadoop-watrous/mapred/local/1384803670535/reducer.py ... 13/11/18 19:41:11 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./mapper.py] ... 13/11/18 19:43:35 INFO streaming.PipeMapRed: PipeMapRed exec [/home/watrous/./reducer.py] ... 13/11/18 19:43:36 INFO mapreduce.Job: map 100% reduce 100% 13/11/18 19:43:36 INFO mapreduce.Job: Job job_local848014294_0001 completed successfully 13/11/18 19:43:36 INFO mapreduce.Job: Counters: 32 File System Counters FILE: Number of bytes read=291759 FILE: Number of bytes written=3394292 ... |
After successfully running the map reduce job, confirm the results are present and view the results.
[watrous@myhost ~]$ hdfs dfs -ls -R /user/watrous/python_output_myhost.houston.hp.com/ 13/11/18 19:45:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable -rw-r--r-- 3 watrous supergroup 0 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/_SUCCESS -rw-r--r-- 3 watrous supergroup 10813 2013-11-18 19:43 /user/watrous/python_output_myhost.houston.hp.com/part-00000 [watrous@myhost ~]$ hdfs dfs -cat /user/watrous/python_output_myhost.houston.hp.com/part-00000 | more 13/11/18 19:45:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2013-10-25 06:07:37,108 1 2013-10-25 06:07:37,366 1 2013-10-27 06:08:35,770 1 2013-10-27 06:08:35,777 1 2013-10-27 06:08:35,783 1 2013-10-27 06:09:33,748 11 2013-10-27 06:09:33,749 25 2013-10-27 06:09:33,750 26 2013-10-27 06:09:33,751 14 2013-10-27 07:07:58,383 5 |