io 파이썬 title - 하위 프로세스에서 비 차단 읽기 .PIPE in python





13 Answers

나는 종종 비슷한 문제를 겪어왔다. 필자가 자주 작성하는 Python 프로그램은 명령 행 (stdin)에서 사용자 입력을 동시에 받아들이면서 일부 기본 기능을 실행할 수 있어야합니다. readline() 차단되고 시간 제한이 없으므로 사용자 입력 처리 기능을 다른 스레드에 넣기 만하면 문제가 해결되지 않습니다. 기본 기능이 완료되어 더 이상 사용자 입력을 기다릴 필요가 없으면 내 프로그램을 끝내기를 원하지만 readline() 이 줄을 기다리고있는 다른 스레드에서 여전히 차단되어 있기 때문에 프로그램을 종료 할 수 없습니다. 이 문제에서 발견 된 해결책은 fcntl 모듈을 사용하여 stdin을 비 차단 파일로 만드는 것입니다.

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

내 의견으로는 이것은 선택 또는 신호 모듈을 사용하여이 문제를 해결하는 것보다 조금 더 깨끗하지만 다시 UNIX에서만 작동합니다 ...

함수

하위 프로세스 모듈 을 사용하여 하위 프로세스 를 시작하고 출력 스트림 (stdout)에 연결합니다. stdout에서 비 차단 읽기를 실행할 수 있기를 원합니다. .readline을 호출하기 전에 .readline을 non-blocking으로 만들거나 스트림에 데이터가 있는지 확인하는 방법이 있습니까? 나는 이것을 휴대용 또는 적어도 Windows 및 Linux에서 작동시키고 싶습니다.

여기에 내가 지금하는 방법이있다. (데이터가 없으면 .readline 막는다.)

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()



asyncproc 모듈을 사용해보십시오. 예 :

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

이 모듈은 S.Lott이 제안한 모든 스레딩을 처리합니다.




select & read (1)를 사용하십시오.

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

readline () - like :

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a



면책 조항 : 이것은 토네이도에만 적용됩니다.

fd를 비 블로킹으로 설정 한 다음 ioloop을 사용하여 콜백을 등록하면됩니다. 나는 이것을 tornado_subprocess 라 불리는 달걀 속에 포장했고 PyPI를 통해 설치할 수 있습니다 :

easy_install tornado_subprocess

이제 다음과 같이 할 수 있습니다.

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

당신은 또한 그것을 RequestHandler와 함께 사용할 수있다.

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()



이 비 차단 읽기 버전은 특별한 모듈을 필요로 하지 않으며 대부분의 리눅스 배포판에서 즉시 사용할 수 있습니다.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())



여기에 부분 코드를 포함하여 가능한 빨리 하위 프로세스의 모든 출력을 잡는 데 사용되는 코드가 있습니다. 그것은 거의 동시에 정확한 시간에 stdout과 stderr를 펌핑합니다.

Python 2.7 Linux 및 Windows에서 제대로 작동하는지 테스트했습니다.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()



select 모듈은 다음 유용한 입력의 위치를 ​​결정하는 데 도움이됩니다.

그러나, 당신은 거의 항상 분리 된 스레드로 행복합니다. 하나는 블로킹으로 표준 입력을 읽지 만 다른 하나는 블로킹을 원하지 않는 곳이면 어디에서나 할 수 있습니다.




JF Sebastian의 대답과 다른 여러 출처에서 작업하면서 간단한 하위 프로세스 관리자를 구성했습니다. 요청을 비 차단 읽기뿐만 아니라 여러 프로세스를 병렬로 실행합니다. 어떤 OS 특정 호출도 사용하지 않으므로 어디에서든지 작동해야합니다.

그것은 pypi에서 사용할 수 있으므로 그냥 pip install shelljob . 예제와 전체 문서는 프로젝트 페이지 를 참조하십시오.




나는 최근에 비 블로킹 모드에서 스트림 (테일 서브 프로세스에서 실행)에서 한 라인을 읽어야하는 동일한 문제를 발견했다. 다음 문제를 피하려고했다. CPU를 태우지 않고 한 바이트 씩 스트림을 읽지 않는다. readline처럼) 등

여기 내 구현 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 그것은 Windows (설문 조사)를 지원하지 않습니다, EOF를 처리하지 않습니다,하지만 그것은 잘 작동합니다




필자의 경우 로깅 모듈을 사용하여 백그라운드 응용 프로그램의 출력을 포착하고 시간 스탬프, 색 등을 추가하여 보강했습니다.

나는 실제 I / O를하는 배경 스레드로 끝났다. 다음 코드는 POSIX 플랫폼에만 해당됩니다. 나는 필수가 아닌 부분을 제거했다.

누군가가 장시간 동안이 짐승을 사용하려고하면 열린 기술자를 관리하는 것을 고려하십시오. 제 경우에는 커다란 문제는 아니 었습니다.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)



실행중인 프로세스에서 stdout과 stderr를 모두 수집하려고했지만 내 생성물이 생성 된 위젯에서 출력을 렌더링하고 싶었 기 때문에 궁극적으로 동일합니다.

다른 스크립트를 실행하고 출력을 수집하는 것과 같은 일반적인 작업을 수행 할 필요가 없으므로 큐 또는 추가 스레드를 사용하여 제안 된 여러 가지 해결 방법을 사용하고 싶지 않았습니다.

제안 된 솔루션과 파이썬 문서를 읽은 후에 아래 구현과 함께 내 문제를 해결했습니다. 예, select함수 호출을 사용하고 있기 때문에 POSIX에서만 작동합니다 .

나는 문서가 혼란스럽고 구현이 그러한 일반적인 스크립팅 작업에 대해 어색하다는 것에 동의한다. 이전 버전의 파이썬에는 다른 기본값 Popen과 다른 설명이 포함되어있어 많은 혼란을 야기 한다고 생각합니다 . 이것은 파이썬 2.7.12와 3.5.2 모두에서 잘 작동하는 것으로 보인다.

핵심은 bufsize=1라인 버퍼링 을 설정 한 다음 universal_newlines=True설정시 기본값으로 보이는 바이너리 대신 텍스트 파일로 처리하는 것이 었습니다 bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG 및 VERBOSE는 단순히 출력을 터미널에 인쇄하는 매크로입니다.

이 솔루션은 IMHO 99.99 %가 여전히 블로킹 readline기능을 사용하므로 효과적 이므로 서브 프로세스가 훌륭하고 완전한 라인을 출력한다고 가정합니다.

필자는 Python을 처음 접했을 때 솔루션을 개선하기위한 피드백을 환영합니다.




또한 가 설명한 문제에 직면하여 , 및 다른 사람들이 " 차단"모드를 사용하여 바쁜 루프를 피하기 위해 "선택"을 사용하여 해결했습니다 . 가짜 표준 입력으로 더미 파이프를 사용합니다. 블록을 선택하고 stdin 또는 파이프가 준비 될 때까지 기다립니다. 키를 누르면 stdin은 select를 해제하고 키 값은 read (1)로 검색 할 수 있습니다. 다른 스레드가 파이프에 쓸 때 파이프는 select를 차단 해제하고 stdin의 필요성을 나타내는 표시로 사용할 수 있습니다. 다음은 참조 코드입니다.

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()



다음은 비 차단 읽기와 배경 쓰기를 지원하는 모듈입니다 :

https://pypi.python.org/pypi/python-nonblock

기능을 제공합니다.

nonblock_read 가능한 경우 스트림에서 데이터를 읽습니다. 그렇지 않으면 빈 문자열을 반환합니다 (스트림이 다른 쪽에서 닫히고 가능한 모든 데이터가 읽혀지면 None입니다)

또한 python-subprocess2 모듈을 고려해 볼 수도 있습니다.

https://pypi.python.org/pypi/python-subprocess2

서브 프로세스 모듈에 추가됩니다. 따라서 "subprocess.Popen"에서 반환 된 객체에는 runInBackground라는 메서드가 추가되었습니다. 이것은 스레드를 시작하고 메인 스레드를 차단하지 않고 stdout / stderr에 쓰여지는 것처럼 자동으로 채워지는 객체를 반환합니다.

즐겨!






Related