import os import sys import shutil import tempfile import contextlib import shlex from ._compat import iteritems, PY2, string_types # If someone wants to vendor click, we want to ensure the # correct package is discovered. Ideally we could use a # relative import here but unfortunately Python does not # support that. clickpkg = sys.modules[__name__.rsplit('.', 1)[0]] if PY2: from cStringIO import StringIO else: import io from ._compat import _find_binary_reader class EchoingStdin(object): def __init__(self, input, output): self._input = input self._output = output def __getattr__(self, x): return getattr(self._input, x) def _echo(self, rv): self._output.write(rv) return rv def read(self, n=-1): return self._echo(self._input.read(n)) def readline(self, n=-1): return self._echo(self._input.readline(n)) def readlines(self): return [self._echo(x) for x in self._input.readlines()] def __iter__(self): return iter(self._echo(x) for x in self._input) def __repr__(self): return repr(self._input) def make_input_stream(input, charset): # Is already an input stream. if hasattr(input, 'read'): if PY2: return input rv = _find_binary_reader(input) if rv is not None: return rv raise TypeError('Could not find binary reader for input stream.') if input is None: input = b'' elif not isinstance(input, bytes): input = input.encode(charset) if PY2: return StringIO(input) return io.BytesIO(input) class Result(object): """Holds the captured result of an invoked CLI script.""" def __init__(self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None): #: The runner that created the result self.runner = runner #: The standard output as bytes. self.stdout_bytes = stdout_bytes #: The standard error as bytes, or False(y) if not available self.stderr_bytes = stderr_bytes #: The exit code as integer. self.exit_code = exit_code #: The exception that happened if one did. self.exception = exception #: The traceback self.exc_info = exc_info @property def output(self): """The (standard) output as unicode string.""" return self.stdout @property def stdout(self): """The standard output as unicode string.""" return self.stdout_bytes.decode(self.runner.charset, 'replace') \ .replace('\r\n', '\n') @property def stderr(self): """The standard error as unicode string.""" if not self.stderr_bytes: raise ValueError("stderr not separately captured") return self.stderr_bytes.decode(self.runner.charset, 'replace') \ .replace('\r\n', '\n') def __repr__(self): return '<%s %s>' % ( type(self).__name__, self.exception and repr(self.exception) or 'okay', ) class CliRunner(object): """The CLI runner provides functionality to invoke a Click command line script for unittesting purposes in a isolated environment. This only works in single-threaded systems without any concurrency as it changes the global interpreter state. :param charset: the character set for the input and output data. This is UTF-8 by default and should not be changed currently as the reporting to Click only works in Python 2 properly. :param env: a dictionary with environment variables for overriding. :param echo_stdin: if this is set to `True`, then reading from stdin writes to stdout. This is useful for showing examples in some circumstances. Note that regular prompts will automatically echo the input. :param mix_stderr: if this is set to `False`, then stdout and stderr are preserved as independent streams. This is useful for Unix-philosophy apps that have predictable stdout and noisy stderr, such that each may be measured independently """ def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True): if charset is None: charset = 'utf-8' self.charset = charset self.env = env or {} self.echo_stdin = echo_stdin self.mix_stderr = mix_stderr def get_default_prog_name(self, cli): """Given a command object it will return the default program name for it. The default is the `name` attribute or ``"root"`` if not set. """ return cli.name or 'root' def make_env(self, overrides=None): """Returns the environment overrides for invoking a script.""" rv = dict(self.env) if overrides: rv.update(overrides) return rv @contextlib.contextmanager def isolation(self, input=None, env=None, color=False): """A context manager that sets up the isolation for invoking of a command line tool. This sets up stdin with the given input data and `os.environ` with the overrides from the given dictionary. This also rebinds some internals in Click to be mocked (like the prompt functionality). This is automatically done in the :meth:`invoke` method. .. versionadded:: 4.0 The ``color`` parameter was added. :param input: the input stream to put into sys.stdin. :param env: the environment overrides as dictionary. :param color: whether the output should contain color codes. The application can still override this explicitly. """ input = make_input_stream(input, self.charset) old_stdin = sys.stdin old_stdout = sys.stdout old_stderr = sys.stderr old_forced_width = clickpkg.formatting.FORCED_WIDTH clickpkg.formatting.FORCED_WIDTH = 80 env = self.make_env(env) if PY2: bytes_output = StringIO() if self.echo_stdin: input = EchoingStdin(input, bytes_output) sys.stdout = bytes_output if not self.mix_stderr: bytes_error = StringIO() sys.stderr = bytes_error else: bytes_output = io.BytesIO() if self.echo_stdin: input = EchoingStdin(input, bytes_output) input = io.TextIOWrapper(input, encoding=self.charset) sys.stdout = io.TextIOWrapper( bytes_output, encoding=self.charset) if not self.mix_stderr: bytes_error = io.BytesIO() sys.stderr = io.TextIOWrapper( bytes_error, encoding=self.charset) if self.mix_stderr: sys.stderr = sys.stdout sys.stdin = input def visible_input(prompt=None): sys.stdout.write(prompt or '') val = input.readline().rstrip('\r\n') sys.stdout.write(val + '\n') sys.stdout.flush() return val def hidden_input(prompt=None): sys.stdout.write((prompt or '') + '\n') sys.stdout.flush() return input.readline().rstrip('\r\n') def _getchar(echo): char = sys.stdin.read(1) if echo: sys.stdout.write(char) sys.stdout.flush() return char default_color = color def should_strip_ansi(stream=None, color=None): if color is None: return not default_color return not color old_visible_prompt_func = clickpkg.termui.visible_prompt_func old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func old__getchar_func = clickpkg.termui._getchar old_should_strip_ansi = clickpkg.utils.should_strip_ansi clickpkg.termui.visible_prompt_func = visible_input clickpkg.termui.hidden_prompt_func = hidden_input clickpkg.termui._getchar = _getchar clickpkg.utils.should_strip_ansi = should_strip_ansi old_env = {} try: for key, value in iteritems(env): old_env[key] = os.environ.get(key) if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value yield (bytes_output, not self.mix_stderr and bytes_error) finally: for key, value in iteritems(old_env): if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value sys.stdout = old_stdout sys.stderr = old_stderr sys.stdin = old_stdin clickpkg.termui.visible_prompt_func = old_visible_prompt_func clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func clickpkg.termui._getchar = old__getchar_func clickpkg.utils.should_strip_ansi = old_should_strip_ansi clickpkg.formatting.FORCED_WIDTH = old_forced_width def invoke(self, cli, args=None, input=None, env=None, catch_exceptions=True, color=False, mix_stderr=False, **extra): """Invokes a command in an isolated environment. The arguments are forwarded directly to the command line script, the `extra` keyword arguments are passed to the :meth:`~clickpkg.Command.main` function of the command. This returns a :class:`Result` object. .. versionadded:: 3.0 The ``catch_exceptions`` parameter was added. .. versionchanged:: 3.0 The result object now has an `exc_info` attribute with the traceback if available. .. versionadded:: 4.0 The ``color`` parameter was added. :param cli: the command to invoke :param args: the arguments to invoke. It may be given as an iterable or a string. When given as string it will be interpreted as a Unix shell command. More details at :func:`shlex.split`. :param input: the input data for `sys.stdin`. :param env: the environment overrides. :param catch_exceptions: Whether to catch any other exceptions than ``SystemExit``. :param extra: the keyword arguments to pass to :meth:`main`. :param color: whether the output should contain color codes. The application can still override this explicitly. """ exc_info = None with self.isolation(input=input, env=env, color=color) as outstreams: exception = None exit_code = 0 if isinstance(args, string_types): args = shlex.split(args) try: prog_name = extra.pop("prog_name") except KeyError: prog_name = self.get_default_prog_name(cli) try: cli.main(args=args or (), prog_name=prog_name, **extra) except SystemExit as e: exc_info = sys.exc_info() exit_code = e.code if exit_code is None: exit_code = 0 if exit_code != 0: exception = e if not isinstance(exit_code, int): sys.stdout.write(str(exit_code)) sys.stdout.write('\n') exit_code = 1 except Exception as e: if not catch_exceptions: raise exception = e exit_code = 1 exc_info = sys.exc_info() finally: sys.stdout.flush() stdout = outstreams[0].getvalue() stderr = outstreams[1] and outstreams[1].getvalue() return Result(runner=self, stdout_bytes=stdout, stderr_bytes=stderr, exit_code=exit_code, exception=exception, exc_info=exc_info) @contextlib.contextmanager def isolated_filesystem(self): """A context manager that creates a temporary folder and changes the current working directory to it for isolated filesystem tests. """ cwd = os.getcwd() t = tempfile.mkdtemp() os.chdir(t) try: yield t finally: os.chdir(cwd) try: shutil.rmtree(t) except (OSError, IOError): pass