IT技术博客大学习 共学习 共进步

hadoop笔记 (2):pipes例子分析 (1)

ou的笔记ou的笔记 2012-09-17 23:15:39 浏览 1,962 次

Pipes是hadoop提供的c++接口,但是在官网上找不到pipes的文档,只能从例子开始一点点摸索。实验环境是debian 6 amd64,hadoop 1.0.3。hadoop的安装目录是$HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。

为了少敲些字符,给hadoop命令做了一个alias:

alias hadoop='$HOME/hadoop/bin/hadoop'

单词统计程序

下面的程序是对hadoop 1.0.3自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:

wordcount.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <string>
#include <vector>
using namespace std;
 
#include <hadoop/Pipes.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/TemplateFactory.hh>
 
class WordCountMapper : public HadoopPipes::Mapper {
 
    public:
 
        WordCountMapper(HadoopPipes::TaskContext& context) {}
 
        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};
 
class WordCountReducer : public HadoopPipes::Reducer {
 
    public:
 
        WordCountReducer(HadoopPipes::TaskContext& context) {}
 
        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;
 
            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());
 
            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};
 
int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>());
}

先看看编译程序的Makefile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CXX := g++ 
CXXFLAGS := -g -Wall
INCLUDE := -I$(HOME)/hadoop/c++/Linux-amd64-64/include
LIBS := -L$(HOME)/hadoop/c++/Linux-amd64-64/lib -lhadooppipes -lhadooputils -lpthread -lcrypto
 
TARGET := wordcount
 
.PHONY: all clean
 
all: $(TARGET)
 
wordcount: wordcount.o
        $(CXX) $(CXXFLAGS) $^ -o $@ $(LIBS)
 
.cpp.o:
        $(CXX) $(CXXFLAGS) -c $< $(INCLUDE)
 
clean:
        rm -f $(TARGET) *.o

链接时要加上-lcrypto,安装libssl-dev,如果不加这个就会报错:”undefined reference to `EVP_*’(undefined reference to `HMAC_*’或undefined reference to `BIO_*’)”。

编译完后把生成的可执行文件wordcount上传到hdfs中:

hadoop dfs -put wordcount wordcount

然后准备测试文件wordcount-input(随便输入一些单词),并且把测试文件也上传到hdfs中:

hadoop dfs -put wordcount-input wordcount-input

上传完后看看是否上传成功:

hadoop@debian:wordcount$ hadoop dfs -ls
Found 2 items
-rw-r--r--   1 hadoop supergroup     261399 2012-08-12 10:56 /user/hadoop/wordcount
-rw-r--r--   1 hadoop supergroup         42 2012-08-12 10:56 /user/hadoop/wordcount-input

一切准备好之后就可以运行程序了:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=true \
           -D hadoop.pipes.java.recordwriter=true \
           -input wordcount-input -output wordcount-output \
           -program wordcount

在运行过程中hadoop会输出进度和其它的一些相关信息。运行完的结果在hdfs的wordcount-output目录下:

hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS
drwx------   - hadoop supergroup          0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs
-rw-r--r--   1 hadoop supergroup         33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000

最后查看运行结果:

hadoop dfs -cat wordcount-output/part-00000

和java版本类似,pipes要求至少有mapper和reducer两个类:WordCountMapper和WordCountReducer,分别继承HadoopPipes::Mapper和HadoopPipes::Reducer,并分别实现其中的纯虚函数map()和reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。

在c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh中看到,除了必须实现的mapper和reducer类外,还可以指定partitioner,combiner,reader和writer。如果不指定的话就使用默认的,在执行程序的命令中指定”hadoop.pipes.java.recordreader”和”hadoop.pipes.java.recordwriter”为true表示使用hadoop默认的reader和writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。

程序使用默认的RecordReader为LineRecordReader,即把输入文件按行拆分成{行偏移量, 内容}这样的{key, value},然后传递给map()函数。

在WordCountMapper的map()中,使用context.getInputKey()可以获取输入的key,由于程序中不关心行号,因此并没有使用;context.getInputValue()就是文件中某一行的内容。words保存了按照空格拆分后的所有单词,并通过context.emit()函数返回每个单词及其出现的次数给下一阶段。

shuffle阶段是由hadoop替我们完成的。hadoop会把map()输出的{单词, 出现次数}这样的key/value对进行排序和合并,并且把结果传给reducer。

WordCountReducer中的reducer()函数收到了经过排序和合并后的结果:{key: value1, value2, …}。每个reducer收到的内容都是一个key以及和这个key相关的values,在这个程序中key就是一个单词,values就是每个单词出现的次数,因此程序中通过context.nextValue()获取所有出现次数并累加,最后通过context.emit()把统计得到的{单词, 出现次数}结果传给writer。

程序使用默认的RecordWriter为LineRecordWriter,输出格式为”key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。

参考资料

[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程

建议继续学习

  1. Facebook的实时Hadoop系统 (阅读 11,404)
  2. hadoop rpc机制 && 将avro引入hadoop rpc机制初探 (阅读 6,083)
  3. Hadoop的map/reduce作业输入非UTF-8编码数据的处理原理 (阅读 5,546)
  4. 百度是如何使用hadoop的 (阅读 5,004)
  5. Hadoop超级安装手册 (阅读 4,662)
  6. Hadoop集群间Hadoop方案探讨 (阅读 4,444)
  7. 使用hadoop进行大规模数据的全局排序 (阅读 4,425)
  8. Hadoop安装端口已经被占用问题的解决方法 (阅读 3,882)
  9. Hadoop现有测试框架探幽 (阅读 3,804)
  10. 分布式计算平台Hadoop 发展现状乱而稳定的解读 (阅读 3,808)