这是我对这个 PPT 中的内容译了一下,简单总结一下,放过来做一个例子,这个拿出是来用指导使用 Gearman 来实现 MapReduce 的。下次我再写一个国外有关 MySQL 的 MapReduce 的应用的文章.真希望国内也有人能好好分享这些,很多大公司都用着开源技术不好意思讲出来。
原 ppt 的地址是:http://www.slideshare.net/jamespitts/gearman
MapReduce的是什么?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(化简)".
简单来讲,就是给大量的输入的 key/value 的键值对 reduced 成少量的key/value 的键值对。
简单实例
从两家公司处理通过 MapReduce 处理申报文件。
实例功能介绍
- 主程序进行文档的分类分割. 分配给各自的 Mapper worker 来处理。
- Mapper worker 遍历分配的内容。 Map 的功能会对每一个键进行提取,以 key/value 来存为中间数据来存储。
- 当 map 完了分配的内容。Master 的程序通过 Gearman 分配给 Reducer worker 来处理输入进来的中间数据。
- 由 Reducer worker 来遍历分配的中间数据,通过本身的 Reduce 功能接收 key 和设置这个的值,然后返回值。Work 会给唯一的键/值对做为输出的数据。
实例架构
有二个 Gearman Job 服务器运行着。分别用来处理调度 map 和 reduce 的可用的 worker .
Gearman::XS::Client
Master 的 Gearman client 是用来控制MapReduce 的管道的。
Gearman::XS::Worker
Mappers Reducers
Mapper 和 Reducer workers 是 Perl 的程序,用来读和处理接收到的任务。
The Master Process
(1. 这个程序是用来设置 Mapper 和 Reducer 的 Gearman 的 clients.
连接 mapper client 到 gearman server 的 4730 的端口。
连接 reducer client 到 gearman server 的 4731 的端口。
产生一个唯一的reducer id 给每个 reducer 的 Worker.这是用来后来做输出识别的.
use strict;
use warnings;
use Gearman::XS qw(:constants);
use Gearman::XS::Client;
use FreezeThaw qw(freeze thaw cmpStr safeFreeze cmpStrHard);
# set up the mapper
print "Connect to the Mapper gearman servers.\n";
my $mapper = new Gearman::XS::Client;
if ($mapper->add_server('localhost', 4730) != GEARMAN_SUCCESS) {
printf(STDERR "%s\n", $mapper->error());
exit(1);
}
# set up the reducer, generate a sortable unique id
print "Connect to the Reducer gearman servers.\n";
my $reducer = new Gearman::XS::Client;
if ($reducer->add_server('localhost', 4731) != GEARMAN_SUCCESS) {
printf(STDERR "%s\n", $reducer->error());
exit(1);
}
my $reducer_id = time . '_' . join "", map { ("a".."z", 0..9)[rand 36] } (1..4);(2. 以公司 ID 来 split 来分配任务提交各自的 Mapper 任务
根据公司 ID 来进行简单的数组 split。 jobs 哈希是定义用来存 mappers 和 reducers 的二者的句柄。
从分割的任务通过 gearman 的 background 提交成 mapper job 给gearman.
这的 job handle 是 gearman 生成的唯一的 ID ,拿这个来做标识存到 jobs 的 hash 来保存任务信息。
对 frozen 过的数据进行 split 的操作。
# submit jobs with each split to the mappers
my ($ret, $job_handle);
my $jobs = {};
foreach my $split (@splits) {
# submit a mapper job to be performed by gearman workers
($ret, $job_handle) = $mapper->do_background( 'mapper',
freeze ({ # workload
'split' => int($split)
})
);
# add this to the jobs to be monitored
if ($ret == GEARMAN_SUCCESS) {
print "> Begin mapping $split with job_handle=$job_handle.\n";
$jobs->{$job_handle} = {
mapper => 1, split => int($split), gearman_client => $mapper
};
} else {
printf(STDERR "%s\n", $mapper->error()) and die;
}
# sleep for a tenth of a sec
select(undef, undef, undef, 0.10);
}(3. 任务监控:当 Mapper 的任务完成时,提交新的 Reduce 任务.
通过不断的循环在内部监控 Mapper 和 Reducer 任务进度。
这些任务的状态是通过上面存起来的 job handle 来查询的。
如果任务完成,并且这个任务是 mapper 的任务,就立即开始 reducer 的任务(detailed in the next slide).
所有的任务完成后删除 jobs 中的任务 hash 。
while (1) {
# stop if there are no more jobs
last unless (keys %$jobs);
# check each job, run reducer when a mapper is done
foreach $job_handle (sort keys %$jobs) {
# get the job status from this job's gearman client
($return_value, $is_status_known, $running_status, $status_num, $status_denom) =
$jobs->{$job_handle}->{gearman_client}->job_status($job_handle);
# this job is done
unless ($running_status) {
# this is a complete mapper job... run its reducer
if ($jobs->{$job_handle}->{mapper}) {
....
}
delete $jobs->{$job_handle};
}
# sleep for a tenth of a sec
select(undef, undef, undef, 0.10);
}
}(4. 提交 Reducer 任务的细节
以 gearman 的 background 来提交 reducer 的任务。
在这也是 gearman 对提交的任务也是生成唯一的 ID 存 job handle,在次记录放到 jobs 的 hash 中。
给 split 和 reducer_id 在 frozen 后传给 job.
if ($jobs->{$job_handle}->{mapper}) {
warn("< Done mapping " . $jobs->{$job_handle}->{split} . " with job_handle=$job_handle.\n");
# submit a reducer job to be performed by gearman workers
($ret, $re_job_handle) = $reducer->do_background( 'reducer',
freeze ({ # workload
'split' => int($jobs->{$job_handle}->{split}),
'reducer_id' => $reducer_id
})
);
# add this to the jobs to be monitored
if ($ret == GEARMAN_SUCCESS) {
print ">> Begin reducing " . $jobs->{$job_handle}->{split} . " with job_handle=$re_job_handle\n";
$jobs->{$re_job_handle} = { reducer => 1, split => $jobs->{$job_handle}->{split}, gearman_client => $reducer };
} else {
printf(STDERR "%s\n", $mapper->error()) and die;
}
}处理完就如下图显示
这个程序的 Demo 的下载地址:http://annarbor.pm.org/meetings/downloads/20101208/demo_gearman_mapreduce-1.0.tar.gz