IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

hadoop笔记 (2):pipes例子分析 (1)

ou的笔记ou的笔记 2012-09-17 23:15:39 累计浏览 2,043 次
本机暂存

Pipes是hadoop提供的c++接口,但是在官网上找不到pipes的文档,只能从例子开始一点点摸索。实验环境是debian 6 amd64,hadoop 1.0.3。hadoop的安装目录是$HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。

为了少敲些字符,给hadoop命令做了一个alias:

alias hadoop='$HOME/hadoop/bin/hadoop'

单词统计程序

下面的程序是对hadoop 1.0.3自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:

wordcount.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <string>
#include <vector>
using namespace std;
 
#include <hadoop/Pipes.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/TemplateFactory.hh>
 
class WordCountMapper : public HadoopPipes::Mapper {
 
    public:
 
        WordCountMapper(HadoopPipes::TaskContext& context) {}
 
        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};
 
class WordCountReducer : public HadoopPipes::Reducer {
 
    public:
 
        WordCountReducer(HadoopPipes::TaskContext& context) {}
 
        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;
 
            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());
 
            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};
 
int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>());
}

先看看编译程序的Makefile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CXX := g++ 
CXXFLAGS := -g -Wall
INCLUDE := -I$(HOME)/hadoop/c++/Linux-amd64-64/include
LIBS := -L$(HOME)/hadoop/c++/Linux-amd64-64/lib -lhadooppipes -lhadooputils -lpthread -lcrypto
 
TARGET := wordcount
 
.PHONY: all clean
 
all: $(TARGET)
 
wordcount: wordcount.o
        $(CXX) $(CXXFLAGS) $^ -o $@ $(LIBS)
 
.cpp.o:
        $(CXX) $(CXXFLAGS) -c $< $(INCLUDE)
 
clean:
        rm -f $(TARGET) *.o

链接时要加上-lcrypto,安装libssl-dev,如果不加这个就会报错:”undefined reference to `EVP_*’(undefined reference to `HMAC_*’或undefined reference to `BIO_*’)”。

编译完后把生成的可执行文件wordcount上传到hdfs中:

hadoop dfs -put wordcount wordcount

然后准备测试文件wordcount-input(随便输入一些单词),并且把测试文件也上传到hdfs中:

hadoop dfs -put wordcount-input wordcount-input

上传完后看看是否上传成功:

hadoop@debian:wordcount$ hadoop dfs -ls
Found 2 items
-rw-r--r--   1 hadoop supergroup     261399 2012-08-12 10:56 /user/hadoop/wordcount
-rw-r--r--   1 hadoop supergroup         42 2012-08-12 10:56 /user/hadoop/wordcount-input

一切准备好之后就可以运行程序了:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=true \
           -D hadoop.pipes.java.recordwriter=true \
           -input wordcount-input -output wordcount-output \
           -program wordcount

在运行过程中hadoop会输出进度和其它的一些相关信息。运行完的结果在hdfs的wordcount-output目录下:

hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS
drwx------   - hadoop supergroup          0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs
-rw-r--r--   1 hadoop supergroup         33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000

最后查看运行结果:

hadoop dfs -cat wordcount-output/part-00000

和java版本类似,pipes要求至少有mapper和reducer两个类:WordCountMapper和WordCountReducer,分别继承HadoopPipes::Mapper和HadoopPipes::Reducer,并分别实现其中的纯虚函数map()和reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。

在c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh中看到,除了必须实现的mapper和reducer类外,还可以指定partitioner,combiner,reader和writer。如果不指定的话就使用默认的,在执行程序的命令中指定”hadoop.pipes.java.recordreader”和”hadoop.pipes.java.recordwriter”为true表示使用hadoop默认的reader和writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。

程序使用默认的RecordReader为LineRecordReader,即把输入文件按行拆分成{行偏移量, 内容}这样的{key, value},然后传递给map()函数。

在WordCountMapper的map()中,使用context.getInputKey()可以获取输入的key,由于程序中不关心行号,因此并没有使用;context.getInputValue()就是文件中某一行的内容。words保存了按照空格拆分后的所有单词,并通过context.emit()函数返回每个单词及其出现的次数给下一阶段。

shuffle阶段是由hadoop替我们完成的。hadoop会把map()输出的{单词, 出现次数}这样的key/value对进行排序和合并,并且把结果传给reducer。

WordCountReducer中的reducer()函数收到了经过排序和合并后的结果:{key: value1, value2, …}。每个reducer收到的内容都是一个key以及和这个key相关的values,在这个程序中key就是一个单词,values就是每个单词出现的次数,因此程序中通过context.nextValue()获取所有出现次数并累加,最后通过context.emit()把统计得到的{单词, 出现次数}结果传给writer。

程序使用默认的RecordWriter为LineRecordWriter,输出格式为”key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。

参考资料

[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程

同分类推荐文章

  1. 等了十年的 Go 链式管道,终于来了:seq 让你像写 Scala 一样写 Go (2026-06-25 18:38:18)
  2. Go 实验特性详解 (2026-06-21 10:05:27)
  3. amd64 微架构级别对 Go 程序性能提升多少? (2026-06-21 09:38:49)

查看更多 后端 文章 →

建议继续学习

  1. HFile存储格式 (累计阅读 15,977)
  2. Zookeeper工作原理 (累计阅读 12,203)
  3. Facebook的实时Hadoop系统 (累计阅读 11,497)
  4. 海量数据面试题举例 (累计阅读 11,115)
  5. 如何学好C++语言 (累计阅读 10,450)
  6. Emacs配置C/C++-mode的代码智能提示和自动补全 (累计阅读 10,414)
  7. colortail,让 tail 命令绚丽起来 (累计阅读 10,260)
  8. 在C++中实现foreach循环,比for_each更简洁! (累计阅读 9,500)
  9. 几个内存相关面试题(c/c++) (累计阅读 9,447)
  10. 关于使用STL的红黑树map还是hashmap的问题 (累计阅读 8,877)