Skip to content

Commit

Permalink
pythongh-123856: Fix PyREPL failure when a keyboard interrupt is trig…
Browse files Browse the repository at this point in the history
…gered after using a history search (pythonGH-124396)

(cherry picked from commit c1600c7)

Co-authored-by: Emily Morehouse <emily@cuttlesoft.com>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
  • Loading branch information
2 people authored and miss-islington committed Sep 25, 2024
1 parent 2f25d85 commit da09f56
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 90 deletions.
11 changes: 6 additions & 5 deletions Lib/_pyrepl/simple_interact.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import _sitebuiltins
import linecache
import functools
import os
import sys
import code

Expand All @@ -50,7 +51,9 @@ def check() -> str:
try:
_get_reader()
except _error as e:
return str(e) or repr(e) or "unknown error"
if term := os.environ.get("TERM", ""):
term = f"; TERM={term}"
return str(str(e) or repr(e) or "unknown error") + term
return ""


Expand Down Expand Up @@ -159,10 +162,8 @@ def maybe_run_command(statement: str) -> bool:
input_n += 1
except KeyboardInterrupt:
r = _get_reader()
if r.last_command and 'isearch' in r.last_command.__name__:
r.isearch_direction = ''
r.console.forgetinput()
r.pop_input_trans()
if r.input_trans is r.isearch_trans:
r.do_cmd(("isearch-end", [""]))
r.pos = len(r.get_unicode())
r.dirty = True
r.refresh()
Expand Down
192 changes: 107 additions & 85 deletions Lib/test/test_pyrepl/test_pyrepl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess
import sys
import tempfile
from unittest import TestCase, skipUnless
from unittest import TestCase, skipUnless, skipIf
from unittest.mock import patch
from test.support import force_not_colorized
from test.support import SHORT_TIMEOUT
Expand All @@ -35,6 +35,94 @@
except ImportError:
pty = None


class ReplTestCase(TestCase):
def run_repl(
self,
repl_input: str | list[str],
env: dict | None = None,
*,
cmdline_args: list[str] | None = None,
cwd: str | None = None,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
cwd = temp_dir.name
try:
return self._run_repl(
repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
)
finally:
if temp_dir is not None:
temp_dir.cleanup()

def _run_repl(
self,
repl_input: str | list[str],
*,
env: dict | None,
cmdline_args: list[str] | None,
cwd: str,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
cmd = [sys.executable, "-i", "-u"]
if env is None:
cmd.append("-I")
elif "PYTHON_HISTORY" not in env:
env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
if cmdline_args is not None:
cmd.extend(cmdline_args)

try:
import termios
except ModuleNotFoundError:
pass
else:
term_attr = termios.tcgetattr(slave_fd)
term_attr[6][termios.VREPRINT] = 0 # pass through CTRL-R
term_attr[6][termios.VINTR] = 0 # pass through CTRL-C
termios.tcsetattr(slave_fd, termios.TCSANOW, term_attr)

process = subprocess.Popen(
cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=cwd,
text=True,
close_fds=True,
env=env if env else os.environ,
)
os.close(slave_fd)
if isinstance(repl_input, list):
repl_input = "\n".join(repl_input) + "\n"
os.write(master_fd, repl_input.encode("utf-8"))

output = []
while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
break
except OSError:
break
output.append(data)
else:
os.close(master_fd)
process.kill()
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")

os.close(master_fd)
try:
exit_code = process.wait(timeout=SHORT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
return "".join(output), exit_code


class TestCursorPosition(TestCase):
def prepare_reader(self, events):
console = FakeConsole(events)
Expand Down Expand Up @@ -968,7 +1056,20 @@ def test_bracketed_paste_single_line(self):


@skipUnless(pty, "requires pty")
class TestMain(TestCase):
class TestDumbTerminal(ReplTestCase):
def test_dumb_terminal_exits_cleanly(self):
env = os.environ.copy()
env.update({"TERM": "dumb"})
output, exit_code = self.run_repl("exit()\n", env=env)
self.assertEqual(exit_code, 0)
self.assertIn("warning: can't use pyrepl", output)
self.assertNotIn("Exception", output)
self.assertNotIn("Traceback", output)


@skipUnless(pty, "requires pty")
@skipIf((os.environ.get("TERM") or "dumb") == "dumb", "can't use pyrepl in dumb terminal")
class TestMain(ReplTestCase):
def setUp(self):
# Cleanup from PYTHON* variables to isolate from local
# user settings, see #121359. Such variables should be
Expand Down Expand Up @@ -1078,15 +1179,6 @@ def test_inspect_keeps_globals_from_inspected_module(self):
}
self._run_repl_globals_test(expectations, as_module=True)

def test_dumb_terminal_exits_cleanly(self):
env = os.environ.copy()
env.update({"TERM": "dumb"})
output, exit_code = self.run_repl("exit()\n", env=env)
self.assertEqual(exit_code, 0)
self.assertIn("warning: can't use pyrepl", output)
self.assertNotIn("Exception", output)
self.assertNotIn("Traceback", output)

@force_not_colorized
def test_python_basic_repl(self):
env = os.environ.copy()
Expand Down Expand Up @@ -1209,80 +1301,6 @@ def test_proper_tracebacklimit(self):
self.assertIn("in x3", output)
self.assertIn("in <module>", output)

def run_repl(
self,
repl_input: str | list[str],
env: dict | None = None,
*,
cmdline_args: list[str] | None = None,
cwd: str | None = None,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
cwd = temp_dir.name
try:
return self._run_repl(
repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd
)
finally:
if temp_dir is not None:
temp_dir.cleanup()

def _run_repl(
self,
repl_input: str | list[str],
*,
env: dict | None,
cmdline_args: list[str] | None,
cwd: str,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
cmd = [sys.executable, "-i", "-u"]
if env is None:
cmd.append("-I")
elif "PYTHON_HISTORY" not in env:
env["PYTHON_HISTORY"] = os.path.join(cwd, ".regrtest_history")
if cmdline_args is not None:
cmd.extend(cmdline_args)
process = subprocess.Popen(
cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=cwd,
text=True,
close_fds=True,
env=env if env else os.environ,
)
os.close(slave_fd)
if isinstance(repl_input, list):
repl_input = "\n".join(repl_input) + "\n"
os.write(master_fd, repl_input.encode("utf-8"))

output = []
while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
break
except OSError:
break
output.append(data)
else:
os.close(master_fd)
process.kill()
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")

os.close(master_fd)
try:
exit_code = process.wait(timeout=SHORT_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
return "".join(output), exit_code

def test_readline_history_file(self):
# skip, if readline module is not available
readline = import_module('readline')
Expand All @@ -1305,3 +1323,7 @@ def test_readline_history_file(self):
output, exit_code = self.run_repl("exit\n", env=env)
self.assertEqual(exit_code, 0)
self.assertNotIn("\\040", pathlib.Path(hfile.name).read_text())

def test_keyboard_interrupt_after_isearch(self):
output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
self.assertEqual(exit_code, 0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix PyREPL failure when a keyboard interrupt is triggered after using a
history search

0 comments on commit da09f56

Please sign in to comment.