io 파이썬 title - 하위 프로세스에서 비 차단 읽기 .PIPE in python
나는 종종 비슷한 문제를 겪어왔다. 필자가 자주 작성하는 Python 프로그램은 명령 행 (stdin)에서 사용자 입력을 동시에 받아들이면서 일부 기본 기능을 실행할 수 있어야합니다. readline()
차단되고 시간 제한이 없으므로 사용자 입력 처리 기능을 다른 스레드에 넣기 만하면 문제가 해결되지 않습니다. 기본 기능이 완료되어 더 이상 사용자 입력을 기다릴 필요가 없으면 내 프로그램을 끝내기를 원하지만 readline()
이 줄을 기다리고있는 다른 스레드에서 여전히 차단되어 있기 때문에 프로그램을 종료 할 수 없습니다. 이 문제에서 발견 된 해결책은 fcntl 모듈을 사용하여 stdin을 비 차단 파일로 만드는 것입니다.
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
내 의견으로는 이것은 선택 또는 신호 모듈을 사용하여이 문제를 해결하는 것보다 조금 더 깨끗하지만 다시 UNIX에서만 작동합니다 ...
하위 프로세스 모듈 을 사용하여 하위 프로세스 를 시작하고 출력 스트림 (stdout)에 연결합니다. stdout에서 비 차단 읽기를 실행할 수 있기를 원합니다. .readline을 호출하기 전에 .readline을 non-blocking으로 만들거나 스트림에 데이터가 있는지 확인하는 방법이 있습니까? 나는 이것을 휴대용 또는 적어도 Windows 및 Linux에서 작동시키고 싶습니다.
여기에 내가 지금하는 방법이있다. (데이터가 없으면 .readline
막는다.)
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
asyncproc 모듈을 사용해보십시오. 예 :
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
이 모듈은 S.Lott이 제안한 모든 스레딩을 처리합니다.
select & read (1)를 사용하십시오.
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
readline () - like :
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
면책 조항 : 이것은 토네이도에만 적용됩니다.
fd를 비 블로킹으로 설정 한 다음 ioloop을 사용하여 콜백을 등록하면됩니다. 나는 이것을 tornado_subprocess 라 불리는 달걀 속에 포장했고 PyPI를 통해 설치할 수 있습니다 :
easy_install tornado_subprocess
이제 다음과 같이 할 수 있습니다.
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
당신은 또한 그것을 RequestHandler와 함께 사용할 수있다.
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
이 비 차단 읽기 버전은 특별한 모듈을 필요로 하지 않으며 대부분의 리눅스 배포판에서 즉시 사용할 수 있습니다.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
여기에 부분 코드를 포함하여 가능한 빨리 하위 프로세스의 모든 출력을 잡는 데 사용되는 코드가 있습니다. 그것은 거의 동시에 정확한 시간에 stdout과 stderr를 펌핑합니다.
Python 2.7 Linux 및 Windows에서 제대로 작동하는지 테스트했습니다.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
select 모듈은 다음 유용한 입력의 위치를 결정하는 데 도움이됩니다.
그러나, 당신은 거의 항상 분리 된 스레드로 행복합니다. 하나는 블로킹으로 표준 입력을 읽지 만 다른 하나는 블로킹을 원하지 않는 곳이면 어디에서나 할 수 있습니다.
JF Sebastian의 대답과 다른 여러 출처에서 작업하면서 간단한 하위 프로세스 관리자를 구성했습니다. 요청을 비 차단 읽기뿐만 아니라 여러 프로세스를 병렬로 실행합니다. 어떤 OS 특정 호출도 사용하지 않으므로 어디에서든지 작동해야합니다.
그것은 pypi에서 사용할 수 있으므로 그냥 pip install shelljob
. 예제와 전체 문서는 프로젝트 페이지 를 참조하십시오.
나는 최근에 비 블로킹 모드에서 스트림 (테일 서브 프로세스에서 실행)에서 한 라인을 읽어야하는 동일한 문제를 발견했다. 다음 문제를 피하려고했다. CPU를 태우지 않고 한 바이트 씩 스트림을 읽지 않는다. readline처럼) 등
여기 내 구현 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 그것은 Windows (설문 조사)를 지원하지 않습니다, EOF를 처리하지 않습니다,하지만 그것은 잘 작동합니다
필자의 경우 로깅 모듈을 사용하여 백그라운드 응용 프로그램의 출력을 포착하고 시간 스탬프, 색 등을 추가하여 보강했습니다.
나는 실제 I / O를하는 배경 스레드로 끝났다. 다음 코드는 POSIX 플랫폼에만 해당됩니다. 나는 필수가 아닌 부분을 제거했다.
누군가가 장시간 동안이 짐승을 사용하려고하면 열린 기술자를 관리하는 것을 고려하십시오. 제 경우에는 커다란 문제는 아니 었습니다.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
실행중인 프로세스에서 stdout과 stderr를 모두 수집하려고했지만 내 생성물이 생성 된 위젯에서 출력을 렌더링하고 싶었 기 때문에 궁극적으로 동일합니다.
다른 스크립트를 실행하고 출력을 수집하는 것과 같은 일반적인 작업을 수행 할 필요가 없으므로 큐 또는 추가 스레드를 사용하여 제안 된 여러 가지 해결 방법을 사용하고 싶지 않았습니다.
제안 된 솔루션과 파이썬 문서를 읽은 후에 아래 구현과 함께 내 문제를 해결했습니다. 예, select
함수 호출을 사용하고 있기 때문에 POSIX에서만 작동합니다 .
나는 문서가 혼란스럽고 구현이 그러한 일반적인 스크립팅 작업에 대해 어색하다는 것에 동의한다. 이전 버전의 파이썬에는 다른 기본값 Popen
과 다른 설명이 포함되어있어 많은 혼란을 야기 한다고 생각합니다 . 이것은 파이썬 2.7.12와 3.5.2 모두에서 잘 작동하는 것으로 보인다.
핵심은 bufsize=1
라인 버퍼링 을 설정 한 다음 universal_newlines=True
설정시 기본값으로 보이는 바이너리 대신 텍스트 파일로 처리하는 것이 었습니다 bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG 및 VERBOSE는 단순히 출력을 터미널에 인쇄하는 매크로입니다.
이 솔루션은 IMHO 99.99 %가 여전히 블로킹 readline
기능을 사용하므로 효과적 이므로 서브 프로세스가 훌륭하고 완전한 라인을 출력한다고 가정합니다.
필자는 Python을 처음 접했을 때 솔루션을 개선하기위한 피드백을 환영합니다.
또한 가 설명한 문제에 직면하여 , 및 다른 사람들이 " 차단"모드를 사용하여 바쁜 루프를 피하기 위해 "선택"을 사용하여 해결했습니다 . 가짜 표준 입력으로 더미 파이프를 사용합니다. 블록을 선택하고 stdin 또는 파이프가 준비 될 때까지 기다립니다. 키를 누르면 stdin은 select를 해제하고 키 값은 read (1)로 검색 할 수 있습니다. 다른 스레드가 파이프에 쓸 때 파이프는 select를 차단 해제하고 stdin의 필요성을 나타내는 표시로 사용할 수 있습니다. 다음은 참조 코드입니다.
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
다음은 비 차단 읽기와 배경 쓰기를 지원하는 모듈입니다 :
https://pypi.python.org/pypi/python-nonblock
기능을 제공합니다.
nonblock_read 가능한 경우 스트림에서 데이터를 읽습니다. 그렇지 않으면 빈 문자열을 반환합니다 (스트림이 다른 쪽에서 닫히고 가능한 모든 데이터가 읽혀지면 None입니다)
또한 python-subprocess2 모듈을 고려해 볼 수도 있습니다.
https://pypi.python.org/pypi/python-subprocess2
서브 프로세스 모듈에 추가됩니다. 따라서 "subprocess.Popen"에서 반환 된 객체에는 runInBackground라는 메서드가 추가되었습니다. 이것은 스레드를 시작하고 메인 스레드를 차단하지 않고 stdout / stderr에 쓰여지는 것처럼 자동으로 채워지는 객체를 반환합니다.
즐겨!