違い - python sharedmemory




どのようにマルチプロセッシングモジュールによって作成されたゾンビプロセスを殺すか? (2)

active_childrenを使用します。 multiprocessing.active_children

私はmultiprocessingモジュールの新機能です。 そして、私はちょうど次のように作成しようとしました。私は、RabbitMQからメッセージを取得し、それを内部キュー( multiprocessing.Queueキュー)に渡すというプロセスを1つ持っています。 そして、私がしたいのは、新しいメッセージが来たときにプロセスを起動させることです。それは動作しますが、ジョブが終了すると、ゾンビプロセスが親プロセスで終了しなくなります。 ここに私のコードです:

主なプロセス:

 #!/usr/bin/env python

 import multiprocessing
 import logging
 import consumer
 import producer
 import worker
 import time
 import base

 conf = base.get_settings()
 logger = base.logger(identity='launcher')

 request_order_q = multiprocessing.Queue()
 result_order_q = multiprocessing.Queue()

 request_status_q = multiprocessing.Queue()
 result_status_q = multiprocessing.Queue()

 CONSUMER_KEYS = [{'queue':'product.order',
                   'routing_key':'product.order',
                   'internal_q':request_order_q}]
 #                 {'queue':'product.status',
 #                  'routing_key':'product.status',
 #                  'internal_q':request_status_q}]

 def main():
     # Launch consumers
     for key in CONSUMER_KEYS:
         cons = consumer.RabbitConsumer(rabbit_q=key['queue'],
                                        routing_key=key['routing_key'],
                                        internal_q=key['internal_q'])
         cons.start()

     # Check reques_order_q if not empty spaw a process and process message
     while True:
         time.sleep(0.5)
         if not request_order_q.empty():
             handler = worker.Worker(request_order_q.get())
             logger.info('Launching Worker')
             handler.start()

 if __name__ == "__main__":
     main()

そしてここに私の労働者がいます:

 import multiprocessing
 import sys 
 import time
 import base

 conf = base.get_settings()
 logger = base.logger(identity='worker')

 class Worker(multiprocessing.Process):

     def __init__(self, msg):
         super(Worker, self).__init__()
         self.msg = msg 
         self.daemon = True

     def run(self):
         logger.info('%s' % self.msg)
         time.sleep(10)
         sys.exit(1)

すべてのメッセージが処理された後、私はps auxコマンドでプロセスを見ることができます。 しかし、私は本当に彼らが終了した後に終了するようにしたいと思います。 ありがとう。


multiprocessing.active_childrenを使用する方がProcess.joinよりもProcess.joinます。 active_children関数は、 active_childrenの最後の呼び出し以降に作成されたゾンビをすべてactive_childrenます。 メソッドjoinは、選択されたプロセスを待機します。 その間、他のプロセスは終了しゾンビになりますが、待っているメソッドが結合されるまで、親プロセスは通知しません。 これを実際に見るには:

import multiprocessing as mp
import time


def main():
    n = 3
    c = list()
    for i in xrange(n):
        d = dict(i=i)
        p = mp.Process(target=count, kwargs=d)
        p.start()
        c.append(p)
    for p in reversed(c):
        p.join()
        print('joined')


def count(i):
    print('{i} going to sleep'.format(i=i))
    time.sleep(i * 10)
    print('{i} woke up'.format(i=i))


if __name__ == '__main__':
    main()

上記は、それぞれ10秒ずつ終了する3つのプロセスを作成します。 コードは次のように最後のプロセスが最初に結合されるため、以前終了した残りの2つは20秒間ゾンビになります。 あなたはそれらを見ることができます:

ps aux | grep Z

プロセスが終了するシーケンスで待っている場合は、ゾンビはありません。 このケースを見るにはreversedしてください。 しかし、実際のアプリケーションでは、子プロセスが終了する順序はめったにないため、 joinを使用joinといくつかのゾンビが発生します。

代わりのactive_childrenはゾンビを残しません。 上の例でfor p in reversed(c):のループfor p in reversed(c):ように置き換えます。with:

while True:
    time.sleep(1)
    if not mp.active_children():
        break

何が起こるか見る。





multithreading