блоки - отправка ввода в подпроцесс python


Answers

Работа с большими файлами

Два принципа должны применяться единообразно при работе с большими файлами в Python.

  1. Поскольку любая процедура IO может блокироваться, мы должны поддерживать каждую стадию конвейера в другом потоке или процессе. В этом примере мы используем потоки, но подпроцессы позволят вам избежать GIL.
  2. Мы должны использовать инкрементные чтения и записи, чтобы мы не дождались EOF прежде чем начать продвигаться вперед.

Альтернативой является использование неблокирующего IO, хотя это громоздко в стандартном Python. См. Gevent для легкой библиотеки потоков, которая реализует синхронный IO API с помощью неблокирующих примитивов.

Пример кода

Мы построим глупый трубопровод, который грубо

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

где каждый этап в фигурных скобках {} реализован в Python, а остальные используют стандартные внешние программы. TL; DR: Посмотрите на это .

Мы начинаем с ожидаемого импорта.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Питоновые этапы трубопровода

Все, кроме последнего этапа, на котором выполняется Python, должны идти в потоке, чтобы он IO не блокировал другие. Вместо этого они могли работать в подпроцессах Python, если вы хотели, чтобы они фактически выполнялись параллельно (избегайте GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Каждый из них должен быть помещен в собственный поток, который мы будем использовать, используя эту функцию удобства.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Создать трубопровод

Создайте внешние этапы, используя Popen и этапы Python, используя spawn . Аргумент bufsize=-1 говорит, что используется буферизация по умолчанию системы (обычно 4 kiB). Это обычно быстрее, чем стандартная (небуферизованная) или буферизация строк, но вам нужна буферизация строк, если вы хотите визуально контролировать вывод без лаг.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Управляйте трубопроводом

Собранные, как указано выше, все буферы в конвейере будут заполняться, но поскольку никто не читает с конца ( grepz.stdout ), все они будут блокироваться. Мы могли бы прочитать все это в одном вызове grepz.stdout.read() , но это будет использовать большую память для больших файлов. Вместо этого мы читаем постепенно .

for line in grepz.stdout:
    sys.stdout.write(line.lower())

Нити и процессы очищаются после достижения EOF . Мы можем явно очистить

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 и ранее

Внутренне, subprocess.Popen вызывает fork , настраивает дескрипторы файла pipe и вызывает exec . В дочернем процессе из fork есть копии всех дескрипторов файлов в родительском процессе, и оба экземпляра должны быть закрыты до того, как соответствующий читатель получит EOF . Это можно устранить путем ручного закрытия труб (либо с помощью close_fds=True либо с помощью подходящего аргумента preexec_fn для subprocess.Popen ) или путем установки флага FD_CLOEXEC чтобы exec автоматически закрыл дескриптор файла. Этот флаг устанавливается автоматически в Python-2.7 и более поздних версиях, см. issue12786 . Мы можем получить поведение Python-2.7 в более ранних версиях Python, позвонив

p._set_cloexec_flags(p.stdin)

перед передачей p.stdin в качестве аргумента для последующего subprocess.Popen . subprocess.Popen .

Question

Я тестирую подпроцессы конвейеров с помощью python. Я знаю, что я могу делать то, что программы ниже делают в python напрямую, но это не главное. Я просто хочу проверить конвейер, чтобы я знал, как его использовать.

Моя система - Linux Ubuntu 9.04 с по умолчанию python 2.6.

Я начал с этого примера документации .

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

Это работает, но поскольку stdin p1 не перенаправляется, я должен набирать материал в терминале для подачи канала. Когда я набираю ^D закрываю stdin, я получаю вывод, который я хочу.

Тем не менее, я хочу отправить данные в трубу, используя строковую переменную python. Сначала я пробовал писать на stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Не работает. Я попытался использовать p2.stdout.read() вместо последней строки, но также блокирует. Я добавил p1.stdin.flush() и p1.stdin.close() но это тоже не сработало. Затем я перешел на общение:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

Так что это еще не так.

Я заметил, что выполнение одного процесса (например, p1 выше, удаление p2 ) отлично работает. И работает файл дескриптор для p1 ( stdin=open(...) ). Таким образом, проблема заключается в следующем:

Можно ли передавать данные в конвейер из двух или более подпроцессов в python без блокировки? Почему нет?

Я знаю, что могу запустить оболочку и запустить конвейер в оболочке, но это не то, что я хочу.

ОБНОВЛЕНИЕ 1 : Ниже ниже намека Аарона Дигуллы я теперь пытаюсь использовать потоки, чтобы заставить его работать.

Сначала я попытался запустить p1.communicate в потоке.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Ладно, не получилось. Пробовал другие комбинации, такие как изменение его на .write() а также p2.read() . Ничего. Теперь давайте попробуем противоположный подход:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

код где-то блокируется. Либо в порожденной нити, либо в основном потоке, либо в обоих. Так что это не сработало. Если вы знаете, как заставить его работать, это облегчится, если вы сможете предоставить рабочий код. Я пытаюсь здесь.

ОБНОВЛЕНИЕ 2

Paul Du Bois ответил ниже с некоторой информацией, поэтому я сделал больше тестов. Я прочитал весь модуль subprocess.py и понял, как он работает. Поэтому я попытался применить именно это к коду.

Я нахожусь в linux, но, поскольку я тестировал потоки, мой первый подход заключался в том, чтобы реплицировать точный код потокового окна, наблюдаемый в методе subprocess.py communicate() , но для двух процессов вместо одного. Вот весь список того, что я пробовал:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Что ж. Это не сработало. Даже после p1.stdin.close() p2.stdout.read() все еще блокирует.

Затем я попробовал код posix на subprocess.py :

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Также блокирует select.select() . Распространяя print , я узнал об этом:

  • Чтение работает. Код читается много раз во время выполнения.
  • Письмо также работает. Данные записываются в p1.stdin .
  • В конце numwrites p1.stdin.close() .
  • Когда select() начинает блокировку, только to_read имеет что-то, p2.stdout . to_write уже пуст.
  • os.read() всегда возвращает что-то, поэтому p2.stdout.close() никогда не вызывается.

Вывод из обоих тестов : Закрытие stdin первого процесса на конвейере ( grep в примере) не заставляет его сбросить свой буферный вывод в следующий и умереть.

Нет способа заставить его работать?

PS: Я не хочу использовать временный файл, я уже тестировал файлы и знаю, что он работает. И я не хочу использовать окна.




Предлагаемое решение Nosklo быстро сломается, если на приемный конец трубы будет записано слишком много данных:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

Если этот скрипт не зависает на вашем компьютере, просто увеличьте «20000» на то, что превышает размер буферов вашей операционной системы.

Это связано с тем, что операционная система выполняет буферизацию ввода на «grep», но как только этот буфер заполнен, вызов p1.stdin.write будет блокироваться до тех пор, пока что-то не прочитает с p2.stdout . В игрушечных сценариях вы можете найти способ записи / чтения из трубы в том же процессе, но при нормальном использовании необходимо писать из одного потока / процесса и читать из отдельного потока / процесса. Это справедливо для subprocess.popen, os.pipe, os.popen * и т. Д.

Еще одним поворотным моментом является то, что иногда вы хотите продолжать подавать трубу с элементами, полученными из более раннего выхода того же самого трубопровода. Решение состоит в том, чтобы сделать как податчик труб, так и считыватель труб асинхронным по отношению к мужской программе и реализовать две очереди: одну между основной программой и податчиком трубопровода и одну между основной программой и считывателем каналов. http://www.darkarchive.org/w/Pub/PythonInteract .

Подпроцесс - хорошая модель удобства, но поскольку он скрывает детали вызовов os.popen и os.fork, которые он делает под капотом, иногда бывает сложнее иметь дело с более низкими вызовами, которые он использует. По этой причине подпроцесс не является хорошим способом узнать, как работают межпроцессные каналы.




Вот пример использования Popen вместе с os.fork для выполнения того же самого. Вместо того, чтобы использовать close_fdsего, он просто закрывает трубы в нужном месте. Гораздо проще, чем пытаться использовать select.select, и в полной мере использует системные буферы.

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()



Отвечая на утверждение nosklo (см. Другие комментарии к этому вопросу), что это невозможно сделать без close_fds=True :

close_fds=True необходимо только в том случае, если вы оставили другие дескрипторы файлов открытыми. При открытии нескольких дочерних процессов всегда полезно отслеживать открытые файлы, которые могут наследоваться, и явно закрывать любые, которые не нужны:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds умолчанию False потому что подпроцесс предпочитает доверять вызывающей программе, чтобы знать, что она делает с открытыми файловыми дескрипторами, и просто предоставить вызывающему абоненту простой способ закрыть их все, если это то, что он хочет сделать.

Но реальная проблема заключается в том, что буферы для труб будут укусить вас за все, кроме игрушек. Как я уже сказал в своих других ответах на этот вопрос, эмпирическое правило заключается в том, чтобы не открывать читателя и автора в том же процессе / потоке. Любой, кто хочет использовать модуль подпроцесса для двусторонней связи, будет хорошо изучать сначала os.pipe и os.fork. На самом деле это не так сложно использовать, если у вас есть pypi.python.org/pypi/pipeline/0.1 для просмотра.




В одном из комментариев выше я бросил вызов nosklo, чтобы либо опубликовать некоторый код, чтобы поддержать его утверждения о select.select или повысить мои ответы, которые он ранее проголосовал. Он ответил следующим кодом:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Одна из проблем с этим скриптом заключается в том, что он во-вторых угадывает размер / характер буферов системных труб. Сценарий будет испытывать меньше сбоев, если он сможет удалить магические числа, такие как 1024.

Большая проблема заключается в том, что этот код скрипта работает только с правильной комбинацией ввода данных и внешних программ. grep и сокращают работу с линиями, поэтому их внутренние буферы ведут себя по-другому. Если мы используем более общую команду типа «cat» и записываем меньшие биты данных в трубу, состояние фатальной гонки будет появляться чаще:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

В этом случае проявятся два разных результата:

write, write, close file, read -> success
write, read -> hang

Поэтому снова я призываю nosklo к любому сообщению, показывающему использование select.select для обработки произвольной буферизации ввода и буфера из одного потока или для повышения моих ответов.

Итог: не пытайтесь манипулировать обоими концами трубы из одного потока. Это просто не стоит. См. pypi.python.org/pypi/pipeline/0.1 для хорошего низкоуровневого примера того, как это сделать правильно.




Links