Pipes是hadoop提供的c++接口,但是在官网上找不到pipes的文档,只能从例子开始一点点摸索。实验环境是debian 6 amd64,hadoop 1.0.3。hadoop的安装目录是$HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。
为了少敲些字符,给hadoop命令做了一个alias:
alias hadoop='$HOME/hadoop/bin/hadoop'单词统计程序
下面的程序是对hadoop 1.0.3自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:
|
|
先看看编译程序的Makefile:
|
|
链接时要加上-lcrypto,安装libssl-dev,如果不加这个就会报错:”undefined reference to `EVP_*’(undefined reference to `HMAC_*’或undefined reference to `BIO_*’)”。
编译完后把生成的可执行文件wordcount上传到hdfs中:
hadoop dfs -put wordcount wordcount然后准备测试文件wordcount-input(随便输入一些单词),并且把测试文件也上传到hdfs中:
hadoop dfs -put wordcount-input wordcount-input上传完后看看是否上传成功:
hadoop@debian:wordcount$ hadoop dfs -ls
Found 2 items
-rw-r--r-- 1 hadoop supergroup 261399 2012-08-12 10:56 /user/hadoop/wordcount
-rw-r--r-- 1 hadoop supergroup 42 2012-08-12 10:56 /user/hadoop/wordcount-input
一切准备好之后就可以运行程序了:
hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input wordcount-input -output wordcount-output \
-program wordcount在运行过程中hadoop会输出进度和其它的一些相关信息。运行完的结果在hdfs的wordcount-output目录下:
hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS
drwx------ - hadoop supergroup 0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs
-rw-r--r-- 1 hadoop supergroup 33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000最后查看运行结果:
hadoop dfs -cat wordcount-output/part-00000和java版本类似,pipes要求至少有mapper和reducer两个类:WordCountMapper和WordCountReducer,分别继承HadoopPipes::Mapper和HadoopPipes::Reducer,并分别实现其中的纯虚函数map()和reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。
在c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh中看到,除了必须实现的mapper和reducer类外,还可以指定partitioner,combiner,reader和writer。如果不指定的话就使用默认的,在执行程序的命令中指定”hadoop.pipes.java.recordreader”和”hadoop.pipes.java.recordwriter”为true表示使用hadoop默认的reader和writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。
程序使用默认的RecordReader为LineRecordReader,即把输入文件按行拆分成{行偏移量, 内容}这样的{key, value},然后传递给map()函数。
在WordCountMapper的map()中,使用context.getInputKey()可以获取输入的key,由于程序中不关心行号,因此并没有使用;context.getInputValue()就是文件中某一行的内容。words保存了按照空格拆分后的所有单词,并通过context.emit()函数返回每个单词及其出现的次数给下一阶段。
shuffle阶段是由hadoop替我们完成的。hadoop会把map()输出的{单词, 出现次数}这样的key/value对进行排序和合并,并且把结果传给reducer。
WordCountReducer中的reducer()函数收到了经过排序和合并后的结果:{key: value1, value2, …}。每个reducer收到的内容都是一个key以及和这个key相关的values,在这个程序中key就是一个单词,values就是每个单词出现的次数,因此程序中通过context.nextValue()获取所有出现次数并累加,最后通过context.emit()把统计得到的{单词, 出现次数}结果传给writer。
程序使用默认的RecordWriter为LineRecordWriter,输出格式为”key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。
参考资料
[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程