技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 系统架构 --> 使用 Perl 中的 Gearman来实现 MapReduce

使用 Perl 中的 Gearman来实现 MapReduce

浏览:3061次  出处信息

这是我对这个 PPT 中的内容译了一下,简单总结一下,放过来做一个例子,这个拿出是来用指导使用 Gearman 来实现 MapReduce 的。下次我再写一个国外有关 MySQL 的 MapReduce 的应用的文章.真希望国内也有人能好好分享这些,很多大公司都用着开源技术不好意思讲出来。
原 ppt 的地址是:http://www.slideshare.net/jamespitts/gearman

MapReduce的是什么?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(化简)".
简单来讲,就是给大量的输入的 key/value 的键值对 reduced 成少量的key/value 的键值对。

简单实例
从两家公司处理通过 MapReduce 处理申报文件。

实例功能介绍

  1. 主程序进行文档的分类分割. 分配给各自的 Mapper worker 来处理。
  2. Mapper worker 遍历分配的内容。 Map 的功能会对每一个键进行提取,以 key/value 来存为中间数据来存储。
  3. 当 map 完了分配的内容。Master 的程序通过 Gearman 分配给 Reducer worker 来处理输入进来的中间数据。
  4. 由 Reducer worker 来遍历分配的中间数据,通过本身的 Reduce 功能接收 key 和设置这个的值,然后返回值。Work 会给唯一的键/值对做为输出的数据。

 

实例架构
有二个 Gearman Job 服务器运行着。分别用来处理调度 map 和 reduce 的可用的 worker .

Gearman::XS::Client

Master 的 Gearman client 是用来控制MapReduce 的管道的。

 

Gearman::XS::Worker

Mappers Reducers
Mapper 和 Reducer workers 是 Perl 的程序,用来读和处理接收到的任务。

The Master Process
(1. 这个程序是用来设置 Mapper 和 Reducer 的 Gearman 的 clients.
连接 mapper client 到 gearman server 的 4730 的端口。
连接 reducer client 到 gearman server 的 4731 的端口。
产生一个唯一的reducer id 给每个 reducer 的 Worker.这是用来后来做输出识别的.

use strict;
use warnings;
 
use Gearman::XS qw(:constants);
use Gearman::XS::Client;
use FreezeThaw qw(freeze thaw cmpStr safeFreeze cmpStrHard);
 
# set up the mapper
print "Connect to the Mapper gearman servers.\n";
my $mapper = new Gearman::XS::Client;
if ($mapper->add_server('localhost', 4730) != GEARMAN_SUCCESS) {
        printf(STDERR "%s\n", $mapper->error());
        exit(1);
}
 
# set up the reducer, generate a sortable unique id
print "Connect to the Reducer gearman servers.\n";
my $reducer = new Gearman::XS::Client;
if ($reducer->add_server('localhost', 4731) != GEARMAN_SUCCESS) {
        printf(STDERR "%s\n", $reducer->error());
        exit(1);
}
my $reducer_id = time . '_' . join "", map { ("a".."z", 0..9)[rand 36] } (1..4);

 

(2. 以公司 ID 来 split 来分配任务提交各自的 Mapper 任务
根据公司 ID 来进行简单的数组 split。 jobs 哈希是定义用来存 mappers 和 reducers 的二者的句柄。
从分割的任务通过 gearman 的 background 提交成 mapper job 给gearman.
这的 job handle 是 gearman 生成的唯一的 ID ,拿这个来做标识存到 jobs 的 hash 来保存任务信息。
对 frozen 过的数据进行 split 的操作。

# submit jobs with each split to the mappers
my ($ret, $job_handle);
my $jobs = {};
foreach my $split (@splits) {
        # submit a mapper job to be performed by gearman workers
        ($ret, $job_handle) = $mapper->do_background( 'mapper',
                freeze ({ # workload
                        'split' => int($split)
                })
        );
 
        # add this to the jobs to be monitored
        if ($ret == GEARMAN_SUCCESS) {
                print "> Begin mapping $split with job_handle=$job_handle.\n";
                $jobs->{$job_handle} = {
                        mapper => 1, split => int($split), gearman_client => $mapper
                };
 
        } else {
                printf(STDERR "%s\n", $mapper->error()) and die;
        }
 
        # sleep for a tenth of a sec
        select(undef, undef, undef, 0.10);
}

(3. 任务监控:当 Mapper 的任务完成时,提交新的 Reduce 任务.
通过不断的循环在内部监控 Mapper 和 Reducer 任务进度。
这些任务的状态是通过上面存起来的 job handle 来查询的。
如果任务完成,并且这个任务是 mapper 的任务,就立即开始 reducer 的任务(detailed in the next slide).
所有的任务完成后删除 jobs 中的任务 hash 。

while (1) {   
    # stop if there are no more jobs
    last unless (keys %$jobs);
 
    # check each job, run reducer when a mapper is done
    foreach $job_handle (sort keys %$jobs) {
 
        # get the job status from this job's gearman client
        ($return_value, $is_status_known, $running_status, $status_num, $status_denom) = 
                $jobs->{$job_handle}->{gearman_client}->job_status($job_handle);
 
        # this job is done
        unless ($running_status) {
            # this is a complete mapper job... run its reducer
            if ($jobs->{$job_handle}->{mapper}) {
                 ....delete $jobs->{$job_handle};# sleep for a tenth of a sec
           select(undef, undef, undef, 0.10);
      }
}

(4. 提交 Reducer 任务的细节
以 gearman 的 background 来提交 reducer 的任务。
在这也是 gearman 对提交的任务也是生成唯一的 ID 存 job handle,在次记录放到 jobs 的 hash 中。
给 split 和 reducer_id 在 frozen 后传给 job.

if ($jobs->{$job_handle}->{mapper}) {
 
    warn("< Done mapping " . $jobs->{$job_handle}->{split} . " with job_handle=$job_handle.\n");
 
    # submit a reducer job to be performed by gearman workers
    ($ret, $re_job_handle) = $reducer->do_background( 'reducer',
            freeze ({ # workload
                'split' => int($jobs->{$job_handle}->{split}), 
                'reducer_id' => $reducer_id 
                })  
            );  
 
    # add this to the jobs to be monitored
    if ($ret == GEARMAN_SUCCESS) {
        print ">> Begin reducing " . $jobs->{$job_handle}->{split} . " with job_handle=$re_job_handle\n";
        $jobs->{$re_job_handle} = { reducer => 1, split => $jobs->{$job_handle}->{split}, gearman_client => $reducer };
    } else {
        printf(STDERR "%s\n", $mapper->error()) and die;
    }   
 
}

 

处理完就如下图显示

 这个程序的 Demo 的下载地址:http://annarbor.pm.org/meetings/downloads/20101208/demo_gearman_mapreduce-1.0.tar.gz

建议继续学习:

  1. Gearman Server 使用 MySQL UDFs 来管理和保持队列    (阅读:4865)
  2. 进程运行于不同的 CPU 核    (阅读:4414)
  3. 利用开源的Gearman框架构建分布式图片处理平台[原创]    (阅读:4259)
  4. Gearman分布式远程过程处理框架    (阅读:3330)
  5. 利用Gearman来实现远程监控与管理    (阅读:3030)
  6. 通过eclipse调试MapReduce任务    (阅读:2956)
  7. 使用 Gearman 实现分布式处理    (阅读:2807)
  8. Gearman for MySQL    (阅读:2600)
  9. 《big data glossary》之MapReduce    (阅读:2604)
  10. 基于glusterfs和gearman的离线任务运算分布式化方案介绍    (阅读:2334)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1