Python的threading和multiprocessing模块初探(3)

该实例假想的任务是:一个主进程会启动多个子进程分别处理不同的任务,各个子进程可能又有自己的线程用于不同的IO处理(前面说过,线程在IO方面还是不错的),要实现的功能是,对这些子进程发送信号,能被正确的处理,例如发生SIGTERM,子进程能通知其线程收工,然后“优雅”的退出。现在要解决的问题有:(1)在子类化的Process对象中如何捕捉信号;(2)如何“优雅的退出”。下面分别说明。

1、子类化Process并捕捉信号

如果是使用第一种进程创建方式(传入函数),那么捕捉信号很容易,假设给进程运行的函数叫func,代码示例如下:

#!/usr/bin/Python
#-*-coding:utf-8-*-
import multiprocessing, signal,time

def handler(signum, frame):
    print 'signal', signum;

def run():
    signal.signal(signal.SIGTERM, handler);
    signal.signal(signal.SIGINT, handler);
    i = 0;
    while i<10000:
        print 'running';
        time.sleep(2);
        i += 1;

if __name__ == '__main__':

p = multiprocessing.Process(target=run);
    p.start();
    #p.join();
    print p.pid;
    print 'master gone';

这段代码是在第一种创建方式的基础上修改而来的,增加了两行signal.signal(...)调用,这是说这个函数要捕捉SIGTERM和SIGINT两个信号,另外增加了一个handler函数,该函数用于捕捉到信号时进行相应的处理,我们这里只是简单的打印出信号值。注意p.join()被注释掉了,这里跟线程的情况有点区别,新的进程启动后就开始运行了,主进程也不用等待它运行完,可以该干嘛干嘛去。这段代码运行后会打印出子进程的进程id,根据这个id,在另一个终端输入kill -TERM id,会发现刚才的终端打印出了"signal 15"。

但是使用传入函数的方式有一点不好的是封装性太差,如果功能稍微复杂点,将会有很多的全局变量暴露在外,最好还是将功能封装成类,那么使用类又怎么注册信号相应函数呢?上面的例子貌似只能使用一个全局的函数,手册也没有给出在类中处理信号的例子,其实解决方法大同小异,也很容易,这个帖子给了我灵感:

class Master(multiprocessing.Process):
    def __init__(self):
        super(Master,self).__init__();
        signal.signal(signal.SIGTERM, self.handler);    #注册信号处理函数
        self.live = 1;
    #信号处理函数
    def handler(self, signum, frame):
        print 'signal:',signum;
        self.live = 0;

def run(self):
        print 'PID:',self.pid;
        while self.live:
            print 'living...'
            time.sleep(2); 

方法很直观,首先在构造函数中注册信号处理函数,然后定义了一个方法handler作为处理函数。这个进程类会每隔2秒打印一个“living...”,当接收到SIGTERM后,改变self.live的值,run方法的循环检测到这个值为0后就结束了,进程也结束了。

2、让进程优雅的退出

下面放出这次的假想任务的全部代码,我在主进程中启动了一个子进程(通过子类化Process类),然后子进程启动后又产生两个子线程,用来模拟“生产者-消费者”模型,两个线程通过一个队列进行交流,为了互斥访问这个队列,自然要加一把锁(condition对象跟Lock对象差不多,不过多了等待和通知的功能);生产者每次产生一个随机数并扔进队列,然后休息一个随机时间,消费者每次从队列取一个数;而子进程中的主线程要负责接收信号,以便让整个过程优雅的结束。代码如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import time, multiprocessing, signal, threading, random, time, Queue

class Master(multiprocessing.Process):
    def __init__(self):
        super(Master,self).__init__();
        signal.signal(signal.SIGTERM, self.handler);
 #这个变量要传入线程用于控制线程运行,为什么用dict?充分利用线程间共享资源的特点
 #因为可变对象按引用传递,标量是传值的,不信写成self.live = true试试
        self.live = {'stat':True};                 

def handler(self, signum, frame):
        print 'signal:',signum;
        self.live['stat'] = 0;                                  #置这个变量为0,通知子线程可以“收工”了

def run(self):
        print 'PID:',self.pid;   
        cond = threading.Condition(threading.Lock());            #创建一个condition对象,用于子线程交互
        q = Queue.Queue();                                      #一个队列
        sender = Sender(cond, self.live, q);                    #传入共享资源
        geter = Geter(cond, self.live, q);
        sender.start();                                          #启动线程
        geter.start();
        signal.pause();                                          #主线程睡眠并等待信号
        while threading.activeCount()-1:                        #主线程收到信号并被唤醒后,检查还有多少线程活着(除掉自己)
            time.sleep(2);                                      #再睡眠等待,确保子线程都安全的结束
            print 'checking live', threading.activeCount();
        print 'mater gone';

class Sender(threading.Thread):
    def __init__(self, cond, live, queue):
        super(Sender, self).__init__(name='sender');
        self.cond = cond;
        self.queue = queue;
        self.live = live

def run(self):
        cond = self.cond;
        while self.live['stat']:                                #检查这个进程内的“全局”变量,为真就继续运行
            cond.acquire();                                      #获得锁,以便控制队列
            i = random.randint(0,100);
            self.queue.put(i,False);
            if not self.queue.full():
                print 'sender add:',i;
            cond.notify();                                      #唤醒等待锁的其他线程
            cond.release();                                      #释放锁
            time.sleep(random.randint(1,3));
        print 'sender done'

class Geter(threading.Thread):
    def __init__(self, cond, live, queue):
        super(Geter, self).__init__(name='geter');
        self.cond = cond;
        self.queue = queue;
        self.live = live

def run(self):
        cond = self.cond;
        while self.live['stat']:
            cond.acquire();
            if not self.queue.empty():
                i = self.queue.get();
                print 'geter get:',i;
            cond.wait(3);
            cond.release();
            time.sleep(random.randint(1,3));
        print 'geter done'

if __name__ == '__main__':

master = Master();
    master.start();                                              #启动子进程

需要注意的地方是,在Master的run方法中sender.start()和geter.start()之后,按常理应该接着调用sender.join()和geter.join(),让主线程等待子线程结束,前面说的join的陷阱就在这里,join将主线程阻塞(blocking)住了,主线程无法再捕捉信号,刚开始研究这块时还以为信号处理函数写错了。网上讨论比较少,这里说的比较清楚,

参考:

《python核心编程》

《python manual》

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/c7a55b5abc33828c108b33cc742d2a6f.html