@ -5,8 +5,9 @@ from test.support.import_helper import import_module
import_module ( ' termios ' )
import errno
import pty
import os
import pty
import tty
import sys
import select
import signal
@ -123,12 +124,6 @@ class PtyTest(unittest.TestCase):
@staticmethod
def handle_sighup ( signum , frame ) :
# bpo-38547: if the process is the session leader, os.close(master_fd)
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
# signal: just ignore the signal.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.
pass
@expectedFailureIfStdinIsTTY
@ -190,13 +185,6 @@ class PtyTest(unittest.TestCase):
self . assertEqual ( _get_term_winsz ( slave_fd ) , new_stdin_winsz ,
" openpty() failed to set slave window size " )
# Solaris requires reading the fd before anything is returned.
# My guess is that since we open and close the slave fd
# in master_open(), we need to read the EOF.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.
# Ensure the fd is non-blocking in case there's nothing to read.
blocking = os . get_blocking ( master_fd )
try :
@ -324,22 +312,40 @@ class PtyTest(unittest.TestCase):
self . assertEqual ( data , b " " )
def test_spawn_doesnt_hang ( self ) :
pty . spawn ( [ sys . executable , ' -c ' , ' print( " hi there " ) ' ] )
class SmallPtyTests ( unittest . TestCase ) :
""" These tests don ' t spawn children or hang. """
def setUp ( self ) :
self . orig_stdin_fileno = pty . STDIN_FILENO
self . orig_stdout_fileno = pty . STDOUT_FILENO
self . orig_pty_close = pty . close
self . orig_pty__copy = pty . _copy
self . orig_pty_fork = pty . fork
self . orig_pty_select = pty . select
self . orig_pty_setraw = pty . setraw
self . orig_pty_tcgetattr = pty . tcgetattr
self . orig_pty_tcsetattr = pty . tcsetattr
self . orig_pty_waitpid = pty . waitpid
self . fds = [ ] # A list of file descriptors to close.
self . files = [ ]
self . select_rfds_lengths = [ ]
self . select_rfds_results = [ ]
self . tcsetattr_mode_setting = None
def tearDown ( self ) :
pty . STDIN_FILENO = self . orig_stdin_fileno
pty . STDOUT_FILENO = self . orig_stdout_fileno
pty . close = self . orig_pty_close
pty . _copy = self . orig_pty__copy
pty . fork = self . orig_pty_fork
pty . select = self . orig_pty_select
pty . setraw = self . orig_pty_setraw
pty . tcgetattr = self . orig_pty_tcgetattr
pty . tcsetattr = self . orig_pty_tcsetattr
pty . waitpid = self . orig_pty_waitpid
for file in self . files :
try :
file . close ( )
@ -367,6 +373,14 @@ class SmallPtyTests(unittest.TestCase):
self . assertEqual ( self . select_rfds_lengths . pop ( 0 ) , len ( rfds ) )
return self . select_rfds_results . pop ( 0 ) , [ ] , [ ]
def _make_mock_fork ( self , pid ) :
def mock_fork ( ) :
return ( pid , 12 )
return mock_fork
def _mock_tcsetattr ( self , fileno , opt , mode ) :
self . tcsetattr_mode_setting = mode
def test__copy_to_each ( self ) :
""" Test the normal data case on both master_fd and stdin. """
read_from_stdout_fd , mock_stdout_fd = self . _pipe ( )
@ -407,7 +421,6 @@ class SmallPtyTests(unittest.TestCase):
socketpair [ 1 ] . close ( )
os . close ( write_to_stdin_fd )
# Expect two select calls, the last one will cause IndexError
pty . select = self . _mock_select
self . select_rfds_lengths . append ( 2 )
self . select_rfds_results . append ( [ mock_stdin_fd , masters [ 0 ] ] )
@ -415,12 +428,34 @@ class SmallPtyTests(unittest.TestCase):
# both encountered an EOF before the second select call.
self . select_rfds_lengths . append ( 0 )
with self . assertRaises ( IndexError ) :
pty . _copy ( masters [ 0 ] )
# We expect the function to return without error.
self . assertEqual ( pty . _copy ( masters [ 0 ] ) , None )
def test__restore_tty_mode_normal_return ( self ) :
""" Test that spawn resets the tty mode no when _copy returns normally. """
# PID 1 is returned from mocked fork to run the parent branch
# of code
pty . fork = self . _make_mock_fork ( 1 )
status_sentinel = object ( )
pty . waitpid = lambda _1 , _2 : [ None , status_sentinel ]
pty . close = lambda _ : None
pty . _copy = lambda _1 , _2 , _3 : None
mode_sentinel = object ( )
pty . tcgetattr = lambda fd : mode_sentinel
pty . tcsetattr = self . _mock_tcsetattr
pty . setraw = lambda _ : None
self . assertEqual ( pty . spawn ( [ ] ) , status_sentinel , " pty.waitpid process status not returned by pty.spawn " )
self . assertEqual ( self . tcsetattr_mode_setting , mode_sentinel , " pty.tcsetattr not called with original mode value " )
def tearDownModule ( ) :
reap_children ( )
if __name__ == " __main__ " :
unittest . main ( )