import contextlib import io import os import shlex import shutil import sys import tempfile import typing as t from types import TracebackType from . import formatting from . import termui from . import utils from ._compat import _find_binary_reader if t.TYPE_CHECKING: from .core import BaseCommand class EchoingStdin: def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: self._input = input self._output = output self._paused = False def __getattr__(self, x: str) -> t.Any: return getattr(self._input, x) def _echo(self, rv: bytes) -> bytes: if not self._paused: self._output.write(rv) return rv def read(self, n: int = -1) -> bytes: return self._echo(self._input.read(n)) def read1(self, n: int = -1) -> bytes: return self._echo(self._input.read1(n)) # type: ignore def readline(self, n: int = -1) -> bytes: return self._echo(self._input.readline(n)) def readlines(self) -> t.List[bytes]: return [self._echo(x) for x in self._input.readlines()] def __iter__(self) -> t.Iterator[bytes]: return iter(self._echo(x) for x in self._input) def __repr__(self) -> str: return repr(self._input) @contextlib.contextmanager def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]: if stream is None: yield else: stream._paused = True yield stream._paused = False class _NamedTextIOWrapper(io.TextIOWrapper): def __init__( self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any ) -> None: super().__init__(buffer, **kwargs) self._name = name self._mode = mode @property def name(self) -> str: return self._name @property def mode(self) -> str: return self._mode def make_input_stream( input: t.Optional[t.Union[str, bytes, t.IO]], charset: str ) -> t.BinaryIO: # Is already an input stream. if hasattr(input, "read"): rv = _find_binary_reader(t.cast(t.IO, input)) if rv is not None: return rv raise TypeError("Could not find binary reader for input stream.") if input is None: input = b"" elif isinstance(input, str): input = input.encode(charset) return io.BytesIO(t.cast(bytes, input)) class Result: """Holds the captured result of an invoked CLI script.""" def __init__( self, runner: "CliRunner", stdout_bytes: bytes, stderr_bytes: t.Optional[bytes], return_value: t.Any, exit_code: int, exception: t.Optional[BaseException], exc_info: t.Optional[ t.Tuple[t.Type[BaseException], BaseException, TracebackType] ] = None, ): #: The runner that created the result self.runner = runner #: The standard output as bytes. self.stdout_bytes = stdout_bytes #: The standard error as bytes, or None if not available self.stderr_bytes = stderr_bytes #: The value returned from the invoked command. #: #: .. versionadded:: 8.0 self.return_value = return_value #: The exit code as integer. self.exit_code = exit_code #: The exception that happened if one did. self.exception = exception #: The traceback self.exc_info = exc_info @property def output(self) -> str: """The (standard) output as unicode string.""" return self.stdout @property def stdout(self) -> str: """The standard output as unicode string.""" return self.stdout_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) @property def stderr(self) -> str: """The standard error as unicode string.""" if self.stderr_bytes is None: raise ValueError("stderr not separately captured") return self.stderr_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) def __repr__(self) -> str: exc_str = repr(self.exception) if self.exception else "okay" return f"<{type(self).__name__} {exc_str}>" class CliRunner: """The CLI runner provides functionality to invoke a Click command line script for unittesting purposes in a isolated environment. This only works in single-threaded systems without any concurrency as it changes the global interpreter state. :param charset: the character set for the input and output data. :param env: a dictionary with environment variables for overriding. :param echo_stdin: if this is set to `True`, then reading from stdin writes to stdout. This is useful for showing examples in some circumstances. Note that regular prompts will automatically echo the input. :param mix_stderr: if this is set to `False`, then stdout and stderr are preserved as independent streams. This is useful for Unix-philosophy apps that have predictable stdout and noisy stderr, such that each may be measured independently """ def __init__( self, charset: str = "utf-8", env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, echo_stdin: bool = False, mix_stderr: bool = True, ) -> None: self.charset = charset self.env = env or {} self.echo_stdin = echo_stdin self.mix_stderr = mix_stderr def get_default_prog_name(self, cli: "BaseCommand") -> str: """Given a command object it will return the default program name for it. The default is the `name` attribute or ``"root"`` if not set. """ return cli.name or "root" def make_env( self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None ) -> t.Mapping[str, t.Optional[str]]: """Returns the environment overrides for invoking a script.""" rv = dict(self.env) if overrides: rv.update(overrides) return rv @contextlib.contextmanager def isolation( self, input: t.Optional[t.Union[str, bytes, t.IO]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, color: bool = False, ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]: """A context manager that sets up the isolation for invoking of a command line tool. This sets up stdin with the given input data and `os.environ` with the overrides from the given dictionary. This also rebinds some internals in Click to be mocked (like the prompt functionality). This is automatically done in the :meth:`invoke` method. :param input: the input stream to put into sys.stdin. :param env: the environment overrides as dictionary. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionchanged:: 8.0 ``stderr`` is opened with ``errors="backslashreplace"`` instead of the default ``"strict"``. .. versionchanged:: 4.0 Added the ``color`` parameter. """ bytes_input = make_input_stream(input, self.charset) echo_input = None old_stdin = sys.stdin old_stdout = sys.stdout old_stderr = sys.stderr old_forced_width = formatting.FORCED_WIDTH formatting.FORCED_WIDTH = 80 env = self.make_env(env) bytes_output = io.BytesIO() if self.echo_stdin: bytes_input = echo_input = t.cast( t.BinaryIO, EchoingStdin(bytes_input, bytes_output) ) sys.stdin = text_input = _NamedTextIOWrapper( bytes_input, encoding=self.charset, name="", mode="r" ) if self.echo_stdin: # Force unbuffered reads, otherwise TextIOWrapper reads a # large chunk which is echoed early. text_input._CHUNK_SIZE = 1 # type: ignore sys.stdout = _NamedTextIOWrapper( bytes_output, encoding=self.charset, name="", mode="w" ) bytes_error = None if self.mix_stderr: sys.stderr = sys.stdout else: bytes_error = io.BytesIO() sys.stderr = _NamedTextIOWrapper( bytes_error, encoding=self.charset, name="", mode="w", errors="backslashreplace", ) @_pause_echo(echo_input) # type: ignore def visible_input(prompt: t.Optional[str] = None) -> str: sys.stdout.write(prompt or "") val = text_input.readline().rstrip("\r\n") sys.stdout.write(f"{val}\n") sys.stdout.flush() return val @_pause_echo(echo_input) # type: ignore def hidden_input(prompt: t.Optional[str] = None) -> str: sys.stdout.write(f"{prompt or ''}\n") sys.stdout.flush() return text_input.readline().rstrip("\r\n") @_pause_echo(echo_input) # type: ignore def _getchar(echo: bool) -> str: char = sys.stdin.read(1) if echo: sys.stdout.write(char) sys.stdout.flush() return char default_color = color def should_strip_ansi( stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None ) -> bool: if color is None: return not default_color return not color old_visible_prompt_func = termui.visible_prompt_func old_hidden_prompt_func = termui.hidden_prompt_func old__getchar_func = termui._getchar old_should_strip_ansi = utils.should_strip_ansi # type: ignore termui.visible_prompt_func = visible_input termui.hidden_prompt_func = hidden_input termui._getchar = _getchar utils.should_strip_ansi = should_strip_ansi # type: ignore old_env = {} try: for key, value in env.items(): old_env[key] = os.environ.get(key) if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value yield (bytes_output, bytes_error) finally: for key, value in old_env.items(): if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value sys.stdout = old_stdout sys.stderr = old_stderr sys.stdin = old_stdin termui.visible_prompt_func = old_visible_prompt_func termui.hidden_prompt_func = old_hidden_prompt_func termui._getchar = old__getchar_func utils.should_strip_ansi = old_should_strip_ansi # type: ignore formatting.FORCED_WIDTH = old_forced_width def invoke( self, cli: "BaseCommand", args: t.Optional[t.Union[str, t.Sequence[str]]] = None, input: t.Optional[t.Union[str, bytes, t.IO]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, catch_exceptions: bool = True, color: bool = False, **extra: t.Any, ) -> Result: """Invokes a command in an isolated environment. The arguments are forwarded directly to the command line script, the `extra` keyword arguments are passed to the :meth:`~clickpkg.Command.main` function of the command. This returns a :class:`Result` object. :param cli: the command to invoke :param args: the arguments to invoke. It may be given as an iterable or a string. When given as string it will be interpreted as a Unix shell command. More details at :func:`shlex.split`. :param input: the input data for `sys.stdin`. :param env: the environment overrides. :param catch_exceptions: Whether to catch any other exceptions than ``SystemExit``. :param extra: the keyword arguments to pass to :meth:`main`. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionchanged:: 8.0 The result object has the ``return_value`` attribute with the value returned from the invoked command. .. versionchanged:: 4.0 Added the ``color`` parameter. .. versionchanged:: 3.0 Added the ``catch_exceptions`` parameter. .. versionchanged:: 3.0 The result object has the ``exc_info`` attribute with the traceback if available. """ exc_info = None with self.isolation(input=input, env=env, color=color) as outstreams: return_value = None exception: t.Optional[BaseException] = None exit_code = 0 if isinstance(args, str): args = shlex.split(args) try: prog_name = extra.pop("prog_name") except KeyError: prog_name = self.get_default_prog_name(cli) try: return_value = cli.main(args=args or (), prog_name=prog_name, **extra) except SystemExit as e: exc_info = sys.exc_info() e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code) if e_code is None: e_code = 0 if e_code != 0: exception = e if not isinstance(e_code, int): sys.stdout.write(str(e_code)) sys.stdout.write("\n") e_code = 1 exit_code = e_code except Exception as e: if not catch_exceptions: raise exception = e exit_code = 1 exc_info = sys.exc_info() finally: sys.stdout.flush() stdout = outstreams[0].getvalue() if self.mix_stderr: stderr = None else: stderr = outstreams[1].getvalue() # type: ignore return Result( runner=self, stdout_bytes=stdout, stderr_bytes=stderr, return_value=return_value, exit_code=exit_code, exception=exception, exc_info=exc_info, # type: ignore ) @contextlib.contextmanager def isolated_filesystem( self, temp_dir: t.Optional[t.Union[str, os.PathLike]] = None ) -> t.Iterator[str]: """A context manager that creates a temporary directory and changes the current working directory to it. This isolates tests that affect the contents of the CWD to prevent them from interfering with each other. :param temp_dir: Create the temporary directory under this directory. If given, the created directory is not removed when exiting. .. versionchanged:: 8.0 Added the ``temp_dir`` parameter. """ cwd = os.getcwd() dt = tempfile.mkdtemp(dir=temp_dir) # type: ignore[type-var] os.chdir(dt) try: yield t.cast(str, dt) finally: os.chdir(cwd) if temp_dir is None: try: shutil.rmtree(dt) except OSError: # noqa: B014 pass