These files are a subset of the python-2.7.2.tgz distribution from python.org. Changed files from PyMod-2.7.2 have been copied into the corresponding directories of this tree, replacing the original files in the distribution. Signed-off-by: daryl.mcdaniel@intel.com git-svn-id: https://edk2.svn.sourceforge.net/svnroot/edk2/trunk/edk2@13197 6f19259b-4bc3-4df7-8a09-765794883524
		
			
				
	
	
		
			494 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			494 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| from test import test_support
 | |
| from contextlib import closing
 | |
| import gc
 | |
| import pickle
 | |
| import select
 | |
| import signal
 | |
| import subprocess
 | |
| import traceback
 | |
| import sys, os, time, errno
 | |
| 
 | |
| if sys.platform in ('os2', 'riscos'):
 | |
|     raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
 | |
| 
 | |
| 
 | |
| class HandlerBCalled(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def exit_subprocess():
 | |
|     """Use os._exit(0) to exit the current subprocess.
 | |
| 
 | |
|     Otherwise, the test catches the SystemExit and continues executing
 | |
|     in parallel with the original test, so you wind up with an
 | |
|     exponential number of tests running concurrently.
 | |
|     """
 | |
|     os._exit(0)
 | |
| 
 | |
| 
 | |
| def ignoring_eintr(__func, *args, **kwargs):
 | |
|     try:
 | |
|         return __func(*args, **kwargs)
 | |
|     except EnvironmentError as e:
 | |
|         if e.errno != errno.EINTR:
 | |
|             raise
 | |
|         return None
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 | |
| class InterProcessSignalTests(unittest.TestCase):
 | |
|     MAX_DURATION = 20   # Entire test should last at most 20 sec.
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.using_gc = gc.isenabled()
 | |
|         gc.disable()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         if self.using_gc:
 | |
|             gc.enable()
 | |
| 
 | |
|     def format_frame(self, frame, limit=None):
 | |
|         return ''.join(traceback.format_stack(frame, limit=limit))
 | |
| 
 | |
|     def handlerA(self, signum, frame):
 | |
|         self.a_called = True
 | |
|         if test_support.verbose:
 | |
|             print "handlerA invoked from signal %s at:\n%s" % (
 | |
|                 signum, self.format_frame(frame, limit=1))
 | |
| 
 | |
|     def handlerB(self, signum, frame):
 | |
|         self.b_called = True
 | |
|         if test_support.verbose:
 | |
|             print "handlerB invoked from signal %s at:\n%s" % (
 | |
|                 signum, self.format_frame(frame, limit=1))
 | |
|         raise HandlerBCalled(signum, self.format_frame(frame))
 | |
| 
 | |
|     def wait(self, child):
 | |
|         """Wait for child to finish, ignoring EINTR."""
 | |
|         while True:
 | |
|             try:
 | |
|                 child.wait()
 | |
|                 return
 | |
|             except OSError as e:
 | |
|                 if e.errno != errno.EINTR:
 | |
|                     raise
 | |
| 
 | |
|     def run_test(self):
 | |
|         # Install handlers. This function runs in a sub-process, so we
 | |
|         # don't worry about re-setting the default handlers.
 | |
|         signal.signal(signal.SIGHUP, self.handlerA)
 | |
|         signal.signal(signal.SIGUSR1, self.handlerB)
 | |
|         signal.signal(signal.SIGUSR2, signal.SIG_IGN)
 | |
|         signal.signal(signal.SIGALRM, signal.default_int_handler)
 | |
| 
 | |
|         # Variables the signals will modify:
 | |
|         self.a_called = False
 | |
|         self.b_called = False
 | |
| 
 | |
|         # Let the sub-processes know who to send signals to.
 | |
|         pid = os.getpid()
 | |
|         if test_support.verbose:
 | |
|             print "test runner's pid is", pid
 | |
| 
 | |
|         child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
 | |
|         if child:
 | |
|             self.wait(child)
 | |
|             if not self.a_called:
 | |
|                 time.sleep(1)  # Give the signal time to be delivered.
 | |
|         self.assertTrue(self.a_called)
 | |
|         self.assertFalse(self.b_called)
 | |
|         self.a_called = False
 | |
| 
 | |
|         # Make sure the signal isn't delivered while the previous
 | |
|         # Popen object is being destroyed, because __del__ swallows
 | |
|         # exceptions.
 | |
|         del child
 | |
|         try:
 | |
|             child = subprocess.Popen(['kill', '-USR1', str(pid)])
 | |
|             # This wait should be interrupted by the signal's exception.
 | |
|             self.wait(child)
 | |
|             time.sleep(1)  # Give the signal time to be delivered.
 | |
|             self.fail('HandlerBCalled exception not thrown')
 | |
|         except HandlerBCalled:
 | |
|             self.assertTrue(self.b_called)
 | |
|             self.assertFalse(self.a_called)
 | |
|             if test_support.verbose:
 | |
|                 print "HandlerBCalled exception caught"
 | |
| 
 | |
|         child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
 | |
|         if child:
 | |
|             self.wait(child)  # Nothing should happen.
 | |
| 
 | |
|         try:
 | |
|             signal.alarm(1)
 | |
|             # The race condition in pause doesn't matter in this case,
 | |
|             # since alarm is going to raise a KeyboardException, which
 | |
|             # will skip the call.
 | |
|             signal.pause()
 | |
|             # But if another signal arrives before the alarm, pause
 | |
|             # may return early.
 | |
|             time.sleep(1)
 | |
|         except KeyboardInterrupt:
 | |
|             if test_support.verbose:
 | |
|                 print "KeyboardInterrupt (the alarm() went off)"
 | |
|         except:
 | |
|             self.fail("Some other exception woke us from pause: %s" %
 | |
|                       traceback.format_exc())
 | |
|         else:
 | |
|             self.fail("pause returned of its own accord, and the signal"
 | |
|                       " didn't arrive after another second.")
 | |
| 
 | |
|     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
 | |
|     @unittest.skipIf(sys.platform=='freebsd6',
 | |
|         'inter process signals not reliable (do not mix well with threading) '
 | |
|         'on freebsd6')
 | |
|     def test_main(self):
 | |
|         # This function spawns a child process to insulate the main
 | |
|         # test-running process from all the signals. It then
 | |
|         # communicates with that child process over a pipe and
 | |
|         # re-raises information about any exceptions the child
 | |
|         # throws. The real work happens in self.run_test().
 | |
|         os_done_r, os_done_w = os.pipe()
 | |
|         with closing(os.fdopen(os_done_r)) as done_r, \
 | |
|              closing(os.fdopen(os_done_w, 'w')) as done_w:
 | |
|             child = os.fork()
 | |
|             if child == 0:
 | |
|                 # In the child process; run the test and report results
 | |
|                 # through the pipe.
 | |
|                 try:
 | |
|                     done_r.close()
 | |
|                     # Have to close done_w again here because
 | |
|                     # exit_subprocess() will skip the enclosing with block.
 | |
|                     with closing(done_w):
 | |
|                         try:
 | |
|                             self.run_test()
 | |
|                         except:
 | |
|                             pickle.dump(traceback.format_exc(), done_w)
 | |
|                         else:
 | |
|                             pickle.dump(None, done_w)
 | |
|                 except:
 | |
|                     print 'Uh oh, raised from pickle.'
 | |
|                     traceback.print_exc()
 | |
|                 finally:
 | |
|                     exit_subprocess()
 | |
| 
 | |
|             done_w.close()
 | |
|             # Block for up to MAX_DURATION seconds for the test to finish.
 | |
|             r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
 | |
|             if done_r in r:
 | |
|                 tb = pickle.load(done_r)
 | |
|                 if tb:
 | |
|                     self.fail(tb)
 | |
|             else:
 | |
|                 os.kill(child, signal.SIGKILL)
 | |
|                 self.fail('Test deadlocked after %d seconds.' %
 | |
|                           self.MAX_DURATION)
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 | |
| class BasicSignalTests(unittest.TestCase):
 | |
|     def trivial_signal_handler(self, *args):
 | |
|         pass
 | |
| 
 | |
|     def test_out_of_range_signal_number_raises_error(self):
 | |
|         self.assertRaises(ValueError, signal.getsignal, 4242)
 | |
| 
 | |
|         self.assertRaises(ValueError, signal.signal, 4242,
 | |
|                           self.trivial_signal_handler)
 | |
| 
 | |
|     def test_setting_signal_handler_to_none_raises_error(self):
 | |
|         self.assertRaises(TypeError, signal.signal,
 | |
|                           signal.SIGUSR1, None)
 | |
| 
 | |
|     def test_getsignal(self):
 | |
|         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
 | |
|         self.assertEqual(signal.getsignal(signal.SIGHUP),
 | |
|                          self.trivial_signal_handler)
 | |
|         signal.signal(signal.SIGHUP, hup)
 | |
|         self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
 | |
| 
 | |
| 
 | |
| @unittest.skipUnless(sys.platform == "win32", "Windows specific")
 | |
| class WindowsSignalTests(unittest.TestCase):
 | |
|     def test_issue9324(self):
 | |
|         # Updated for issue #10003, adding SIGBREAK
 | |
|         handler = lambda x, y: None
 | |
|         for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
 | |
|                     signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
 | |
|                     signal.SIGTERM):
 | |
|             # Set and then reset a handler for signals that work on windows
 | |
|             signal.signal(sig, signal.signal(sig, handler))
 | |
| 
 | |
|         with self.assertRaises(ValueError):
 | |
|             signal.signal(-1, handler)
 | |
| 
 | |
|         with self.assertRaises(ValueError):
 | |
|             signal.signal(7, handler)
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 | |
| class WakeupSignalTests(unittest.TestCase):
 | |
|     TIMEOUT_FULL = 10
 | |
|     TIMEOUT_HALF = 5
 | |
| 
 | |
|     def test_wakeup_fd_early(self):
 | |
|         import select
 | |
| 
 | |
|         signal.alarm(1)
 | |
|         before_time = time.time()
 | |
|         # We attempt to get a signal during the sleep,
 | |
|         # before select is called
 | |
|         time.sleep(self.TIMEOUT_FULL)
 | |
|         mid_time = time.time()
 | |
|         self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
 | |
|         select.select([self.read], [], [], self.TIMEOUT_FULL)
 | |
|         after_time = time.time()
 | |
|         self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
 | |
| 
 | |
|     def test_wakeup_fd_during(self):
 | |
|         import select
 | |
| 
 | |
|         signal.alarm(1)
 | |
|         before_time = time.time()
 | |
|         # We attempt to get a signal during the select call
 | |
|         self.assertRaises(select.error, select.select,
 | |
|             [self.read], [], [], self.TIMEOUT_FULL)
 | |
|         after_time = time.time()
 | |
|         self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
 | |
| 
 | |
|     def setUp(self):
 | |
|         import fcntl
 | |
| 
 | |
|         self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
 | |
|         self.read, self.write = os.pipe()
 | |
|         flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
 | |
|         flags = flags | os.O_NONBLOCK
 | |
|         fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
 | |
|         self.old_wakeup = signal.set_wakeup_fd(self.write)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         signal.set_wakeup_fd(self.old_wakeup)
 | |
|         os.close(self.read)
 | |
|         os.close(self.write)
 | |
|         signal.signal(signal.SIGALRM, self.alrm)
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 | |
| class SiginterruptTest(unittest.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         """Install a no-op signal handler that can be set to allow
 | |
|         interrupts or not, and arrange for the original signal handler to be
 | |
|         re-installed when the test is finished.
 | |
|         """
 | |
|         self.signum = signal.SIGUSR1
 | |
|         oldhandler = signal.signal(self.signum, lambda x,y: None)
 | |
|         self.addCleanup(signal.signal, self.signum, oldhandler)
 | |
| 
 | |
|     def readpipe_interrupted(self):
 | |
|         """Perform a read during which a signal will arrive.  Return True if the
 | |
|         read is interrupted by the signal and raises an exception.  Return False
 | |
|         if it returns normally.
 | |
|         """
 | |
|         # Create a pipe that can be used for the read.  Also clean it up
 | |
|         # when the test is over, since nothing else will (but see below for
 | |
|         # the write end).
 | |
|         r, w = os.pipe()
 | |
|         self.addCleanup(os.close, r)
 | |
| 
 | |
|         # Create another process which can send a signal to this one to try
 | |
|         # to interrupt the read.
 | |
|         ppid = os.getpid()
 | |
|         pid = os.fork()
 | |
| 
 | |
|         if pid == 0:
 | |
|             # Child code: sleep to give the parent enough time to enter the
 | |
|             # read() call (there's a race here, but it's really tricky to
 | |
|             # eliminate it); then signal the parent process.  Also, sleep
 | |
|             # again to make it likely that the signal is delivered to the
 | |
|             # parent process before the child exits.  If the child exits
 | |
|             # first, the write end of the pipe will be closed and the test
 | |
|             # is invalid.
 | |
|             try:
 | |
|                 time.sleep(0.2)
 | |
|                 os.kill(ppid, self.signum)
 | |
|                 time.sleep(0.2)
 | |
|             finally:
 | |
|                 # No matter what, just exit as fast as possible now.
 | |
|                 exit_subprocess()
 | |
|         else:
 | |
|             # Parent code.
 | |
|             # Make sure the child is eventually reaped, else it'll be a
 | |
|             # zombie for the rest of the test suite run.
 | |
|             self.addCleanup(os.waitpid, pid, 0)
 | |
| 
 | |
|             # Close the write end of the pipe.  The child has a copy, so
 | |
|             # it's not really closed until the child exits.  We need it to
 | |
|             # close when the child exits so that in the non-interrupt case
 | |
|             # the read eventually completes, otherwise we could just close
 | |
|             # it *after* the test.
 | |
|             os.close(w)
 | |
| 
 | |
|             # Try the read and report whether it is interrupted or not to
 | |
|             # the caller.
 | |
|             try:
 | |
|                 d = os.read(r, 1)
 | |
|                 return False
 | |
|             except OSError, err:
 | |
|                 if err.errno != errno.EINTR:
 | |
|                     raise
 | |
|                 return True
 | |
| 
 | |
|     def test_without_siginterrupt(self):
 | |
|         """If a signal handler is installed and siginterrupt is not called
 | |
|         at all, when that signal arrives, it interrupts a syscall that's in
 | |
|         progress.
 | |
|         """
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertTrue(i)
 | |
|         # Arrival of the signal shouldn't have changed anything.
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertTrue(i)
 | |
| 
 | |
|     def test_siginterrupt_on(self):
 | |
|         """If a signal handler is installed and siginterrupt is called with
 | |
|         a true value for the second argument, when that signal arrives, it
 | |
|         interrupts a syscall that's in progress.
 | |
|         """
 | |
|         signal.siginterrupt(self.signum, 1)
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertTrue(i)
 | |
|         # Arrival of the signal shouldn't have changed anything.
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertTrue(i)
 | |
| 
 | |
|     def test_siginterrupt_off(self):
 | |
|         """If a signal handler is installed and siginterrupt is called with
 | |
|         a false value for the second argument, when that signal arrives, it
 | |
|         does not interrupt a syscall that's in progress.
 | |
|         """
 | |
|         signal.siginterrupt(self.signum, 0)
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertFalse(i)
 | |
|         # Arrival of the signal shouldn't have changed anything.
 | |
|         i = self.readpipe_interrupted()
 | |
|         self.assertFalse(i)
 | |
| 
 | |
| 
 | |
| @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
 | |
| class ItimerTest(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         self.hndl_called = False
 | |
|         self.hndl_count = 0
 | |
|         self.itimer = None
 | |
|         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         signal.signal(signal.SIGALRM, self.old_alarm)
 | |
|         if self.itimer is not None: # test_itimer_exc doesn't change this attr
 | |
|             # just ensure that itimer is stopped
 | |
|             signal.setitimer(self.itimer, 0)
 | |
| 
 | |
|     def sig_alrm(self, *args):
 | |
|         self.hndl_called = True
 | |
|         if test_support.verbose:
 | |
|             print("SIGALRM handler invoked", args)
 | |
| 
 | |
|     def sig_vtalrm(self, *args):
 | |
|         self.hndl_called = True
 | |
| 
 | |
|         if self.hndl_count > 3:
 | |
|             # it shouldn't be here, because it should have been disabled.
 | |
|             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
 | |
|                 "timer.")
 | |
|         elif self.hndl_count == 3:
 | |
|             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
 | |
|             signal.setitimer(signal.ITIMER_VIRTUAL, 0)
 | |
|             if test_support.verbose:
 | |
|                 print("last SIGVTALRM handler call")
 | |
| 
 | |
|         self.hndl_count += 1
 | |
| 
 | |
|         if test_support.verbose:
 | |
|             print("SIGVTALRM handler invoked", args)
 | |
| 
 | |
|     def sig_prof(self, *args):
 | |
|         self.hndl_called = True
 | |
|         signal.setitimer(signal.ITIMER_PROF, 0)
 | |
| 
 | |
|         if test_support.verbose:
 | |
|             print("SIGPROF handler invoked", args)
 | |
| 
 | |
|     def test_itimer_exc(self):
 | |
|         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
 | |
|         # defines it ?
 | |
|         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
 | |
|         # Negative times are treated as zero on some platforms.
 | |
|         if 0:
 | |
|             self.assertRaises(signal.ItimerError,
 | |
|                               signal.setitimer, signal.ITIMER_REAL, -1)
 | |
| 
 | |
|     def test_itimer_real(self):
 | |
|         self.itimer = signal.ITIMER_REAL
 | |
|         signal.setitimer(self.itimer, 1.0)
 | |
|         if test_support.verbose:
 | |
|             print("\ncall pause()...")
 | |
|         signal.pause()
 | |
| 
 | |
|         self.assertEqual(self.hndl_called, True)
 | |
| 
 | |
|     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
 | |
|     @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
 | |
|         'itimer not reliable (does not mix well with threading) on some BSDs.')
 | |
|     def test_itimer_virtual(self):
 | |
|         self.itimer = signal.ITIMER_VIRTUAL
 | |
|         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
 | |
|         signal.setitimer(self.itimer, 0.3, 0.2)
 | |
| 
 | |
|         start_time = time.time()
 | |
|         while time.time() - start_time < 60.0:
 | |
|             # use up some virtual time by doing real work
 | |
|             _ = pow(12345, 67890, 10000019)
 | |
|             if signal.getitimer(self.itimer) == (0.0, 0.0):
 | |
|                 break # sig_vtalrm handler stopped this itimer
 | |
|         else: # Issue 8424
 | |
|             self.skipTest("timeout: likely cause: machine too slow or load too "
 | |
|                           "high")
 | |
| 
 | |
|         # virtual itimer should be (0.0, 0.0) now
 | |
|         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
 | |
|         # and the handler should have been called
 | |
|         self.assertEqual(self.hndl_called, True)
 | |
| 
 | |
|     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
 | |
|     @unittest.skipIf(sys.platform=='freebsd6',
 | |
|         'itimer not reliable (does not mix well with threading) on freebsd6')
 | |
|     def test_itimer_prof(self):
 | |
|         self.itimer = signal.ITIMER_PROF
 | |
|         signal.signal(signal.SIGPROF, self.sig_prof)
 | |
|         signal.setitimer(self.itimer, 0.2, 0.2)
 | |
| 
 | |
|         start_time = time.time()
 | |
|         while time.time() - start_time < 60.0:
 | |
|             # do some work
 | |
|             _ = pow(12345, 67890, 10000019)
 | |
|             if signal.getitimer(self.itimer) == (0.0, 0.0):
 | |
|                 break # sig_prof handler stopped this itimer
 | |
|         else: # Issue 8424
 | |
|             self.skipTest("timeout: likely cause: machine too slow or load too "
 | |
|                           "high")
 | |
| 
 | |
|         # profiling itimer should be (0.0, 0.0) now
 | |
|         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
 | |
|         # and the handler should have been called
 | |
|         self.assertEqual(self.hndl_called, True)
 | |
| 
 | |
| def test_main():
 | |
|     test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
 | |
|                               WakeupSignalTests, SiginterruptTest,
 | |
|                               ItimerTest, WindowsSignalTests)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     test_main()
 |