diff --git a/.github/workflows/cygwin-test.yml b/.github/workflows/cygwin-test.yml index 16b42f89c..0018e7dfc 100644 --- a/.github/workflows/cygwin-test.yml +++ b/.github/workflows/cygwin-test.yml @@ -1,16 +1,12 @@ name: test-cygwin -on: - push: - branches: - main - pull_request: - branches: - main +on: [push, pull_request, workflow_dispatch] jobs: build: runs-on: windows-latest + strategy: + fail-fast: false env: CHERE_INVOKING: 1 SHELLOPTS: igncr @@ -47,11 +43,6 @@ jobs: # If we rewrite the user's config by accident, we will mess it up # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - shell: bash.exe -eo pipefail -o igncr "{0}" - run: | - set -x - /usr/bin/python -m flake8 - name: Test with pytest shell: bash.exe -eo pipefail -o igncr "{0}" run: | diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 000000000..c78a4053a --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,14 @@ +name: Lint + +on: [push, pull_request, workflow_dispatch] + +jobs: + lint: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + - uses: pre-commit/action@v3.0.0 diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 5373dace6..6d6c67952 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -3,11 +3,7 @@ name: Python package -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] +on: [push, pull_request, workflow_dispatch] permissions: contents: read @@ -17,6 +13,7 @@ jobs: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [3.7, 3.8, 3.9, "3.10", "3.11"] @@ -47,11 +44,6 @@ jobs: # and cause subsequent tests to fail cat test/fixtures/.gitconfig >> ~/.gitconfig - - name: Lint with flake8 - run: | - set -x - flake8 - - name: Check types with mypy # With new versions of pypi new issues might arise. This is a problem if there is nobody able to fix them, # so we have to ignore errors until that changes. diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..581cb69b2 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: + [ + flake8-bugbear==22.12.6, + flake8-comprehensions==3.10.1, + flake8-typing-imports==1.14.0, + ] + exclude: ^doc|^git/ext/|^test/ + + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-merge-conflict + - id: check-toml + - id: check-yaml diff --git a/README.md b/README.md index 54a735e53..82c5c9e0f 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,7 @@ with MINGW's. Ensure testing libraries are installed. In the root directory, run: `pip install -r test-requirements.txt` -To lint, run: `flake8` +To lint, run: `pre-commit run --all-files` To typecheck, run: `mypy -p git` diff --git a/VERSION b/VERSION index 51b450da3..339bdc849 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.1.30 +3.1.31 diff --git a/doc/source/changes.rst b/doc/source/changes.rst index 7cd09a1c5..4ee613bcc 100644 --- a/doc/source/changes.rst +++ b/doc/source/changes.rst @@ -2,6 +2,12 @@ Changelog ========= +3.1.31 +====== + +See the following for all changes. +https://github.com/gitpython-developers/gitpython/milestone/61?closed=1 + 3.1.30 ====== diff --git a/git/cmd.py b/git/cmd.py index 9ef1e3a65..dfce9024d 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -695,15 +695,14 @@ def __iter__(self) -> "Git.CatFileContentStream": return self def __next__(self) -> bytes: - return next(self) - - def next(self) -> bytes: line = self.readline() if not line: raise StopIteration return line + next = __next__ + def __del__(self) -> None: bytes_left = self._size - self._nbr if bytes_left: @@ -735,6 +734,7 @@ def __init__(self, working_dir: Union[None, PathLike] = None): def __getattr__(self, name: str) -> Any: """A convenience method as it allows to call the command as if it was an object. + :return: Callable object that will execute call _call_process with your arguments.""" if name[0] == "_": return LazyMixin.__getattr__(self, name) @@ -915,7 +915,7 @@ def execute( render the repository incapable of accepting changes until the lock is manually removed. :param strip_newline_in_stdout: - Whether to strip the trailing `\n` of the command stdout. + Whether to strip the trailing ``\\n`` of the command stdout. :return: * str(output) if extended_output = False (Default) * tuple(int(status), str(stdout), str(stderr)) if extended_output = True @@ -1384,7 +1384,8 @@ def get_object_header(self, ref: str) -> Tuple[str, str, int]: def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: """As get_object_header, but returns object data as well - :return: (hexsha, type_string, size_as_int,data_string) + + :return: (hexsha, type_string, size_as_int, data_string) :note: not threadsafe""" hexsha, typename, size, stream = self.stream_object_data(ref) data = stream.read(size) diff --git a/git/config.py b/git/config.py index 71d7ea689..e05a297af 100644 --- a/git/config.py +++ b/git/config.py @@ -796,6 +796,7 @@ def get_values( :raise TypeError: in case the value could not be understood Otherwise the exceptions known to the ConfigParser will be raised.""" try: + self.sections() lst = self._sections[section].getall(option) except Exception: if default is not None: diff --git a/git/diff.py b/git/diff.py index c4424592f..c1a5bd26f 100644 --- a/git/diff.py +++ b/git/diff.py @@ -144,7 +144,10 @@ def diff( args.append("--abbrev=40") # we need full shas args.append("--full-index") # get full index paths, not only filenames - args.append("-M") # check for renames, in both formats + # remove default '-M' arg (check for renames) if user is overriding it + if not any(x in kwargs for x in ('find_renames', 'no_renames', 'M')): + args.append("-M") + if create_patch: args.append("-p") else: diff --git a/git/index/base.py b/git/index/base.py index 17d18db58..cda08de25 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -982,12 +982,12 @@ def move( Additional arguments you would like to pass to git-mv, such as dry_run or force. - :return:List(tuple(source_path_string, destination_path_string), ...) + :return: List(tuple(source_path_string, destination_path_string), ...) A list of pairs, containing the source file moved as well as its actual destination. Relative to the repository root. :raise ValueError: If only one item was given - GitCommandError: If git could not handle your request""" + :raise GitCommandError: If git could not handle your request""" args = [] if skip_errors: args.append("-k") diff --git a/git/index/fun.py b/git/index/fun.py index 4659ac898..d0925ed51 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -82,6 +82,7 @@ def _has_file_extension(path): def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. + :param name: name of hook, like 'pre-commit' :param index: IndexFile instance :param args: arguments passed to hook file @@ -234,11 +235,13 @@ def read_cache( stream: IO[bytes], ) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: """Read a cache file from the given stream + :return: tuple(version, entries_dict, extension_data, content_sha) - * version is the integer version number - * entries dict is a dictionary which maps IndexEntry instances to a path at a stage - * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes - * content_sha is a 20 byte sha on all cache file contents""" + + * version is the integer version number + * entries dict is a dictionary which maps IndexEntry instances to a path at a stage + * extension_data is '' or 4 bytes of type + 4 bytes of size + size bytes + * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} diff --git a/git/objects/base.py b/git/objects/base.py index 9d0057254..eb9a8ac3d 100644 --- a/git/objects/base.py +++ b/git/objects/base.py @@ -143,6 +143,7 @@ def data_stream(self) -> "OStream": def stream_data(self, ostream: "OStream") -> "Object": """Writes our data directly to the given output stream + :param ostream: File object compatible stream object. :return: self""" istream = self.repo.odb.stream(self.binsha) diff --git a/git/objects/commit.py b/git/objects/commit.py index 82d2387b3..547e8fe82 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -324,14 +324,14 @@ def stats(self) -> Stats: :return: git.Stats""" if not self.parents: - text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True) + text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True) text2 = "" for line in text.splitlines()[1:]: (insertions, deletions, filename) = line.split("\t") text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) text = text2 else: - text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True) + text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True) return Stats._list_from_string(self.repo, text) @property diff --git a/git/objects/fun.py b/git/objects/fun.py index 001e10e47..e91403a8b 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -37,6 +37,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None: """Write the give list of entries into a stream using its write method + :param entries: **sorted** list of tuples with (binsha, mode, name) :param write: write method which takes a data string""" ord_zero = ord("0") @@ -68,6 +69,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer def tree_entries_from_data(data: bytes) -> List[EntryTup]: """Reads the binary representation of a tree and returns tuples of Tree items + :param data: data block with tree data (as bytes) :return: list(tuple(binsha, mode, tree_relative_path), ...)""" ord_zero = ord("0") diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index 9aa9deb27..7db64d705 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -287,7 +287,9 @@ def _clone_repo( :param url: url to clone from :param path: repository - relative path to the submodule checkout location :param name: canonical of the submodule - :param kwrags: additinoal arguments given to git.clone""" + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack + :param kwargs: additional arguments given to git.clone""" module_abspath = cls._module_abspath(repo, path, name) module_checkout_path = module_abspath if cls._need_gitfile_submodules(repo.git): @@ -411,6 +413,8 @@ def add( as its value. :param clone_multi_options: A list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :return: The newly created submodule instance :note: works atomically, such that no change will be done if the repository update fails for instance""" @@ -581,6 +585,8 @@ def update( as its value. :param clone_multi_options: list of Clone options. Please see ``git.repo.base.Repo.clone`` for details. Only take effect with `init` option. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :note: does nothing in bare repositories :note: method is definitely not atomic if recurisve is True :return: self""" diff --git a/git/objects/tree.py b/git/objects/tree.py index b72e88c48..a9b491e23 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -128,6 +128,7 @@ def set_done(self) -> "TreeModifier": """Call this method once you are done modifying the tree information. It may be called several times, but be aware that each call will cause a sort operation + :return self:""" merge_sort(self._cache, git_cmp) return self @@ -175,6 +176,7 @@ def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: """Add the given item to the tree, its correctness is assumed, which puts the caller into responsibility to assure the input is correct. For more information on the parameters, see ``add`` + :param binsha: 20 byte binary sha""" assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) tree_cache = (binsha, mode, name) @@ -259,8 +261,8 @@ def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[ def join(self, file: str) -> IndexObjUnion: """Find the named object in this tree's contents - :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` + :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` :raise KeyError: if given file or tree does not exist in tree""" msg = "Blob or Tree named %r not found" if "/" in file: diff --git a/git/objects/util.py b/git/objects/util.py index 636a58316..af279154c 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -137,21 +137,25 @@ def get_object_type_by_name( def utctz_to_altz(utctz: str) -> int: - """we convert utctz to the timezone in seconds, it is the format time.altzone - returns. Git stores it as UTC timezone which has the opposite sign as well, - which explains the -1 * ( that was made explicit here ) - :param utctz: git utc timezone string, i.e. +0200""" - return -1 * int(float(utctz) / 100 * 3600) - - -def altz_to_utctz_str(altz: float) -> str: - """As above, but inverses the operation, returning a string that can be used - in commit objects""" - utci = -1 * int((float(altz) / 3600) * 100) - utcs = str(abs(utci)) - utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and "-") or "+" - return prefix + utcs + """Convert a git timezone offset into a timezone offset west of + UTC in seconds (compatible with time.altzone). + + :param utctz: git utc timezone string, i.e. +0200 + """ + int_utctz = int(utctz) + seconds = ((abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60) + return seconds if int_utctz < 0 else -seconds + + +def altz_to_utctz_str(altz: int) -> str: + """Convert a timezone offset west of UTC in seconds into a git timezone offset string + + :param altz: timezone offset in seconds west of UTC + """ + hours = abs(altz) // 3600 + minutes = (abs(altz) % 3600) // 60 + sign = "-" if altz >= 60 else "+" + return "{}{:02}{:02}".format(sign, hours, minutes) def verify_utctz(offset: str) -> str: diff --git a/git/refs/log.py b/git/refs/log.py index a5f4de58b..1f86356a4 100644 --- a/git/refs/log.py +++ b/git/refs/log.py @@ -253,6 +253,7 @@ def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry": def to_file(self, filepath: PathLike) -> None: """Write the contents of the reflog instance to a file at the given filepath. + :param filepath: path to file, parent directories are assumed to exist""" lfd = LockedFD(filepath) assure_directory_exists(filepath, is_file=True) @@ -326,6 +327,7 @@ def append_entry( def write(self) -> "RefLog": """Write this instance's data to the file we are originating from + :return: self""" if self._path is None: raise ValueError("Instance was not initialized with a path, use to_file(...) instead") diff --git a/git/refs/reference.py b/git/refs/reference.py index ca43cc430..4f9e3a0a7 100644 --- a/git/refs/reference.py +++ b/git/refs/reference.py @@ -49,8 +49,8 @@ class Reference(SymbolicReference, LazyMixin, IterableObj): def __init__(self, repo: "Repo", path: PathLike, check_path: bool = True) -> None: """Initialize this instance - :param repo: Our parent repository + :param repo: Our parent repository :param path: Path relative to the .git/ directory pointing to the ref in question, i.e. refs/heads/master @@ -73,6 +73,7 @@ def set_object( logmsg: Union[str, None] = None, ) -> "Reference": """Special version which checks if the head-log needs an update as well + :return: self""" oldbinsha = None if logmsg is not None: diff --git a/git/remote.py b/git/remote.py index 4240223e8..5886a69f0 100644 --- a/git/remote.py +++ b/git/remote.py @@ -641,6 +641,7 @@ def set_url( :param new_url: string being the URL to add as an extra remote URL :param old_url: when set, replaces this URL with new_url for the remote + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ if not allow_unsafe_protocols: @@ -660,6 +661,7 @@ def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) multiple URLs for a single remote. :param url: string being the URL to add as an extra remote URL + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :return: self """ return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols) @@ -756,9 +758,11 @@ def stale_refs(self) -> IterableList[Reference]: @classmethod def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote": """Create a new remote to the given repository + :param repo: Repository instance that is to receive the new remote :param name: Desired name of the remote :param url: URL which corresponds to the remote's name + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: Additional arguments to be passed to the git-remote add command :return: New Remote instance :raise GitCommandError: in case an origin with that name already exists""" @@ -778,6 +782,7 @@ def add(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote": @classmethod def remove(cls, repo: "Repo", name: str) -> str: """Remove the remote with the given name + :return: the passed remote name to remove """ repo.git.remote("rm", name) @@ -790,6 +795,7 @@ def remove(cls, repo: "Repo", name: str) -> str: def rename(self, new_name: str) -> "Remote": """Rename self to the given new_name + :return: self""" if self.name == new_name: return self @@ -975,6 +981,8 @@ def fetch( :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-fetch :return: IterableList(FetchInfo, ...) list of FetchInfo instances providing detailed @@ -1021,11 +1029,13 @@ def pull( """Pull changes from the given branch, being the same as a fetch followed by a merge of branch with your local branch. - :param refspec: see 'fetch' method - :param progress: see 'push' method - :param kill_after_timeout: see 'fetch' method + :param refspec: see :meth:`fetch` method + :param progress: see :meth:`push` method + :param kill_after_timeout: see :meth:`fetch` method + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: Additional arguments to be passed to git-pull - :return: Please see 'fetch' method""" + :return: Please see :meth:`fetch` method""" if refspec is None: # No argument refspec, then ensure the repo's config has a fetch refspec. self._assert_refspec() @@ -1074,6 +1084,8 @@ def push( :param kill_after_timeout: To specify a timeout in seconds for the git command, after which the process should be killed. It is set to None by default. + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --receive-pack :param kwargs: Additional arguments to be passed to git-push :return: A ``PushInfoList`` object, where each list member diff --git a/git/repo/base.py b/git/repo/base.py index d4463f1e1..2fc9cf1fe 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -9,6 +9,9 @@ import re import shlex import warnings + +from pathlib import Path + from gitdb.db.loose import LooseObjectDB from gitdb.exc import BadObject @@ -112,7 +115,7 @@ class Repo(object): 'working_dir' is the working directory of the git command, which is the working tree directory if available or the .git directory in case of bare repositories - 'working_tree_dir' is the working tree directory, but will raise AssertionError + 'working_tree_dir' is the working tree directory, but will return None if we are a bare repository. 'git_dir' is the .git repository directory, which is always set.""" @@ -120,9 +123,9 @@ class Repo(object): DAEMON_EXPORT_FILE = "git-daemon-export-ok" git = cast("Git", None) # Must exist, or __del__ will fail in case we raise on `__init__()` - working_dir: Optional[PathLike] = None + working_dir: PathLike _working_tree_dir: Optional[PathLike] = None - git_dir: PathLike = "" + git_dir: PathLike _common_dir: PathLike = "" # precompiled regex @@ -212,13 +215,14 @@ def __init__( ## Walk up the path to find the `.git` dir. # curpath = epath + git_dir = None while curpath: # ABOUT osp.NORMPATH # It's important to normalize the paths, as submodules will otherwise initialize their # repo instances with paths that depend on path-portions that will not exist after being # removed. It's just cleaner. if is_git_dir(curpath): - self.git_dir = curpath + git_dir = curpath # from man git-config : core.worktree # Set the path to the root of the working tree. If GIT_COMMON_DIR environment # variable is set, core.worktree is ignored and not used for determining the @@ -227,9 +231,9 @@ def __init__( # directory, which is either specified by GIT_DIR, or automatically discovered. # If GIT_DIR is specified but none of GIT_WORK_TREE and core.worktree is specified, # the current working directory is regarded as the top level of your working tree. - self._working_tree_dir = os.path.dirname(self.git_dir) + self._working_tree_dir = os.path.dirname(git_dir) if os.environ.get("GIT_COMMON_DIR") is None: - gitconf = self.config_reader("repository") + gitconf = self._config_reader("repository", git_dir) if gitconf.has_option("core", "worktree"): self._working_tree_dir = gitconf.get("core", "worktree") if "GIT_WORK_TREE" in os.environ: @@ -239,14 +243,14 @@ def __init__( dotgit = osp.join(curpath, ".git") sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = osp.normpath(sm_gitpath) + git_dir = osp.normpath(sm_gitpath) sm_gitpath = find_submodule_git_dir(dotgit) if sm_gitpath is None: sm_gitpath = find_worktree_git_dir(dotgit) if sm_gitpath is not None: - self.git_dir = expand_path(sm_gitpath, expand_vars) + git_dir = expand_path(sm_gitpath, expand_vars) self._working_tree_dir = curpath break @@ -257,8 +261,9 @@ def __init__( break # END while curpath - if self.git_dir is None: + if git_dir is None: raise InvalidGitRepositoryError(epath) + self.git_dir = git_dir self._bare = False try: @@ -268,7 +273,7 @@ def __init__( pass try: - common_dir = open(osp.join(self.git_dir, "commondir"), "rt").readlines()[0].strip() + common_dir = (Path(self.git_dir) / "commondir").read_text().splitlines()[0].strip() self._common_dir = osp.join(self.git_dir, common_dir) except OSError: self._common_dir = "" @@ -279,7 +284,7 @@ def __init__( self._working_tree_dir = None # END working dir handling - self.working_dir: Optional[PathLike] = self._working_tree_dir or self.common_dir + self.working_dir: PathLike = self._working_tree_dir or self.common_dir self.git = self.GitCommandWrapperType(self.working_dir) # special handling, in special times @@ -317,7 +322,7 @@ def close(self) -> None: gc.collect() def __eq__(self, rhs: object) -> bool: - if isinstance(rhs, Repo) and self.git_dir: + if isinstance(rhs, Repo): return self.git_dir == rhs.git_dir return False @@ -329,14 +334,12 @@ def __hash__(self) -> int: # Description property def _get_description(self) -> str: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "rb") as fp: return fp.read().rstrip().decode(defenc) def _set_description(self, descr: str) -> None: - if self.git_dir: - filename = osp.join(self.git_dir, "description") + filename = osp.join(self.git_dir, "description") with open(filename, "wb") as fp: fp.write((descr + "\n").encode(defenc)) @@ -354,13 +357,7 @@ def common_dir(self) -> PathLike: """ :return: The git dir that holds everything except possibly HEAD, FETCH_HEAD, ORIG_HEAD, COMMIT_EDITMSG, index, and logs/.""" - if self._common_dir: - return self._common_dir - elif self.git_dir: - return self.git_dir - else: - # or could return "" - raise InvalidGitRepositoryError() + return self._common_dir or self.git_dir @property def bare(self) -> bool: @@ -403,6 +400,7 @@ def head(self) -> "HEAD": @property def remotes(self) -> "IterableList[Remote]": """A list of Remote objects allowing to access and manipulate remotes + :return: ``git.IterableList(Remote, ...)``""" return Remote.list_items(self) @@ -443,6 +441,7 @@ def create_submodule(self, *args: Any, **kwargs: Any) -> Submodule: def iter_submodules(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: """An iterator yielding Submodule instances, see Traversable interface for a description of args and kwargs + :return: Iterator""" return RootModule(self).traverse(*args, **kwargs) @@ -457,6 +456,7 @@ def submodule_update(self, *args: Any, **kwargs: Any) -> Iterator[Submodule]: @property def tags(self) -> "IterableList[TagReference]": """A list of ``Tag`` objects that are available in this repo + :return: ``git.IterableList(TagReference, ...)``""" return TagReference.list_items(self) @@ -526,7 +526,9 @@ def delete_remote(self, remote: "Remote") -> str: """Delete the given remote.""" return Remote.remove(self, remote) - def _get_config_path(self, config_level: Lit_config_levels) -> str: + def _get_config_path(self, config_level: Lit_config_levels, git_dir: Optional[PathLike] = None) -> str: + if git_dir is None: + git_dir = self.git_dir # we do not support an absolute path of the gitconfig on windows , # use the global config instead if is_win and config_level == "system": @@ -540,7 +542,7 @@ def _get_config_path(self, config_level: Lit_config_levels) -> str: elif config_level == "global": return osp.normpath(osp.expanduser("~/.gitconfig")) elif config_level == "repository": - repo_dir = self._common_dir or self.git_dir + repo_dir = self._common_dir or git_dir if not repo_dir: raise NotADirectoryError else: @@ -569,15 +571,21 @@ def config_reader( you know which file you wish to read to prevent reading multiple files. :note: On windows, system configuration cannot currently be read as the path is unknown, instead the global path will be used.""" - files = None + return self._config_reader(config_level=config_level) + + def _config_reader( + self, + config_level: Optional[Lit_config_levels] = None, + git_dir: Optional[PathLike] = None, + ) -> GitConfigParser: if config_level is None: files = [ - self._get_config_path(cast(Lit_config_levels, f)) + self._get_config_path(cast(Lit_config_levels, f), git_dir) for f in self.config_level if cast(Lit_config_levels, f) ] else: - files = [self._get_config_path(config_level)] + files = [self._get_config_path(config_level, git_dir)] return GitConfigParser(files, read_only=True, repo=self) def config_writer(self, config_level: Lit_config_levels = "repository") -> GitConfigParser: @@ -867,8 +875,15 @@ def ignored(self, *paths: PathLike) -> List[str]: """ try: proc: str = self.git.check_ignore(*paths) - except GitCommandError: - return [] + except GitCommandError as err: + # If return code is 1, this means none of the items in *paths + # are ignored by Git, so return an empty list. Raise the + # exception on all other return codes. + if err.status == 1: + return [] + else: + raise + return proc.replace("\\\\", "\\").replace('"', "").split("\n") @property @@ -1256,7 +1271,8 @@ def clone( option per list item which is passed exactly as specified to clone. For example ['--config core.filemode=false', '--config core.ignorecase', '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: * odbt = ObjectDatabase Type, allowing to determine the object database implementation used by the returned Repo instance @@ -1299,7 +1315,8 @@ def clone_from( If you want to unset some variable, consider providing empty string as its value. :param multi_options: See ``clone`` method - :param unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_protocols: Allow unsafe protocols to be used, like ext + :param allow_unsafe_options: Allow unsafe options to be used, like --upload-pack :param kwargs: see the ``clone`` method :return: Repo instance pointing to the cloned directory""" git = cls.GitCommandWrapperType(os.getcwd()) @@ -1380,4 +1397,6 @@ def currently_rebasing_on(self) -> Commit | None: rebase_head_file = osp.join(self.git_dir, "REBASE_HEAD") if not osp.isfile(rebase_head_file): return None - return self.commit(open(rebase_head_file, "rt").readline().strip()) + with open(rebase_head_file, "rt") as f: + content = f.readline().strip() + return self.commit(content) diff --git a/git/repo/fun.py b/git/repo/fun.py index 2ca2e3d6f..ae35aa81e 100644 --- a/git/repo/fun.py +++ b/git/repo/fun.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import stat +from pathlib import Path from string import digits from git.exc import WorkTreeRepositoryUnsupported @@ -83,7 +84,7 @@ def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]: return None try: - lines = open(dotgit, "r").readlines() + lines = Path(dotgit).read_text().splitlines() for key, value in [line.strip().split(": ") for line in lines]: if key == "gitdir": return value diff --git a/git/util.py b/git/util.py index 6a4a65579..30028b1c2 100644 --- a/git/util.py +++ b/git/util.py @@ -131,7 +131,7 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: - """Methods with this decorator raise InvalidGitRepositoryError if they + """Methods with this decorator raise :class:`.exc.InvalidGitRepositoryError` if they encounter a bare repository""" from .exc import InvalidGitRepositoryError @@ -1152,7 +1152,7 @@ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: :note: Favor the iter_items method as it will - :return:list(Item,...) list of item instances""" + :return: list(Item,...) list of item instances""" out_list: Any = IterableList(cls._id_attribute_) out_list.extend(cls.iter_items(repo, *args, **kwargs)) return out_list @@ -1184,7 +1184,7 @@ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_I :note: Favor the iter_items method as it will - :return:list(Item,...) list of item instances""" + :return: list(Item,...) list of item instances""" out_list: IterableList = IterableList(cls._id_attribute_) out_list.extend(cls.iter_items(repo, *args, **kwargs)) return out_list diff --git a/setup.py b/setup.py index daad454d8..81ae0132d 100755 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ def make_release_tree(self, base_dir: str, files: Sequence) -> None: def _stamp_version(filename: str) -> None: found, out = False, [] try: - with open(filename, "r") as f: + with open(filename) as f: for line in f: if "__version__ =" in line: line = line.replace("\"git\"", "'%s'" % VERSION) @@ -82,7 +82,7 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: name="GitPython", cmdclass={"build_py": build_py, "sdist": sdist}, version=VERSION, - description="""GitPython is a python library used to interact with Git repositories""", + description="GitPython is a Python library used to interact with Git repositories", author="Sebastian Thiel, Michael Trier", author_email="byronimo@gmail.com, mtrier@gmail.com", license="BSD", @@ -95,7 +95,7 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: install_requires=requirements, tests_require=requirements + test_requirements, zip_safe=False, - long_description="""GitPython is a python library used to interact with Git repositories""", + long_description="""GitPython is a Python library used to interact with Git repositories""", long_description_content_type="text/markdown", classifiers=[ # Picked from @@ -121,5 +121,6 @@ def build_py_modules(basedir: str, excludes: Sequence = ()) -> Sequence: "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ], ) diff --git a/test-requirements.txt b/test-requirements.txt index 6549f0fa0..6c6d57060 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,10 +3,7 @@ mypy black -flake8 -flake8-bugbear -flake8-comprehensions -flake8-typing-imports +pre-commit virtualenv diff --git a/test/test_base.py b/test/test_base.py index ccfdc8ed3..30029367d 100644 --- a/test/test_base.py +++ b/test/test_base.py @@ -9,6 +9,7 @@ import tempfile from unittest import SkipTest, skipIf +from git import Repo from git.objects import Blob, Tree, Commit, TagObject from git.compat import is_win from git.objects.util import get_object_type_by_name @@ -95,14 +96,18 @@ def test_object_resolution(self): self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object) @with_rw_repo("HEAD", bare=True) - def test_with_bare_rw_repo(self, bare_rw_repo): + def test_with_bare_rw_repo(self, bare_rw_repo: Repo): assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") assert osp.isfile(osp.join(bare_rw_repo.git_dir, "HEAD")) + assert osp.isdir(bare_rw_repo.working_dir) + assert bare_rw_repo.working_tree_dir is None @with_rw_repo("0.1.6") - def test_with_rw_repo(self, rw_repo): + def test_with_rw_repo(self, rw_repo: Repo): assert not rw_repo.config_reader("repository").getboolean("core", "bare") + assert osp.isdir(rw_repo.working_tree_dir) assert osp.isdir(osp.join(rw_repo.working_tree_dir, "lib")) + assert osp.isdir(rw_repo.working_dir) @skipIf(HIDE_WINDOWS_FREEZE_ERRORS, "FIXME: Freezes! sometimes...") @with_rw_and_rw_remote_repo("0.1.6") diff --git a/test/test_commit.py b/test/test_commit.py index c5a43c94a..1efc68897 100644 --- a/test/test_commit.py +++ b/test/test_commit.py @@ -159,6 +159,37 @@ def check_entries(d): self.assertEqual(commit.committer_tz_offset, 14400, commit.committer_tz_offset) self.assertEqual(commit.message, "initial project\n") + def test_renames(self): + commit = self.rorepo.commit("185d847ec7647fd2642a82d9205fb3d07ea71715") + files = commit.stats.files + + # when a file is renamed, the output of git diff is like "dir/{old => new}" + # unless we disable rename with --no-renames, which produces two lines + # one with the old path deletes and another with the new added + self.assertEqual(len(files), 2) + + def check_entries(path, changes): + expected = { + ".github/workflows/Future.yml" : { + 'insertions': 57, + 'deletions': 0, + 'lines': 57 + }, + ".github/workflows/test_pytest.yml" : { + 'insertions': 0, + 'deletions': 55, + 'lines': 55 + }, + } + assert path in expected + assert isinstance(changes, dict) + for key in ("insertions", "deletions", "lines"): + assert changes[key] == expected[path][key] + + for path, changes in files.items(): + check_entries(path, changes) + # END for each stated file + def test_unicode_actor(self): # assure we can parse unicode actors correctly name = "Üäöß ÄußÉ" diff --git a/test/test_config.py b/test/test_config.py index 8bb2aa306..b159ebe2d 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -398,6 +398,17 @@ def test_empty_config_value(self): with self.assertRaises(cp.NoOptionError): cr.get_value("color", "ui") + def test_get_values_works_without_requiring_any_other_calls_first(self): + file_obj = self._to_memcache(fixture_path("git_config_multiple")) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section0", "option0"), ["value0"]) + file_obj.seek(0) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section1", "option1"), ["value1a", "value1b"]) + file_obj.seek(0) + cr = GitConfigParser(file_obj, read_only=True) + self.assertEqual(cr.get_values("section1", "other_option1"), ["other_value1"]) + def test_multiple_values(self): file_obj = self._to_memcache(fixture_path("git_config_multiple")) with GitConfigParser(file_obj, read_only=False) as cw: diff --git a/test/test_diff.py b/test/test_diff.py index 7065f0635..504337744 100644 --- a/test/test_diff.py +++ b/test/test_diff.py @@ -411,3 +411,73 @@ def test_diff_interface(self): cp = c.parents[0] diff_index = c.diff(cp, ["does/not/exist"]) self.assertEqual(len(diff_index), 0) + + @with_rw_directory + def test_rename_override(self, rw_dir): + """Test disabling of diff rename detection""" + + # create and commit file_a.txt + repo = Repo.init(rw_dir) + file_a = osp.join(rw_dir, "file_a.txt") + with open(file_a, "w", encoding='utf-8') as outfile: + outfile.write("hello world\n") + repo.git.add(Git.polish_url(file_a)) + repo.git.commit(message="Added file_a.txt") + + # remove file_a.txt + repo.git.rm(Git.polish_url(file_a)) + + # create and commit file_b.txt with similarity index of 52 + file_b = osp.join(rw_dir, "file_b.txt") + with open(file_b, "w", encoding='utf-8') as outfile: + outfile.write("hello world\nhello world") + repo.git.add(Git.polish_url(file_b)) + repo.git.commit(message="Removed file_a.txt. Added file_b.txt") + + commit_a = repo.commit('HEAD') + commit_b = repo.commit('HEAD~1') + + # check default diff command with renamed files enabled + diffs = commit_b.diff(commit_a) + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + + # check diff with rename files disabled + diffs = commit_b.diff(commit_a, no_renames=True) + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with high similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='75%') + self.assertEqual(2, len(diffs)) + + # check fileA.txt deleted + diff = diffs[0] + self.assertEqual(True, diff.deleted_file) + self.assertEqual('file_a.txt', diff.a_path) + + # check fileB.txt added + diff = diffs[1] + self.assertEqual(True, diff.new_file) + self.assertEqual('file_b.txt', diff.a_path) + + # check diff with low similarity index + diffs = commit_b.diff(commit_a, split_single_char_options=False, M='40%') + self.assertEqual(1, len(diffs)) + diff = diffs[0] + self.assertEqual(True, diff.renamed_file) + self.assertEqual('file_a.txt', diff.rename_from) + self.assertEqual('file_b.txt', diff.rename_to) + diff --git a/test/test_git.py b/test/test_git.py index e7d236deb..c5d871f08 100644 --- a/test/test_git.py +++ b/test/test_git.py @@ -169,7 +169,7 @@ def test_refresh(self): self.assertRaises(GitCommandNotFound, refresh, "yada") # test a good path refresh - which_cmd = "where" if is_win else "which" + which_cmd = "where" if is_win else "command -v" path = os.popen("{0} git".format(which_cmd)).read().strip().split("\n")[0] refresh(path) diff --git a/test/test_remote.py b/test/test_remote.py index 3a47afab5..9636ca486 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -694,259 +694,279 @@ def test_push_error(self, repo): @with_rw_repo("HEAD") def test_set_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.set_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.set_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_set_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.set_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.set_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.add_url(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.add_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - remote.add_url(url, allow_unsafe_protocols=True) - assert list(remote.urls)[-1] == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + remote.add_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Remote.create(rw_repo, "origin", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Remote.create(rw_repo, "origin", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for i, url in enumerate(urls): - remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) - assert remote.url == url - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for i, url in enumerate(urls): + remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) + assert remote.url == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.fetch(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.fetch(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.fetch(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.fetch(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.fetch(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.fetch(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.fetch(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.fetch(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_pull_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.pull(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.pull(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.pull(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.pull(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - remote.pull(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.pull(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.pull(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.pull(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_push_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - remote.push(url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.push(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - remote = rw_repo.remote("origin") - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - remote.push(url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + remote = rw_repo.remote("origin") + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.push(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - assert not tmp_file.exists() - with self.assertRaises(UnsafeOptionError): - remote.push(**unsafe_option) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + assert not tmp_file.exists() + with self.assertRaises(UnsafeOptionError): + remote.push(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options_allowed(self, rw_repo): - remote = rw_repo.remote("origin") - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - { - "receive-pack": f"touch {tmp_file}", - "exec": f"touch {tmp_file}", - } - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - assert not tmp_file.exists() - with self.assertRaises(GitCommandError): - remote.push(**unsafe_option, allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() + with tempfile.TemporaryDirectory() as tdir: + remote = rw_repo.remote("origin") + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + remote.push(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() class TestTimeouts(TestBase): diff --git a/test/test_repo.py b/test/test_repo.py index 5874dbe6a..07c1e9adf 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -268,143 +268,176 @@ def test_leaking_password_in_clone_logs(self, rw_dir): @with_rw_repo("HEAD") def test_clone_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + assert destination.exists() @with_rw_repo("HEAD") def test_clone_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - rw_repo.clone(destination, multi_options=[option]) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + rw_repo.clone(destination, multi_options=[option]) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options_allowed(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not tmp_file.exists() - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Repo.clone_from( - rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert tmp_file.exists() - tmp_file.unlink() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for i, unsafe_option in enumerate(unsafe_options): - destination = tmp_dir / str(i) - assert not destination.exists() - Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) - assert destination.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not tmp_file.exists() + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Repo.clone_from( + rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert tmp_file.exists() + tmp_file.unlink() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() @with_rw_repo("HEAD") def test_clone_from_safe_options(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - options = [ - "--depth=1", - "--single-branch", - "-q", - ] - for option in options: - destination = tmp_dir / option - assert not destination.exists() - Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) - assert destination.exists() - - def test_clone_from_unsafe_procol(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::17/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Repo.clone_from(url, tmp_dir) - assert not tmp_file.exists() - - def test_clone_from_unsafe_procol_allowed(self): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - "ext::sh -c touch% /tmp/pwn", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) + assert destination.exists() + + def test_clone_from_unsafe_protocol(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Repo.clone_from(url, tmp_dir / "repo") + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Repo.clone_from(url, tmp_dir / "repo", allow_unsafe_protocols=True) + assert not tmp_file.exists() + + def test_clone_from_unsafe_protocol_allowed_and_enabled(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + ] + allow_ext = [ + "--config=protocol.ext.allow=always", + ] + for url in urls: + # The URL will be allowed into the command, and the protocol is enabled, + # but the command will fail since it can't read from the remote repo. + assert not tmp_file.exists() + with self.assertRaises(GitCommandError): + Repo.clone_from( + url, + tmp_dir / "repo", + multi_options=allow_ext, + allow_unsafe_protocols=True, + allow_unsafe_options=True, + ) + assert tmp_file.exists() + tmp_file.unlink() @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): @@ -1326,26 +1359,55 @@ def test_do_not_strip_newline_in_stdout(self, rw_dir): @with_rw_repo("HEAD") def test_clone_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - unexpected_file = tmp_dir / "pwn" - assert not unexpected_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + unexpected_file = tmp_dir / "pwn" + assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - rw_repo.clone(payload) + payload = f"--upload-pack=touch {unexpected_file}" + rw_repo.clone(payload) - assert not unexpected_file.exists() - # A repo was cloned with the payload as name - assert pathlib.Path(payload).exists() + assert not unexpected_file.exists() + # A repo was cloned with the payload as name + assert pathlib.Path(payload).exists() @with_rw_repo("HEAD") def test_clone_from_command_injection(self, rw_repo): - tmp_dir = pathlib.Path(tempfile.mkdtemp()) - temp_repo = Repo.init(tmp_dir / "repo") - unexpected_file = tmp_dir / "pwn" + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + unexpected_file = tmp_dir / "pwn" + + assert not unexpected_file.exists() + payload = f"--upload-pack=touch {unexpected_file}" + with self.assertRaises(GitCommandError): + rw_repo.clone_from(payload, temp_repo.common_dir) + + assert not unexpected_file.exists() + + def test_ignored_items_reported(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") + + gi = tmp_dir / "repo" / ".gitignore" + + with open(gi, 'w') as file: + file.write('ignored_file.txt\n') + file.write('ignored_dir/\n') + + assert temp_repo.ignored(['included_file.txt', 'included_dir/file.txt']) == [] + assert temp_repo.ignored(['ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt']) == ['ignored_file.txt'] + assert temp_repo.ignored(['included_file.txt', 'ignored_file.txt', 'included_dir/file.txt', 'ignored_dir/file.txt']) == ['ignored_file.txt', 'ignored_dir/file.txt'] + + def test_ignored_raises_error_w_symlink(self): + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = pathlib.Path(tdir) + temp_repo = Repo.init(tmp_dir / "repo") - assert not unexpected_file.exists() - payload = f"--upload-pack=touch {unexpected_file}" - with self.assertRaises(GitCommandError): - rw_repo.clone_from(payload, temp_repo.common_dir) + os.mkdir(tmp_dir / "target") + os.symlink(tmp_dir / "target", tmp_dir / "symlink") - assert not unexpected_file.exists() + with pytest.raises(GitCommandError): + temp_repo.ignored(tmp_dir / "symlink/file.txt") \ No newline at end of file diff --git a/test/test_submodule.py b/test/test_submodule.py index 13878df28..982226411 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1101,139 +1101,147 @@ def test_add_no_clone_multi_options_argument(self, rwdir): @with_rw_repo("HEAD") def test_submodule_add_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - with self.assertRaises(UnsafeProtocolError): - Submodule.add(rw_repo, "new", "new", url) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Submodule.add(rw_repo, "new", "new", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - Submodule.add( - rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True - ) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) @with_rw_repo("HEAD") def test_submodule_update_unsafe_url(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - with self.assertRaises(UnsafeProtocolError): - submodule.update() - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + with self.assertRaises(UnsafeProtocolError): + submodule.update() + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_url_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - urls = [ - f"ext::sh -c touch% {tmp_file}", - "fd::/foo", - ] - for url in urls: - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) - # The URL will be allowed into the command, but the command will - # fail since we don't have that protocol enabled in the Git config file. - with self.assertRaises(GitCommandError): - submodule.update(allow_unsafe_protocols=True) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + urls = [ + f"ext::sh -c touch% {tmp_file}", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + submodule.update(allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(UnsafeOptionError): - submodule.update(clone_multi_options=[unsafe_option]) - assert not tmp_file.exists() + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + submodule.update(clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options_allowed(self, rw_repo): - tmp_dir = Path(tempfile.mkdtemp()) - tmp_file = tmp_dir / "pwn" - unsafe_options = [ - f"--upload-pack='touch {tmp_file}'", - f"-u 'touch {tmp_file}'", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - # The options will be allowed, but the command will fail. - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) - assert not tmp_file.exists() - - unsafe_options = [ - "--config=protocol.ext.allow=always", - "-c protocol.ext.allow=always", - ] - submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) - for unsafe_option in unsafe_options: - with self.assertRaises(GitCommandError): - submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + with tempfile.TemporaryDirectory() as tdir: + tmp_dir = Path(tdir) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + assert not tmp_file.exists() + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) diff --git a/test/test_util.py b/test/test_util.py index 90dd89a91..c17efce35 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -333,6 +333,27 @@ def test_iterable_list(self, case): self.assertRaises(IndexError, ilist.__delitem__, 0) self.assertRaises(IndexError, ilist.__delitem__, "something") + def test_utctz_to_altz(self): + self.assertEqual(utctz_to_altz("+0000"), 0) + self.assertEqual(utctz_to_altz("+1400"), -(14 * 3600)) + self.assertEqual(utctz_to_altz("-1200"), 12 * 3600) + self.assertEqual(utctz_to_altz("+0001"), -60) + self.assertEqual(utctz_to_altz("+0530"), -(5 * 3600 + 1800)) + self.assertEqual(utctz_to_altz("-0930"), 9 * 3600 + 1800) + + def test_altz_to_utctz_str(self): + self.assertEqual(altz_to_utctz_str(0), "+0000") + self.assertEqual(altz_to_utctz_str(-(14 * 3600)), "+1400") + self.assertEqual(altz_to_utctz_str(12 * 3600), "-1200") + self.assertEqual(altz_to_utctz_str(-60), "+0001") + self.assertEqual(altz_to_utctz_str(-(5 * 3600 + 1800)), "+0530") + self.assertEqual(altz_to_utctz_str(9 * 3600 + 1800), "-0930") + + self.assertEqual(altz_to_utctz_str(1), "+0000") + self.assertEqual(altz_to_utctz_str(59), "+0000") + self.assertEqual(altz_to_utctz_str(-1), "+0000") + self.assertEqual(altz_to_utctz_str(-59), "+0000") + def test_from_timestamp(self): # Correct offset: UTC+2, should return datetime + tzoffset(+2) altz = utctz_to_altz("+0200")