title用法 - python线程池threadpool




如何在Python中使用线程? (11)

我想了解Python中的线程。 我已经看过文档和示例,但坦率地说,很多示例都过于复杂,我无法理解它们。

你如何清楚地显示被划分为多线程的任务?


Alex Martelli的回答对我有所帮助,但这里是我认为更有用的修改版(至少对我而言)。

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

Python 3具有启动并行任务的功能 。 这使我们的工作更轻松。

它用于线程池进程池

以下给出一个见解:

ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

上述解决方案实际上并没有在我的GNU / Linux服务器上使用多个内核(我没有管理员权限)。 他们只是在一个核心上运行。 我使用较低级别的os.fork界面来产生多个进程。 这是为我工作的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

使用炽热的新的concurrent.futures模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

执行者的方法似乎对所有那些以前用Java弄脏手的人都很熟悉。

另外还有一点要注意:为了让宇宙保持健康,如果不使用上下文,那么不要忘记关闭池/执行程序(这对于你来说太棒了)


在这里,参数是一个参数的元组; 使用空元组来调用函数而不传递任何参数。 kwargs是关键字参数的可选字典。

#!/usr/bin/python

import thread
import time

# Define a function for the thread
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )

# Create two threads as follows
try:
   thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print "Error: unable to start thread"

while 1:
   pass

当上面的代码被执行时,它会产生以下结果 -

Thread-1: Thu Jan 22 15:42:17 2009
Thread-1: Thu Jan 22 15:42:19 2009
Thread-2: Thu Jan 22 15:42:19 2009
Thread-1: Thu Jan 22 15:42:21 2009
Thread-2: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:25 2009
Thread-2: Thu Jan 22 15:42:27 2009
Thread-2: Thu Jan 22 15:42:31 2009
Thread-2: Thu Jan 22 15:42:35 2009

多线程与简单的例子,这将是有益的。 你可以运行它,并很容易理解多线程如何在python中工作。 我使用lock来防止访问其他线程,直到以前的线程完成他们的工作。 通过使用

tLock = threading.BoundedSemaphore(value = 4)

这行代码可以允许一次处理多个进程,并保持稍后或完成先前进程之后运行的其余线程。

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

对我来说,线程的完美例子是监视异步事件。 看看这个代码。

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话来玩这个代码,并执行如下操作:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等几分钟

>>>a[0] = 2
Mon = 2

我发现这非常有用:创建与核心一样多的线程并让它们执行(大量)任务(在这种情况下,调用一个shell程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

给定一个函数f ,就像这样:

import threading
threading.Thread(target=f).start()

将参数传递给f

threading.Thread(target=f, args=(a,b,c)).start()

自从2010年提出这个问题以来,已经真正简化了如何用mappool进行python简单的多线程处理。

下面的代码来自一篇文章/博客文章,您一定要检查(无隶属关系) - 并行性在一行中:日常线程任务的更好模型 。 我将在下面总结 - 它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

以下是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,也是将并行机制轻松注入Python代码的关键。 对于那些不熟悉的人来说,地图可以从Lisp等功能语言中解脱出来。 它是一个将另一个函数映射到一个序列上的函数。

Map处理对我们的迭代,应用函数,并将所有结果存储在最后的便捷列表中。

履行

并行版本的map函数由两个库提供:多处理,以及它的已知但同样很棒的步骤子:multiprocessing.dummy。

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
  ]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

时间结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数 (仅在Python 3.3和更高版本中工作):( source ):

要传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或者传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,则可以通过此解决方法传递多个参数。

(感谢user136036提供有用的评论)


这里有一个简单的例子:你需要尝试一些替代URL并返回第一个URL的内容来回应。

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

这是线程用作简单优化的一种情况:每个子线程正在等待URL解析和响应,以便将其内容放入队列; 每个线程都是一个守护进程(如果主线程结束,将不会保留进程 - 这比通常更常见); 主线程启动所有的子线程,在队列上等待,直到其中一个进行了put ,然后发出结果并终止(由于它们是守护进程线程,它会取消可能仍在运行的任何子线程)。

在Python中正确使用线程总是与I / O操作相连接(因为CPython不会使用多个内核来运行CPU绑定的任务,但线程的唯一原因并不是阻止进程,而是等待某些I / O )。 顺便提一下,队列几乎总是将工作排除在外并且/或者收集工作结果的最佳方式,而且它们本质上是线程安全的,因此它们可以让您免于担心锁,条件,事件,信号量以及其他跨系统问题,线程协调/通信概念。





multithreading