技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> Linux --> 协程与多任务调度

协程与多任务调度

浏览:1444次  出处信息

   在计算机科学中,多任务(multitasking)是指在同一个时间段内运行多个任务,现代计算机作为一个复杂的系统,运行的任务往往不止一个,所以多任务调度对于计算机来说尤为重要。现阶段多任务调度主要分为抢占式多任务和协作式多任务,抢占式多任务由操作系统决定进程的调度方案,而协作式多任务是当前任务主动放弃执行后,下一个任务继续进行。由于协作式任务管理受恶意程序的威胁更大,现阶段几乎所有的计算机都采用抢占式多任务管理。

现阶段,主要靠多进程或多线程的方式来实现多任务:

#include <stdio.h>
#include <unistd.h>
 
int main()
{
    pid_t pid;
    pid = fork();
 
    if(pid < 0){
        printf("Fork Error!\n");
    }else if (pid > 0){
        printf("This is the parent Process! Process Id is %d, Child id is %d\n",getpid(),pid);
        int i = 0;
        while(i < 10){
            printf("This is parent Process output of i %d!\n",i);
            i++;
        }
    }else if (pid == 0){
        printf("This is the child Process! Process Id is %d, parent id is %d\n",getpid(),getppid());
        int j = 0;
        while(j < 10){
            printf("This is child Process output of j %d\n",j);
            j++;
        }
    }
    return 0;
}

   

   在《协程与yield》中,我们说到了协程是一种比进程和线程更加轻量级的解决方案,也通过yield实现了协程,但最大的疑问是没有提供像进程或线程类的任务调度,没有体现出协程的优势,下面我们来实现一个简单的协程和协作式的多任务调度。

   首先我们需要对任务(Task)进行包装:

class Task():
    def __init__(self,taskid,coroutine):
        self.__taskId = taskid
        self.__coroutine = coroutine
        self.__sendValue = ''
        self.__beforeFirstYield = True
        self.isFinished = False
 
    def getTaskId(self):
        return self.__taskId
 
    def setValue(self,value):
        self.__sendValue == value
 
    def run(self):
        if(self.__beforeFirstYield):
            self.__beforeFirstYield = False
            return self.__coroutine.next()
        else:
            try:
                retval = self.__coroutine.send(self.__sendValue)
                return retval
            except StopIteration:
                self.isFinished = True
                return ""

   这里的“任务”类似系统的进程,有ID,有发送给用户程序的消息sendValue.

   接下来需要一个任务调度器,专门用来管理任务:

from Queue import Queue
class Scheduler():
    def __init__(self):
        self.taskQueue = Queue()
        self.maxTaskId = 0
        self.taskMap = dict()
 
    def scheduler(self,task):
        self.taskQueue.put(task)
 
    def newTask(self,coroutine):
        self.maxTaskId+=1
        task = Task(self.maxTaskId,coroutine)
        self.taskMap[self.maxTaskId] = task
        self.scheduler(task)
        return self.maxTaskId
 
        def KillTask(self,taskid):
        if  not taskid in self.taskMap:
            return False
        i = 0
        while i < self.taskQueue.qsize():
            tmp = self.taskQueue.get()
            if tmp == self.taskMap[taskid]:
                del self.taskMap[taskid]
                break
            else:
                self.scheduler(tmp)
            i+=1
        return True
 
    def run(self):
        while not self.taskQueue.empty():
            task = self.taskQueue.get()
            retval = task.run()
            if task.isFinished:
                tid = task.getTaskId()
                del self.taskMap[tid]
            else:
                self.scheduler(task)

   任务调度器是系统最核心的功能,相当于Linux中的init程序,用来管理所有的系统任务。其它任务通过注册到任务调度器来实现其功能:

def task1():
    i = 0
    while i < 10:
        print "This is task 1 i is %s"%i
        i+=1
        yield
 
def task2():
    i = 0
    while i < 10:
        print "This is task 2 i is %s"%i
        i+=1
        yield
 
sch = Scheduler()
sch.newTask(task1())
sch.newTask(task2())
sch.run()

   其结果输出如下,可以看出任务一和任务二确实是交替执行,实现了任务调度的功能

This is task 1 i is 0
This is task 2 i is 0
This is task 1 i is 1
This is task 2 i is 1
This is task 1 i is 2
This is task 2 i is 2
This is task 1 i is 3
This is task 2 i is 3
This is task 1 i is 4
This is task 2 i is 4
This is task 1 i is 5
This is task 2 i is 5
This is task 1 i is 6
This is task 2 i is 6
This is task 1 i is 7
This is task 2 i is 7
This is task 1 i is 8
This is task 2 i is 8
This is task 1 i is 9
This is task 2 i is 9

   上面我们实现多个任务的调度,它们能够很好的交替运行,yield在这里实现上提供类一个类似中断的功能,一旦系统出现yield,调度器会自动调用另外的任务继续运行。

   然而,在上面的例子中,一但我们把任务提交给调度器,对程序就没有了控制权,必须要等到任务运行结束。我们需要对任务有必要的控制权,如获取任务ID,结束任务,复制任务等等,这里需要用到和调度器的通信,这里就用到了yield的进行传值。类似Linux一样,我们可以给任务提供一些函数接口,任务通过yield把需要调用的函数传给调度器,调度器返回结果给任务,如下:

def task3():
    pid = yield getpid()
    print "This taskid is %d"%pid
    i = 0
    while i < 10:
        print "This is task 3 i is %d"%i
        yield

   要实现上面的调用,可以添加一个系统调用类:

class SysCall():
    def __init__(self,callback):
        self.__callback= callback
 
    def __call__(self,task,schedular):
        if not isinstance(task,Task):
            raise TypeError(task.__name__+" is not instance of Task")
        self.__callback(task,schedular)

   然后对Scheduler类的run方法作出更改

def run(self):
    while not self.taskQueue.empty():
            task = self.taskQueue.get()
            retval = task.run()
             
            if isinstance(retval,SysCall):
                retval(task,self)
                continue
 
            if task.isFinished:
                tid = task.getTaskId()
                del self.taskMap[tid]
            else:
                self.scheduler(task)

   然后添加供任务使用的接口函数:

def getpid():
    def tmp(task,schedular):
        task.setValue(task.getTaskId())
        schedular.scheduler(task)
    return SysCall(tmp)
def Killpid():
    def tmp(task,scheduler):
        task.setValue(scheduler.KillTask(taskid))
    return SysCall(tmp)
def fork():
    pass

   这里实现SysCall的主要目的是方便调度器对传递过去的函数类型进行控制,为了系统安全考虑,防止用户提交危险函数破坏系统,不属于SysCall类的函数一律不以运行。

   至此,我们实现了一个完整协程任务调度器,而不是利用yield进行简单的数据传递,yield是如此好用,以至于很多语言都逐渐加入对其的支持,如PHP5.5开始加入yield,
javascript 6(ECMAScript 6)也加入了对其的支持,虽然其使用起来有一些区别,但是原理是相通的,深入理解协程和yield,对于理解任务调度,系统原理意义重大。

文件下载

   coroutine.py

参考资料

  1. 在PHP中使用协程实现多任务调度

建议继续学习:

  1. 编程珠玑番外篇-Q 协程的历史,现在和未来    (阅读:3828)
  2. 一个“蝇量级” C 语言协程库    (阅读:3111)
  3. 在PHP中使用协程实现多任务调度    (阅读:2346)
  4. 协程并发模型及使用感受    (阅读:1464)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1