하위 프로세스에서 비 차단 읽기 .PIPE in python


Answers

나는 종종 비슷한 문제를 겪어왔다. 필자가 자주 작성하는 Python 프로그램은 명령 행 (stdin)에서 사용자 입력을 동시에 받아들이면서 일부 기본 기능을 실행할 수 있어야합니다. readline() 차단되고 시간 제한이 없으므로 사용자 입력 처리 기능을 다른 스레드에 넣기 만하면 문제가 해결되지 않습니다. 기본 기능이 완료되어 더 이상 사용자 입력을 기다릴 필요가 없으면 내 프로그램을 끝내기를 원하지만 readline() 이 줄을 기다리고있는 다른 스레드에서 여전히 차단되어 있기 때문에 프로그램을 종료 할 수 없습니다. 이 문제에서 발견 된 해결책은 fcntl 모듈을 사용하여 stdin을 비 차단 파일로 만드는 것입니다.

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

내 의견으로는 이것은 선택 또는 신호 모듈을 사용하여이 문제를 해결하는 것보다 조금 더 깨끗하지만 다시 UNIX에서만 작동합니다 ...

Question

하위 프로세스 모듈 을 사용하여 하위 프로세스 를 시작하고 출력 스트림 (stdout)에 연결합니다. stdout에서 비 차단 읽기를 실행할 수 있기를 원합니다. .readline을 호출하기 전에 .readline을 non-blocking으로 만들거나 스트림에 데이터가 있는지 확인하는 방법이 있습니까? 나는 이것을 휴대용 또는 적어도 Windows 및 Linux에서 작동시키고 싶습니다.

여기에 내가 지금하는 방법이있다. (데이터가 없으면 .readline 막는다.)

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()



In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)



면책 조항 : 이것은 토네이도에만 적용됩니다.

fd를 비 블로킹으로 설정 한 다음 ioloop을 사용하여 콜백을 등록하면됩니다. 나는 이것을 tornado_subprocess 라 불리는 달걀 속에 포장했고 PyPI를 통해 설치할 수 있습니다 :

easy_install tornado_subprocess

이제 다음과 같이 할 수 있습니다.

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

당신은 또한 그것을 RequestHandler와 함께 사용할 수있다.

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()



select 모듈은 다음 유용한 입력의 위치를 ​​결정하는 데 도움이됩니다.

그러나, 당신은 거의 항상 분리 된 스레드로 행복합니다. 하나는 블로킹으로 표준 입력을 읽지 만 다른 하나는 블로킹을 원하지 않는 곳이면 어디에서나 할 수 있습니다.




JF Sebastian의 대답과 다른 여러 출처에서 작업하면서 간단한 하위 프로세스 관리자를 구성했습니다. 요청을 비 차단 읽기뿐만 아니라 여러 프로세스를 병렬로 실행합니다. 어떤 OS 특정 호출도 사용하지 않으므로 어디에서든지 작동해야합니다.

그것은 pypi에서 사용할 수 있으므로 그냥 pip install shelljob . 예제와 전체 문서는 프로젝트 페이지 를 참조하십시오.




일부 subprocess.Popen stdout 읽을이 문제를 추가합니다. 다음은 비 차단 읽기 솔루션입니다.

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'



asyncproc 모듈을 사용해보십시오. 예 :

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

이 모듈은 S.Lott이 제안한 모든 스레딩을 처리합니다.




I also faced the problem described by and solved it by using "select" as , and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()



나는 최근에 비 블로킹 모드에서 스트림 (테일 서브 프로세스에서 실행)에서 한 라인을 읽어야하는 동일한 문제를 발견했다. 다음 문제를 피하려고했다. CPU를 태우지 않고 한 바이트 씩 스트림을 읽지 않는다. readline처럼) 등

여기 내 구현 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 그것은 Windows (설문 조사)를 지원하지 않습니다, EOF를 처리하지 않습니다,하지만 그것은 잘 작동합니다




My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select function call.

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1 .

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

I welcome feedback to improve the solution as I am still new to Python.




select & read (1)를 사용하십시오.

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

readline () - like :

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a



Windows 및 Unix에서 비 차단 파이프를 설정할 수 있으므로이 대답을 여기에 추가하십시오.

모든 ctypes 세부 정보는 @ techtonik의 대답 덕분입니다.

유닉스와 윈도우즈 시스템 모두에서 약간 수정 된 버전이있다.

  • Python3과 호환됩니다 (사소한 변경이 필요함) .
  • posix 버전을 포함하며 둘 중 하나에 사용할 예외를 정의합니다.

이 방법을 사용하면 Unix 및 Windows 코드에서 동일한 기능과 예외를 사용할 수 있습니다.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

불완전한 데이터를 읽지 않기 위해 필자는 독자적으로 생성 된 readline generator (각 행의 바이트 문자열을 반환)를 작성했습니다.

그 발전기 그래서 예를 들면 ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)



Links