Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 92 additions & 16 deletions tagbot/action/git.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import subprocess

from datetime import datetime
from datetime import datetime, timezone
from tempfile import mkdtemp
from typing import Optional, cast
from urllib.parse import urlparse
Expand All @@ -10,6 +10,63 @@
from . import Abort


def parse_git_datetime(
date_str: str, _depth: int = 0, _max_depth: int = 2
) -> Optional[datetime]:
"""Parse Git date output into a naive UTC datetime.

Handles common Git formats and normalizes timezone offsets.
Returns None if parsing fails.
"""

def normalize_offset(s: str) -> str:
match = re.search(r"([+-]\d{2})(:?)(\d{2})$", s)
if match and not match.group(2):
# Build prefix separately to avoid an overly long formatted line
prefix = s[: -len(match.group(0))]
return f"{prefix}{match.group(1)}:{match.group(3)}"
return s

cleaned = date_str.strip()
attempts = [cleaned, normalize_offset(cleaned)]
formats = ["%Y-%m-%d %H:%M:%S %z", "%a %b %d %H:%M:%S %Y %z"]

for candidate in attempts:
try:
dt = datetime.fromisoformat(candidate)
except ValueError:
dt = None
if dt:
offset = dt.utcoffset()
if offset:
dt -= offset
return dt.replace(tzinfo=None)
for fmt in formats:
try:
dt = datetime.strptime(candidate, fmt)
except ValueError:
continue
return dt.astimezone(timezone.utc).replace(tzinfo=None)

match = re.search(
r"(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[+-]\d{2}:?\d{2})",
cleaned,
)
if match:
candidate = normalize_offset(match.group(1))
# Prevent infinite recursion: only recurse if normalization changed
# the matched string and we haven't exceeded a small depth cap.
if (
candidate != match.group(1)
and candidate != date_str
and _depth < _max_depth
):
return parse_git_datetime(candidate, _depth + 1, _max_depth)
return None

return None


class Git:
"""Provides access to a local Git repository."""

Expand Down Expand Up @@ -70,18 +127,36 @@ def command(self, *argv: str, repo: Optional[str] = "") -> str:
proc = subprocess.run(args, text=True, capture_output=True)
out = proc.stdout.strip()
if proc.returncode:
error_msg = f"Git command '{self._sanitize_command(cmd)}' failed"
err = proc.stderr.strip()
if out:
stdout = self._sanitize_command(out)
logger.error(f"stdout: {stdout}")
error_msg += f"\nstdout: {stdout}"
if proc.stderr:
stderr = self._sanitize_command(proc.stderr.strip())
logger.error(f"stderr: {stderr}")
error_msg += f"\nstderr: {stderr}"
raise Abort(error_msg)
logger.error(f"stdout: {self._sanitize_command(out)}")
if err:
logger.error(f"stderr: {self._sanitize_command(err)}")

detail = err or out
hint = self._hint_for_failure(detail)
message = f"Git command '{self._sanitize_command(cmd)}' failed"
if detail:
message = f"{message}: {self._sanitize_command(detail)}"
if hint:
message = f"{message} ({hint})"
raise Abort(message)
return out

def _hint_for_failure(self, detail: str) -> Optional[str]:
"""Return a user-facing hint for common git errors."""
lowered = detail.casefold()
if "permission to" in lowered and "denied" in lowered:
return "use a PAT with contents:write or a deploy key"
if "workflow" in lowered or "workflows" in lowered:
if "refusing" in lowered or "permission" in lowered:
return "provide workflow scope or avoid workflow changes"
if "publickey" in lowered or "permission denied (publickey)" in lowered:
return "configure SSH deploy key or switch to https with PAT"
if "bad credentials" in lowered or "authentication failed" in lowered:
return "token is invalid or lacks access"
return None
Comment on lines +146 to +158
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new _hint_for_failure method has multiple conditional branches for different error scenarios (permission denied, publickey, bad credentials), but the test suite only covers the workflow permission scenario. Consider adding tests for the other hint branches to ensure comprehensive test coverage.

Copilot uses AI. Check for mistakes.

def check(self, *argv: str, repo: Optional[str] = "") -> bool:
"""Run a Git command, but only return its success status."""
try:
Expand Down Expand Up @@ -177,9 +252,10 @@ def time_of_commit(self, sha: str, repo: str = "") -> datetime:
"""Get the time that a commit was made."""
# The format %cI is "committer date, strict ISO 8601 format".
date = self.command("show", "-s", "--format=%cI", sha, repo=repo)
dt = datetime.fromisoformat(date)
# Convert to UTC and remove time zone information.
offset = dt.utcoffset()
if offset:
dt -= offset
return dt.replace(tzinfo=None)
parsed = parse_git_datetime(date)
if not parsed:
logger.warning(
"Could not parse git date '%s', using current UTC", date.strip()
)
return datetime.now(timezone.utc).replace(tzinfo=None)
return parsed
62 changes: 46 additions & 16 deletions tagbot/action/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from .. import logger
from . import TAGBOT_WEB, Abort, InvalidProject
from .changelog import Changelog
from .git import Git
from .git import Git, parse_git_datetime

GitlabClient: Any = None
GitlabUnknown: Any = None
Expand Down Expand Up @@ -809,12 +809,10 @@ def _build_commit_datetime_cache(self, shas: List[str]) -> None:
if len(parts) == 2:
commit_sha, iso_date = parts
if commit_sha in sha_set:
# Parse ISO 8601 date and convert to UTC without timezone
dt = datetime.fromisoformat(iso_date)
offset = dt.utcoffset()
if offset:
dt = dt - offset
dt = dt.replace(tzinfo=None)
dt = parse_git_datetime(iso_date)
if not dt:
logger.debug("Could not parse git log date '%s'", iso_date)
continue
self.__commit_datetimes[commit_sha] = dt
found += 1
if found >= len(uncached):
Expand Down Expand Up @@ -1480,15 +1478,40 @@ def create_release(self, version: str, sha: str, is_latest: bool = True) -> None
# Use make_latest=False for backfilled old releases to avoid marking them
# as the "Latest" release on GitHub
make_latest_str = "true" if is_latest else "false"
self._repo.create_git_release(
version_tag,
version_tag,
log,
target_commitish=target,
draft=self._draft,
make_latest=make_latest_str,
generate_release_notes=(self._changelog_format == "github"),
)

def _release_already_exists(exc: GithubException) -> bool:
data = getattr(exc, "data", {}) or {}
for err in data.get("errors", []):
if isinstance(err, dict) and err.get("code") == "already_exists":
return True
return "already exists" in str(exc)

try:
self._repo.create_git_release(
version_tag,
version_tag,
log,
target_commitish=target,
draft=self._draft,
make_latest=make_latest_str,
generate_release_notes=(self._changelog_format == "github"),
)
except GithubException as e:
if e.status == 422 and _release_already_exists(e):
logger.info(f"Release for tag {version_tag} already exists, skipping")
return
elif e.status == 403 and "resource not accessible" in str(e).lower():
logger.error(
"Release creation blocked: token lacks required permissions. "
"Use a PAT with contents:write (and workflows if tagging "
"workflow changes)."
)
elif e.status == 401:
logger.error(
"Release creation failed: bad credentials. Refresh the token or "
"use a PAT with repo scope."
)
raise
logger.info(f"GitHub release {version_tag} created successfully")

def _check_rate_limit(self) -> None:
Expand Down Expand Up @@ -1523,6 +1546,13 @@ def handle_error(self, e: Exception, *, raise_abort: bool = True) -> None:
logger.warning("GitHub returned a 5xx error code")
logger.info(trace)
allowed = True
elif e.status == 401:
logger.error(
"GitHub returned 401 Bad credentials. Verify that your token "
"is valid and has access to the repository and registry."
)
internal = False
allowed = False
elif e.status == 403:
self._check_rate_limit()
logger.error(
Expand Down
33 changes: 22 additions & 11 deletions test/action/test_changelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import textwrap

from datetime import datetime, timedelta, timezone
from unittest.mock import Mock
from unittest.mock import Mock, patch

import yaml

Expand All @@ -12,7 +12,12 @@
from tagbot.action.repo import Repo


def _changelog(*, template="", ignore=set(), subdir=None):
@patch("tagbot.action.repo.Github")
def _changelog(mock_gh, *, template="", ignore=set(), subdir=None):
mock_gh_instance = Mock()
mock_gh.return_value = mock_gh_instance
mock_repo = Mock()
mock_gh_instance.get_repo.return_value = mock_repo
r = Repo(
repo="",
registry="",
Expand All @@ -31,6 +36,10 @@ def _changelog(*, template="", ignore=set(), subdir=None):
branch=None,
subdir=subdir,
)
# Mock get_all_tags to return empty list (tests override as needed)
r.get_all_tags = Mock(return_value=[])
# Mock _build_tags_cache to return empty dict
r._build_tags_cache = Mock(return_value={})
return r._changelog


Expand All @@ -41,12 +50,16 @@ def test_slug():

def test_previous_release():
c = _changelog()
tags = ["ignore", "v1.2.4-ignore", "v1.2.3", "v1.2.2", "v1.0.2", "v1.0.10"]
tags = [
"ignore",
"v1.2.4-ignore",
"v1.2.3",
"v1.2.2",
"v1.0.2",
"v1.0.10",
]
c._repo.get_all_tags = Mock(return_value=tags)
# Mock get_release to return a minimal release-like object
c._repo._repo.get_release = Mock(
side_effect=lambda tag: type("obj", (object,), {"tag_name": tag})()
)
c._repo._repo.get_release = Mock(side_effect=lambda t: Mock(tag_name=t))
assert c._previous_release("v1.0.0") is None
assert c._previous_release("v1.0.2") is None
rel = c._previous_release("v1.2.5")
Expand Down Expand Up @@ -95,10 +108,7 @@ def test_previous_release_subdir():
"Foo-v2.0.0",
]
c._repo.get_all_tags = Mock(return_value=tags)
# Mock get_release to return a minimal release-like object
c._repo._repo.get_release = Mock(
side_effect=lambda tag: type("obj", (object,), {"tag_name": tag})()
)
c._repo._repo.get_release = Mock(side_effect=lambda t: Mock(tag_name=t))
assert c._previous_release("Foo-v1.0.0") is None
assert c._previous_release("Foo-v1.0.2") is None
rel = c._previous_release("Foo-v1.2.5")
Expand Down Expand Up @@ -249,6 +259,7 @@ def test_collect_data():
c = _changelog()
c._repo._repo = Mock(full_name="A/B.jl", html_url="https://github.com/A/B.jl")
c._repo._project = Mock(return_value="B")
c._repo.is_version_yanked = Mock(return_value=False)
c._previous_release = Mock(
side_effect=[
Mock(tag_name="v1.2.2", created_at=datetime.now(timezone.utc)),
Expand Down
Loading