对于操作『函数对象』来说,使用 Python 装饰器是一种非常优雅,非常 Pythonic 的一个方式。而在这篇文章中,对于任何一个普通的函数,只需要在函数定义前加一个装饰器调用,即可使得这一函数被调用时自动加入特定的任务队列,成为异步调用,而不会阻塞主线程。
实现 Producer/Consumer 型任务队列
# -*- coding: utf-8 -*-
from Queue import Queue
from threading import Thread
from collections import namedtuple
Job = namedtuple('Job', ('func', 'args', 'kwargs', 'result_obj'))
首先,我们先声明了一个 namedtuple ,其中包含以下几个元素:
func
:需要加入任务队列的目标函数对象args
和kwargs
:目标函数对象的参数result_obj
:用于保存目标函数返回值的一个字典, 装饰器函数将目标函数加入任务队列之后,会直接返回一个{'result': None, 'done': False}
的字典。等到目标函数异步执行完成之后, Worker 会用返回值替换这一字典的result
,并设置done
为True
class Worker(Thread):
def __init__(self, queue):
"""
:type queue: Queue
"""
super(Worker, self).__init__()
self.queue = queue
self._stopped = False
self._stop_done = False
def run(self):
while True:
if self._stopped is True:
break
task = self.queue.get()
result = task.func(*task.args, **task.kwargs)
task.result_obj['result'] = result
task.result_obj['done'] = True
self._stop_done = True
def stop(self):
self._stopped = True
Worker 和普通的 Consumer 定义基本没有区别,唯一要注意的地方是,记得要用函数返回值,替换掉任务返回值字典的'result'
,并修改任务状态,即
task.result_obj['result'] = result
task.result_obj['done'] = True
class QueueHandler(object):
def __init__(self, workers=5):
self.job_queue = Queue()
self.workers = [Worker(self.job_queue) for i in xrange(workers)]
for worker in self.workers:
worker.setDaemon(True)
worker.start()
def queue_up(self, func):
def _queue_up(*args, **kwargs):
tmp_obj = {'result': None, 'done': False}
self. job_queue.put(
Job(
func = func,
args = args,
kwargs = kwargs,
result_obj=tmp_obj
)
)
return tmp_obj
return _queue_up
同样是比较标准的 Producer ,其中queue_up
函数就是这次的主角了。对于目标函数,先构建一个返回值字典,然后将目标函数和参数加入任务队列,最后返回这一返回值字典。非常简单的结构,但是使用起来非常的方便。
实际测试
这里我构造了一个用来测试的代码
import time, random
testQueue = QueueHandler()
class TestClass(object):
def __init__(self):
self.some_value = 10
self.some_str = "Empty"
@testQueue.queue_up
def some_operation(self, jobid, some_para, another_para='blablabla'):
time.sleep(random.randrange(1,5))
self.some_value = some_para
self.some_str = another_para
print "ID: {} ---Now, value: {}, str: {}\n".format(jobid, self.some_value, self.some_str)
return jobid+100000
if __name__ == '__main__':
testClass = TestClass()
rs = []
for i in range(10):
rs.append(
testClass.some_operation(i, i+1000, another_para='{0}'.format('-'*i))
)
for i in range(10):
time.sleep(1)
print rs
读者可以尝试删除掉some_operation
函数前面的装饰器,对比有无这个装饰器对程序运行的影响。
可以看到,对于需要测试的some_operation
函数,我仅仅是在其定义前加上了一个装饰器的调用,就使得这个函数的执行不会阻塞主线程了。并且对于异步调用函数的返回值,也很好的进行了保留传递。