From 5a8a4059d646fd313e81365ef240562556d8bd3f Mon Sep 17 00:00:00 2001
From: Eliah Kagan <degeneracypressure@gmail.com>
Date: Fri, 6 Jun 2025 23:16:45 -0400
Subject: [PATCH] Refactor Git.{AutoInterrupt,CatFileContentStream} nesting

This makes `Git.AutoInterrupt` and `Git.CatFileContentStream`
transparent aliases to top-level nonpublic `_AutoInterrupt` and
`_CatFileContentStream` classes in the `cmd` module.

This does not change the "public" interface. It also does not
change metadata relevant to documentation: the `__name__` and
`__qualname__` attributes are set explicitly to the values they had
before when these classes were defined nested, so that Sphinx
continues to document them (and to do so in full) in `Git` and as
`Git.AutoInterrupt` and `Git.CatFileContentStream`.

The purpose of this is to increase readability. The `Git` class is
big and complex, with a number of long members and various forms of
nesting. Since these two classes can be understood even without
reading the code of the `Git` class, moving the definitions out of
the `Git` class into top-level nonpublic classes will hopefully
increase readability and help with maintenance.
---
 git/cmd.py | 440 +++++++++++++++++++++++++++--------------------------
 1 file changed, 226 insertions(+), 214 deletions(-)

diff --git a/git/cmd.py b/git/cmd.py
index 7f46edc8f..a6195880d 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -308,6 +308,230 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc
 
 ## -- End Utilities -- @}
 
+
+class _AutoInterrupt:
+    """Process wrapper that terminates the wrapped process on finalization.
+
+    This kills/interrupts the stored process instance once this instance goes out of
+    scope. It is used to prevent processes piling up in case iterators stop reading.
+
+    All attributes are wired through to the contained process object.
+
+    The wait method is overridden to perform automatic status code checking and possibly
+    raise.
+    """
+
+    __slots__ = ("proc", "args", "status")
+
+    # If this is non-zero it will override any status code during _terminate, used
+    # to prevent race conditions in testing.
+    _status_code_if_terminate: int = 0
+
+    def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
+        self.proc = proc
+        self.args = args
+        self.status: Union[int, None] = None
+
+    def _terminate(self) -> None:
+        """Terminate the underlying process."""
+        if self.proc is None:
+            return
+
+        proc = self.proc
+        self.proc = None
+        if proc.stdin:
+            proc.stdin.close()
+        if proc.stdout:
+            proc.stdout.close()
+        if proc.stderr:
+            proc.stderr.close()
+        # Did the process finish already so we have a return code?
+        try:
+            if proc.poll() is not None:
+                self.status = self._status_code_if_terminate or proc.poll()
+                return
+        except OSError as ex:
+            _logger.info("Ignored error after process had died: %r", ex)
+
+        # It can be that nothing really exists anymore...
+        if os is None or getattr(os, "kill", None) is None:
+            return
+
+        # Try to kill it.
+        try:
+            proc.terminate()
+            status = proc.wait()  # Ensure the process goes away.
+
+            self.status = self._status_code_if_terminate or status
+        except OSError as ex:
+            _logger.info("Ignored error after process had died: %r", ex)
+        # END exception handling
+
+    def __del__(self) -> None:
+        self._terminate()
+
+    def __getattr__(self, attr: str) -> Any:
+        return getattr(self.proc, attr)
+
+    # TODO: Bad choice to mimic `proc.wait()` but with different args.
+    def wait(self, stderr: Union[None, str, bytes] = b"") -> int:
+        """Wait for the process and return its status code.
+
+        :param stderr:
+            Previously read value of stderr, in case stderr is already closed.
+
+        :warn:
+            May deadlock if output or error pipes are used and not handled separately.
+
+        :raise git.exc.GitCommandError:
+            If the return status is not 0.
+        """
+        if stderr is None:
+            stderr_b = b""
+        stderr_b = force_bytes(data=stderr, encoding="utf-8")
+        status: Union[int, None]
+        if self.proc is not None:
+            status = self.proc.wait()
+            p_stderr = self.proc.stderr
+        else:  # Assume the underlying proc was killed earlier or never existed.
+            status = self.status
+            p_stderr = None
+
+        def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
+            if stream:
+                try:
+                    return stderr_b + force_bytes(stream.read())
+                except (OSError, ValueError):
+                    return stderr_b or b""
+            else:
+                return stderr_b or b""
+
+        # END status handling
+
+        if status != 0:
+            errstr = read_all_from_possibly_closed_stream(p_stderr)
+            _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,))
+            raise GitCommandError(remove_password_if_present(self.args), status, errstr)
+        return status
+
+
+_AutoInterrupt.__name__ = "AutoInterrupt"
+_AutoInterrupt.__qualname__ = "Git.AutoInterrupt"
+
+
+class _CatFileContentStream:
+    """Object representing a sized read-only stream returning the contents of
+    an object.
+
+    This behaves like a stream, but counts the data read and simulates an empty stream
+    once our sized content region is empty.
+
+    If not all data are read to the end of the object's lifetime, we read the rest to
+    ensure the underlying stream continues to work.
+    """
+
+    __slots__ = ("_stream", "_nbr", "_size")
+
+    def __init__(self, size: int, stream: IO[bytes]) -> None:
+        self._stream = stream
+        self._size = size
+        self._nbr = 0  # Number of bytes read.
+
+        # Special case: If the object is empty, has null bytes, get the final
+        # newline right away.
+        if size == 0:
+            stream.read(1)
+        # END handle empty streams
+
+    def read(self, size: int = -1) -> bytes:
+        bytes_left = self._size - self._nbr
+        if bytes_left == 0:
+            return b""
+        if size > -1:
+            # Ensure we don't try to read past our limit.
+            size = min(bytes_left, size)
+        else:
+            # They try to read all, make sure it's not more than what remains.
+            size = bytes_left
+        # END check early depletion
+        data = self._stream.read(size)
+        self._nbr += len(data)
+
+        # Check for depletion, read our final byte to make the stream usable by
+        # others.
+        if self._size - self._nbr == 0:
+            self._stream.read(1)  # final newline
+        # END finish reading
+        return data
+
+    def readline(self, size: int = -1) -> bytes:
+        if self._nbr == self._size:
+            return b""
+
+        # Clamp size to lowest allowed value.
+        bytes_left = self._size - self._nbr
+        if size > -1:
+            size = min(bytes_left, size)
+        else:
+            size = bytes_left
+        # END handle size
+
+        data = self._stream.readline(size)
+        self._nbr += len(data)
+
+        # Handle final byte.
+        if self._size - self._nbr == 0:
+            self._stream.read(1)
+        # END finish reading
+
+        return data
+
+    def readlines(self, size: int = -1) -> List[bytes]:
+        if self._nbr == self._size:
+            return []
+
+        # Leave all additional logic to our readline method, we just check the size.
+        out = []
+        nbr = 0
+        while True:
+            line = self.readline()
+            if not line:
+                break
+            out.append(line)
+            if size > -1:
+                nbr += len(line)
+                if nbr > size:
+                    break
+            # END handle size constraint
+        # END readline loop
+        return out
+
+    # skipcq: PYL-E0301
+    def __iter__(self) -> "Git.CatFileContentStream":
+        return self
+
+    def __next__(self) -> bytes:
+        line = self.readline()
+        if not line:
+            raise StopIteration
+
+        return line
+
+    next = __next__
+
+    def __del__(self) -> None:
+        bytes_left = self._size - self._nbr
+        if bytes_left:
+            # Read and discard - seeking is impossible within a stream.
+            # This includes any terminating newline.
+            self._stream.read(bytes_left + 1)
+        # END handle incomplete read
+
+
+_CatFileContentStream.__name__ = "CatFileContentStream"
+_CatFileContentStream.__qualname__ = "Git.CatFileContentStream"
+
+
 _USE_SHELL_DEFAULT_MESSAGE = (
     "Git.USE_SHELL is deprecated, because only its default value of False is safe. "
     "It will be removed in a future release."
@@ -728,221 +952,9 @@ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) ->
                         f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it."
                     )
 
-    class AutoInterrupt:
-        """Process wrapper that terminates the wrapped process on finalization.
-
-        This kills/interrupts the stored process instance once this instance goes out of
-        scope. It is used to prevent processes piling up in case iterators stop reading.
-
-        All attributes are wired through to the contained process object.
-
-        The wait method is overridden to perform automatic status code checking and
-        possibly raise.
-        """
-
-        __slots__ = ("proc", "args", "status")
-
-        # If this is non-zero it will override any status code during _terminate, used
-        # to prevent race conditions in testing.
-        _status_code_if_terminate: int = 0
-
-        def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
-            self.proc = proc
-            self.args = args
-            self.status: Union[int, None] = None
-
-        def _terminate(self) -> None:
-            """Terminate the underlying process."""
-            if self.proc is None:
-                return
-
-            proc = self.proc
-            self.proc = None
-            if proc.stdin:
-                proc.stdin.close()
-            if proc.stdout:
-                proc.stdout.close()
-            if proc.stderr:
-                proc.stderr.close()
-            # Did the process finish already so we have a return code?
-            try:
-                if proc.poll() is not None:
-                    self.status = self._status_code_if_terminate or proc.poll()
-                    return
-            except OSError as ex:
-                _logger.info("Ignored error after process had died: %r", ex)
-
-            # It can be that nothing really exists anymore...
-            if os is None or getattr(os, "kill", None) is None:
-                return
-
-            # Try to kill it.
-            try:
-                proc.terminate()
-                status = proc.wait()  # Ensure the process goes away.
-
-                self.status = self._status_code_if_terminate or status
-            except OSError as ex:
-                _logger.info("Ignored error after process had died: %r", ex)
-            # END exception handling
-
-        def __del__(self) -> None:
-            self._terminate()
-
-        def __getattr__(self, attr: str) -> Any:
-            return getattr(self.proc, attr)
-
-        # TODO: Bad choice to mimic `proc.wait()` but with different args.
-        def wait(self, stderr: Union[None, str, bytes] = b"") -> int:
-            """Wait for the process and return its status code.
-
-            :param stderr:
-                Previously read value of stderr, in case stderr is already closed.
-
-            :warn:
-                May deadlock if output or error pipes are used and not handled
-                separately.
-
-            :raise git.exc.GitCommandError:
-                If the return status is not 0.
-            """
-            if stderr is None:
-                stderr_b = b""
-            stderr_b = force_bytes(data=stderr, encoding="utf-8")
-            status: Union[int, None]
-            if self.proc is not None:
-                status = self.proc.wait()
-                p_stderr = self.proc.stderr
-            else:  # Assume the underlying proc was killed earlier or never existed.
-                status = self.status
-                p_stderr = None
-
-            def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
-                if stream:
-                    try:
-                        return stderr_b + force_bytes(stream.read())
-                    except (OSError, ValueError):
-                        return stderr_b or b""
-                else:
-                    return stderr_b or b""
-
-            # END status handling
-
-            if status != 0:
-                errstr = read_all_from_possibly_closed_stream(p_stderr)
-                _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,))
-                raise GitCommandError(remove_password_if_present(self.args), status, errstr)
-            return status
-
-    # END auto interrupt
-
-    class CatFileContentStream:
-        """Object representing a sized read-only stream returning the contents of
-        an object.
-
-        This behaves like a stream, but counts the data read and simulates an empty
-        stream once our sized content region is empty.
-
-        If not all data are read to the end of the object's lifetime, we read the
-        rest to ensure the underlying stream continues to work.
-        """
-
-        __slots__ = ("_stream", "_nbr", "_size")
-
-        def __init__(self, size: int, stream: IO[bytes]) -> None:
-            self._stream = stream
-            self._size = size
-            self._nbr = 0  # Number of bytes read.
-
-            # Special case: If the object is empty, has null bytes, get the final
-            # newline right away.
-            if size == 0:
-                stream.read(1)
-            # END handle empty streams
-
-        def read(self, size: int = -1) -> bytes:
-            bytes_left = self._size - self._nbr
-            if bytes_left == 0:
-                return b""
-            if size > -1:
-                # Ensure we don't try to read past our limit.
-                size = min(bytes_left, size)
-            else:
-                # They try to read all, make sure it's not more than what remains.
-                size = bytes_left
-            # END check early depletion
-            data = self._stream.read(size)
-            self._nbr += len(data)
-
-            # Check for depletion, read our final byte to make the stream usable by
-            # others.
-            if self._size - self._nbr == 0:
-                self._stream.read(1)  # final newline
-            # END finish reading
-            return data
-
-        def readline(self, size: int = -1) -> bytes:
-            if self._nbr == self._size:
-                return b""
-
-            # Clamp size to lowest allowed value.
-            bytes_left = self._size - self._nbr
-            if size > -1:
-                size = min(bytes_left, size)
-            else:
-                size = bytes_left
-            # END handle size
-
-            data = self._stream.readline(size)
-            self._nbr += len(data)
-
-            # Handle final byte.
-            if self._size - self._nbr == 0:
-                self._stream.read(1)
-            # END finish reading
-
-            return data
-
-        def readlines(self, size: int = -1) -> List[bytes]:
-            if self._nbr == self._size:
-                return []
-
-            # Leave all additional logic to our readline method, we just check the size.
-            out = []
-            nbr = 0
-            while True:
-                line = self.readline()
-                if not line:
-                    break
-                out.append(line)
-                if size > -1:
-                    nbr += len(line)
-                    if nbr > size:
-                        break
-                # END handle size constraint
-            # END readline loop
-            return out
-
-        # skipcq: PYL-E0301
-        def __iter__(self) -> "Git.CatFileContentStream":
-            return self
-
-        def __next__(self) -> bytes:
-            line = self.readline()
-            if not line:
-                raise StopIteration
-
-            return line
-
-        next = __next__
+    AutoInterrupt = _AutoInterrupt
 
-        def __del__(self) -> None:
-            bytes_left = self._size - self._nbr
-            if bytes_left:
-                # Read and discard - seeking is impossible within a stream.
-                # This includes any terminating newline.
-                self._stream.read(bytes_left + 1)
-            # END handle incomplete read
+    CatFileContentStream = _CatFileContentStream
 
     def __init__(self, working_dir: Union[None, PathLike] = None) -> None:
         """Initialize this instance with: