IT技术博客大学习 共学习 共进步

Java Worker 设计模式

淘宝网综合业务平台团队博客 2012-09-20 13:42:18 累计浏览 3,664 次
本机暂存

Worker模式

想解决的问题

异步执行一些任务,有返回或无返回结果

使用动机

有些时候想执行一些异步任务,如异步网络通信、daemon任务,但又不想去管理这任务的生命周。这个时候可以使用Worker模式,它会帮您管理与执行任务,并能非常方便地获取结果

结构

很多人可能为觉得这与executor很像,但executor是多线程的,它的作用更像是一个规划中心。而Worker则只是个搬运工,它自己本身只有一个线程的。每个worker有自己的任务处理逻辑,为了实现这个目的,有两种方式

    1. 建立一个抽象的AbstractWorker,不同逻辑的worker对其进行不同的实现;

    2. 对worker新增一个TaskProcessor不同的任务传入不同的processor即可。

第二种方式worker的角色可以很方便地改变,而且可以随时更换processor,可以理解成可”刷机”的worker

     ^ ^。这里我们使用第二种方式来介绍此模式的整体结构。原图已失效

针对上图,详细介绍一下几个角色:

  • ConfigurableWorker:顾名思义这个就是真正干活的worker了。要实现自我生命周期管理,需要实现Runable,这样其才能以单独的线程运行,需要注意的是work最好以daemon线程的方式运行。worker里面还包括几个其它成员:taskQueue,一个阻塞性质的queue,一般BlockingArrayList就可以了,这样任务是FIFO(先进先出)的,如果要考虑任务的优先级,则可以考虑使用PriorityBlockingQueue;listeners,根据事件进行划分的事件监听者,以便于当一个任务完成的时候进行处理,需要注意的是,为了较高效地进行listener遍历,这里我推荐使用CopyOnWriteArrayList,免得每次都复制。其对应的方法有addlistener、addTask等配套方法,这个都不多说了,更详细的可以看后面的示例代码。
  • WorkerTask:实际上这是一个抽象的工内容,其包括基本的id与,task的ID是Worker生成的,相当于递wtte后的一个执回,当数据执行完了的时候需要使用这个id来取结果。而后面真正实现的实体task则包含任务处理时需要的数据。
  • Processor:为了实现可”刷机”的worker,我们将处理逻辑与worker分开来,processor的本职工作很简单,只需要加工传入的task数据即可,加工完成后触发fireEvent(WorkerEvent.TASK_COMPLETE)事件,之后通过Future的get即可得到最终的数据。
  • 另外再说一点,对于addTask,可以有一个overload的方法,即在输入task的同时,传入一个RejectPolice,这样可以在size过大的时候做出拒绝操作,有效避免被撑死。

    适用性/问题

    这种设计能自动处理任务,并能根据任务的优先级自动调节任务的执行顺序,一个完全独立的thread,你完全可以将其理解成一专门负责干某种活的”机器人”。它可以用于处理一些定时、请求量固定均匀且对实时性要求不是太高的任务,如日志记录,数据分析等。当然,如果想提高任务处理的数据,可以生成多个worker,就相当于雇佣更多的人来为你干活,非常直观的。当然这样一来,谁来维护这worker便成了一个问题,另外就目前这种设计下worker之间是没有通信与协同的,这些都是改进点。

    那么对于多个worker,有什么组织方式呢?这里我介绍三种,算是抛砖引玉:

    流水线式worker(assembly-line worker)

    就像生产车间上的流水线工人一样,将任务切分成几个小块,每个worker负责自己的一部分,以提高整体的生产、产出效率,如下图:

        原图已失效

    假设完成任务 t 需要的时间为:W(t)=n,那么将任务分解成m份,流水线式的执行,每小份需要的时间便为 W(t/m)=n/m,那么执行1000条任务的时间,单个为1000n,流水线长度为L,则用这种方式所用的时间为(1000-1)*(m-L+1)*n/m+n

         其中L

    多级反馈队列(Multilevel Feedback Queue)

        这是一个有Q1、Q2…Qn个多重流水线方式,从高到低分别代码不同的优先级,高优先级的worker要多于低优先级的,一般是2的倍数,即Q4有16个worker、Q3有8个,后面类推。任务根据预先估计好的优先级进入,如果任务在某步的执行过长,直接踢到下一级,让出最快的资源。如下图所示:

        原图已失效

    显然这种方式的好处就在于可以动态地调整任务的优级,及时做出反应。当然,为了实现更好的高度,我们可以在低级里增加一个阀值,使得放偶然放入低级的task可以有复活的机会^

         ^。

    MapReduce式

        流水线虽然有一定的并行性,但总体来说仍然是串行的,因为只要有一个节点出了问题,那都是致命的错误。MapReduce是Google率先实现的一个分布式算法,有非常好的并行执行效率。

        原图已失效

        如上图所示,只要我们将Map与Reduce都改成Worker就行了,如MapWorker与ReduceWorker。这样,可以看见,Map的过程是完全并行的,当然这样就需要在Map与Reduce上的分配与数据组合上稍稍下一点功夫了。

    样例实现

        这里我们实现一个PageURLMiningWorker,对给定的URL,打开页面后,采取所有的URL,并反回结果进行汇总输出。由于时间有限,这里我只实现了单worker与MapReduce worker集两种方式,有兴趣的同学可以实现其它类型,如多级反馈队列。注意!我这里只是向大家展示这种设计模式,URL

         抓取的效率不在本次考虑之列。

        所有的代码可以在这里获取:https://github.com/sefler1987/javaworker

    单Worker实现样例

  • package com.alibaba.taobao.main;
  • import java.util.Arrays;
  • import java.util.List;
  • import java.util.concurrent.ConcurrentHashMap;
  • import java.util.concurrent.ConcurrentSkipListSet;
  • import java.util.concurrent.TimeUnit;
  • import com.alibaba.taobao.worker.ConfigurableWorker;
  • import com.alibaba.taobao.worker.SimpleURLComparator;
  • import com.alibaba.taobao.worker.WorkerEvent;
  • import com.alibaba.taobao.worker.WorkerListener;
  • import com.alibaba.taobao.worker.WorkerTask;
  • import com.alibaba.taobao.worker.linear.PageURLMiningProcessor;
  • import com.alibaba.taobao.worker.linear.PageURLMiningTask;
  • /**
  •  * Linear version of page URL mining. It’s slow but simple.
  •  * Average time cost for 1000 URLs is: 3800ms
  •  *
  •  * @author xuanyin.zy E-mail:xuanyin.zy@taobao.com
  •  * @since Sep 16, 2012 5:35:40 PM
  •  */
  • public class LinearURLMiningMain implements WorkerListener {
  •     private static final String EMPTY_STRING = ”";
  •     private static final int URL_SIZE_TO_MINE = 10000;
  •     private static ConcurrentHashMap> taskID2TaskMap = new ConcurrentHashMap>();
  •     private static ConcurrentSkipListSet foundURLs = new ConcurrentSkipListSet(new SimpleURLComparator());
  •     public static void main(String[] args) throws InterruptedException {
  •         long startTime = System.currentTimeMillis();
  •         ConfigurableWorker worker = new ConfigurableWorker(“W001″);
  •         worker.setTaskProcessor(new PageURLMiningProcessor());
  •         addTask2Worker(worker, new PageURLMiningTask(“http://www.taobao.com”));
  •         addTask2Worker(worker, new PageURLMiningTask(“http://www.xinhuanet.com”));
  •         addTask2Worker(worker, new PageURLMiningTask(“http://www.zol.com.cn”));
  •         addTask2Worker(worker, new PageURLMiningTask(“http://www.163.com”));
  •         LinearURLMiningMain mainListener = new LinearURLMiningMain();
  •         worker.addListener(mainListener);
  •         worker.start();
  •         String targetURL = EMPTY_STRING;
  •         while (foundURLs.size() < URL_SIZE_TO_MINE) {
  •             targetURL = foundURLs.pollFirst();
  •             if (targetURL == null) {
  •                 TimeUnit.MILLISECONDS.sleep(50);
  •                 continue;
  •             }
  •             PageURLMiningTask task = new PageURLMiningTask(targetURL);
  •             taskID2TaskMap.putIfAbsent(worker.addTask(task), task);
  •             TimeUnit.MILLISECONDS.sleep(100);
  •         }
  •         worker.stop();
  •         for (String string : foundURLs) {
  •             System.out.println(string);
  •         }
  •         System.out.println(“Time Cost: ” + (System.currentTimeMillis() - startTime) + ”ms”);
  •     }
  •     private static void addTask2Worker(ConfigurableWorker mapWorker_1, PageURLMiningTask task) {
  •         String taskID = mapWorker_1.addTask(task);
  •         taskID2TaskMap.put(taskID, task);
  •     }
  •     @Override
  •     public List intrests() {
  •         return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
  •     }
  •     @Override
  •     public void onEvent(WorkerEvent event, Object… args) {
  •         if (WorkerEvent.TASK_FAILED == event) {
  •             System.err.println(“Error while extracting URLs”);
  •             return;
  •         }
  •         if (WorkerEvent.TASK_COMPLETE != event)
  •             return;
  •         PageURLMiningTask task = (PageURLMiningTask) args[0];
  •         if (!taskID2TaskMap.containsKey(task.getTaskID()))
  •             return;
  •         foundURLs.addAll(task.getMinedURLs());
  •         System.out.println(“Found URL size: ” + foundURLs.size());
  •         taskID2TaskMap.remove(task.getTaskID());
  •     }
  • }
  • MapReduce实现样例

  • package com.alibaba.taobao.main;
  • import java.util.ArrayList;
  • import java.util.Arrays;
  • import java.util.List;
  • import java.util.concurrent.ConcurrentHashMap;
  • import java.util.concurrent.ConcurrentSkipListSet;
  • import java.util.concurrent.TimeUnit;
  • import com.alibaba.taobao.worker.ConfigurableWorker;
  • import com.alibaba.taobao.worker.SimpleURLComparator;
  • import com.alibaba.taobao.worker.WorkerEvent;
  • import com.alibaba.taobao.worker.WorkerListener;
  • import com.alibaba.taobao.worker.WorkerTask;
  • import com.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
  • import com.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
  • import com.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
  • import com.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;
  • /**
  •  * MapReduce version of page URL mining. It’s very powerful.
  •  *
  •  * @author xuanyin.zy E-mail:xuanyin.zy@taobao.com
  •  * @since Sep 16, 2012 5:35:40 PM
  •  */
  • public class MapReduceURLMiningMain implements WorkerListener {
  •     private static final String EMPTY_STRING = ”";
  •     private static final int URL_SIZE_TO_MINE = 10000;
  •     private static ConcurrentHashMap> taskID2TaskMap = new ConcurrentHashMap>();
  •     private static ConcurrentSkipListSet foundURLs = new ConcurrentSkipListSet(new SimpleURLComparator());
  •     public static void main(String[] args) throws InterruptedException {
  •         long startTime = System.currentTimeMillis();
  •         // four mapers
  •         List mappers = new ArrayList(4);
  •         ConfigurableWorker mapWorker_1 = new ConfigurableWorker(“W_M1″);
  •         ConfigurableWorker mapWorker_2 = new ConfigurableWorker(“W_M2″);
  •         ConfigurableWorker mapWorker_3 = new ConfigurableWorker(“W_M3″);
  •         ConfigurableWorker mapWorker_4 = new ConfigurableWorker(“W_M4″);
  •         mapWorker_1.setTaskProcessor(new PageContentFetchProcessor());
  •         mapWorker_2.setTaskProcessor(new PageContentFetchProcessor());
  •         mapWorker_3.setTaskProcessor(new PageContentFetchProcessor());
  •         mapWorker_4.setTaskProcessor(new PageContentFetchProcessor());
  •         mappers.add(mapWorker_1);
  •         mappers.add(mapWorker_2);
  •         mappers.add(mapWorker_3);
  •         mappers.add(mapWorker_4);
  •         // one reducer
  •         ConfigurableWorker reduceWorker_1 = new ConfigurableWorker(“W_R1″);
  •         reduceWorker_1.setTaskProcessor(new URLMatchingProcessor());
  •         // bind reducer to final result class
  •         MapReduceURLMiningMain main = new MapReduceURLMiningMain();
  •         reduceWorker_1.addListener(main);
  •         // initiate tasks
  •         addTask2Worker(mapWorker_1, new MapReducePageURLMiningTask(“http://www.taobao.com”));
  •         addTask2Worker(mapWorker_2, new MapReducePageURLMiningTask(“http://www.xinhuanet.com”));
  •         addTask2Worker(mapWorker_3, new MapReducePageURLMiningTask(“http://www.zol.com.cn”));
  •         addTask2Worker(mapWorker_4, new MapReducePageURLMiningTask(“http://www.sina.com.cn/”));
  •         // bind mapper to reduer
  •         Map2ReduceConnector connector = new Map2ReduceConnector(Arrays.asList(reduceWorker_1));
  •         mapWorker_1.addListener(connector);
  •         mapWorker_2.addListener(connector);
  •         mapWorker_3.addListener(connector);
  •         mapWorker_4.addListener(connector);
  •         // start all
  •         mapWorker_1.start();
  •         mapWorker_2.start();
  •         mapWorker_3.start();
  •         mapWorker_4.start();
  •         reduceWorker_1.start();
  •         String targetURL = EMPTY_STRING;
  •         int lastIndex = 0;
  •         while (foundURLs.size() < URL_SIZE_TO_MINE) {
  •             targetURL = foundURLs.pollFirst();
  •             if (targetURL == null) {
  •                 TimeUnit.MILLISECONDS.sleep(50);
  •                 continue;
  •             }
  •             lastIndex = ++lastIndex % mappers.size();
  •             MapReducePageURLMiningTask task = new MapReducePageURLMiningTask(targetURL);
  •             taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task), task);
  •             TimeUnit.MILLISECONDS.sleep(100);
  •         }
  •         // stop all
  •         mapWorker_1.stop();
  •         mapWorker_2.stop();
  •         mapWorker_3.stop();
  •         mapWorker_4.stop();
  •         reduceWorker_1.stop();
  •         for (String string : foundURLs) {
  •             System.out.println(string);
  •         }
  •         System.out.println(“Time Cost: ” + (System.currentTimeMillis() - startTime) + ”ms”);
  •     }
  •     private static void addTask2Worker(ConfigurableWorker mapWorker_1, MapReducePageURLMiningTask task) {
  •         String taskID = mapWorker_1.addTask(task);
  •         taskID2TaskMap.put(taskID, task);
  •     }
  •     @Override
  •     public List intrests() {
  •         return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
  •     }
  •     @Override
  •     public void onEvent(WorkerEvent event, Object… args) {
  •         if (WorkerEvent.TASK_FAILED == event) {
  •             System.err.println(“Error while extracting URLs”);
  •             return;
  •         }
  •         if (WorkerEvent.TASK_COMPLETE != event)
  •             return;
  •         MapReducePageURLMiningTask task = (MapReducePageURLMiningTask) args[0];
  •         if (!taskID2TaskMap.containsKey(task.getTaskID()))
  •             return;
  •         foundURLs.addAll(task.getMinedURLs());
  •         System.out.println(“Found URL size: ” + foundURLs.size());
  •         taskID2TaskMap.remove(task.getTaskID());
  •     }
  • }
  • 结果对比

    Y轴为抓取X轴URL个数所用的时间

    原图已失效

    总结

        我们可以看到,worker模式组合是非常灵活的,它真的就像一个活生生的工人,任你调配。使用worker,我们可以更方便地实现更复杂的结构。

    同分类推荐文章

    1. Day for Night (2026-05-29 17:57:14)
    2. Your Prototype Is Not Being Honest With Your Users (And Here’s How To Fix It) (2026-05-25 20:00:00)
    3. Fixed-Height Cards: More Fragile Than They Look (2026-05-04 22:01:36)

    查看更多 设计 文章 →

    建议继续学习

    1. SmartSprites - 命令行形式的CSS Sprites生成器 (累计阅读 123,800)
    2. Java开发岗位面试题归类汇总 (累计阅读 21,941)
    3. android 开发入门 (累计阅读 19,421)
    4. 我的PHP,Python和Ruby之路 (累计阅读 13,061)
    5. HashMap解决hash冲突的方法 (累计阅读 12,542)
    6. 一个大二学生有关如何成为一名软件工程师的疑问及答复 (累计阅读 9,100)
    7. 最常被程序员们谎称读过的计算机书籍 (累计阅读 9,082)
    8. Java程序员应该知道的10个eclipse调试技巧 (累计阅读 7,900)
    9. 如何让员工忠于公司? (累计阅读 7,860)
    10. Java技术路线 (累计阅读 7,662)