Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions git/cmd.py
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
finalizer: Union[None,
Callable[[Union[subprocess.Popen, 'Git.AutoInterrupt']], None]] = None,
decode_streams: bool = True,
timeout: float = 10.0) -> None:
kill_after_timeout: Union[None, float] = None) -> None:
"""Registers for notifications to learn that process output is ready to read, and dispatches lines to
the respective line handlers.
This function returns once the finalizer returns
@@ -94,7 +94,10 @@ def handle_process_output(process: 'Git.AutoInterrupt' | Popen,
their contents to handlers.
Set it to False if `universal_newline == True` (then streams are in text-mode)
or if decoding must happen later (i.e. for Diffs).
:param timeout: float, timeout to pass to t.join() in case it hangs. Default = 10.0 seconds
:param kill_after_timeout:
float or None, Default = None
To specify a timeout in seconds for the git command, after which the process
should be killed.
"""
# Use 2 "pump" threads and wait for both to finish.
def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO], is_decode: bool,
@@ -108,9 +111,12 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
handler(line_str)
else:
handler(line)

except Exception as ex:
log.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}")
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
if "I/O operation on closed file" not in str(ex):
# Only reraise if the error was not due to the stream closing
raise CommandError([f'<{name}-pump>'] + remove_password_if_present(cmdline), ex) from ex
finally:
stream.close()

@@ -146,9 +152,24 @@ def pump_stream(cmdline: List[str], name: str, stream: Union[BinaryIO, TextIO],
## FIXME: Why Join?? Will block if `stdin` needs feeding...
#
for t in threads:
t.join(timeout=timeout)
t.join(timeout=kill_after_timeout)
if t.is_alive():
raise RuntimeError(f"Thread join() timed out in cmd.handle_process_output(). Timeout={timeout} seconds")
if isinstance(process, Git.AutoInterrupt):
process._terminate()
else: # Don't want to deal with the other case
raise RuntimeError("Thread join() timed out in cmd.handle_process_output()."
f" kill_after_timeout={kill_after_timeout} seconds")
if stderr_handler:
error_str: Union[str, bytes] = (
"error: process killed because it timed out."
f" kill_after_timeout={kill_after_timeout} seconds")
if not decode_streams and isinstance(p_stderr, BinaryIO):
# Assume stderr_handler needs binary input
error_str = cast(str, error_str)
error_str = error_str.encode()
# We ignore typing on the next line because mypy does not like
# the way we inferred that stderr takes str or bytes
stderr_handler(error_str) # type: ignore

if finalizer:
return finalizer(process)
@@ -386,13 +407,19 @@ class AutoInterrupt(object):
The wait method was overridden to perform automatic status code checking
and possibly raise."""

__slots__ = ("proc", "args")
__slots__ = ("proc", "args", "status")

# If this is non-zero it will override any status code during
# _terminate, used to prevent race conditions in testing
_status_code_if_terminate: int = 0

def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None:
self.proc = proc
self.args = args
self.status: Union[int, None] = None

def __del__(self) -> None:
def _terminate(self) -> None:
"""Terminate the underlying process"""
if self.proc is None:
return

@@ -404,10 +431,10 @@ def __del__(self) -> None:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()

# did the process finish already so we have a return code ?
try:
if proc.poll() is not None:
self.status = self._status_code_if_terminate or proc.poll()
return None
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
@@ -419,7 +446,9 @@ def __del__(self) -> None:
# try to kill it
try:
proc.terminate()
proc.wait() # ensure process goes away
status = proc.wait() # ensure process goes away

self.status = self._status_code_if_terminate or status
except OSError as ex:
log.info("Ignored error after process had died: %r", ex)
except AttributeError:
@@ -431,6 +460,9 @@ def __del__(self) -> None:
call(("TASKKILL /F /T /PID %s 2>nul 1>nul" % str(proc.pid)), shell=True)
# END exception handling

def __del__(self) -> None:
self._terminate()

def __getattr__(self, attr: str) -> Any:
return getattr(self.proc, attr)

@@ -444,24 +476,29 @@ def wait(self, stderr: Union[None, str, bytes] = b'') -> int:
if stderr is None:
stderr_b = b''
stderr_b = force_bytes(data=stderr, encoding='utf-8')

status: Union[int, None]
if self.proc is not None:
status = self.proc.wait()
p_stderr = self.proc.stderr
else: # Assume the underlying proc was killed earlier or never existed
status = self.status
p_stderr = None

def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr_b + force_bytes(stream.read())
except ValueError:
return stderr_b or b''
else:
return stderr_b or b''

if status != 0:
errstr = read_all_from_possibly_closed_stream(self.proc.stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
# END status handling

if status != 0:
errstr = read_all_from_possibly_closed_stream(p_stderr)
log.debug('AutoInterrupt wait stderr: %r' % (errstr,))
raise GitCommandError(remove_password_if_present(self.args), status, errstr)
return status

# END auto interrupt
@@ -694,7 +731,7 @@ def execute(self,
as_process: bool = False,
output_stream: Union[None, BinaryIO] = None,
stdout_as_string: bool = True,
kill_after_timeout: Union[None, int] = None,
kill_after_timeout: Union[None, float] = None,
with_stdout: bool = True,
universal_newlines: bool = False,
shell: Union[None, bool] = None,
@@ -817,7 +854,7 @@ def execute(self,

if is_win:
cmd_not_found_exception = OSError
if kill_after_timeout:
if kill_after_timeout is not None:
raise GitCommandError(redacted_command, '"kill_after_timeout" feature is not supported on Windows.')
else:
cmd_not_found_exception = FileNotFoundError # NOQA # exists, flake8 unknown @UndefinedVariable
@@ -884,7 +921,7 @@ def _kill_process(pid: int) -> None:
return
# end

if kill_after_timeout:
if kill_after_timeout is not None:
kill_check = threading.Event()
watchdog = threading.Timer(kill_after_timeout, _kill_process, args=(proc.pid,))

@@ -895,10 +932,10 @@ def _kill_process(pid: int) -> None:
newline = "\n" if universal_newlines else b"\n"
try:
if output_stream is None:
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.start()
stdout_value, stderr_value = proc.communicate()
if kill_after_timeout:
if kill_after_timeout is not None:
watchdog.cancel()
if kill_check.is_set():
stderr_value = ('Timeout: the command "%s" did not complete in %d '
40 changes: 31 additions & 9 deletions git/remote.py
Original file line number Diff line number Diff line change
@@ -707,7 +707,8 @@ def update(self, **kwargs: Any) -> 'Remote':
return self

def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None,
) -> IterableList['FetchInfo']:

progress = to_progress_instance(progress)
@@ -724,7 +725,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
cmds = set(FetchInfo._flag_map.keys())

progress_handler = progress.new_message_handler()
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, None, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)

stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
proc.wait(stderr=stderr_text)
@@ -769,7 +771,8 @@ def _get_fetch_info_from_stderr(self, proc: 'Git.AutoInterrupt',
return output

def _get_push_info(self, proc: 'Git.AutoInterrupt',
progress: Union[Callable[..., Any], RemoteProgress, None]) -> IterableList[PushInfo]:
progress: Union[Callable[..., Any], RemoteProgress, None],
kill_after_timeout: Union[None, float] = None) -> IterableList[PushInfo]:
progress = to_progress_instance(progress)

# read progress information from stderr
@@ -786,11 +789,14 @@ def stdout_handler(line: str) -> None:
# If an error happens, additional info is given which we parse below.
pass

handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False)
handle_process_output(proc, stdout_handler, progress_handler, finalizer=None, decode_streams=False,
kill_after_timeout=kill_after_timeout)
stderr_text = progress.error_lines and '\n'.join(progress.error_lines) or ''
try:
proc.wait(stderr=stderr_text)
except Exception:
# This is different than fetch (which fails if there is any std_err
# even if there is an output)
if not output:
raise
elif stderr_text:
@@ -813,7 +819,9 @@ def _assert_refspec(self) -> None:

def fetch(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, None, 'UpdateProgress'] = None,
verbose: bool = True, **kwargs: Any) -> IterableList[FetchInfo]:
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote
:param refspec:
@@ -833,6 +841,9 @@ def fetch(self, refspec: Union[str, List[str], None] = None,
for 'refspec' will make use of this facility.
:param progress: See 'push' method
:param verbose: Boolean for verbose output
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-fetch
:return:
IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed
@@ -853,19 +864,22 @@ def fetch(self, refspec: Union[str, List[str], None] = None,

proc = self.repo.git.fetch(self, *args, as_process=True, with_stdout=False,
universal_newlines=True, v=verbose, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def pull(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
by a merge of branch with your local branch.
:param refspec: see 'fetch' method
:param progress: see 'push' method
:param kill_after_timeout: see 'fetch' method
:param kwargs: Additional arguments to be passed to git-pull
:return: Please see 'fetch' method """
if refspec is None:
@@ -874,13 +888,15 @@ def pull(self, refspec: Union[str, List[str], None] = None,
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.pull(self, refspec, with_stdout=False, as_process=True,
universal_newlines=True, v=True, **kwargs)
res = self._get_fetch_info_from_stderr(proc, progress)
res = self._get_fetch_info_from_stderr(proc, progress,
kill_after_timeout=kill_after_timeout)
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
return res

def push(self, refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, 'UpdateProgress', Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
**kwargs: Any) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.
@@ -897,6 +913,9 @@ def push(self, refspec: Union[str, List[str], None] = None,
overrides the ``update()`` function.
:note: No further progress information is returned after push returns.
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. It is set to None by default.
:param kwargs: Additional arguments to be passed to git-push
:return:
list(PushInfo, ...) list of PushInfo instances, each
@@ -908,8 +927,11 @@ def push(self, refspec: Union[str, List[str], None] = None,
be 0."""
kwargs = add_progress(kwargs, self.repo.git, progress)
proc = self.repo.git.push(self, refspec, porcelain=True, as_process=True,
universal_newlines=True, **kwargs)
return self._get_push_info(proc, progress)
universal_newlines=True,
kill_after_timeout=kill_after_timeout,
**kwargs)
return self._get_push_info(proc, progress,
kill_after_timeout=kill_after_timeout)

@ property
def config_reader(self) -> SectionConstraint[GitConfigParser]:
30 changes: 25 additions & 5 deletions test/test_remote.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import random
import tempfile
import pytest
from unittest import skipIf

from git import (
@@ -164,7 +165,7 @@ def _commit_random_file(self, repo):
index.commit("Committing %s" % new_file)
return new_file

def _do_test_fetch(self, remote, rw_repo, remote_repo):
def _do_test_fetch(self, remote, rw_repo, remote_repo, **kwargs):
# specialized fetch testing to de-clutter the main test
self._do_test_fetch_info(rw_repo)

@@ -183,7 +184,7 @@ def get_info(res, remote, name):
# put remote head to master as it is guaranteed to exist
remote_repo.head.reference = remote_repo.heads.master

res = fetch_and_test(remote)
res = fetch_and_test(remote, **kwargs)
# all up to date
for info in res:
self.assertTrue(info.flags & info.HEAD_UPTODATE)
@@ -401,12 +402,12 @@ def _assert_push_and_pull(self, remote, rw_repo, remote_repo):
res = remote.push(all=True)
self._do_test_push_result(res, remote)

remote.pull('master')
remote.pull('master', kill_after_timeout=10.0)

# cleanup - delete created tags and branches as we are in an innerloop on
# the same repository
TagReference.delete(rw_repo, new_tag, other_tag)
remote.push(":%s" % other_tag.path)
remote.push(":%s" % other_tag.path, kill_after_timeout=10.0)

@skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes!")
@with_rw_and_rw_remote_repo('0.1.6')
@@ -467,7 +468,8 @@ def test_base(self, rw_repo, remote_repo):
# Only for remotes - local cases are the same or less complicated
# as additional progress information will never be emitted
if remote.name == "daemon_origin":
self._do_test_fetch(remote, rw_repo, remote_repo)
self._do_test_fetch(remote, rw_repo, remote_repo,
kill_after_timeout=10.0)
ran_fetch_test = True
# END fetch test

@@ -651,3 +653,21 @@ def test_push_error(self, repo):
rem = repo.remote('origin')
with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"):
rem.push('__BAD_REF__')


class TestTimeouts(TestBase):
@with_rw_repo('HEAD', bare=False)
def test_timeout_funcs(self, repo):
# Force error code to prevent a race condition if the python thread is
# slow
default = Git.AutoInterrupt._status_code_if_terminate
Git.AutoInterrupt._status_code_if_terminate = -15
for function in ["pull", "fetch"]: # can't get push to timeout
f = getattr(repo.remotes.origin, function)
assert f is not None # Make sure these functions exist
_ = f() # Make sure the function runs
with pytest.raises(GitCommandError,
match="kill_after_timeout=0 s"):
f(kill_after_timeout=0)

Git.AutoInterrupt._status_code_if_terminate = default