Skip to content

Commit

Permalink
Support streams on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
Yann Diorcet committed Apr 16, 2018
1 parent 1e90dab commit ed2c187
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 15 deletions.
31 changes: 31 additions & 0 deletions circus/pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import io
import socket
from .winpopen import enable_overlapped, disable_overlapped


def make_pipe():
if os.name != 'nt':
a, b = os.pipe()
a, b = io.open(a, 'rb', -1), io.open(b, 'wb', -1)
return a, b
else:
disable_overlapped()
try:
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('127.0.0.1', 0))
serv.listen(1)

# need to save sockets in _rsock/_wsock so they don't get closed
_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_rsock.connect(('127.0.0.1', serv.getsockname()[1]))

_wsock, addr = serv.accept()
serv.close()
_rsock_fd = _rsock.makefile('rb', 0)
_wsock_fd = _wsock.makefile('wb', 0)
_rsock.close()
_wsock.close()
return _rsock_fd, _wsock_fd
finally:
enable_overlapped()
29 changes: 20 additions & 9 deletions circus/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ObjectDict, replace_gnu_args, get_default_gid,
get_username_from_uid, IS_WINDOWS)
from circus import logger
from circus.pipe import make_pipe


_INFOLINE = ("%(pid)s %(cmdline)s %(username)s %(nice)s %(mem_info1)s "
Expand Down Expand Up @@ -211,9 +212,7 @@ def __init__(self, name, wid, cmd, args=None, working_dir=None,
self.gid = get_default_gid(self.uid)

if IS_WINDOWS:
if not self.use_fds and (self.pipe_stderr or self.pipe_stdout):
raise ValueError("On Windows, you can't close the fds if "
"you are redirecting stdout or stderr")
self.use_fds = True

if spawn:
self.spawn()
Expand Down Expand Up @@ -354,16 +353,28 @@ def preexec():
else:
preexec_fn = preexec

stdout = None
stderr = None
if self.pipe_stdout:
extra['stdout'] = subprocess.PIPE
stdout, extra['stdout'] = make_pipe()

if self.pipe_stderr:
extra['stderr'] = subprocess.PIPE
stderr, extra['stderr'] = make_pipe()

self._worker = Popen(args, cwd=self.working_dir,
shell=self.shell, preexec_fn=preexec_fn,
env=self.env, close_fds=not self.use_fds,
executable=self.executable, **extra)
try:
self._worker = Popen(args, cwd=self.working_dir,
shell=self.shell, preexec_fn=preexec_fn,
env=self.env, close_fds=not self.use_fds,
executable=self.executable, **extra)
finally:
if 'stdout' in extra:
extra['stdout'].close()

if 'stderr' in extra:
extra['stderr'].close()

self._worker.stderr = stderr
self._worker.stdout = stdout

# let go of sockets created only for self._worker to inherit
self._sockets = []
Expand Down
2 changes: 2 additions & 0 deletions circus/stream/file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def close(self):
def write_data(self, data):
# data to write on file
file_data = s(data['data'])
if os.linesep != '\n':
file_data = file_data.replace(os.linesep, '\n')

# If we want to prefix the stream with the current datetime
if self._time_format is not None:
Expand Down
7 changes: 6 additions & 1 deletion circus/stream/redirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ def __call__(self, fd, events):
self.redirector.remove_fd(fd)
return
try:
data = os.read(fd, self.redirector.buffer)
if hasattr(self.pipe, 'read1'):
data = self.pipe.read1(self.redirector.buffer)
else:
data = self.pipe.read(self.redirector.buffer)
if len(data) == 0:
self.redirector.remove_fd(fd)
else:
datamap = {'data': data, 'pid': self.process.pid,
'name': self.name}
self.redirector.redirect[self.name](datamap)
except ConnectionResetError:
self.redirector.remove_fd(fd)
except IOError as ex:
if ex.args[0] != errno.EAGAIN:
raise
Expand Down
6 changes: 1 addition & 5 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
" watcher" % self.numprocesses)

if IS_WINDOWS:
if self.stdout_stream or self.stderr_stream:
raise NotImplementedError("Streams are not supported"
" on Windows.")

if not copy_env and not env:
# Copy the env by default on Windows as we can't run any
# executable without some env variables
Expand Down Expand Up @@ -686,7 +682,7 @@ def spawn_process(self, recovery_wid=None):
# catch ValueError as well, as a misconfigured rlimit setting could
# lead to bad infinite retries here
except (OSError, ValueError) as e:
logger.warning('error in %r: %s', self.name, str(e))
logger.exception('error in %r: %s', self.name, str(e))

if process is None:
nb_tries += 1
Expand Down
212 changes: 212 additions & 0 deletions circus/winpopen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from __future__ import print_function, division, absolute_import

import os

"""
Fix windows Popen for supporting socket as pipe
"""

if os.name == "nt":
import subprocess
import socket
import sys
import msvcrt
if sys.version_info < (3, 0):
import _subprocess
else:
import _winapi

SO_OPENTYPE = 0x7008
SO_SYNCHRONOUS_ALERT = 0x10
SO_SYNCHRONOUS_NONALERT = 0x20

PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
if sys.version_info >= (3, 0):
DEVNULL = subprocess.DEVNULL

class WindowsPopen(subprocess.Popen):
def __init__(self, *args, **kwargs):
super(WindowsPopen, self).__init__(*args, **kwargs)

if sys.version_info < (3, 0):
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
to_close = set()
if stdin is None and stdout is None and stderr is None:
return (None, None, None, None, None, None), to_close

p2cread, p2cwrite = None, None
c2pread, c2pwrite = None, None
errread, errwrite = None, None

if stdin is None:
p2cread = _subprocess.GetStdHandle(_subprocess.STD_INPUT_HANDLE)
if p2cread is None:
p2cread, _ = _subprocess.CreatePipe(None, 0)
elif stdin == PIPE:
p2cread, p2cwrite = _subprocess.CreatePipe(None, 0)
elif isinstance(stdin, int):
p2cread = msvcrt.get_osfhandle(stdin)
else:
# Assuming file-like object
if not hasattr(stdin, '_sock'):
p2cread = msvcrt.get_osfhandle(stdin.fileno())
else:
p2cread = stdin.fileno()
p2cread = self._make_inheritable(p2cread)
# We just duplicated the handle, it has to be closed at the end
to_close.add(p2cread)
if stdin == PIPE:
to_close.add(p2cwrite)

if stdout is None:
c2pwrite = _subprocess.GetStdHandle(_subprocess.STD_OUTPUT_HANDLE)
if c2pwrite is None:
_, c2pwrite = _subprocess.CreatePipe(None, 0)
elif stdout == PIPE:
c2pread, c2pwrite = _subprocess.CreatePipe(None, 0)
elif isinstance(stdout, int):
c2pwrite = msvcrt.get_osfhandle(stdout)
else:
# Assuming file-like object
if not hasattr(stdout, '_sock'):
c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
else:
c2pwrite = stdout.fileno()
c2pwrite = self._make_inheritable(c2pwrite)
# We just duplicated the handle, it has to be closed at the end
to_close.add(c2pwrite)
if stdout == PIPE:
to_close.add(c2pread)

if stderr is None:
errwrite = _subprocess.GetStdHandle(_subprocess.STD_ERROR_HANDLE)
if errwrite is None:
_, errwrite = _subprocess.CreatePipe(None, 0)
elif stderr == PIPE:
errread, errwrite = _subprocess.CreatePipe(None, 0)
elif stderr == STDOUT:
errwrite = c2pwrite
elif isinstance(stderr, int):
errwrite = msvcrt.get_osfhandle(stderr)
else:
# Assuming file-like object
if not hasattr(stderr, '_sock'):
errwrite = msvcrt.get_osfhandle(stderr.fileno())
else:
errwrite = stderr.fileno()
errwrite = self._make_inheritable(errwrite)
# We just duplicated the handle, it has to be closed at the end
to_close.add(errwrite)
if stderr == PIPE:
to_close.add(errread)

return (p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite), to_close
else:
def _get_handles(self, stdin, stdout, stderr):
"""Construct and return tuple with IO objects:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
"""
if stdin is None and stdout is None and stderr is None:
return (-1, -1, -1, -1, -1, -1)

p2cread, p2cwrite = -1, -1
c2pread, c2pwrite = -1, -1
errread, errwrite = -1, -1

if stdin is None:
p2cread = _winapi.GetStdHandle(_winapi.STD_INPUT_HANDLE)
if p2cread is None:
p2cread, _ = _winapi.CreatePipe(None, 0)
p2cread = subprocess.Handle(p2cread)
_winapi.CloseHandle(_)
elif stdin == PIPE:
p2cread, p2cwrite = _winapi.CreatePipe(None, 0)
p2cread, p2cwrite = subprocess.Handle(p2cread), subprocess.Handle(p2cwrite)
elif stdin == DEVNULL:
p2cread = msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stdin, int):
p2cread = msvcrt.get_osfhandle(stdin)
else:
# Assuming file-like object
if not hasattr(stdin, '_sock'):
p2cread = msvcrt.get_osfhandle(stdin.fileno())
else:
p2cread = stdin.fileno()
p2cread = self._make_inheritable(p2cread)

if stdout is None:
c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE)
if c2pwrite is None:
_, c2pwrite = _winapi.CreatePipe(None, 0)
c2pwrite = subprocess.Handle(c2pwrite)
_winapi.CloseHandle(_)
elif stdout == PIPE:
c2pread, c2pwrite = _winapi.CreatePipe(None, 0)
c2pread, c2pwrite = subprocess.Handle(c2pread), subprocess.Handle(c2pwrite)
elif stdout == DEVNULL:
c2pwrite = msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stdout, int):
c2pwrite = msvcrt.get_osfhandle(stdout)
else:
# Assuming file-like object
if not hasattr(stdout, '_sock'):
c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
else:
c2pwrite = stdout.fileno()
c2pwrite = self._make_inheritable(c2pwrite)

if stderr is None:
errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE)
if errwrite is None:
_, errwrite = _winapi.CreatePipe(None, 0)
errwrite = subprocess.Handle(errwrite)
_winapi.CloseHandle(_)
elif stderr == PIPE:
errread, errwrite = _winapi.CreatePipe(None, 0)
errread, errwrite = subprocess.Handle(errread), subprocess.Handle(errwrite)
elif stderr == STDOUT:
errwrite = c2pwrite
elif stderr == DEVNULL:
errwrite = msvcrt.get_osfhandle(self._get_devnull())
elif isinstance(stderr, int):
errwrite = msvcrt.get_osfhandle(stderr)
else:
# Assuming file-like object
if not hasattr(stderr, '_sock'):
errwrite = msvcrt.get_osfhandle(stderr.fileno())
else:
errwrite = stderr.fileno()
errwrite = self._make_inheritable(errwrite)

return (p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)

subprocess.Popen_old = subprocess.Popen
subprocess.Popen = WindowsPopen

def disable_overlapped():
# Enable socket to be non overlapped
try:
dummy = socket.socket(0xDEAD, socket.SOCK_STREAM) # After that python will not force WSA_FLAG_OVERLAPPED
except:
pass
dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_NONALERT)

def enable_overlapped():
dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_ALERT)

else:
def disable_overlapped():
pass

def enable_overlapped():
pass

0 comments on commit ed2c187

Please sign in to comment.