Initial Commit

This commit is contained in:
FanaticPythoner (Nathan Trudeau)
2026-04-27 15:56:43 -04:00
parent 81b23fb2b2
commit 0c159e91fb
25 changed files with 5729 additions and 0 deletions

976
tests/test_forge_auth.py Executable file
View File

@@ -0,0 +1,976 @@
"""Unit tests for scripts/forge_auth.py.
Pure-logic tests (no network, no browser). The full PKCE flow is
covered by tests/test_forge_auth_integration.sh, which stands up a
local mock Gitea OIDC server and drives the CLI end-to-end.
Run with: python3 -m unittest tests.test_forge_auth
"""
from __future__ import annotations
import base64
import hashlib
import io
import json
import os
import sys
import tempfile
import time
import unittest
from pathlib import Path
from unittest import mock
from urllib.parse import urlparse
HERE = Path(__file__).resolve().parent
ROOT = HERE.parent
sys.path.insert(0, str(ROOT / "scripts"))
import forge_auth as fa # noqa: E402
# --------------------------------------------------------------------
# PKCE primitives
# --------------------------------------------------------------------
class PkcePairTests(unittest.TestCase):
def test_verifier_length_in_range(self) -> None:
v, _ = fa.pkce_pair()
# RFC 7636: 43 <= len(verifier) <= 128
self.assertGreaterEqual(len(v), 43)
self.assertLessEqual(len(v), 128)
def test_challenge_is_correct_s256_encoding(self) -> None:
v, c = fa.pkce_pair()
expected = (
base64.urlsafe_b64encode(hashlib.sha256(v.encode("ascii")).digest())
.rstrip(b"=")
.decode("ascii")
)
self.assertEqual(c, expected)
def test_pairs_are_unique(self) -> None:
pairs = {fa.pkce_pair() for _ in range(10)}
self.assertEqual(len(pairs), 10)
class StateSigningTests(unittest.TestCase):
def setUp(self) -> None:
self.key = b"\x01" * 32
def test_roundtrip(self) -> None:
nonce = "abc.123" # dot in nonce must still round-trip (rpartition)
signed = fa.sign_state(self.key, nonce)
self.assertEqual(fa.verify_state(self.key, signed), nonce)
def test_tampered_mac_rejected(self) -> None:
signed = fa.sign_state(self.key, "n1")
tampered = signed[:-1] + ("0" if signed[-1] != "0" else "1")
with self.assertRaises(fa.AuthError):
fa.verify_state(self.key, tampered)
def test_wrong_key_rejected(self) -> None:
signed = fa.sign_state(self.key, "n1")
with self.assertRaises(fa.AuthError):
fa.verify_state(b"\x02" * 32, signed)
def test_missing_separator_raises(self) -> None:
with self.assertRaises(fa.AuthError):
fa.verify_state(self.key, "nosignaturehere")
# --------------------------------------------------------------------
# ForgeAuthConfig.from_env
# --------------------------------------------------------------------
class ConfigFromEnvTests(unittest.TestCase):
def _env(self, **overrides: str) -> dict[str, str]:
env = {
"FORGE_GITEA_URL": "https://gitea.example.com",
"FSDGG_CLI_CLIENT_ID": "my-client-id",
"FSDGG_CLI_REDIRECT_URI": "http://127.0.0.1:38111/callback",
}
env.update(overrides)
return env
def test_happy_path(self) -> None:
with mock.patch.dict(os.environ, self._env(), clear=True):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
self.assertEqual(cfg.client_id, "my-client-id")
self.assertEqual(cfg.redirect_uri, "http://127.0.0.1:38111/callback")
self.assertFalse(cfg.insecure_tls)
def test_trailing_slash_stripped(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FORGE_GITEA_URL="https://gitea.example.com/"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
def test_localhost_redirect_allowed(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://localhost:45000/cb"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.redirect_uri, "http://localhost:45000/cb")
def test_ipv6_loopback_redirect_allowed(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://[::1]:45000/cb"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.redirect_uri, "http://[::1]:45000/cb")
def test_non_loopback_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="https://example.com/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
self.assertIn("RFC 8252", str(ctx.exception))
def test_https_loopback_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="https://127.0.0.1:38111/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
self.assertIn("RFC 8252", str(ctx.exception))
def test_missing_port_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://127.0.0.1/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError):
fa.ForgeAuthConfig.from_env()
def test_missing_required_env_var_lists_all_missing(self) -> None:
env = self._env()
env["FORGE_GITEA_URL"] = ""
env["FSDGG_CLI_CLIENT_ID"] = ""
with mock.patch.dict(os.environ, env, clear=True):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
msg = str(ctx.exception)
self.assertIn("FORGE_GITEA_URL", msg)
self.assertIn("FSDGG_CLI_CLIENT_ID", msg)
def test_expected_username_read_from_env(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FORGE_GITEA_USERNAME="CVJMAllaire"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.expected_username, "CVJMAllaire")
def test_expected_username_defaults_to_empty(self) -> None:
with mock.patch.dict(os.environ, self._env(), clear=True):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.expected_username, "")
def test_insecure_tls_flag(self) -> None:
with mock.patch.dict(
os.environ, self._env(FORGE_INSECURE_TLS="1"), clear=True
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertTrue(cfg.insecure_tls)
# --------------------------------------------------------------------
# build_authorize_url
# --------------------------------------------------------------------
class BuildAuthorizeUrlTests(unittest.TestCase):
def setUp(self) -> None:
self.cfg = fa.ForgeAuthConfig(
gitea_base_url="https://g.example",
client_id="client-1",
redirect_uri="http://127.0.0.1:38111/callback",
)
self.endpoints = {
"authorization_endpoint": "https://g.example/login/oauth/authorize",
"token_endpoint": "https://g.example/login/oauth/access_token",
"userinfo_endpoint": "https://g.example/login/oauth/userinfo",
}
def test_url_contains_all_required_parameters(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c123", state="s123"
)
self.assertTrue(url.startswith("https://g.example/login/oauth/authorize?"))
# All required PKCE + OAuth2 params
for token in (
"response_type=code",
"client_id=client-1",
"code_challenge=c123",
"code_challenge_method=S256",
"state=s123",
"redirect_uri=http%3A%2F%2F127.0.0.1%3A38111%2Fcallback",
):
self.assertIn(token, url)
def test_url_always_includes_prompt_login(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
# prompt=login forces Gitea to display the login page even when a
# session is already active: prevents silent auth under the
# wrong identity.
self.assertIn("prompt=login", url)
def test_url_adds_login_hint_when_expected_username_set(self) -> None:
cfg = fa.ForgeAuthConfig(
gitea_base_url="https://g.example",
client_id="client-1",
redirect_uri="http://127.0.0.1:38111/callback",
expected_username="CVJMAllaire",
)
url = fa.build_authorize_url(
cfg, self.endpoints, challenge="c", state="s"
)
self.assertIn("login_hint=CVJMAllaire", url)
def test_url_omits_login_hint_when_expected_username_unset(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
self.assertNotIn("login_hint=", url)
class BuildGiteaLogoutUrlTests(unittest.TestCase):
def test_composes_logout_url_with_redirect_to_login(self) -> None:
url = fa.build_gitea_logout_url("https://gitea.example.com")
self.assertEqual(
url,
"https://gitea.example.com/user/logout?redirect_to=%2Fuser%2Flogin",
)
def test_trailing_slash_is_stripped(self) -> None:
url = fa.build_gitea_logout_url("https://gitea.example.com/")
self.assertTrue(url.startswith("https://gitea.example.com/user/logout"))
self.assertNotIn("//user/logout", url)
# --------------------------------------------------------------------
# AuthFile (read / merge / write / has_live_gitea_token)
# --------------------------------------------------------------------
class AuthFileTests(unittest.TestCase):
def test_read_missing_file_returns_empty(self) -> None:
f = fa.AuthFile.read(Path("/nonexistent/path/client-auth.json"))
self.assertEqual(f.raw, {})
self.assertFalse(f.has_live_gitea_token())
def test_read_malformed_json_raises(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "bad.json"
p.write_text("{ not json", encoding="utf-8")
with self.assertRaises(fa.AuthError):
fa.AuthFile.read(p)
def test_read_non_object_raises(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "arr.json"
p.write_text("[1,2,3]", encoding="utf-8")
with self.assertRaises(fa.AuthError):
fa.AuthFile.read(p)
def test_has_live_gitea_token_requires_token(self) -> None:
f = fa.AuthFile(raw={"gitea_access_token": ""})
self.assertFalse(f.has_live_gitea_token())
def test_has_live_gitea_token_expired(self) -> None:
f = fa.AuthFile(raw={
"gitea_access_token": "t",
"gitea_token_expires_at": time.time() - 10,
})
self.assertFalse(f.has_live_gitea_token())
def test_has_live_gitea_token_live(self) -> None:
f = fa.AuthFile(raw={
"gitea_access_token": "t",
"gitea_token_expires_at": time.time() + 3600,
})
self.assertTrue(f.has_live_gitea_token())
def test_has_live_gitea_token_unknown_expiry_is_trusted(self) -> None:
f = fa.AuthFile(raw={"gitea_access_token": "t"})
self.assertTrue(f.has_live_gitea_token())
def test_merge_login_fills_required_gateway_fields(self) -> None:
f = fa.AuthFile()
f.merge_login(
username="alice",
gitea_access_token="AAA",
gitea_token_expires_at=time.time() + 3600,
refresh_token="RRR",
client_id="client-1",
gitea_base_url="https://g.example",
)
for required in (
"username", "access_token", "expires_in",
"issued_at", "public_base_url", "index_name",
):
self.assertIn(required, f.raw)
self.assertEqual(f.raw["username"], "alice")
self.assertEqual(f.raw["gitea_access_token"], "AAA")
self.assertEqual(f.raw["_forge_refresh_token"], "RRR")
self.assertEqual(f.raw["_forge_client_id"], "client-1")
self.assertEqual(f.raw["_forge_gitea_base_url"], "https://g.example")
def test_merge_login_preserves_gateway_bearer(self) -> None:
# Gateway bearer fields remain intact across a Gitea login write.
f = fa.AuthFile(raw={
"username": "old-alice",
"access_token": "GATEWAY-BEARER",
"expires_in": 7200,
"issued_at": 1000000.0,
"public_base_url": "https://gateway.example",
"index_name": "forge",
})
f.merge_login(
username="alice",
gitea_access_token="NEW-GITEA",
gitea_token_expires_at=time.time() + 3600,
refresh_token="NEW-RT",
client_id="client-1",
gitea_base_url="https://g.example",
)
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
self.assertEqual(f.raw["public_base_url"], "https://gateway.example")
self.assertEqual(f.raw["index_name"], "forge")
# And the Gitea fields got refreshed
self.assertEqual(f.raw["gitea_access_token"], "NEW-GITEA")
self.assertEqual(f.raw["username"], "alice")
def test_merge_refresh_only_touches_gitea_fields(self) -> None:
f = fa.AuthFile(raw={
"username": "alice",
"access_token": "GATEWAY-BEARER",
"public_base_url": "https://gateway.example",
})
f.merge_refresh(
gitea_access_token="FRESH",
gitea_token_expires_at=time.time() + 3600,
refresh_token="ROTATED",
)
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
self.assertEqual(f.raw["gitea_access_token"], "FRESH")
self.assertEqual(f.raw["_forge_refresh_token"], "ROTATED")
def test_merge_refresh_empty_refresh_token_keeps_existing(self) -> None:
f = fa.AuthFile(raw={"_forge_refresh_token": "KEEP"})
f.merge_refresh(
gitea_access_token="NEW", gitea_token_expires_at=None, refresh_token=""
)
self.assertEqual(f.raw["_forge_refresh_token"], "KEEP")
def test_write_is_atomic_and_0600(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "sub" / "client-auth.json"
f = fa.AuthFile(raw={"username": "u", "gitea_access_token": "T"})
f.write(p)
self.assertTrue(p.is_file())
mode = p.stat().st_mode & 0o777
self.assertEqual(mode, 0o600)
# The tmp file must not still exist
self.assertFalse((p.parent / "client-auth.json.tmp").exists())
roundtrip = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(roundtrip["username"], "u")
def test_write_preserves_unknown_keys(self) -> None:
# Forward-compat: preserve unknown gateway fields verbatim.
raw = {"username": "u", "future_field": {"x": 1}, "access_token": "A"}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "a.json"
fa.AuthFile(raw=dict(raw)).write(p)
roundtrip = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(roundtrip, raw)
# --------------------------------------------------------------------
# auth_store_path
# --------------------------------------------------------------------
class AuthStorePathTests(unittest.TestCase):
def test_explicit_env_var_wins(self) -> None:
with mock.patch.dict(
os.environ,
{"FSDGG_AUTH_STORE_PATH": "/tmp/somewhere/a.json", "FSDGG_RUNTIME_DIR": "/other"},
clear=True,
):
self.assertEqual(fa.auth_store_path(), Path("/tmp/somewhere/a.json"))
def test_runtime_dir_fallback(self) -> None:
with mock.patch.dict(
os.environ, {"FSDGG_RUNTIME_DIR": "/tmp/rt"}, clear=True
):
self.assertEqual(fa.auth_store_path(), Path("/tmp/rt/client-auth.json"))
def test_default_path(self) -> None:
with mock.patch.dict(os.environ, {}, clear=True):
self.assertEqual(fa.auth_store_path(), fa.DEFAULT_AUTH_FILE)
# --------------------------------------------------------------------
# run_logout
# --------------------------------------------------------------------
class RunLogoutTests(unittest.TestCase):
"""Invariant: every assertion runs *inside* the TemporaryDirectory
context. Tempdir teardown after the assertions would delete the
file under test and produce a false positive on
``assertFalse(is_file)``.
"""
def test_logout_no_file(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
result = fa.run_logout()
self.assertIsNone(result)
self.assertFalse(p.is_file())
def test_logout_only_welcome_fields_removes_file(self) -> None:
payload = {
"username": "u", "access_token": "", "expires_in": 0,
"issued_at": 0.0, "public_base_url": "", "index_name": "",
"gitea_access_token": "A", "_forge_refresh_token": "R",
}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
p.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
result = fa.run_logout()
self.assertEqual(result, p)
self.assertFalse(p.is_file())
def test_logout_preserves_gateway_bearer(self) -> None:
payload = {
"username": "u",
"access_token": "GATEWAY-BEARER",
"expires_in": 3600,
"issued_at": 1000000.0,
"public_base_url": "https://gateway.example",
"index_name": "forge",
"gitea_access_token": "GITEA-A",
"gitea_token_expires_at": 2000000.0,
"_forge_refresh_token": "R",
"_forge_client_id": "c",
}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
p.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
fa.run_logout()
self.assertTrue(p.is_file())
remaining = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(remaining["access_token"], "GATEWAY-BEARER")
self.assertEqual(remaining["public_base_url"], "https://gateway.example")
self.assertNotIn("gitea_access_token", remaining)
self.assertNotIn("_forge_refresh_token", remaining)
# --------------------------------------------------------------------
# main() dispatcher
# --------------------------------------------------------------------
class MainDispatcherTests(unittest.TestCase):
def test_unknown_command_rc_2(self) -> None:
self.assertEqual(fa.main(["forge_auth.py", "nope"]), 2)
def test_no_args_rc_2(self) -> None:
self.assertEqual(fa.main(["forge_auth.py"]), 2)
def test_status_on_empty_store_rc_1(self) -> None:
with tempfile.TemporaryDirectory() as d:
with mock.patch.dict(
os.environ,
{"FSDGG_AUTH_STORE_PATH": str(Path(d) / "a.json")},
clear=True,
):
rc = fa.main(["forge_auth.py", "status"])
self.assertEqual(rc, 1)
def test_status_on_live_token_rc_0(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "a.json"
p.write_text(json.dumps({
"username": "u",
"gitea_access_token": "LIVE",
"gitea_token_expires_at": time.time() + 3600,
}), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
rc = fa.main(["forge_auth.py", "status"])
self.assertEqual(rc, 0)
class HeadlessGuidanceTests(unittest.TestCase):
AUTH_URL = "https://gitea.example/login/oauth/authorize?x=1"
REDIRECT = urlparse("http://127.0.0.1:54321/callback")
def _capture(self, env_extra: dict[str, str] | None = None) -> str:
env = {"USER": "alice"}
if env_extra is not None:
for k in ("SSH_CONNECTION", "SSH_TTY"):
env.pop(k, None)
env.update(env_extra)
buf = io.StringIO()
with mock.patch.dict(os.environ, env, clear=True), \
mock.patch("forge_auth.sys.stderr", buf), \
mock.patch("forge_auth.socket.getfqdn", return_value="box.example"), \
mock.patch("forge_auth.socket.gethostname", return_value="box.example"):
fa._print_headless_guidance(self.AUTH_URL, self.REDIRECT)
return buf.getvalue()
def test_prints_authorize_url(self) -> None:
out = self._capture()
self.assertIn(self.AUTH_URL, out)
def test_prints_rfc_8252_reference(self) -> None:
self.assertIn("RFC 8252", self._capture())
def test_uses_configured_redirect_port(self) -> None:
out = self._capture()
self.assertIn("127.0.0.1:54321", out)
self.assertNotIn("38111", out)
def test_ssh_forward_template_uses_configured_port(self) -> None:
out = self._capture()
self.assertIn("ssh -L 54321:127.0.0.1:54321 alice@box.example", out)
def test_reachability_probe_included(self) -> None:
out = self._capture()
self.assertIn("curl -sS -m 2 http://127.0.0.1:54321/", out)
self.assertIn("Connection refused", out)
def test_ssh_branch_when_SSH_CONNECTION_set(self) -> None:
out = self._capture({"SSH_CONNECTION": "1.2.3.4 22 5.6.7.8 22", "USER": "alice"})
self.assertIn("running inside an SSH session", out)
def test_non_ssh_branch_when_no_ssh_env(self) -> None:
out = self._capture()
self.assertIn("if this machine is remote", out)
self.assertNotIn("running inside an SSH session", out)
class BuildAuthorizeErrorTests(unittest.TestCase):
"""Contract tests for ``_build_authorize_error``."""
BASE = "https://gitea.example.com"
CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5"
SCOPES = "openid profile email read:user read:organization read:repository write:repository"
def _exc_different_scope(self, **overrides: str) -> fa.AuthError:
kw: dict[str, object] = {
"error": "server_error",
"error_description": "a grant exists with different scope",
"gitea_base_url": self.BASE,
"client_id": self.CID,
"scopes": self.SCOPES,
}
kw.update(overrides)
return fa._build_authorize_error(
str(kw["error"]),
str(kw["error_description"]) if kw["error_description"] else None,
str(kw["gitea_base_url"]),
client_id=str(kw["client_id"]),
scopes=str(kw["scopes"]),
)
def test_different_scope_returns_autherror(self) -> None:
self.assertIsInstance(self._exc_different_scope(), fa.AuthError)
def test_different_scope_message_is_five_lines_max(self) -> None:
# RULE #2 §D.3: user-facing error bodies cap at 5 lines.
s = str(self._exc_different_scope())
self.assertLessEqual(len(s.splitlines()), 5)
def test_different_scope_surfaces_gitea_phrasing(self) -> None:
self.assertIn("different scope", str(self._exc_different_scope()).lower())
def test_different_scope_cites_settings_url(self) -> None:
self.assertIn(
f"{self.BASE}/user/settings/applications",
str(self._exc_different_scope()),
)
def test_different_scope_disambiguates_authorized_section(self) -> None:
# Gitea's settings page has both "Authorized OAuth2 Applications"
# (user grants) and "Manage OAuth2 Applications" (app registrations);
# the message must point at the first.
self.assertIn(
'"Authorized OAuth2 Applications"',
str(self._exc_different_scope()),
)
def test_different_scope_includes_full_client_id(self) -> None:
self.assertIn(self.CID, str(self._exc_different_scope()))
def test_different_scope_includes_requested_scopes(self) -> None:
self.assertIn(self.SCOPES, str(self._exc_different_scope()))
def test_different_scope_links_to_remediation_doc(self) -> None:
# External rationale lives in the doc per RULE #2 §D/E.
self.assertIn(
"docs/oauth-grant-scope-mismatch.md",
str(self._exc_different_scope()),
)
def test_different_scope_without_base_url_uses_placeholder(self) -> None:
self.assertIn(
"<gitea-base-url>/user/settings/applications",
str(self._exc_different_scope(gitea_base_url="")),
)
def test_different_scope_without_client_id_uses_placeholder(self) -> None:
self.assertIn(
"<unknown-client-id>",
str(self._exc_different_scope(client_id="")),
)
def test_different_scope_without_scopes_uses_placeholder(self) -> None:
self.assertIn(
"<unknown-scopes>",
str(self._exc_different_scope(scopes="")),
)
def test_access_denied_branch(self) -> None:
exc = fa._build_authorize_error(
"access_denied",
"The resource owner or authorization server denied the request.",
self.BASE,
)
self.assertIn("access_denied", str(exc))
self.assertIn("just login", str(exc))
def test_fallback_preserves_raw_error_and_description(self) -> None:
exc = fa._build_authorize_error(
"invalid_request", "redirect_uri mismatch", self.BASE,
)
s = str(exc)
self.assertIn("invalid_request", s)
self.assertIn("redirect_uri mismatch", s)
def test_fallback_with_no_description_only_includes_error_code(self) -> None:
exc = fa._build_authorize_error("temporarily_unavailable", None, self.BASE)
self.assertIn("temporarily_unavailable", str(exc))
class AuthorizeUrlIsHeadlessInvariantTests(unittest.TestCase):
"""Pin the invariant: authorize URL is independent of ``--no-browser``."""
def setUp(self) -> None:
self.cfg = fa.ForgeAuthConfig(
gitea_base_url="https://gitea.example.com",
client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5",
redirect_uri="http://127.0.0.1:38111/callback",
expected_username="alice",
)
self.endpoints = {
"authorization_endpoint": "https://gitea.example.com/login/oauth/authorize",
"token_endpoint": "https://gitea.example.com/login/oauth/access_token",
"userinfo_endpoint": "https://gitea.example.com/login/oauth/userinfo",
}
def test_build_authorize_url_signature_has_no_headless_parameter(self) -> None:
import inspect
sig = inspect.signature(fa.build_authorize_url)
self.assertNotIn("open_browser", sig.parameters)
self.assertNotIn("headless", sig.parameters)
self.assertNotIn("no_browser", sig.parameters)
def test_url_is_deterministic_for_fixed_challenge_and_state(self) -> None:
url_a = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
)
url_b = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
)
self.assertEqual(url_a, url_b)
def test_scope_parameter_matches_config_scopes_exactly(self) -> None:
from urllib.parse import parse_qs, urlparse
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
params = parse_qs(urlparse(url).query)
self.assertEqual(params["scope"], [self.cfg.scopes])
def test_default_url_carries_prompt_login(self) -> None:
from urllib.parse import parse_qs, urlparse
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
params = parse_qs(urlparse(url).query)
self.assertEqual(params["prompt"], ["login"])
def test_force_login_prompt_false_drops_prompt_param(self) -> None:
from urllib.parse import parse_qs, urlparse
url = fa.build_authorize_url(
self.cfg,
self.endpoints,
challenge="c",
state="s",
force_login_prompt=False,
)
params = parse_qs(urlparse(url).query)
self.assertNotIn("prompt", params)
# login_hint must still ride along; the retry path keeps it so
# Gitea can pre-fill the username field if a fresh login screen
# ever does appear (e.g., expired session cookie).
self.assertEqual(params["login_hint"], [self.cfg.expected_username])
class ScopeMismatchAuthErrorTests(unittest.TestCase):
"""Contract tests for the ``ScopeMismatchAuthError`` subclass.
The "different scope" branch of ``_build_authorize_error`` must
return a ``ScopeMismatchAuthError`` so callers can drive a
revoke-and-retry recovery flow instead of swallowing the error.
The class is also a subclass of ``AuthError`` so existing
``except AuthError`` handlers (e.g., the credential helper) keep
working unchanged.
"""
BASE = "https://gitea.example.com"
CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5"
SCOPES = "openid profile email read:user read:organization read:repository write:repository"
def _build(self, **overrides):
kw = {
"error": "server_error",
"error_description": "a grant exists with different scope",
"gitea_base_url": self.BASE,
"client_id": self.CID,
"scopes": self.SCOPES,
}
kw.update(overrides)
return fa._build_authorize_error(
str(kw["error"]),
str(kw["error_description"]) if kw["error_description"] else None,
str(kw["gitea_base_url"]),
client_id=str(kw["client_id"]),
scopes=str(kw["scopes"]),
)
def test_different_scope_returns_subclass(self) -> None:
self.assertIsInstance(self._build(), fa.ScopeMismatchAuthError)
def test_subclass_inherits_from_autherror(self) -> None:
self.assertTrue(issubclass(fa.ScopeMismatchAuthError, fa.AuthError))
def test_attributes_carry_diagnostic_fields(self) -> None:
exc = self._build()
self.assertEqual(exc.gitea_base_url, self.BASE)
self.assertEqual(exc.client_id, self.CID)
self.assertEqual(exc.scopes, self.SCOPES)
def test_revoke_url_with_base(self) -> None:
exc = self._build()
self.assertEqual(
exc.revoke_url,
f"{self.BASE}/user/settings/applications",
)
def test_revoke_url_strips_trailing_slash(self) -> None:
exc = self._build(gitea_base_url=self.BASE + "/")
self.assertEqual(
exc.revoke_url,
f"{self.BASE}/user/settings/applications",
)
def test_revoke_url_without_base_uses_placeholder(self) -> None:
exc = self._build(gitea_base_url="")
self.assertEqual(
exc.revoke_url,
"<gitea-base-url>/user/settings/applications",
)
def test_access_denied_branch_is_plain_autherror(self) -> None:
# Negative case: only the scope-conflict branch upgrades to
# the subclass; access_denied stays a generic AuthError.
exc = fa._build_authorize_error(
"access_denied", "denied by user", self.BASE,
)
self.assertNotIsInstance(exc, fa.ScopeMismatchAuthError)
self.assertIsInstance(exc, fa.AuthError)
class CanPromptForRevokeTests(unittest.TestCase):
"""``_can_prompt_for_revoke`` gates the interactive retry path.
Returns False whenever the operator cannot reasonably be asked to
press Enter: explicit opt-out via ``FORGE_AUTO_REVOKE``,
non-interactive mode via ``FORGE_SETUP_YES``, or non-TTY stdio.
"""
def setUp(self) -> None:
self._env_keys = ("FORGE_AUTO_REVOKE", "FORGE_SETUP_YES")
self._env_backup = {k: os.environ.get(k) for k in self._env_keys}
for k in self._env_keys:
os.environ.pop(k, None)
def tearDown(self) -> None:
for k, v in self._env_backup.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
def _run(self, *, stderr_tty: bool = True, stdin_tty: bool = True) -> bool:
with mock.patch.object(sys.stderr, "isatty", lambda: stderr_tty), \
mock.patch.object(sys.stdin, "isatty", lambda: stdin_tty):
return fa._can_prompt_for_revoke()
def test_default_with_both_ttys_returns_true(self) -> None:
self.assertTrue(self._run())
def test_force_auto_revoke_off_disables(self) -> None:
for v in ("0", "no", "false", "FALSE", "No"):
with self.subTest(value=v):
os.environ["FORGE_AUTO_REVOKE"] = v
self.assertFalse(self._run())
os.environ.pop("FORGE_AUTO_REVOKE", None)
def test_setup_yes_disables(self) -> None:
os.environ["FORGE_SETUP_YES"] = "1"
self.assertFalse(self._run())
def test_no_stderr_tty_disables(self) -> None:
self.assertFalse(self._run(stderr_tty=False))
def test_no_stdin_tty_disables(self) -> None:
self.assertFalse(self._run(stdin_tty=False))
class CmdLoginRetryOnScopeMismatchTests(unittest.TestCase):
"""``_cmd_login`` auto-retries once after ``ScopeMismatchAuthError``
when the prompt path is enabled; otherwise exits 1 immediately.
The retry must call ``run_login`` with ``force=True`` and
``force_login_prompt=False`` so the cached live-token short-circuit
cannot mask a stale grant and Gitea can reuse the existing browser
session cookie (only consent screen on retry).
"""
def setUp(self) -> None:
# Capture stderr to keep cli_err/cli_ok/cli_info from polluting
# the test runner output for the entire class.
stderr_patch = mock.patch.object(sys, "stderr", new_callable=io.StringIO)
stderr_patch.start()
self.addCleanup(stderr_patch.stop)
self.fake_cfg = fa.ForgeAuthConfig(
gitea_base_url="https://gitea.example.com",
client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5",
redirect_uri="http://127.0.0.1:38111/callback",
)
self.fake_state = mock.Mock(username="alice")
self.scope_exc = fa.ScopeMismatchAuthError(
"boom",
gitea_base_url=self.fake_cfg.gitea_base_url,
client_id=self.fake_cfg.client_id,
scopes=self.fake_cfg.scopes,
)
def _common_patches(self, *, run_login_side_effect):
return [
mock.patch.object(
fa.ForgeAuthConfig, "from_env", return_value=self.fake_cfg
),
mock.patch.object(
fa, "run_login", side_effect=run_login_side_effect
),
mock.patch.object(
fa, "auth_store_path", return_value=Path("/tmp/dummy.json")
),
]
def _start(self, patches):
for p in patches:
p.start()
self.addCleanup(lambda: [p.stop() for p in patches])
def test_retries_when_prompt_allowed(self) -> None:
patches = self._common_patches(
run_login_side_effect=[self.scope_exc, self.fake_state]
) + [
mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True),
mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=True),
]
self._start(patches)
rc = fa._cmd_login([])
self.assertEqual(rc, 0)
self.assertEqual(fa.run_login.call_count, 2)
_, kwargs = fa.run_login.call_args_list[1]
self.assertTrue(kwargs.get("force"))
# The retry must drop ``prompt=login`` so Gitea reuses the
# browser session cookie established by the failed first call.
self.assertIs(kwargs.get("force_login_prompt"), False)
def test_does_not_retry_when_prompt_disabled(self) -> None:
prompt_mock = mock.Mock(return_value=True)
patches = self._common_patches(
run_login_side_effect=[self.scope_exc]
) + [
mock.patch.object(fa, "_can_prompt_for_revoke", return_value=False),
mock.patch.object(fa, "_prompt_revoke_and_wait", new=prompt_mock),
]
self._start(patches)
rc = fa._cmd_login([])
self.assertEqual(rc, 1)
self.assertEqual(fa.run_login.call_count, 1)
prompt_mock.assert_not_called()
def test_does_not_retry_when_user_aborts(self) -> None:
patches = self._common_patches(
run_login_side_effect=[self.scope_exc]
) + [
mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True),
mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=False),
]
self._start(patches)
rc = fa._cmd_login([])
self.assertEqual(rc, 1)
self.assertEqual(fa.run_login.call_count, 1)
def test_unrelated_autherror_propagates(self) -> None:
patches = self._common_patches(
run_login_side_effect=[fa.AuthError("unrelated")]
) + [
mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True),
mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=True),
]
self._start(patches)
with self.assertRaises(fa.AuthError):
fa._cmd_login([])
if __name__ == "__main__":
unittest.main()