这是搭建hadoop环境后的第一个MapReduce程序;
基于python的脚本;
1 map.py文件,把文本的内容划分成单词:
#!/bin/pythonimport sysfor line in sys.stdin:
data_list = line.strip().split()
for i in range(0, len(data_list)):
print data_list
2 reduce文件,把统计单词出现的次数;
#!/bin/pythonimport sys
word_dict = {}for line in sys.stdin:
v = line.strip() if word_dict.has_key(v):
word_dict[v] += 1
else:
word_dict[v] = 1for key in word_dict: print key + "\t" + str(word_dict[key])
3 调用脚本:指定输出目录 OUTPUT ;
调用支持多语言的streaming的编程环境,参数-input是输入的log文件,为了用mapreduce模式统计这个文件每个单词出现的次数;-output是输出路径;-mapper是mapper编译 此处是python语言;-reducer是reduce编译语法;-file是mapper文件路径和reduce文件路径;-numReduceTaskers 是使用的子tasker数目,这里是3,代表分成了3了tasker分布式的处理计数任务;
#!/bin/bashOUTPUT=/home/apm3/outdir
hadoop fs -rmr $OUTPUThadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-input /opt/mapr/logs/warden.log \
-output $OUTPUT \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file map.py \
-file reduce.py \
-numReduceTasks 3
bash -x start.sh 会在输出路径中生成三个输出文件,及三分ReduceTasks 输出的结果;(MapReduce 模式主要做了shuffle和sort任务,shuffle是按照hashkey分配单词到子tasker中,而sort是排序的功能。)
来源:中国大数据 |