IT技术博客大学习 共学习 共进步

使用 Perl 中的 Gearman来实现 MapReduce

扶凯 2011-07-31 12:49:52 浏览 3,862 次

这是我对这个 PPT 中的内容译了一下,简单总结一下,放过来做一个例子,这个拿出是来用指导使用 Gearman 来实现 MapReduce 的。下次我再写一个国外有关 MySQL 的 MapReduce 的应用的文章.真希望国内也有人能好好分享这些,很多大公司都用着开源技术不好意思讲出来。
原 ppt 的地址是:http://www.slideshare.net/jamespitts/gearman

MapReduce的是什么?
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(化简)".
简单来讲,就是给大量的输入的 key/value 的键值对 reduced 成少量的key/value 的键值对。

简单实例
从两家公司处理通过 MapReduce 处理申报文件。

实例功能介绍

  1. 主程序进行文档的分类分割. 分配给各自的 Mapper worker 来处理。
  2. Mapper worker 遍历分配的内容。 Map 的功能会对每一个键进行提取,以 key/value 来存为中间数据来存储。
  3. 当 map 完了分配的内容。Master 的程序通过 Gearman 分配给 Reducer worker 来处理输入进来的中间数据。
  4. 由 Reducer worker 来遍历分配的中间数据,通过本身的 Reduce 功能接收 key 和设置这个的值,然后返回值。Work 会给唯一的键/值对做为输出的数据。

实例架构
有二个 Gearman Job 服务器运行着。分别用来处理调度 map 和 reduce 的可用的 worker .

Gearman::XS::Client

Master 的 Gearman client 是用来控制MapReduce 的管道的。

Gearman::XS::Worker

Mappers Reducers
Mapper 和 Reducer workers 是 Perl 的程序,用来读和处理接收到的任务。

The Master Process
(1. 这个程序是用来设置 Mapper 和 Reducer 的 Gearman 的 clients.
连接 mapper client 到 gearman server 的 4730 的端口。
连接 reducer client 到 gearman server 的 4731 的端口。
产生一个唯一的reducer id 给每个 reducer 的 Worker.这是用来后来做输出识别的.

use strict;
use warnings;
 
use Gearman::XS qw(:constants);
use Gearman::XS::Client;
use FreezeThaw qw(freeze thaw cmpStr safeFreeze cmpStrHard);
 
# set up the mapper
print "Connect to the Mapper gearman servers.\n";
my $mapper = new Gearman::XS::Client;
if ($mapper->add_server('localhost', 4730) != GEARMAN_SUCCESS) {
        printf(STDERR "%s\n", $mapper->error());
        exit(1);
}
 
# set up the reducer, generate a sortable unique id
print "Connect to the Reducer gearman servers.\n";
my $reducer = new Gearman::XS::Client;
if ($reducer->add_server('localhost', 4731) != GEARMAN_SUCCESS) {
        printf(STDERR "%s\n", $reducer->error());
        exit(1);
}
my $reducer_id = time . '_' . join "", map { ("a".."z", 0..9)[rand 36] } (1..4);

(2. 以公司 ID 来 split 来分配任务提交各自的 Mapper 任务
根据公司 ID 来进行简单的数组 split。 jobs 哈希是定义用来存 mappers 和 reducers 的二者的句柄。
从分割的任务通过 gearman 的 background 提交成 mapper job 给gearman.
这的 job handle 是 gearman 生成的唯一的 ID ,拿这个来做标识存到 jobs 的 hash 来保存任务信息。
对 frozen 过的数据进行 split 的操作。

# submit jobs with each split to the mappers
my ($ret, $job_handle);
my $jobs = {};
foreach my $split (@splits) {
        # submit a mapper job to be performed by gearman workers
        ($ret, $job_handle) = $mapper->do_background( 'mapper',
                freeze ({ # workload
                        'split' => int($split)
                })
        );
 
        # add this to the jobs to be monitored
        if ($ret == GEARMAN_SUCCESS) {
                print "> Begin mapping $split with job_handle=$job_handle.\n";
                $jobs->{$job_handle} = {
                        mapper => 1, split => int($split), gearman_client => $mapper
                };
 
        } else {
                printf(STDERR "%s\n", $mapper->error()) and die;
        }
 
        # sleep for a tenth of a sec
        select(undef, undef, undef, 0.10);
}

(3. 任务监控:当 Mapper 的任务完成时,提交新的 Reduce 任务.
通过不断的循环在内部监控 Mapper 和 Reducer 任务进度。
这些任务的状态是通过上面存起来的 job handle 来查询的。
如果任务完成,并且这个任务是 mapper 的任务,就立即开始 reducer 的任务(detailed in the next slide).
所有的任务完成后删除 jobs 中的任务 hash 。

while (1) {   
    # stop if there are no more jobs
    last unless (keys %$jobs);
 
    # check each job, run reducer when a mapper is done
    foreach $job_handle (sort keys %$jobs) {
 
        # get the job status from this job's gearman client
        ($return_value, $is_status_known, $running_status, $status_num, $status_denom) = 
                $jobs->{$job_handle}->{gearman_client}->job_status($job_handle);
 
        # this job is done
        unless ($running_status) {
            # this is a complete mapper job... run its reducer
            if ($jobs->{$job_handle}->{mapper}) {
                 ....
            }
            delete $jobs->{$job_handle};
         }
           # sleep for a tenth of a sec
           select(undef, undef, undef, 0.10);
      }
}

(4. 提交 Reducer 任务的细节
以 gearman 的 background 来提交 reducer 的任务。
在这也是 gearman 对提交的任务也是生成唯一的 ID 存 job handle,在次记录放到 jobs 的 hash 中。
给 split 和 reducer_id 在 frozen 后传给 job.

if ($jobs->{$job_handle}->{mapper}) {
 
    warn("< Done mapping " . $jobs->{$job_handle}->{split} . " with job_handle=$job_handle.\n");
 
    # submit a reducer job to be performed by gearman workers
    ($ret, $re_job_handle) = $reducer->do_background( 'reducer',
            freeze ({ # workload
                'split' => int($jobs->{$job_handle}->{split}), 
                'reducer_id' => $reducer_id 
                })  
            );  
 
    # add this to the jobs to be monitored
    if ($ret == GEARMAN_SUCCESS) {
        print ">> Begin reducing " . $jobs->{$job_handle}->{split} . " with job_handle=$re_job_handle\n";
        $jobs->{$re_job_handle} = { reducer => 1, split => $jobs->{$job_handle}->{split}, gearman_client => $reducer };
    } else {
        printf(STDERR "%s\n", $mapper->error()) and die;
    }   
 
}

处理完就如下图显示

 这个程序的 Demo 的下载地址:http://annarbor.pm.org/meetings/downloads/20101208/demo_gearman_mapreduce-1.0.tar.gz

建议继续学习

  1. 进程运行于不同的 CPU 核 (阅读 5,821)
  2. Gearman Server 使用 MySQL UDFs 来管理和保持队列 (阅读 5,760)
  3. 利用开源的Gearman框架构建分布式图片处理平台[原创] (阅读 5,223)
  4. 利用Gearman来实现远程监控与管理 (阅读 4,101)
  5. Gearman分布式远程过程处理框架 (阅读 4,040)
  6. 通过eclipse调试MapReduce任务 (阅读 3,861)
  7. 使用 Gearman 实现分布式处理 (阅读 3,580)
  8. 《big data glossary》之MapReduce (阅读 3,580)
  9. Gearman for MySQL (阅读 3,441)
  10. 基于glusterfs和gearman的离线任务运算分布式化方案介绍 (阅读 3,122)