multithreading ejemplos de - ¿Cómo utilizar hilos en Python?





8 Answers

Aquí hay un ejemplo simple: necesita probar algunas URL alternativas y devolver el contenido de la primera para responder.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Este es un caso en el que los subprocesos se utilizan como una simple optimización: cada subhilo está a la espera de que una URL se resuelva y responda, para poner su contenido en la cola; cada subproceso es un demonio (no mantendrá el proceso si el subproceso principal finaliza, eso es más común que no); el subproceso principal inicia todos los subprocesos, se pone en la cola para esperar hasta que uno de ellos haga un put , luego emite los resultados y termina (lo que elimina cualquier subproceso que aún esté en ejecución, ya que son subprocesos de daemon).

El uso correcto de los subprocesos en Python está invariablemente conectado a las operaciones de E / S (ya que CPython no usa múltiples núcleos para ejecutar tareas vinculadas a la CPU de todos modos, la única razón para los subprocesos no es el bloqueo del proceso, mientras que hay una espera para algunas E / S ). Las colas son casi invariablemente la mejor manera de agrupar el trabajo en subprocesos y / o recopilar los resultados del trabajo, por cierto, y son intrínsecamente seguras para subprocesos, por lo que evitan que se preocupe por los bloqueos, las condiciones, los eventos, los semáforos y otras interrelaciones. Conceptos de coordinación / comunicación de hilos.

comunicacion entre como

Estoy tratando de entender los hilos en Python. He mirado la documentación y los ejemplos, pero francamente, muchos ejemplos son demasiado sofisticados y tengo problemas para entenderlos.

¿Cómo se muestran claramente las tareas que se dividen para subprocesos múltiples?




Como mencionaron otros, CPython puede usar hilos solo para I \ O espera debido a GIL. Si desea beneficiarse de varios núcleos para tareas vinculadas a la CPU, utilice el multiprocessing :

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()



La respuesta de Alex Martelli me ayudó, sin embargo, aquí está la versión modificada que pensé que era más útil (al menos para mí).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()



Para mí, el ejemplo perfecto para Threading es monitorear eventos asíncronos. Mira este código.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Puedes jugar con este código abriendo una sesión de IPython y haciendo algo como:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Espera unos minutos

>>>a[0] = 2
Mon = 2



Python 3 tiene la facilidad de lanzar tareas paralelas . Esto hace nuestro trabajo más fácil.

Tiene para agrupación de hilos y agrupación de procesos .

Lo siguiente da una idea:

Ejemplo de ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()



La mayoría de las documentaciones y tutoriales utilizan el módulo de Threading y Queue Python, que pueden parecer abrumadores para los principiantes.

Quizás considere el módulo concurrent.futures.ThreadPoolExecutor de python 3. Combinado with cláusula y la comprensión de la lista, podría ser un verdadero encanto.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of urls to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads 
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results 
        rs = f.result()



Hilo múltiple con un ejemplo simple que será útil. Puede ejecutarlo y entender fácilmente cómo funciona el subproceso múltiple en Python. Utilicé el bloqueo para impedir acceder a otros hilos hasta que los hilos anteriores terminaron su trabajo. Por el uso de

tLock = threading.BoundedSemaphore (value = 4)

En esta línea de código, puede permitir la cantidad de procesos a la vez y mantener el resto del hilo que se ejecutará más tarde o después de que hayan finalizado los procesos anteriores.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()



Ninguna de las soluciones anteriores realmente usó múltiples núcleos en mi servidor GNU / Linux (donde no tengo derechos de administrador). Simplemente corrieron en un solo núcleo. Utilicé la interfaz os.fork nivel inferior para generar múltiples procesos. Este es el código que me funcionó:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break



Related