Wednesday, February 18, 2009

[Python]Python线程简例

第一部分:最简单的用法


Python提供了非常简单的线程用法。最简单的用法只要遵循下面几步:
  1. 定义一个类(比如下面例子里的Job)继承threading.Thread
  2. 在该类中必须实现__init__(初始化)以及run函数(该线程需要执行的代码).注意在__init__当中要调用Thread.__init__。其他成员函数和变量可以自己根据需要添加。
  3. 用start函数启动线程。
  4. (可选)用join函数来阻塞线程直至线程结束。

若不理解join的作用,可以看如下例子:
import os
import time
from threading import Thread
class Job(Thread):
def __init__(self, cmd):
Thread.__init__(self)
self.cmd = cmd
self.starttime = time.time()
def run(self):
os.popen(self.cmd)
print "time %.2fs, job %s is done"%(time.time()-self.starttime,self.cmd)
jobs = []
for i in range(1, 5):
j = Job("sleep %d"%i)
jobs.append(j)
j.start()
for j in jobs:
j.join()
print "time %.2fs, job %s returned"%(time.time() - j.starttime, j.cmd)

运行得到结果:
time 1.00s, job sleep 1 is done
time 1.00s, job sleep 1 returned
time 2.00s, job sleep 2 is done
time 2.00s, job sleep 2 returned
time 3.00s, job sleep 3 is done
time 3.00s, job sleep 3 returned
time 4.00s, job sleep 4 is done
time 4.00s, job sleep 4 returned

如果我们把最后一个循环的顺序该一下,变成倒叙(如下),其余部分不变
for j in reversed(jobs):
j.join()
print "time %.2fs, job %s returned"%(time.time() - j.starttime, j.cmd)
得到结果:
time 1.00s, job sleep 1 is done
time 2.00s, job sleep 2 is done
time 3.00s, job sleep 3 is done
time 4.00s, job sleep 4 is done
time 4.00s, job sleep 4 returned
time 4.01s, job sleep 3 returned
time 4.01s, job sleep 2 returned
time 4.01s, job sleep 1 returned
所以当你调用join时,仅仅是检查该线程是否以及结束。如果结束则返回;否则阻塞直至结束。
再来作一点变换, 对join加上参数timeout。 timeout指定调用join后多久停止阻塞。如果没有指定该参数(就像我们前面做得这样),则一直阻塞到线程结束:
for j in reversed(jobs):
j.join(0.5)
print "time %.2fs, job %s returned"%(time.time() - j.starttime, j.cmd)

我们得到
time 0.52s, job sleep 1 returned
time 1.00s, job sleep 1 is done
time 1.02s, job sleep 2 returned
time 1.52s, job sleep 3 returned
time 2.00s, job sleep 2 is done
time 2.02s, job sleep 4 returned
time 3.00s, job sleep 3 is done
time 4.00s, job sleep 4 is done

第二部分: 一个例子

在这个例子里面,我们使用指定数目个线程来完成一批任务。就好比1000个矿要采,但我们就只用9个农民轮流去采。

在这个例子里我们定义了一个Jobs类(乔布斯?)和一个Worker类。Jobs类通过继承Queue来维护一个任务队列。如果要添加新任务,就调用newjob。newjob函数的参数决定了一个任务:它们包括一个名字(name),一个函数入口(func)以及相应的参数(args,kargs)。每个Worker类的实例对应一个线程,当线程开始执行或者idle的时候,就从任务队列中取一个任务,并执行这个任务。完成之后再接着做下一个,周而复始直至所有任务完成(任务队列为空)。
class Jobs(Queue.Queue):
def __init__(self):
Queue.Queue.__init__(self)
def newjob(self, name, func, *args, **kargs):
if not callable(func):
raise RuntimeError, str(func) + ' not a callable object'
self.put((name, func, args, kargs))
def do(self, numthreads = 1, debug = True):
threads = []
for i in range(numthreads):
t = Worker("thread%d"%i, self, debug)
threads.append(t)
t.start()
for t in threads:
t.join()

class Worker(threading.Thread):
def __init__(self, name, jobs, debug = False):
threading.Thread.__init__(self)
self.name = name
self.jobs = jobs
self.debug = debug
def run(self):
while not self.jobs.empty():
try:
job = self.jobs.get(True, 1)
(name, func, args, kargs) = job
except:
break
stime = time()
func(*args, **kargs)
if self.debug:
print "%s is done by %s, fin in %.2f s"%(name, self.name, time()-stime

No comments: