These files are a subset of the python-2.7.2.tgz distribution from python.org. Changed files from PyMod-2.7.2 have been copied into the corresponding directories of this tree, replacing the original files in the distribution. Signed-off-by: daryl.mcdaniel@intel.com git-svn-id: https://edk2.svn.sourceforge.net/svnroot/edk2/trunk/edk2@13197 6f19259b-4bc3-4df7-8a09-765794883524
		
			
				
	
	
		
			1234 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1234 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Supporting definitions for the Python regression tests."""
 | 
						|
 | 
						|
if __name__ != 'test.test_support':
 | 
						|
    raise ImportError('test_support must be imported from the test package')
 | 
						|
 | 
						|
import contextlib
 | 
						|
import errno
 | 
						|
import functools
 | 
						|
import gc
 | 
						|
import socket
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import shutil
 | 
						|
import warnings
 | 
						|
import unittest
 | 
						|
import importlib
 | 
						|
import UserDict
 | 
						|
import re
 | 
						|
import time
 | 
						|
try:
 | 
						|
    import thread
 | 
						|
except ImportError:
 | 
						|
    thread = None
 | 
						|
 | 
						|
__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
 | 
						|
           "verbose", "use_resources", "max_memuse", "record_original_stdout",
 | 
						|
           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
 | 
						|
           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
 | 
						|
           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
 | 
						|
           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
 | 
						|
           "open_urlresource", "check_warnings", "check_py3k_warnings",
 | 
						|
           "CleanImport", "EnvironmentVarGuard", "captured_output",
 | 
						|
           "captured_stdout", "TransientResource", "transient_internet",
 | 
						|
           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
 | 
						|
           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
 | 
						|
           "threading_cleanup", "reap_children", "cpython_only",
 | 
						|
           "check_impl_detail", "get_attribute", "py3k_bytes",
 | 
						|
           "import_fresh_module"]
 | 
						|
 | 
						|
 | 
						|
class Error(Exception):
 | 
						|
    """Base class for regression test exceptions."""
 | 
						|
 | 
						|
class TestFailed(Error):
 | 
						|
    """Test failed."""
 | 
						|
 | 
						|
class ResourceDenied(unittest.SkipTest):
 | 
						|
    """Test skipped because it requested a disallowed resource.
 | 
						|
 | 
						|
    This is raised when a test calls requires() for a resource that
 | 
						|
    has not been enabled.  It is used to distinguish between expected
 | 
						|
    and unexpected skips.
 | 
						|
    """
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def _ignore_deprecated_imports(ignore=True):
 | 
						|
    """Context manager to suppress package and module deprecation
 | 
						|
    warnings when importing them.
 | 
						|
 | 
						|
    If ignore is False, this context manager has no effect."""
 | 
						|
    if ignore:
 | 
						|
        with warnings.catch_warnings():
 | 
						|
            warnings.filterwarnings("ignore", ".+ (module|package)",
 | 
						|
                                    DeprecationWarning)
 | 
						|
            yield
 | 
						|
    else:
 | 
						|
        yield
 | 
						|
 | 
						|
 | 
						|
def import_module(name, deprecated=False):
 | 
						|
    """Import and return the module to be tested, raising SkipTest if
 | 
						|
    it is not available.
 | 
						|
 | 
						|
    If deprecated is True, any module or package deprecation messages
 | 
						|
    will be suppressed."""
 | 
						|
    with _ignore_deprecated_imports(deprecated):
 | 
						|
        try:
 | 
						|
            return importlib.import_module(name)
 | 
						|
        except ImportError, msg:
 | 
						|
            raise unittest.SkipTest(str(msg))
 | 
						|
 | 
						|
 | 
						|
def _save_and_remove_module(name, orig_modules):
 | 
						|
    """Helper function to save and remove a module from sys.modules
 | 
						|
 | 
						|
       Raise ImportError if the module can't be imported."""
 | 
						|
    # try to import the module and raise an error if it can't be imported
 | 
						|
    if name not in sys.modules:
 | 
						|
        __import__(name)
 | 
						|
        del sys.modules[name]
 | 
						|
    for modname in list(sys.modules):
 | 
						|
        if modname == name or modname.startswith(name + '.'):
 | 
						|
            orig_modules[modname] = sys.modules[modname]
 | 
						|
            del sys.modules[modname]
 | 
						|
 | 
						|
def _save_and_block_module(name, orig_modules):
 | 
						|
    """Helper function to save and block a module in sys.modules
 | 
						|
 | 
						|
       Return True if the module was in sys.modules, False otherwise."""
 | 
						|
    saved = True
 | 
						|
    try:
 | 
						|
        orig_modules[name] = sys.modules[name]
 | 
						|
    except KeyError:
 | 
						|
        saved = False
 | 
						|
    sys.modules[name] = None
 | 
						|
    return saved
 | 
						|
 | 
						|
 | 
						|
def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
 | 
						|
    """Imports and returns a module, deliberately bypassing the sys.modules cache
 | 
						|
    and importing a fresh copy of the module. Once the import is complete,
 | 
						|
    the sys.modules cache is restored to its original state.
 | 
						|
 | 
						|
    Modules named in fresh are also imported anew if needed by the import.
 | 
						|
    If one of these modules can't be imported, None is returned.
 | 
						|
 | 
						|
    Importing of modules named in blocked is prevented while the fresh import
 | 
						|
    takes place.
 | 
						|
 | 
						|
    If deprecated is True, any module or package deprecation messages
 | 
						|
    will be suppressed."""
 | 
						|
    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
 | 
						|
    # checks to make sure that this utility function is working as expected
 | 
						|
    with _ignore_deprecated_imports(deprecated):
 | 
						|
        # Keep track of modules saved for later restoration as well
 | 
						|
        # as those which just need a blocking entry removed
 | 
						|
        orig_modules = {}
 | 
						|
        names_to_remove = []
 | 
						|
        _save_and_remove_module(name, orig_modules)
 | 
						|
        try:
 | 
						|
            for fresh_name in fresh:
 | 
						|
                _save_and_remove_module(fresh_name, orig_modules)
 | 
						|
            for blocked_name in blocked:
 | 
						|
                if not _save_and_block_module(blocked_name, orig_modules):
 | 
						|
                    names_to_remove.append(blocked_name)
 | 
						|
            fresh_module = importlib.import_module(name)
 | 
						|
        except ImportError:
 | 
						|
            fresh_module = None
 | 
						|
        finally:
 | 
						|
            for orig_name, module in orig_modules.items():
 | 
						|
                sys.modules[orig_name] = module
 | 
						|
            for name_to_remove in names_to_remove:
 | 
						|
                del sys.modules[name_to_remove]
 | 
						|
        return fresh_module
 | 
						|
 | 
						|
 | 
						|
def get_attribute(obj, name):
 | 
						|
    """Get an attribute, raising SkipTest if AttributeError is raised."""
 | 
						|
    try:
 | 
						|
        attribute = getattr(obj, name)
 | 
						|
    except AttributeError:
 | 
						|
        raise unittest.SkipTest("module %s has no attribute %s" % (
 | 
						|
            obj.__name__, name))
 | 
						|
    else:
 | 
						|
        return attribute
 | 
						|
 | 
						|
 | 
						|
verbose = 1              # Flag set to 0 by regrtest.py
 | 
						|
use_resources = None     # Flag set to [] by regrtest.py
 | 
						|
max_memuse = 0           # Disable bigmem tests (they will still be run with
 | 
						|
                         # small sizes, to make sure they work.)
 | 
						|
real_max_memuse = 0
 | 
						|
 | 
						|
# _original_stdout is meant to hold stdout at the time regrtest began.
 | 
						|
# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
 | 
						|
# The point is to have some flavor of stdout the user can actually see.
 | 
						|
_original_stdout = None
 | 
						|
def record_original_stdout(stdout):
 | 
						|
    global _original_stdout
 | 
						|
    _original_stdout = stdout
 | 
						|
 | 
						|
def get_original_stdout():
 | 
						|
    return _original_stdout or sys.stdout
 | 
						|
 | 
						|
def unload(name):
 | 
						|
    try:
 | 
						|
        del sys.modules[name]
 | 
						|
    except KeyError:
 | 
						|
        pass
 | 
						|
 | 
						|
def unlink(filename):
 | 
						|
    try:
 | 
						|
        os.unlink(filename)
 | 
						|
    except OSError:
 | 
						|
        pass
 | 
						|
 | 
						|
def rmtree(path):
 | 
						|
    try:
 | 
						|
        shutil.rmtree(path)
 | 
						|
    except OSError, e:
 | 
						|
        # Unix returns ENOENT, Windows returns ESRCH.
 | 
						|
        if e.errno not in (errno.ENOENT, errno.ESRCH):
 | 
						|
            raise
 | 
						|
 | 
						|
def forget(modname):
 | 
						|
    '''"Forget" a module was ever imported by removing it from sys.modules and
 | 
						|
    deleting any .pyc and .pyo files.'''
 | 
						|
    unload(modname)
 | 
						|
    for dirname in sys.path:
 | 
						|
        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
 | 
						|
        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
 | 
						|
        # the chance exists that there is no .pyc (and thus the 'try' statement
 | 
						|
        # is exited) but there is a .pyo file.
 | 
						|
        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
 | 
						|
 | 
						|
def is_resource_enabled(resource):
 | 
						|
    """Test whether a resource is enabled.  Known resources are set by
 | 
						|
    regrtest.py."""
 | 
						|
    return use_resources is not None and resource in use_resources
 | 
						|
 | 
						|
def requires(resource, msg=None):
 | 
						|
    """Raise ResourceDenied if the specified resource is not available.
 | 
						|
 | 
						|
    If the caller's module is __main__ then automatically return True.  The
 | 
						|
    possibility of False being returned occurs when regrtest.py is executing."""
 | 
						|
    # see if the caller's module is __main__ - if so, treat as if
 | 
						|
    # the resource was set
 | 
						|
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
 | 
						|
        return
 | 
						|
    if not is_resource_enabled(resource):
 | 
						|
        if msg is None:
 | 
						|
            msg = "Use of the `%s' resource not enabled" % resource
 | 
						|
        raise ResourceDenied(msg)
 | 
						|
 | 
						|
HOST = 'localhost'
 | 
						|
 | 
						|
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
 | 
						|
    """Returns an unused port that should be suitable for binding.  This is
 | 
						|
    achieved by creating a temporary socket with the same family and type as
 | 
						|
    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
 | 
						|
    the specified host address (defaults to 0.0.0.0) with the port set to 0,
 | 
						|
    eliciting an unused ephemeral port from the OS.  The temporary socket is
 | 
						|
    then closed and deleted, and the ephemeral port is returned.
 | 
						|
 | 
						|
    Either this method or bind_port() should be used for any tests where a
 | 
						|
    server socket needs to be bound to a particular port for the duration of
 | 
						|
    the test.  Which one to use depends on whether the calling code is creating
 | 
						|
    a python socket, or if an unused port needs to be provided in a constructor
 | 
						|
    or passed to an external program (i.e. the -accept argument to openssl's
 | 
						|
    s_server mode).  Always prefer bind_port() over find_unused_port() where
 | 
						|
    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
 | 
						|
    socket is bound to a hard coded port, the ability to run multiple instances
 | 
						|
    of the test simultaneously on the same host is compromised, which makes the
 | 
						|
    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
 | 
						|
    may simply manifest as a failed test, which can be recovered from without
 | 
						|
    intervention in most cases, but on Windows, the entire python process can
 | 
						|
    completely and utterly wedge, requiring someone to log in to the buildbot
 | 
						|
    and manually kill the affected process.
 | 
						|
 | 
						|
    (This is easy to reproduce on Windows, unfortunately, and can be traced to
 | 
						|
    the SO_REUSEADDR socket option having different semantics on Windows versus
 | 
						|
    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
 | 
						|
    listen and then accept connections on identical host/ports.  An EADDRINUSE
 | 
						|
    socket.error will be raised at some point (depending on the platform and
 | 
						|
    the order bind and listen were called on each socket).
 | 
						|
 | 
						|
    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
 | 
						|
    will ever be raised when attempting to bind two identical host/ports. When
 | 
						|
    accept() is called on each socket, the second caller's process will steal
 | 
						|
    the port from the first caller, leaving them both in an awkwardly wedged
 | 
						|
    state where they'll no longer respond to any signals or graceful kills, and
 | 
						|
    must be forcibly killed via OpenProcess()/TerminateProcess().
 | 
						|
 | 
						|
    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
 | 
						|
    instead of SO_REUSEADDR, which effectively affords the same semantics as
 | 
						|
    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
 | 
						|
    Source world compared to Windows ones, this is a common mistake.  A quick
 | 
						|
    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
 | 
						|
    openssl.exe is called with the 's_server' option, for example. See
 | 
						|
    http://bugs.python.org/issue2550 for more info.  The following site also
 | 
						|
    has a very thorough description about the implications of both REUSEADDR
 | 
						|
    and EXCLUSIVEADDRUSE on Windows:
 | 
						|
    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
 | 
						|
 | 
						|
    XXX: although this approach is a vast improvement on previous attempts to
 | 
						|
    elicit unused ports, it rests heavily on the assumption that the ephemeral
 | 
						|
    port returned to us by the OS won't immediately be dished back out to some
 | 
						|
    other process when we close and delete our temporary socket but before our
 | 
						|
    calling code has a chance to bind the returned port.  We can deal with this
 | 
						|
    issue if/when we come across it."""
 | 
						|
    tempsock = socket.socket(family, socktype)
 | 
						|
    port = bind_port(tempsock)
 | 
						|
    tempsock.close()
 | 
						|
    del tempsock
 | 
						|
    return port
 | 
						|
 | 
						|
def bind_port(sock, host=HOST):
 | 
						|
    """Bind the socket to a free port and return the port number.  Relies on
 | 
						|
    ephemeral ports in order to ensure we are using an unbound port.  This is
 | 
						|
    important as many tests may be running simultaneously, especially in a
 | 
						|
    buildbot environment.  This method raises an exception if the sock.family
 | 
						|
    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
 | 
						|
    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
 | 
						|
    for TCP/IP sockets.  The only case for setting these options is testing
 | 
						|
    multicasting via multiple UDP sockets.
 | 
						|
 | 
						|
    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
 | 
						|
    on Windows), it will be set on the socket.  This will prevent anyone else
 | 
						|
    from bind()'ing to our host/port for the duration of the test.
 | 
						|
    """
 | 
						|
    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
 | 
						|
        if hasattr(socket, 'SO_REUSEADDR'):
 | 
						|
            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
 | 
						|
                raise TestFailed("tests should never set the SO_REUSEADDR "   \
 | 
						|
                                 "socket option on TCP/IP sockets!")
 | 
						|
        if hasattr(socket, 'SO_REUSEPORT'):
 | 
						|
            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
 | 
						|
                raise TestFailed("tests should never set the SO_REUSEPORT "   \
 | 
						|
                                 "socket option on TCP/IP sockets!")
 | 
						|
        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
 | 
						|
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
 | 
						|
 | 
						|
    sock.bind((host, 0))
 | 
						|
    port = sock.getsockname()[1]
 | 
						|
    return port
 | 
						|
 | 
						|
FUZZ = 1e-6
 | 
						|
 | 
						|
def fcmp(x, y): # fuzzy comparison function
 | 
						|
    if isinstance(x, float) or isinstance(y, float):
 | 
						|
        try:
 | 
						|
            fuzz = (abs(x) + abs(y)) * FUZZ
 | 
						|
            if abs(x-y) <= fuzz:
 | 
						|
                return 0
 | 
						|
        except:
 | 
						|
            pass
 | 
						|
    elif type(x) == type(y) and isinstance(x, (tuple, list)):
 | 
						|
        for i in range(min(len(x), len(y))):
 | 
						|
            outcome = fcmp(x[i], y[i])
 | 
						|
            if outcome != 0:
 | 
						|
                return outcome
 | 
						|
        return (len(x) > len(y)) - (len(x) < len(y))
 | 
						|
    return (x > y) - (x < y)
 | 
						|
 | 
						|
try:
 | 
						|
    unicode
 | 
						|
    have_unicode = True
 | 
						|
except NameError:
 | 
						|
    have_unicode = False
 | 
						|
 | 
						|
is_jython = sys.platform.startswith('java')
 | 
						|
 | 
						|
# Filename used for testing
 | 
						|
if os.name == 'java':
 | 
						|
    # Jython disallows @ in module names
 | 
						|
    TESTFN = '$test'
 | 
						|
elif os.name == 'riscos':
 | 
						|
    TESTFN = 'testfile'
 | 
						|
else:
 | 
						|
    TESTFN = '@test'
 | 
						|
    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
 | 
						|
    if have_unicode:
 | 
						|
        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
 | 
						|
        # TESTFN_UNICODE is a filename that can be encoded using the
 | 
						|
        # file system encoding, but *not* with the default (ascii) encoding
 | 
						|
        if isinstance('', unicode):
 | 
						|
            # python -U
 | 
						|
            # XXX perhaps unicode() should accept Unicode strings?
 | 
						|
            TESTFN_UNICODE = "@test-\xe0\xf2"
 | 
						|
        else:
 | 
						|
            # 2 latin characters.
 | 
						|
            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
 | 
						|
        TESTFN_ENCODING = sys.getfilesystemencoding()
 | 
						|
        # TESTFN_UNENCODABLE is a filename that should *not* be
 | 
						|
        # able to be encoded by *either* the default or filesystem encoding.
 | 
						|
        # This test really only makes sense on Windows NT platforms
 | 
						|
        # which have special Unicode support in posixmodule.
 | 
						|
        if (not hasattr(sys, "getwindowsversion") or
 | 
						|
                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
 | 
						|
            TESTFN_UNENCODABLE = None
 | 
						|
        else:
 | 
						|
            # Japanese characters (I think - from bug 846133)
 | 
						|
            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
 | 
						|
            try:
 | 
						|
                # XXX - Note - should be using TESTFN_ENCODING here - but for
 | 
						|
                # Windows, "mbcs" currently always operates as if in
 | 
						|
                # errors=ignore' mode - hence we get '?' characters rather than
 | 
						|
                # the exception.  'Latin1' operates as we expect - ie, fails.
 | 
						|
                # See [ 850997 ] mbcs encoding ignores errors
 | 
						|
                TESTFN_UNENCODABLE.encode("Latin1")
 | 
						|
            except UnicodeEncodeError:
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                print \
 | 
						|
                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
 | 
						|
                'Unicode filename tests may not be effective' \
 | 
						|
                % TESTFN_UNENCODABLE
 | 
						|
 | 
						|
 | 
						|
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
 | 
						|
# module name.
 | 
						|
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
 | 
						|
 | 
						|
# Save the initial cwd
 | 
						|
SAVEDCWD = os.getcwd()
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def temp_cwd(name='tempcwd', quiet=False):
 | 
						|
    """
 | 
						|
    Context manager that creates a temporary directory and set it as CWD.
 | 
						|
 | 
						|
    The new CWD is created in the current directory and it's named *name*.
 | 
						|
    If *quiet* is False (default) and it's not possible to create or change
 | 
						|
    the CWD, an error is raised.  If it's True, only a warning is raised
 | 
						|
    and the original CWD is used.
 | 
						|
    """
 | 
						|
    if isinstance(name, unicode):
 | 
						|
        try:
 | 
						|
            name = name.encode(sys.getfilesystemencoding() or 'ascii')
 | 
						|
        except UnicodeEncodeError:
 | 
						|
            if not quiet:
 | 
						|
                raise unittest.SkipTest('unable to encode the cwd name with '
 | 
						|
                                        'the filesystem encoding.')
 | 
						|
    saved_dir = os.getcwd()
 | 
						|
    is_temporary = False
 | 
						|
    try:
 | 
						|
        os.mkdir(name)
 | 
						|
        os.chdir(name)
 | 
						|
        is_temporary = True
 | 
						|
    except OSError:
 | 
						|
        if not quiet:
 | 
						|
            raise
 | 
						|
        warnings.warn('tests may fail, unable to change the CWD to ' + name,
 | 
						|
                      RuntimeWarning, stacklevel=3)
 | 
						|
    try:
 | 
						|
        yield os.getcwd()
 | 
						|
    finally:
 | 
						|
        os.chdir(saved_dir)
 | 
						|
        if is_temporary:
 | 
						|
            rmtree(name)
 | 
						|
 | 
						|
 | 
						|
def findfile(file, here=__file__, subdir=None):
 | 
						|
    """Try to find a file on sys.path and the working directory.  If it is not
 | 
						|
    found the argument passed to the function is returned (this does not
 | 
						|
    necessarily signal failure; could still be the legitimate path)."""
 | 
						|
    if os.path.isabs(file):
 | 
						|
        return file
 | 
						|
    if subdir is not None:
 | 
						|
        file = os.path.join(subdir, file)
 | 
						|
    path = sys.path
 | 
						|
    path = [os.path.dirname(here)] + path
 | 
						|
    for dn in path:
 | 
						|
        fn = os.path.join(dn, file)
 | 
						|
        if os.path.exists(fn): return fn
 | 
						|
    return file
 | 
						|
 | 
						|
def sortdict(dict):
 | 
						|
    "Like repr(dict), but in sorted order."
 | 
						|
    items = dict.items()
 | 
						|
    items.sort()
 | 
						|
    reprpairs = ["%r: %r" % pair for pair in items]
 | 
						|
    withcommas = ", ".join(reprpairs)
 | 
						|
    return "{%s}" % withcommas
 | 
						|
 | 
						|
def make_bad_fd():
 | 
						|
    """
 | 
						|
    Create an invalid file descriptor by opening and closing a file and return
 | 
						|
    its fd.
 | 
						|
    """
 | 
						|
    file = open(TESTFN, "wb")
 | 
						|
    try:
 | 
						|
        return file.fileno()
 | 
						|
    finally:
 | 
						|
        file.close()
 | 
						|
        unlink(TESTFN)
 | 
						|
 | 
						|
def check_syntax_error(testcase, statement):
 | 
						|
    testcase.assertRaises(SyntaxError, compile, statement,
 | 
						|
                          '<test string>', 'exec')
 | 
						|
 | 
						|
def open_urlresource(url, check=None):
 | 
						|
    import urlparse, urllib2
 | 
						|
 | 
						|
    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
 | 
						|
 | 
						|
    fn = os.path.join(os.path.dirname(__file__), "data", filename)
 | 
						|
 | 
						|
    def check_valid_file(fn):
 | 
						|
        f = open(fn)
 | 
						|
        if check is None:
 | 
						|
            return f
 | 
						|
        elif check(f):
 | 
						|
            f.seek(0)
 | 
						|
            return f
 | 
						|
        f.close()
 | 
						|
 | 
						|
    if os.path.exists(fn):
 | 
						|
        f = check_valid_file(fn)
 | 
						|
        if f is not None:
 | 
						|
            return f
 | 
						|
        unlink(fn)
 | 
						|
 | 
						|
    # Verify the requirement before downloading the file
 | 
						|
    requires('urlfetch')
 | 
						|
 | 
						|
    print >> get_original_stdout(), '\tfetching %s ...' % url
 | 
						|
    f = urllib2.urlopen(url, timeout=15)
 | 
						|
    try:
 | 
						|
        with open(fn, "wb") as out:
 | 
						|
            s = f.read()
 | 
						|
            while s:
 | 
						|
                out.write(s)
 | 
						|
                s = f.read()
 | 
						|
    finally:
 | 
						|
        f.close()
 | 
						|
 | 
						|
    f = check_valid_file(fn)
 | 
						|
    if f is not None:
 | 
						|
        return f
 | 
						|
    raise TestFailed('invalid resource "%s"' % fn)
 | 
						|
 | 
						|
 | 
						|
class WarningsRecorder(object):
 | 
						|
    """Convenience wrapper for the warnings list returned on
 | 
						|
       entry to the warnings.catch_warnings() context manager.
 | 
						|
    """
 | 
						|
    def __init__(self, warnings_list):
 | 
						|
        self._warnings = warnings_list
 | 
						|
        self._last = 0
 | 
						|
 | 
						|
    def __getattr__(self, attr):
 | 
						|
        if len(self._warnings) > self._last:
 | 
						|
            return getattr(self._warnings[-1], attr)
 | 
						|
        elif attr in warnings.WarningMessage._WARNING_DETAILS:
 | 
						|
            return None
 | 
						|
        raise AttributeError("%r has no attribute %r" % (self, attr))
 | 
						|
 | 
						|
    @property
 | 
						|
    def warnings(self):
 | 
						|
        return self._warnings[self._last:]
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        self._last = len(self._warnings)
 | 
						|
 | 
						|
 | 
						|
def _filterwarnings(filters, quiet=False):
 | 
						|
    """Catch the warnings, then check if all the expected
 | 
						|
    warnings have been raised and re-raise unexpected warnings.
 | 
						|
    If 'quiet' is True, only re-raise the unexpected warnings.
 | 
						|
    """
 | 
						|
    # Clear the warning registry of the calling module
 | 
						|
    # in order to re-raise the warnings.
 | 
						|
    frame = sys._getframe(2)
 | 
						|
    registry = frame.f_globals.get('__warningregistry__')
 | 
						|
    if registry:
 | 
						|
        registry.clear()
 | 
						|
    with warnings.catch_warnings(record=True) as w:
 | 
						|
        # Set filter "always" to record all warnings.  Because
 | 
						|
        # test_warnings swap the module, we need to look up in
 | 
						|
        # the sys.modules dictionary.
 | 
						|
        sys.modules['warnings'].simplefilter("always")
 | 
						|
        yield WarningsRecorder(w)
 | 
						|
    # Filter the recorded warnings
 | 
						|
    reraise = [warning.message for warning in w]
 | 
						|
    missing = []
 | 
						|
    for msg, cat in filters:
 | 
						|
        seen = False
 | 
						|
        for exc in reraise[:]:
 | 
						|
            message = str(exc)
 | 
						|
            # Filter out the matching messages
 | 
						|
            if (re.match(msg, message, re.I) and
 | 
						|
                issubclass(exc.__class__, cat)):
 | 
						|
                seen = True
 | 
						|
                reraise.remove(exc)
 | 
						|
        if not seen and not quiet:
 | 
						|
            # This filter caught nothing
 | 
						|
            missing.append((msg, cat.__name__))
 | 
						|
    if reraise:
 | 
						|
        raise AssertionError("unhandled warning %r" % reraise[0])
 | 
						|
    if missing:
 | 
						|
        raise AssertionError("filter (%r, %s) did not catch any warning" %
 | 
						|
                             missing[0])
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def check_warnings(*filters, **kwargs):
 | 
						|
    """Context manager to silence warnings.
 | 
						|
 | 
						|
    Accept 2-tuples as positional arguments:
 | 
						|
        ("message regexp", WarningCategory)
 | 
						|
 | 
						|
    Optional argument:
 | 
						|
     - if 'quiet' is True, it does not fail if a filter catches nothing
 | 
						|
        (default True without argument,
 | 
						|
         default False if some filters are defined)
 | 
						|
 | 
						|
    Without argument, it defaults to:
 | 
						|
        check_warnings(("", Warning), quiet=True)
 | 
						|
    """
 | 
						|
    quiet = kwargs.get('quiet')
 | 
						|
    if not filters:
 | 
						|
        filters = (("", Warning),)
 | 
						|
        # Preserve backward compatibility
 | 
						|
        if quiet is None:
 | 
						|
            quiet = True
 | 
						|
    return _filterwarnings(filters, quiet)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def check_py3k_warnings(*filters, **kwargs):
 | 
						|
    """Context manager to silence py3k warnings.
 | 
						|
 | 
						|
    Accept 2-tuples as positional arguments:
 | 
						|
        ("message regexp", WarningCategory)
 | 
						|
 | 
						|
    Optional argument:
 | 
						|
     - if 'quiet' is True, it does not fail if a filter catches nothing
 | 
						|
        (default False)
 | 
						|
 | 
						|
    Without argument, it defaults to:
 | 
						|
        check_py3k_warnings(("", DeprecationWarning), quiet=False)
 | 
						|
    """
 | 
						|
    if sys.py3kwarning:
 | 
						|
        if not filters:
 | 
						|
            filters = (("", DeprecationWarning),)
 | 
						|
    else:
 | 
						|
        # It should not raise any py3k warning
 | 
						|
        filters = ()
 | 
						|
    return _filterwarnings(filters, kwargs.get('quiet'))
 | 
						|
 | 
						|
 | 
						|
class CleanImport(object):
 | 
						|
    """Context manager to force import to return a new module reference.
 | 
						|
 | 
						|
    This is useful for testing module-level behaviours, such as
 | 
						|
    the emission of a DeprecationWarning on import.
 | 
						|
 | 
						|
    Use like this:
 | 
						|
 | 
						|
        with CleanImport("foo"):
 | 
						|
            importlib.import_module("foo") # new reference
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *module_names):
 | 
						|
        self.original_modules = sys.modules.copy()
 | 
						|
        for module_name in module_names:
 | 
						|
            if module_name in sys.modules:
 | 
						|
                module = sys.modules[module_name]
 | 
						|
                # It is possible that module_name is just an alias for
 | 
						|
                # another module (e.g. stub for modules renamed in 3.x).
 | 
						|
                # In that case, we also need delete the real module to clear
 | 
						|
                # the import cache.
 | 
						|
                if module.__name__ != module_name:
 | 
						|
                    del sys.modules[module.__name__]
 | 
						|
                del sys.modules[module_name]
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        sys.modules.update(self.original_modules)
 | 
						|
 | 
						|
 | 
						|
class EnvironmentVarGuard(UserDict.DictMixin):
 | 
						|
 | 
						|
    """Class to help protect the environment variable properly.  Can be used as
 | 
						|
    a context manager."""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._environ = os.environ
 | 
						|
        self._changed = {}
 | 
						|
 | 
						|
    def __getitem__(self, envvar):
 | 
						|
        return self._environ[envvar]
 | 
						|
 | 
						|
    def __setitem__(self, envvar, value):
 | 
						|
        # Remember the initial value on the first access
 | 
						|
        if envvar not in self._changed:
 | 
						|
            self._changed[envvar] = self._environ.get(envvar)
 | 
						|
        self._environ[envvar] = value
 | 
						|
 | 
						|
    def __delitem__(self, envvar):
 | 
						|
        # Remember the initial value on the first access
 | 
						|
        if envvar not in self._changed:
 | 
						|
            self._changed[envvar] = self._environ.get(envvar)
 | 
						|
        if envvar in self._environ:
 | 
						|
            del self._environ[envvar]
 | 
						|
 | 
						|
    def keys(self):
 | 
						|
        return self._environ.keys()
 | 
						|
 | 
						|
    def set(self, envvar, value):
 | 
						|
        self[envvar] = value
 | 
						|
 | 
						|
    def unset(self, envvar):
 | 
						|
        del self[envvar]
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        for (k, v) in self._changed.items():
 | 
						|
            if v is None:
 | 
						|
                if k in self._environ:
 | 
						|
                    del self._environ[k]
 | 
						|
            else:
 | 
						|
                self._environ[k] = v
 | 
						|
        os.environ = self._environ
 | 
						|
 | 
						|
 | 
						|
class DirsOnSysPath(object):
 | 
						|
    """Context manager to temporarily add directories to sys.path.
 | 
						|
 | 
						|
    This makes a copy of sys.path, appends any directories given
 | 
						|
    as positional arguments, then reverts sys.path to the copied
 | 
						|
    settings when the context ends.
 | 
						|
 | 
						|
    Note that *all* sys.path modifications in the body of the
 | 
						|
    context manager, including replacement of the object,
 | 
						|
    will be reverted at the end of the block.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *paths):
 | 
						|
        self.original_value = sys.path[:]
 | 
						|
        self.original_object = sys.path
 | 
						|
        sys.path.extend(paths)
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        sys.path = self.original_object
 | 
						|
        sys.path[:] = self.original_value
 | 
						|
 | 
						|
 | 
						|
class TransientResource(object):
 | 
						|
 | 
						|
    """Raise ResourceDenied if an exception is raised while the context manager
 | 
						|
    is in effect that matches the specified exception and attributes."""
 | 
						|
 | 
						|
    def __init__(self, exc, **kwargs):
 | 
						|
        self.exc = exc
 | 
						|
        self.attrs = kwargs
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, type_=None, value=None, traceback=None):
 | 
						|
        """If type_ is a subclass of self.exc and value has attributes matching
 | 
						|
        self.attrs, raise ResourceDenied.  Otherwise let the exception
 | 
						|
        propagate (if any)."""
 | 
						|
        if type_ is not None and issubclass(self.exc, type_):
 | 
						|
            for attr, attr_value in self.attrs.iteritems():
 | 
						|
                if not hasattr(value, attr):
 | 
						|
                    break
 | 
						|
                if getattr(value, attr) != attr_value:
 | 
						|
                    break
 | 
						|
            else:
 | 
						|
                raise ResourceDenied("an optional resource is not available")
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def transient_internet(resource_name, timeout=30.0, errnos=()):
 | 
						|
    """Return a context manager that raises ResourceDenied when various issues
 | 
						|
    with the Internet connection manifest themselves as exceptions."""
 | 
						|
    default_errnos = [
 | 
						|
        ('ECONNREFUSED', 111),
 | 
						|
        ('ECONNRESET', 104),
 | 
						|
        ('EHOSTUNREACH', 113),
 | 
						|
        ('ENETUNREACH', 101),
 | 
						|
        ('ETIMEDOUT', 110),
 | 
						|
    ]
 | 
						|
    default_gai_errnos = [
 | 
						|
        ('EAI_NONAME', -2),
 | 
						|
        ('EAI_NODATA', -5),
 | 
						|
    ]
 | 
						|
 | 
						|
    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
 | 
						|
    captured_errnos = errnos
 | 
						|
    gai_errnos = []
 | 
						|
    if not captured_errnos:
 | 
						|
        captured_errnos = [getattr(errno, name, num)
 | 
						|
                           for (name, num) in default_errnos]
 | 
						|
        gai_errnos = [getattr(socket, name, num)
 | 
						|
                      for (name, num) in default_gai_errnos]
 | 
						|
 | 
						|
    def filter_error(err):
 | 
						|
        n = getattr(err, 'errno', None)
 | 
						|
        if (isinstance(err, socket.timeout) or
 | 
						|
            (isinstance(err, socket.gaierror) and n in gai_errnos) or
 | 
						|
            n in captured_errnos):
 | 
						|
            if not verbose:
 | 
						|
                sys.stderr.write(denied.args[0] + "\n")
 | 
						|
            raise denied
 | 
						|
 | 
						|
    old_timeout = socket.getdefaulttimeout()
 | 
						|
    try:
 | 
						|
        if timeout is not None:
 | 
						|
            socket.setdefaulttimeout(timeout)
 | 
						|
        yield
 | 
						|
    except IOError as err:
 | 
						|
        # urllib can wrap original socket errors multiple times (!), we must
 | 
						|
        # unwrap to get at the original error.
 | 
						|
        while True:
 | 
						|
            a = err.args
 | 
						|
            if len(a) >= 1 and isinstance(a[0], IOError):
 | 
						|
                err = a[0]
 | 
						|
            # The error can also be wrapped as args[1]:
 | 
						|
            #    except socket.error as msg:
 | 
						|
            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
 | 
						|
            elif len(a) >= 2 and isinstance(a[1], IOError):
 | 
						|
                err = a[1]
 | 
						|
            else:
 | 
						|
                break
 | 
						|
        filter_error(err)
 | 
						|
        raise
 | 
						|
    # XXX should we catch generic exceptions and look for their
 | 
						|
    # __cause__ or __context__?
 | 
						|
    finally:
 | 
						|
        socket.setdefaulttimeout(old_timeout)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def captured_output(stream_name):
 | 
						|
    """Return a context manager used by captured_stdout and captured_stdin
 | 
						|
    that temporarily replaces the sys stream *stream_name* with a StringIO."""
 | 
						|
    import StringIO
 | 
						|
    orig_stdout = getattr(sys, stream_name)
 | 
						|
    setattr(sys, stream_name, StringIO.StringIO())
 | 
						|
    try:
 | 
						|
        yield getattr(sys, stream_name)
 | 
						|
    finally:
 | 
						|
        setattr(sys, stream_name, orig_stdout)
 | 
						|
 | 
						|
def captured_stdout():
 | 
						|
    """Capture the output of sys.stdout:
 | 
						|
 | 
						|
       with captured_stdout() as s:
 | 
						|
           print "hello"
 | 
						|
       self.assertEqual(s.getvalue(), "hello")
 | 
						|
    """
 | 
						|
    return captured_output("stdout")
 | 
						|
 | 
						|
def captured_stdin():
 | 
						|
    return captured_output("stdin")
 | 
						|
 | 
						|
def gc_collect():
 | 
						|
    """Force as many objects as possible to be collected.
 | 
						|
 | 
						|
    In non-CPython implementations of Python, this is needed because timely
 | 
						|
    deallocation is not guaranteed by the garbage collector.  (Even in CPython
 | 
						|
    this can be the case in case of reference cycles.)  This means that __del__
 | 
						|
    methods may be called later than expected and weakrefs may remain alive for
 | 
						|
    longer than expected.  This function tries its best to force all garbage
 | 
						|
    objects to disappear.
 | 
						|
    """
 | 
						|
    gc.collect()
 | 
						|
    if is_jython:
 | 
						|
        time.sleep(0.1)
 | 
						|
    gc.collect()
 | 
						|
    gc.collect()
 | 
						|
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Decorator for running a function in a different locale, correctly resetting
 | 
						|
# it afterwards.
 | 
						|
 | 
						|
def run_with_locale(catstr, *locales):
 | 
						|
    def decorator(func):
 | 
						|
        def inner(*args, **kwds):
 | 
						|
            try:
 | 
						|
                import locale
 | 
						|
                category = getattr(locale, catstr)
 | 
						|
                orig_locale = locale.setlocale(category)
 | 
						|
            except AttributeError:
 | 
						|
                # if the test author gives us an invalid category string
 | 
						|
                raise
 | 
						|
            except:
 | 
						|
                # cannot retrieve original locale, so do nothing
 | 
						|
                locale = orig_locale = None
 | 
						|
            else:
 | 
						|
                for loc in locales:
 | 
						|
                    try:
 | 
						|
                        locale.setlocale(category, loc)
 | 
						|
                        break
 | 
						|
                    except:
 | 
						|
                        pass
 | 
						|
 | 
						|
            # now run the function, resetting the locale on exceptions
 | 
						|
            try:
 | 
						|
                return func(*args, **kwds)
 | 
						|
            finally:
 | 
						|
                if locale and orig_locale:
 | 
						|
                    locale.setlocale(category, orig_locale)
 | 
						|
        inner.func_name = func.func_name
 | 
						|
        inner.__doc__ = func.__doc__
 | 
						|
        return inner
 | 
						|
    return decorator
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
 | 
						|
 | 
						|
# Some handy shorthands. Note that these are used for byte-limits as well
 | 
						|
# as size-limits, in the various bigmem tests
 | 
						|
_1M = 1024*1024
 | 
						|
_1G = 1024 * _1M
 | 
						|
_2G = 2 * _1G
 | 
						|
_4G = 4 * _1G
 | 
						|
 | 
						|
MAX_Py_ssize_t = sys.maxsize
 | 
						|
 | 
						|
def set_memlimit(limit):
 | 
						|
    global max_memuse
 | 
						|
    global real_max_memuse
 | 
						|
    sizes = {
 | 
						|
        'k': 1024,
 | 
						|
        'm': _1M,
 | 
						|
        'g': _1G,
 | 
						|
        't': 1024*_1G,
 | 
						|
    }
 | 
						|
    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
 | 
						|
                 re.IGNORECASE | re.VERBOSE)
 | 
						|
    if m is None:
 | 
						|
        raise ValueError('Invalid memory limit %r' % (limit,))
 | 
						|
    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
 | 
						|
    real_max_memuse = memlimit
 | 
						|
    if memlimit > MAX_Py_ssize_t:
 | 
						|
        memlimit = MAX_Py_ssize_t
 | 
						|
    if memlimit < _2G - 1:
 | 
						|
        raise ValueError('Memory limit %r too low to be useful' % (limit,))
 | 
						|
    max_memuse = memlimit
 | 
						|
 | 
						|
def bigmemtest(minsize, memuse, overhead=5*_1M):
 | 
						|
    """Decorator for bigmem tests.
 | 
						|
 | 
						|
    'minsize' is the minimum useful size for the test (in arbitrary,
 | 
						|
    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
 | 
						|
    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
 | 
						|
    independent of the testsize, and defaults to 5Mb.
 | 
						|
 | 
						|
    The decorator tries to guess a good value for 'size' and passes it to
 | 
						|
    the decorated test function. If minsize * memuse is more than the
 | 
						|
    allowed memory use (as defined by max_memuse), the test is skipped.
 | 
						|
    Otherwise, minsize is adjusted upward to use up to max_memuse.
 | 
						|
    """
 | 
						|
    def decorator(f):
 | 
						|
        def wrapper(self):
 | 
						|
            if not max_memuse:
 | 
						|
                # If max_memuse is 0 (the default),
 | 
						|
                # we still want to run the tests with size set to a few kb,
 | 
						|
                # to make sure they work. We still want to avoid using
 | 
						|
                # too much memory, though, but we do that noisily.
 | 
						|
                maxsize = 5147
 | 
						|
                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
 | 
						|
            else:
 | 
						|
                maxsize = int((max_memuse - overhead) / memuse)
 | 
						|
                if maxsize < minsize:
 | 
						|
                    # Really ought to print 'test skipped' or something
 | 
						|
                    if verbose:
 | 
						|
                        sys.stderr.write("Skipping %s because of memory "
 | 
						|
                                         "constraint\n" % (f.__name__,))
 | 
						|
                    return
 | 
						|
                # Try to keep some breathing room in memory use
 | 
						|
                maxsize = max(maxsize - 50 * _1M, minsize)
 | 
						|
            return f(self, maxsize)
 | 
						|
        wrapper.minsize = minsize
 | 
						|
        wrapper.memuse = memuse
 | 
						|
        wrapper.overhead = overhead
 | 
						|
        return wrapper
 | 
						|
    return decorator
 | 
						|
 | 
						|
def precisionbigmemtest(size, memuse, overhead=5*_1M):
 | 
						|
    def decorator(f):
 | 
						|
        def wrapper(self):
 | 
						|
            if not real_max_memuse:
 | 
						|
                maxsize = 5147
 | 
						|
            else:
 | 
						|
                maxsize = size
 | 
						|
 | 
						|
                if real_max_memuse and real_max_memuse < maxsize * memuse:
 | 
						|
                    if verbose:
 | 
						|
                        sys.stderr.write("Skipping %s because of memory "
 | 
						|
                                         "constraint\n" % (f.__name__,))
 | 
						|
                    return
 | 
						|
 | 
						|
            return f(self, maxsize)
 | 
						|
        wrapper.size = size
 | 
						|
        wrapper.memuse = memuse
 | 
						|
        wrapper.overhead = overhead
 | 
						|
        return wrapper
 | 
						|
    return decorator
 | 
						|
 | 
						|
def bigaddrspacetest(f):
 | 
						|
    """Decorator for tests that fill the address space."""
 | 
						|
    def wrapper(self):
 | 
						|
        if max_memuse < MAX_Py_ssize_t:
 | 
						|
            if verbose:
 | 
						|
                sys.stderr.write("Skipping %s because of memory "
 | 
						|
                                 "constraint\n" % (f.__name__,))
 | 
						|
        else:
 | 
						|
            return f(self)
 | 
						|
    return wrapper
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# unittest integration.
 | 
						|
 | 
						|
class BasicTestRunner:
 | 
						|
    def run(self, test):
 | 
						|
        result = unittest.TestResult()
 | 
						|
        test(result)
 | 
						|
        return result
 | 
						|
 | 
						|
def _id(obj):
 | 
						|
    return obj
 | 
						|
 | 
						|
def requires_resource(resource):
 | 
						|
    if is_resource_enabled(resource):
 | 
						|
        return _id
 | 
						|
    else:
 | 
						|
        return unittest.skip("resource {0!r} is not enabled".format(resource))
 | 
						|
 | 
						|
def cpython_only(test):
 | 
						|
    """
 | 
						|
    Decorator for tests only applicable on CPython.
 | 
						|
    """
 | 
						|
    return impl_detail(cpython=True)(test)
 | 
						|
 | 
						|
def impl_detail(msg=None, **guards):
 | 
						|
    if check_impl_detail(**guards):
 | 
						|
        return _id
 | 
						|
    if msg is None:
 | 
						|
        guardnames, default = _parse_guards(guards)
 | 
						|
        if default:
 | 
						|
            msg = "implementation detail not available on {0}"
 | 
						|
        else:
 | 
						|
            msg = "implementation detail specific to {0}"
 | 
						|
        guardnames = sorted(guardnames.keys())
 | 
						|
        msg = msg.format(' or '.join(guardnames))
 | 
						|
    return unittest.skip(msg)
 | 
						|
 | 
						|
def _parse_guards(guards):
 | 
						|
    # Returns a tuple ({platform_name: run_me}, default_value)
 | 
						|
    if not guards:
 | 
						|
        return ({'cpython': True}, False)
 | 
						|
    is_true = guards.values()[0]
 | 
						|
    assert guards.values() == [is_true] * len(guards)   # all True or all False
 | 
						|
    return (guards, not is_true)
 | 
						|
 | 
						|
# Use the following check to guard CPython's implementation-specific tests --
 | 
						|
# or to run them only on the implementation(s) guarded by the arguments.
 | 
						|
def check_impl_detail(**guards):
 | 
						|
    """This function returns True or False depending on the host platform.
 | 
						|
       Examples:
 | 
						|
          if check_impl_detail():               # only on CPython (default)
 | 
						|
          if check_impl_detail(jython=True):    # only on Jython
 | 
						|
          if check_impl_detail(cpython=False):  # everywhere except on CPython
 | 
						|
    """
 | 
						|
    guards, default = _parse_guards(guards)
 | 
						|
    return guards.get(platform.python_implementation().lower(), default)
 | 
						|
 | 
						|
 | 
						|
 | 
						|
def _run_suite(suite):
 | 
						|
    """Run tests from a unittest.TestSuite-derived class."""
 | 
						|
    if verbose:
 | 
						|
        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
 | 
						|
    else:
 | 
						|
        runner = BasicTestRunner()
 | 
						|
 | 
						|
    result = runner.run(suite)
 | 
						|
    if not result.wasSuccessful():
 | 
						|
        if len(result.errors) == 1 and not result.failures:
 | 
						|
            err = result.errors[0][1]
 | 
						|
        elif len(result.failures) == 1 and not result.errors:
 | 
						|
            err = result.failures[0][1]
 | 
						|
        else:
 | 
						|
            err = "multiple errors occurred"
 | 
						|
            if not verbose:
 | 
						|
                err += "; run in verbose mode for details"
 | 
						|
        raise TestFailed(err)
 | 
						|
 | 
						|
 | 
						|
def run_unittest(*classes):
 | 
						|
    """Run tests from unittest.TestCase-derived classes."""
 | 
						|
    valid_types = (unittest.TestSuite, unittest.TestCase)
 | 
						|
    suite = unittest.TestSuite()
 | 
						|
    for cls in classes:
 | 
						|
        if isinstance(cls, str):
 | 
						|
            if cls in sys.modules:
 | 
						|
                suite.addTest(unittest.findTestCases(sys.modules[cls]))
 | 
						|
            else:
 | 
						|
                raise ValueError("str arguments must be keys in sys.modules")
 | 
						|
        elif isinstance(cls, valid_types):
 | 
						|
            suite.addTest(cls)
 | 
						|
        else:
 | 
						|
            suite.addTest(unittest.makeSuite(cls))
 | 
						|
    _run_suite(suite)
 | 
						|
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# doctest driver.
 | 
						|
 | 
						|
def run_doctest(module, verbosity=None):
 | 
						|
    """Run doctest on the given module.  Return (#failures, #tests).
 | 
						|
 | 
						|
    If optional argument verbosity is not specified (or is None), pass
 | 
						|
    test_support's belief about verbosity on to doctest.  Else doctest's
 | 
						|
    usual behavior is used (it searches sys.argv for -v).
 | 
						|
    """
 | 
						|
 | 
						|
    import doctest
 | 
						|
 | 
						|
    if verbosity is None:
 | 
						|
        verbosity = verbose
 | 
						|
    else:
 | 
						|
        verbosity = None
 | 
						|
 | 
						|
    # Direct doctest output (normally just errors) to real stdout; doctest
 | 
						|
    # output shouldn't be compared by regrtest.
 | 
						|
    save_stdout = sys.stdout
 | 
						|
    sys.stdout = get_original_stdout()
 | 
						|
    try:
 | 
						|
        f, t = doctest.testmod(module, verbose=verbosity)
 | 
						|
        if f:
 | 
						|
            raise TestFailed("%d of %d doctests failed" % (f, t))
 | 
						|
    finally:
 | 
						|
        sys.stdout = save_stdout
 | 
						|
    if verbose:
 | 
						|
        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
 | 
						|
    return f, t
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Threading support to prevent reporting refleaks when running regrtest.py -R
 | 
						|
 | 
						|
# NOTE: we use thread._count() rather than threading.enumerate() (or the
 | 
						|
# moral equivalent thereof) because a threading.Thread object is still alive
 | 
						|
# until its __bootstrap() method has returned, even after it has been
 | 
						|
# unregistered from the threading module.
 | 
						|
# thread._count(), on the other hand, only gets decremented *after* the
 | 
						|
# __bootstrap() method has returned, which gives us reliable reference counts
 | 
						|
# at the end of a test run.
 | 
						|
 | 
						|
def threading_setup():
 | 
						|
    if thread:
 | 
						|
        return thread._count(),
 | 
						|
    else:
 | 
						|
        return 1,
 | 
						|
 | 
						|
def threading_cleanup(nb_threads):
 | 
						|
    if not thread:
 | 
						|
        return
 | 
						|
 | 
						|
    _MAX_COUNT = 10
 | 
						|
    for count in range(_MAX_COUNT):
 | 
						|
        n = thread._count()
 | 
						|
        if n == nb_threads:
 | 
						|
            break
 | 
						|
        time.sleep(0.1)
 | 
						|
    # XXX print a warning in case of failure?
 | 
						|
 | 
						|
def reap_threads(func):
 | 
						|
    """Use this function when threads are being used.  This will
 | 
						|
    ensure that the threads are cleaned up even when the test fails.
 | 
						|
    If threading is unavailable this function does nothing.
 | 
						|
    """
 | 
						|
    if not thread:
 | 
						|
        return func
 | 
						|
 | 
						|
    @functools.wraps(func)
 | 
						|
    def decorator(*args):
 | 
						|
        key = threading_setup()
 | 
						|
        try:
 | 
						|
            return func(*args)
 | 
						|
        finally:
 | 
						|
            threading_cleanup(*key)
 | 
						|
    return decorator
 | 
						|
 | 
						|
def reap_children():
 | 
						|
    """Use this function at the end of test_main() whenever sub-processes
 | 
						|
    are started.  This will help ensure that no extra children (zombies)
 | 
						|
    stick around to hog resources and create problems when looking
 | 
						|
    for refleaks.
 | 
						|
    """
 | 
						|
 | 
						|
    # Reap all our dead child processes so we don't leave zombies around.
 | 
						|
    # These hog resources and might be causing some of the buildbots to die.
 | 
						|
    if hasattr(os, 'waitpid'):
 | 
						|
        any_process = -1
 | 
						|
        while True:
 | 
						|
            try:
 | 
						|
                # This will raise an exception on Windows.  That's ok.
 | 
						|
                pid, status = os.waitpid(any_process, os.WNOHANG)
 | 
						|
                if pid == 0:
 | 
						|
                    break
 | 
						|
            except:
 | 
						|
                break
 | 
						|
 | 
						|
def py3k_bytes(b):
 | 
						|
    """Emulate the py3k bytes() constructor.
 | 
						|
 | 
						|
    NOTE: This is only a best effort function.
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        # memoryview?
 | 
						|
        return b.tobytes()
 | 
						|
    except AttributeError:
 | 
						|
        try:
 | 
						|
            # iterable of ints?
 | 
						|
            return b"".join(chr(x) for x in b)
 | 
						|
        except TypeError:
 | 
						|
            return bytes(b)
 | 
						|
 | 
						|
def args_from_interpreter_flags():
 | 
						|
    """Return a list of command-line arguments reproducing the current
 | 
						|
    settings in sys.flags."""
 | 
						|
    flag_opt_map = {
 | 
						|
        'bytes_warning': 'b',
 | 
						|
        'dont_write_bytecode': 'B',
 | 
						|
        'ignore_environment': 'E',
 | 
						|
        'no_user_site': 's',
 | 
						|
        'no_site': 'S',
 | 
						|
        'optimize': 'O',
 | 
						|
        'py3k_warning': '3',
 | 
						|
        'verbose': 'v',
 | 
						|
    }
 | 
						|
    args = []
 | 
						|
    for flag, opt in flag_opt_map.items():
 | 
						|
        v = getattr(sys.flags, flag)
 | 
						|
        if v > 0:
 | 
						|
            args.append('-' + opt * v)
 | 
						|
    return args
 | 
						|
 | 
						|
def strip_python_stderr(stderr):
 | 
						|
    """Strip the stderr of a Python process from potential debug output
 | 
						|
    emitted by the interpreter.
 | 
						|
 | 
						|
    This will typically be run on the result of the communicate() method
 | 
						|
    of a subprocess.Popen object.
 | 
						|
    """
 | 
						|
    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
 | 
						|
    return stderr
 |