使用 Python 装饰器来将普通函数加入任务队列

2016/7/14 23:41 下午 posted in  Python

对于操作『函数对象』来说,使用 Python 装饰器是一种非常优雅,非常 Pythonic 的一个方式。而在这篇文章中,对于任何一个普通的函数,只需要在函数定义前加一个装饰器调用,即可使得这一函数被调用时自动加入特定的任务队列,成为异步调用,而不会阻塞主线程。

实现 Producer/Consumer 型任务队列

# -*- coding: utf-8 -*-

from Queue import Queue
from threading import Thread
from collections import namedtuple

Job = namedtuple('Job', ('func', 'args', 'kwargs', 'result_obj'))

首先,我们先声明了一个 namedtuple ,其中包含以下几个元素:

  1. func :需要加入任务队列的目标函数对象
  2. argskwargs :目标函数对象的参数
  3. result_obj :用于保存目标函数返回值的一个字典, 装饰器函数将目标函数加入任务队列之后,会直接返回一个{'result': None, 'done': False}的字典。等到目标函数异步执行完成之后, Worker 会用返回值替换这一字典的result,并设置doneTrue
class Worker(Thread):

    def __init__(self, queue):
        """
        :type queue: Queue
        """
        super(Worker, self).__init__()
        self.queue = queue
        self._stopped = False
        self._stop_done = False

    def run(self):
        while True:
            if self._stopped is True:
                break

            task = self.queue.get()
            result = task.func(*task.args, **task.kwargs)
            task.result_obj['result'] = result
            task.result_obj['done'] = True

        self._stop_done = True

    def stop(self):
        self._stopped = True
        

Worker 和普通的 Consumer 定义基本没有区别,唯一要注意的地方是,记得要用函数返回值,替换掉任务返回值字典的'result',并修改任务状态,即

task.result_obj['result'] = result
task.result_obj['done'] = True
        
class QueueHandler(object):

    def __init__(self, workers=5):
        self.job_queue = Queue()
        self.workers = [Worker(self.job_queue) for i in xrange(workers)]
        for worker in self.workers:
            worker.setDaemon(True)
            worker.start()

    def queue_up(self, func):

        def _queue_up(*args, **kwargs):
            tmp_obj = {'result': None, 'done': False}

            self. job_queue.put(
                Job(
                    func = func,
                    args = args,
                    kwargs = kwargs,
                    result_obj=tmp_obj
                )
            )
            return tmp_obj
        return _queue_up

同样是比较标准的 Producer ,其中queue_up函数就是这次的主角了。对于目标函数,先构建一个返回值字典,然后将目标函数和参数加入任务队列,最后返回这一返回值字典。非常简单的结构,但是使用起来非常的方便。

实际测试

这里我构造了一个用来测试的代码

import time, random

testQueue = QueueHandler()


class TestClass(object):

    def __init__(self):
        self.some_value = 10
        self.some_str = "Empty"

    @testQueue.queue_up
    def some_operation(self, jobid, some_para, another_para='blablabla'):
        time.sleep(random.randrange(1,5))
        self.some_value = some_para
        self.some_str = another_para
        print "ID: {} ---Now, value: {}, str: {}\n".format(jobid, self.some_value, self.some_str)
        return jobid+100000


if __name__ == '__main__':
    testClass = TestClass()
    rs = []
    for i in range(10):
        rs.append(
            testClass.some_operation(i, i+1000, another_para='{0}'.format('-'*i))
        )
    for i in range(10):
        time.sleep(1)
        print rs

读者可以尝试删除掉some_operation函数前面的装饰器,对比有无这个装饰器对程序运行的影响。

可以看到,对于需要测试的some_operation函数,我仅仅是在其定义前加上了一个装饰器的调用,就使得这个函数的执行不会阻塞主线程了。并且对于异步调用函数的返回值,也很好的进行了保留传递。