技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 其他 --> hadoop笔记 (2):pipes例子分析 (1)

hadoop笔记 (2):pipes例子分析 (1)

浏览:1382次  出处信息

Pipes是hadoop提供的c++接口,但是在官网上找不到pipes的文档,只能从例子开始一点点摸索。实验环境是debian 6 amd64,hadoop 1.0.3。hadoop的安装目录是$HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。

为了少敲些字符,给hadoop命令做了一个alias:

alias hadoop='$HOME/hadoop/bin/hadoop'

单词统计程序

下面的程序是对hadoop 1.0.3自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:

wordcount.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <string>
#include <vector>
using namespace std;
 
#include <hadoop/Pipes.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/TemplateFactory.hh>
 
class WordCountMapper : public HadoopPipes::Mapper {
 
    public:
 
        WordCountMapper(HadoopPipes::TaskContext& context) {}
 
        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};
 
class WordCountReducer : public HadoopPipes::Reducer {
 
    public:
 
        WordCountReducer(HadoopPipes::TaskContext& context) {}
 
        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;
 
            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());
 
            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};
 
int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>());
}

先看看编译程序的Makefile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CXX := g++ 
CXXFLAGS := -g -Wall
INCLUDE := -I$(HOME)/hadoop/c++/Linux-amd64-64/include
LIBS := -L$(HOME)/hadoop/c++/Linux-amd64-64/lib -lhadooppipes -lhadooputils -lpthread -lcrypto
 
TARGET := wordcount
 
.PHONY: all clean
 
all: $(TARGET)
 
wordcount: wordcount.o
        $(CXX) $(CXXFLAGS) $^ -o $@ $(LIBS)
 
.cpp.o:
        $(CXX) $(CXXFLAGS) -c $< $(INCLUDE)
 
clean:
        rm -f $(TARGET) *.o

链接时要加上-lcrypto,安装libssl-dev,如果不加这个就会报错:”undefined reference to `EVP_*’(undefined reference to `HMAC_*’或undefined reference to `BIO_*’)”。

编译完后把生成的可执行文件wordcount上传到hdfs中:

hadoop dfs -put wordcount wordcount

然后准备测试文件wordcount-input(随便输入一些单词),并且把测试文件也上传到hdfs中:

hadoop dfs -put wordcount-input wordcount-input

上传完后看看是否上传成功:

hadoop@debian:wordcount$ hadoop dfs -ls
Found 2 items
-rw-r--r--   1 hadoop supergroup     261399 2012-08-12 10:56 /user/hadoop/wordcount
-rw-r--r--   1 hadoop supergroup         42 2012-08-12 10:56 /user/hadoop/wordcount-input

一切准备好之后就可以运行程序了:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=true \
           -D hadoop.pipes.java.recordwriter=true \
           -input wordcount-input -output wordcount-output \
           -program wordcount

在运行过程中hadoop会输出进度和其它的一些相关信息。运行完的结果在hdfs的wordcount-output目录下:

hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS
drwx------   - hadoop supergroup          0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs
-rw-r--r--   1 hadoop supergroup         33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000

最后查看运行结果:

hadoop dfs -cat wordcount-output/part-00000

和java版本类似,pipes要求至少有mapper和reducer两个类:WordCountMapper和WordCountReducer,分别继承HadoopPipes::Mapper和HadoopPipes::Reducer,并分别实现其中的纯虚函数map()和reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。

在c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh中看到,除了必须实现的mapper和reducer类外,还可以指定partitioner,combiner,reader和writer。如果不指定的话就使用默认的,在执行程序的命令中指定”hadoop.pipes.java.recordreader”和”hadoop.pipes.java.recordwriter”为true表示使用hadoop默认的reader和writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。

程序使用默认的RecordReader为LineRecordReader,即把输入文件按行拆分成{行偏移量, 内容}这样的{key, value},然后传递给map()函数。

在WordCountMapper的map()中,使用context.getInputKey()可以获取输入的key,由于程序中不关心行号,因此并没有使用;context.getInputValue()就是文件中某一行的内容。words保存了按照空格拆分后的所有单词,并通过context.emit()函数返回每个单词及其出现的次数给下一阶段。

shuffle阶段是由hadoop替我们完成的。hadoop会把map()输出的{单词, 出现次数}这样的key/value对进行排序和合并,并且把结果传给reducer。

WordCountReducer中的reducer()函数收到了经过排序和合并后的结果:{key: value1, value2, …}。每个reducer收到的内容都是一个key以及和这个key相关的values,在这个程序中key就是一个单词,values就是每个单词出现的次数,因此程序中通过context.nextValue()获取所有出现次数并累加,最后通过context.emit()把统计得到的{单词, 出现次数}结果传给writer。

程序使用默认的RecordWriter为LineRecordWriter,输出格式为”key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。

参考资料

[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程

建议继续学习:

  1. Facebook的实时Hadoop系统    (阅读:10585)
  2. hadoop rpc机制 && 将avro引入hadoop rpc机制初探    (阅读:5105)
  3. Hadoop的map/reduce作业输入非UTF-8编码数据的处理原理    (阅读:4602)
  4. Hadoop超级安装手册    (阅读:3981)
  5. Hadoop集群间Hadoop方案探讨    (阅读:3747)
  6. 百度是如何使用hadoop的    (阅读:3921)
  7. 使用hadoop进行大规模数据的全局排序    (阅读:3453)
  8. Hadoop安装端口已经被占用问题的解决方法    (阅读:2965)
  9. Hadoop现有测试框架探幽    (阅读:2798)
  10. 分布式计算平台Hadoop 发展现状乱而稳定的解读    (阅读:2804)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1