使用 Perl 中的 Gearman来实现 MapReduce
这是我对这个 PPT 中的内容译了一下,简单总结一下,放过来做一个例子,这个拿出是来用指导使用 Gearman 来实现 MapReduce 的。下次我再写一个国外有关 MySQL 的 MapReduce 的应用的文章.真希望国内也有人能好好分享这些,很多大公司都用着开源技术不好意思讲出来。
原 ppt 的地址是:http://www.slideshare.net/jamespitts/gearman
MapReduce的是什么?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(化简)".
简单来讲,就是给大量的输入的 key/value 的键值对 reduced 成少量的key/value 的键值对。
简单实例
从两家公司处理通过 MapReduce 处理申报文件。
实例功能介绍
- 主程序进行文档的分类分割. 分配给各自的 Mapper worker 来处理。
- Mapper worker 遍历分配的内容。 Map 的功能会对每一个键进行提取,以 key/value 来存为中间数据来存储。
- 当 map 完了分配的内容。Master 的程序通过 Gearman 分配给 Reducer worker 来处理输入进来的中间数据。
- 由 Reducer worker 来遍历分配的中间数据,通过本身的 Reduce 功能接收 key 和设置这个的值,然后返回值。Work 会给唯一的键/值对做为输出的数据。
实例架构
有二个 Gearman Job 服务器运行着。分别用来处理调度 map 和 reduce 的可用的 worker .
Gearman::XS::Client
Master 的 Gearman client 是用来控制MapReduce 的管道的。
Gearman::XS::Worker
Mappers Reducers
Mapper 和 Reducer workers 是 Perl 的程序,用来读和处理接收到的任务。
The Master Process
(1. 这个程序是用来设置 Mapper 和 Reducer 的 Gearman 的 clients.
连接 mapper client 到 gearman server 的 4730 的端口。
连接 reducer client 到 gearman server 的 4731 的端口。
产生一个唯一的reducer id 给每个 reducer 的 Worker.这是用来后来做输出识别的.
use strict; use warnings; use Gearman::XS qw(:constants); use Gearman::XS::Client; use FreezeThaw qw(freeze thaw cmpStr safeFreeze cmpStrHard); # set up the mapper print "Connect to the Mapper gearman servers.\n"; my $mapper = new Gearman::XS::Client; if ($mapper->add_server('localhost', 4730) != GEARMAN_SUCCESS) { printf(STDERR "%s\n", $mapper->error()); exit(1); } # set up the reducer, generate a sortable unique id print "Connect to the Reducer gearman servers.\n"; my $reducer = new Gearman::XS::Client; if ($reducer->add_server('localhost', 4731) != GEARMAN_SUCCESS) { printf(STDERR "%s\n", $reducer->error()); exit(1); } my $reducer_id = time . '_' . join "", map { ("a".."z", 0..9)[rand 36] } (1..4);
(2. 以公司 ID 来 split 来分配任务提交各自的 Mapper 任务
根据公司 ID 来进行简单的数组 split。 jobs 哈希是定义用来存 mappers 和 reducers 的二者的句柄。
从分割的任务通过 gearman 的 background 提交成 mapper job 给gearman.
这的 job handle 是 gearman 生成的唯一的 ID ,拿这个来做标识存到 jobs 的 hash 来保存任务信息。
对 frozen 过的数据进行 split 的操作。
# submit jobs with each split to the mappers my ($ret, $job_handle); my $jobs = {}; foreach my $split (@splits) { # submit a mapper job to be performed by gearman workers ($ret, $job_handle) = $mapper->do_background( 'mapper', freeze ({ # workload 'split' => int($split) }) ); # add this to the jobs to be monitored if ($ret == GEARMAN_SUCCESS) { print "> Begin mapping $split with job_handle=$job_handle.\n"; $jobs->{$job_handle} = { mapper => 1, split => int($split), gearman_client => $mapper }; } else { printf(STDERR "%s\n", $mapper->error()) and die; } # sleep for a tenth of a sec select(undef, undef, undef, 0.10); }
(3. 任务监控:当 Mapper 的任务完成时,提交新的 Reduce 任务.
通过不断的循环在内部监控 Mapper 和 Reducer 任务进度。
这些任务的状态是通过上面存起来的 job handle 来查询的。
如果任务完成,并且这个任务是 mapper 的任务,就立即开始 reducer 的任务(detailed in the next slide).
所有的任务完成后删除 jobs 中的任务 hash 。
while (1) { # stop if there are no more jobs last unless (keys %$jobs); # check each job, run reducer when a mapper is done foreach $job_handle (sort keys %$jobs) { # get the job status from this job's gearman client ($return_value, $is_status_known, $running_status, $status_num, $status_denom) = $jobs->{$job_handle}->{gearman_client}->job_status($job_handle); # this job is done unless ($running_status) { # this is a complete mapper job... run its reducer if ($jobs->{$job_handle}->{mapper}) { .... } delete $jobs->{$job_handle}; } # sleep for a tenth of a sec select(undef, undef, undef, 0.10); } }
(4. 提交 Reducer 任务的细节
以 gearman 的 background 来提交 reducer 的任务。
在这也是 gearman 对提交的任务也是生成唯一的 ID 存 job handle,在次记录放到 jobs 的 hash 中。
给 split 和 reducer_id 在 frozen 后传给 job.
if ($jobs->{$job_handle}->{mapper}) { warn("< Done mapping " . $jobs->{$job_handle}->{split} . " with job_handle=$job_handle.\n"); # submit a reducer job to be performed by gearman workers ($ret, $re_job_handle) = $reducer->do_background( 'reducer', freeze ({ # workload 'split' => int($jobs->{$job_handle}->{split}), 'reducer_id' => $reducer_id }) ); # add this to the jobs to be monitored if ($ret == GEARMAN_SUCCESS) { print ">> Begin reducing " . $jobs->{$job_handle}->{split} . " with job_handle=$re_job_handle\n"; $jobs->{$re_job_handle} = { reducer => 1, split => $jobs->{$job_handle}->{split}, gearman_client => $reducer }; } else { printf(STDERR "%s\n", $mapper->error()) and die; } }
处理完就如下图显示
这个程序的 Demo 的下载地址:http://annarbor.pm.org/meetings/downloads/20101208/demo_gearman_mapreduce-1.0.tar.gz
建议继续学习:
- Gearman Server 使用 MySQL UDFs 来管理和保持队列 (阅读:4855)
- 进程运行于不同的 CPU 核 (阅读:4383)
- 利用开源的Gearman框架构建分布式图片处理平台[原创] (阅读:4247)
- Gearman分布式远程过程处理框架 (阅读:3325)
- 利用Gearman来实现远程监控与管理 (阅读:3020)
- 通过eclipse调试MapReduce任务 (阅读:2941)
- 使用 Gearman 实现分布式处理 (阅读:2799)
- Gearman for MySQL (阅读:2592)
- 《big data glossary》之MapReduce (阅读:2588)
- 基于glusterfs和gearman的离线任务运算分布式化方案介绍 (阅读:2321)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:扶凯 来源: 扶凯
- 标签: Gearman MapReduce
- 发布时间:2011-07-31 12:49:52
- [70] Go Reflect 性能
- [68] 如何拿下简短的域名
- [65] Oracle MTS模式下 进程地址与会话信
- [63] 图书馆的世界纪录
- [62] IOS安全–浅谈关于IOS加固的几种方法
- [61] 【社会化设计】自我(self)部分――欢迎区
- [59] android 开发入门
- [54] 视觉调整-设计师 vs. 逻辑
- [49] 界面设计速成
- [48] 读书笔记-壹百度:百度十年千倍的29条法则