hadoop笔记 (2):pipes例子分析 (1)
Pipes是hadoop提供的c++接口,但是在官网上找不到pipes的文档,只能从例子开始一点点摸索。实验环境是debian 6 amd64,hadoop 1.0.3。hadoop的安装目录是$HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。
为了少敲些字符,给hadoop命令做了一个alias:
alias hadoop='$HOME/hadoop/bin/hadoop'
单词统计程序
下面的程序是对hadoop 1.0.3自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
#include <string> #include <vector> using namespace std; #include <hadoop/Pipes.hh> #include <hadoop/StringUtils.hh> #include <hadoop/TemplateFactory.hh> class WordCountMapper : public HadoopPipes::Mapper { public: WordCountMapper(HadoopPipes::TaskContext& context) {} void map(HadoopPipes::MapContext& context) { vector<string> words = HadoopUtils::splitString(context.getInputValue(), " "); for(unsigned int i = 0; i < words.size(); ++i) context.emit(words[i], "1"); } }; class WordCountReducer : public HadoopPipes::Reducer { public: WordCountReducer(HadoopPipes::TaskContext& context) {} void reduce(HadoopPipes::ReduceContext& context) { int sum = 0; while (context.nextValue()) sum += HadoopUtils::toInt(context.getInputValue()); context.emit(context.getInputKey(), HadoopUtils::toString(sum)); } }; int main(void) { return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>()); } |
先看看编译程序的Makefile:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
CXX := g++ CXXFLAGS := -g -Wall INCLUDE := -I$(HOME)/hadoop/c++/Linux-amd64-64/include LIBS := -L$(HOME)/hadoop/c++/Linux-amd64-64/lib -lhadooppipes -lhadooputils -lpthread -lcrypto TARGET := wordcount .PHONY: all clean all: $(TARGET) wordcount: wordcount.o $(CXX) $(CXXFLAGS) $^ -o $@ $(LIBS) .cpp.o: $(CXX) $(CXXFLAGS) -c $< $(INCLUDE) clean: rm -f $(TARGET) *.o |
链接时要加上-lcrypto,安装libssl-dev,如果不加这个就会报错:”undefined reference to `EVP_*’(undefined reference to `HMAC_*’或undefined reference to `BIO_*’)”。
编译完后把生成的可执行文件wordcount上传到hdfs中:
hadoop dfs -put wordcount wordcount
然后准备测试文件wordcount-input(随便输入一些单词),并且把测试文件也上传到hdfs中:
hadoop dfs -put wordcount-input wordcount-input
上传完后看看是否上传成功:
hadoop@debian:wordcount$ hadoop dfs -ls Found 2 items -rw-r--r-- 1 hadoop supergroup 261399 2012-08-12 10:56 /user/hadoop/wordcount -rw-r--r-- 1 hadoop supergroup 42 2012-08-12 10:56 /user/hadoop/wordcount-input
一切准备好之后就可以运行程序了:
hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -input wordcount-input -output wordcount-output \ -program wordcount
在运行过程中hadoop会输出进度和其它的一些相关信息。运行完的结果在hdfs的wordcount-output目录下:
hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output Found 3 items -rw-r--r-- 1 hadoop supergroup 0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS drwx------ - hadoop supergroup 0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs -rw-r--r-- 1 hadoop supergroup 33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000
最后查看运行结果:
hadoop dfs -cat wordcount-output/part-00000
和java版本类似,pipes要求至少有mapper和reducer两个类:WordCountMapper和WordCountReducer,分别继承HadoopPipes::Mapper和HadoopPipes::Reducer,并分别实现其中的纯虚函数map()和reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。
在c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh中看到,除了必须实现的mapper和reducer类外,还可以指定partitioner,combiner,reader和writer。如果不指定的话就使用默认的,在执行程序的命令中指定”hadoop.pipes.java.recordreader”和”hadoop.pipes.java.recordwriter”为true表示使用hadoop默认的reader和writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。
程序使用默认的RecordReader为LineRecordReader,即把输入文件按行拆分成{行偏移量, 内容}这样的{key, value},然后传递给map()函数。
在WordCountMapper的map()中,使用context.getInputKey()可以获取输入的key,由于程序中不关心行号,因此并没有使用;context.getInputValue()就是文件中某一行的内容。words保存了按照空格拆分后的所有单词,并通过context.emit()函数返回每个单词及其出现的次数给下一阶段。
shuffle阶段是由hadoop替我们完成的。hadoop会把map()输出的{单词, 出现次数}这样的key/value对进行排序和合并,并且把结果传给reducer。
WordCountReducer中的reducer()函数收到了经过排序和合并后的结果:{key: value1, value2, …}。每个reducer收到的内容都是一个key以及和这个key相关的values,在这个程序中key就是一个单词,values就是每个单词出现的次数,因此程序中通过context.nextValue()获取所有出现次数并累加,最后通过context.emit()把统计得到的{单词, 出现次数}结果传给writer。
程序使用默认的RecordWriter为LineRecordWriter,输出格式为”key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。
参考资料
[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程
建议继续学习:
- Facebook的实时Hadoop系统 (阅读:10577)
- hadoop rpc机制 && 将avro引入hadoop rpc机制初探 (阅读:5097)
- Hadoop的map/reduce作业输入非UTF-8编码数据的处理原理 (阅读:4578)
- Hadoop超级安装手册 (阅读:3975)
- Hadoop集群间Hadoop方案探讨 (阅读:3739)
- 百度是如何使用hadoop的 (阅读:3878)
- 使用hadoop进行大规模数据的全局排序 (阅读:3443)
- Hadoop安装端口已经被占用问题的解决方法 (阅读:2956)
- Hadoop现有测试框架探幽 (阅读:2792)
- 分布式计算平台Hadoop 发展现状乱而稳定的解读 (阅读:2800)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:ou 来源: ou的笔记ou的笔记
- 标签: hadoop pipes
- 发布时间:2012-09-17 23:15:39
- [66] Oracle MTS模式下 进程地址与会话信
- [65] Go Reflect 性能
- [64] 如何拿下简短的域名
- [60] android 开发入门
- [59] 图书馆的世界纪录
- [59] 【社会化设计】自我(self)部分――欢迎区
- [58] IOS安全–浅谈关于IOS加固的几种方法
- [54] 视觉调整-设计师 vs. 逻辑
- [48] 界面设计速成
- [48] 读书笔记-壹百度:百度十年千倍的29条法则