"""Unit tests for scripts/forge_auth.py. Pure-logic tests (no network, no browser). The full PKCE flow is covered by tests/test_forge_auth_integration.sh, which stands up a local mock Gitea OIDC server and drives the CLI end-to-end. Run with: python3 -m unittest tests.test_forge_auth """ from __future__ import annotations import base64 import hashlib import io import json import os import sys import tempfile import time import unittest from pathlib import Path from unittest import mock from urllib.parse import urlparse HERE = Path(__file__).resolve().parent ROOT = HERE.parent sys.path.insert(0, str(ROOT / "scripts")) import forge_auth as fa # noqa: E402 # -------------------------------------------------------------------- # PKCE primitives # -------------------------------------------------------------------- class PkcePairTests(unittest.TestCase): def test_verifier_length_in_range(self) -> None: v, _ = fa.pkce_pair() # RFC 7636: 43 <= len(verifier) <= 128 self.assertGreaterEqual(len(v), 43) self.assertLessEqual(len(v), 128) def test_challenge_is_correct_s256_encoding(self) -> None: v, c = fa.pkce_pair() expected = ( base64.urlsafe_b64encode(hashlib.sha256(v.encode("ascii")).digest()) .rstrip(b"=") .decode("ascii") ) self.assertEqual(c, expected) def test_pairs_are_unique(self) -> None: pairs = {fa.pkce_pair() for _ in range(10)} self.assertEqual(len(pairs), 10) class StateSigningTests(unittest.TestCase): def setUp(self) -> None: self.key = b"\x01" * 32 def test_roundtrip(self) -> None: nonce = "abc.123" # dot in nonce must still round-trip (rpartition) signed = fa.sign_state(self.key, nonce) self.assertEqual(fa.verify_state(self.key, signed), nonce) def test_tampered_mac_rejected(self) -> None: signed = fa.sign_state(self.key, "n1") tampered = signed[:-1] + ("0" if signed[-1] != "0" else "1") with self.assertRaises(fa.AuthError): fa.verify_state(self.key, tampered) def test_wrong_key_rejected(self) -> None: signed = fa.sign_state(self.key, "n1") with self.assertRaises(fa.AuthError): fa.verify_state(b"\x02" * 32, signed) def test_missing_separator_raises(self) -> None: with self.assertRaises(fa.AuthError): fa.verify_state(self.key, "nosignaturehere") # -------------------------------------------------------------------- # ForgeAuthConfig.from_env # -------------------------------------------------------------------- class ConfigFromEnvTests(unittest.TestCase): def _env(self, **overrides: str) -> dict[str, str]: env = { "FORGE_GITEA_URL": "https://gitea.example.com", "FSDGG_CLI_CLIENT_ID": "my-client-id", "FSDGG_CLI_REDIRECT_URI": "http://127.0.0.1:38111/callback", } env.update(overrides) return env def test_happy_path(self) -> None: with mock.patch.dict(os.environ, self._env(), clear=True): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com") self.assertEqual(cfg.client_id, "my-client-id") self.assertEqual(cfg.redirect_uri, "http://127.0.0.1:38111/callback") self.assertFalse(cfg.insecure_tls) def test_trailing_slash_stripped(self) -> None: with mock.patch.dict( os.environ, self._env(FORGE_GITEA_URL="https://gitea.example.com/"), clear=True, ): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com") def test_localhost_redirect_allowed(self) -> None: with mock.patch.dict( os.environ, self._env(FSDGG_CLI_REDIRECT_URI="http://localhost:45000/cb"), clear=True, ): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.redirect_uri, "http://localhost:45000/cb") def test_ipv6_loopback_redirect_allowed(self) -> None: with mock.patch.dict( os.environ, self._env(FSDGG_CLI_REDIRECT_URI="http://[::1]:45000/cb"), clear=True, ): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.redirect_uri, "http://[::1]:45000/cb") def test_non_loopback_rejected(self) -> None: with mock.patch.dict( os.environ, self._env(FSDGG_CLI_REDIRECT_URI="https://example.com/cb"), clear=True, ): with self.assertRaises(fa.AuthError) as ctx: fa.ForgeAuthConfig.from_env() self.assertIn("RFC 8252", str(ctx.exception)) def test_https_loopback_rejected(self) -> None: with mock.patch.dict( os.environ, self._env(FSDGG_CLI_REDIRECT_URI="https://127.0.0.1:38111/cb"), clear=True, ): with self.assertRaises(fa.AuthError) as ctx: fa.ForgeAuthConfig.from_env() self.assertIn("RFC 8252", str(ctx.exception)) def test_missing_port_rejected(self) -> None: with mock.patch.dict( os.environ, self._env(FSDGG_CLI_REDIRECT_URI="http://127.0.0.1/cb"), clear=True, ): with self.assertRaises(fa.AuthError): fa.ForgeAuthConfig.from_env() def test_missing_required_env_var_lists_all_missing(self) -> None: env = self._env() env["FORGE_GITEA_URL"] = "" env["FSDGG_CLI_CLIENT_ID"] = "" with mock.patch.dict(os.environ, env, clear=True): with self.assertRaises(fa.AuthError) as ctx: fa.ForgeAuthConfig.from_env() msg = str(ctx.exception) self.assertIn("FORGE_GITEA_URL", msg) self.assertIn("FSDGG_CLI_CLIENT_ID", msg) def test_expected_username_read_from_env(self) -> None: with mock.patch.dict( os.environ, self._env(FORGE_GITEA_USERNAME="CVJMAllaire"), clear=True, ): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.expected_username, "CVJMAllaire") def test_expected_username_defaults_to_empty(self) -> None: with mock.patch.dict(os.environ, self._env(), clear=True): cfg = fa.ForgeAuthConfig.from_env() self.assertEqual(cfg.expected_username, "") def test_insecure_tls_flag(self) -> None: with mock.patch.dict( os.environ, self._env(FORGE_INSECURE_TLS="1"), clear=True ): cfg = fa.ForgeAuthConfig.from_env() self.assertTrue(cfg.insecure_tls) # -------------------------------------------------------------------- # build_authorize_url # -------------------------------------------------------------------- class BuildAuthorizeUrlTests(unittest.TestCase): def setUp(self) -> None: self.cfg = fa.ForgeAuthConfig( gitea_base_url="https://g.example", client_id="client-1", redirect_uri="http://127.0.0.1:38111/callback", ) self.endpoints = { "authorization_endpoint": "https://g.example/login/oauth/authorize", "token_endpoint": "https://g.example/login/oauth/access_token", "userinfo_endpoint": "https://g.example/login/oauth/userinfo", } def test_url_contains_all_required_parameters(self) -> None: url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c123", state="s123" ) self.assertTrue(url.startswith("https://g.example/login/oauth/authorize?")) # All required PKCE + OAuth2 params for token in ( "response_type=code", "client_id=client-1", "code_challenge=c123", "code_challenge_method=S256", "state=s123", "redirect_uri=http%3A%2F%2F127.0.0.1%3A38111%2Fcallback", ): self.assertIn(token, url) def test_url_always_includes_prompt_login(self) -> None: url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c", state="s" ) # prompt=login forces Gitea to display the login page even when a # session is already active: prevents silent auth under the # wrong identity. self.assertIn("prompt=login", url) def test_url_adds_login_hint_when_expected_username_set(self) -> None: cfg = fa.ForgeAuthConfig( gitea_base_url="https://g.example", client_id="client-1", redirect_uri="http://127.0.0.1:38111/callback", expected_username="CVJMAllaire", ) url = fa.build_authorize_url( cfg, self.endpoints, challenge="c", state="s" ) self.assertIn("login_hint=CVJMAllaire", url) def test_url_omits_login_hint_when_expected_username_unset(self) -> None: url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c", state="s" ) self.assertNotIn("login_hint=", url) class BuildGiteaLogoutUrlTests(unittest.TestCase): def test_composes_logout_url_with_redirect_to_login(self) -> None: url = fa.build_gitea_logout_url("https://gitea.example.com") self.assertEqual( url, "https://gitea.example.com/user/logout?redirect_to=%2Fuser%2Flogin", ) def test_trailing_slash_is_stripped(self) -> None: url = fa.build_gitea_logout_url("https://gitea.example.com/") self.assertTrue(url.startswith("https://gitea.example.com/user/logout")) self.assertNotIn("//user/logout", url) # -------------------------------------------------------------------- # AuthFile (read / merge / write / has_live_gitea_token) # -------------------------------------------------------------------- class AuthFileTests(unittest.TestCase): def test_read_missing_file_returns_empty(self) -> None: f = fa.AuthFile.read(Path("/nonexistent/path/client-auth.json")) self.assertEqual(f.raw, {}) self.assertFalse(f.has_live_gitea_token()) def test_read_malformed_json_raises(self) -> None: with tempfile.TemporaryDirectory() as d: p = Path(d) / "bad.json" p.write_text("{ not json", encoding="utf-8") with self.assertRaises(fa.AuthError): fa.AuthFile.read(p) def test_read_non_object_raises(self) -> None: with tempfile.TemporaryDirectory() as d: p = Path(d) / "arr.json" p.write_text("[1,2,3]", encoding="utf-8") with self.assertRaises(fa.AuthError): fa.AuthFile.read(p) def test_has_live_gitea_token_requires_token(self) -> None: f = fa.AuthFile(raw={"gitea_access_token": ""}) self.assertFalse(f.has_live_gitea_token()) def test_has_live_gitea_token_expired(self) -> None: f = fa.AuthFile(raw={ "gitea_access_token": "t", "gitea_token_expires_at": time.time() - 10, }) self.assertFalse(f.has_live_gitea_token()) def test_has_live_gitea_token_live(self) -> None: f = fa.AuthFile(raw={ "gitea_access_token": "t", "gitea_token_expires_at": time.time() + 3600, }) self.assertTrue(f.has_live_gitea_token()) def test_has_live_gitea_token_unknown_expiry_is_trusted(self) -> None: f = fa.AuthFile(raw={"gitea_access_token": "t"}) self.assertTrue(f.has_live_gitea_token()) def test_merge_login_fills_required_gateway_fields(self) -> None: f = fa.AuthFile() f.merge_login( username="alice", gitea_access_token="AAA", gitea_token_expires_at=time.time() + 3600, refresh_token="RRR", client_id="client-1", gitea_base_url="https://g.example", ) for required in ( "username", "access_token", "expires_in", "issued_at", "public_base_url", "index_name", ): self.assertIn(required, f.raw) self.assertEqual(f.raw["username"], "alice") self.assertEqual(f.raw["gitea_access_token"], "AAA") self.assertEqual(f.raw["_forge_refresh_token"], "RRR") self.assertEqual(f.raw["_forge_client_id"], "client-1") self.assertEqual(f.raw["_forge_gitea_base_url"], "https://g.example") def test_merge_login_preserves_gateway_bearer(self) -> None: # Simulates the case where the orchestrator already ran # `auth login` and populated the gateway bearer. The code must not # overwrite those fields. f = fa.AuthFile(raw={ "username": "old-alice", "access_token": "GATEWAY-BEARER", "expires_in": 7200, "issued_at": 1000000.0, "public_base_url": "https://gateway.example", "index_name": "forge", }) f.merge_login( username="alice", gitea_access_token="NEW-GITEA", gitea_token_expires_at=time.time() + 3600, refresh_token="NEW-RT", client_id="client-1", gitea_base_url="https://g.example", ) self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER") self.assertEqual(f.raw["public_base_url"], "https://gateway.example") self.assertEqual(f.raw["index_name"], "forge") # And the Gitea fields got refreshed self.assertEqual(f.raw["gitea_access_token"], "NEW-GITEA") self.assertEqual(f.raw["username"], "alice") def test_merge_refresh_only_touches_gitea_fields(self) -> None: f = fa.AuthFile(raw={ "username": "alice", "access_token": "GATEWAY-BEARER", "public_base_url": "https://gateway.example", }) f.merge_refresh( gitea_access_token="FRESH", gitea_token_expires_at=time.time() + 3600, refresh_token="ROTATED", ) self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER") self.assertEqual(f.raw["gitea_access_token"], "FRESH") self.assertEqual(f.raw["_forge_refresh_token"], "ROTATED") def test_merge_refresh_empty_refresh_token_keeps_existing(self) -> None: f = fa.AuthFile(raw={"_forge_refresh_token": "KEEP"}) f.merge_refresh( gitea_access_token="NEW", gitea_token_expires_at=None, refresh_token="" ) self.assertEqual(f.raw["_forge_refresh_token"], "KEEP") def test_write_is_atomic_and_0600(self) -> None: with tempfile.TemporaryDirectory() as d: p = Path(d) / "sub" / "client-auth.json" f = fa.AuthFile(raw={"username": "u", "gitea_access_token": "T"}) f.write(p) self.assertTrue(p.is_file()) mode = p.stat().st_mode & 0o777 self.assertEqual(mode, 0o600) # The tmp file must not still exist self.assertFalse((p.parent / "client-auth.json.tmp").exists()) roundtrip = json.loads(p.read_text(encoding="utf-8")) self.assertEqual(roundtrip["username"], "u") def test_write_preserves_unknown_keys(self) -> None: # Forward-compat: the gateway might add new fields unknown to the # current schema. Writing must preserve them verbatim. raw = {"username": "u", "future_field": {"x": 1}, "access_token": "A"} with tempfile.TemporaryDirectory() as d: p = Path(d) / "a.json" fa.AuthFile(raw=dict(raw)).write(p) roundtrip = json.loads(p.read_text(encoding="utf-8")) self.assertEqual(roundtrip, raw) # -------------------------------------------------------------------- # auth_store_path # -------------------------------------------------------------------- class AuthStorePathTests(unittest.TestCase): def test_explicit_env_var_wins(self) -> None: with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": "/tmp/somewhere/a.json", "FSDGG_RUNTIME_DIR": "/other"}, clear=True, ): self.assertEqual(fa.auth_store_path(), Path("/tmp/somewhere/a.json")) def test_runtime_dir_fallback(self) -> None: with mock.patch.dict( os.environ, {"FSDGG_RUNTIME_DIR": "/tmp/rt"}, clear=True ): self.assertEqual(fa.auth_store_path(), Path("/tmp/rt/client-auth.json")) def test_default_path(self) -> None: with mock.patch.dict(os.environ, {}, clear=True): self.assertEqual(fa.auth_store_path(), fa.DEFAULT_AUTH_FILE) # -------------------------------------------------------------------- # run_logout # -------------------------------------------------------------------- class RunLogoutTests(unittest.TestCase): """Note: all assertions happen *inside* the TemporaryDirectory context. Earlier iterations leaked the tempdir past the assertions, producing false positives on ``assertFalse(is_file)`` because cleanup had already deleted the file. """ def test_logout_no_file(self) -> None: with tempfile.TemporaryDirectory() as d: p = Path(d) / "client-auth.json" with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True ): result = fa.run_logout() self.assertIsNone(result) self.assertFalse(p.is_file()) def test_logout_only_welcome_fields_removes_file(self) -> None: payload = { "username": "u", "access_token": "", "expires_in": 0, "issued_at": 0.0, "public_base_url": "", "index_name": "", "gitea_access_token": "A", "_forge_refresh_token": "R", } with tempfile.TemporaryDirectory() as d: p = Path(d) / "client-auth.json" p.write_text(json.dumps(payload), encoding="utf-8") with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True ): result = fa.run_logout() self.assertEqual(result, p) self.assertFalse(p.is_file()) def test_logout_preserves_gateway_bearer(self) -> None: payload = { "username": "u", "access_token": "GATEWAY-BEARER", "expires_in": 3600, "issued_at": 1000000.0, "public_base_url": "https://gateway.example", "index_name": "forge", "gitea_access_token": "GITEA-A", "gitea_token_expires_at": 2000000.0, "_forge_refresh_token": "R", "_forge_client_id": "c", } with tempfile.TemporaryDirectory() as d: p = Path(d) / "client-auth.json" p.write_text(json.dumps(payload), encoding="utf-8") with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True ): fa.run_logout() self.assertTrue(p.is_file()) remaining = json.loads(p.read_text(encoding="utf-8")) self.assertEqual(remaining["access_token"], "GATEWAY-BEARER") self.assertEqual(remaining["public_base_url"], "https://gateway.example") self.assertNotIn("gitea_access_token", remaining) self.assertNotIn("_forge_refresh_token", remaining) # -------------------------------------------------------------------- # main() dispatcher # -------------------------------------------------------------------- class MainDispatcherTests(unittest.TestCase): def test_unknown_command_rc_2(self) -> None: self.assertEqual(fa.main(["forge_auth.py", "nope"]), 2) def test_no_args_rc_2(self) -> None: self.assertEqual(fa.main(["forge_auth.py"]), 2) def test_status_on_empty_store_rc_1(self) -> None: with tempfile.TemporaryDirectory() as d: with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": str(Path(d) / "a.json")}, clear=True, ): rc = fa.main(["forge_auth.py", "status"]) self.assertEqual(rc, 1) def test_status_on_live_token_rc_0(self) -> None: with tempfile.TemporaryDirectory() as d: p = Path(d) / "a.json" p.write_text(json.dumps({ "username": "u", "gitea_access_token": "LIVE", "gitea_token_expires_at": time.time() + 3600, }), encoding="utf-8") with mock.patch.dict( os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True ): rc = fa.main(["forge_auth.py", "status"]) self.assertEqual(rc, 0) class HeadlessGuidanceTests(unittest.TestCase): AUTH_URL = "https://gitea.example/login/oauth/authorize?x=1" REDIRECT = urlparse("http://127.0.0.1:54321/callback") def _capture(self, env_extra: dict[str, str] | None = None) -> str: env = {"USER": "alice"} if env_extra is not None: for k in ("SSH_CONNECTION", "SSH_TTY"): env.pop(k, None) env.update(env_extra) buf = io.StringIO() with mock.patch.dict(os.environ, env, clear=True), \ mock.patch("forge_auth.sys.stderr", buf), \ mock.patch("forge_auth.socket.getfqdn", return_value="box.example"), \ mock.patch("forge_auth.socket.gethostname", return_value="box.example"): fa._print_headless_guidance(self.AUTH_URL, self.REDIRECT) return buf.getvalue() def test_prints_authorize_url(self) -> None: out = self._capture() self.assertIn(self.AUTH_URL, out) def test_prints_rfc_8252_reference(self) -> None: self.assertIn("RFC 8252", self._capture()) def test_uses_configured_redirect_port(self) -> None: out = self._capture() self.assertIn("127.0.0.1:54321", out) self.assertNotIn("38111", out) def test_ssh_forward_template_uses_configured_port(self) -> None: out = self._capture() self.assertIn("ssh -L 54321:127.0.0.1:54321 alice@box.example", out) def test_reachability_probe_included(self) -> None: out = self._capture() self.assertIn("curl -sS -m 2 http://127.0.0.1:54321/", out) self.assertIn("Connection refused", out) def test_ssh_branch_when_SSH_CONNECTION_set(self) -> None: out = self._capture({"SSH_CONNECTION": "1.2.3.4 22 5.6.7.8 22", "USER": "alice"}) self.assertIn("SSH session detected", out) def test_non_ssh_branch_when_no_ssh_env(self) -> None: out = self._capture() self.assertIn("Remote-host case", out) self.assertNotIn("SSH session detected", out) class BuildAuthorizeErrorTests(unittest.TestCase): """Contract tests for ``_build_authorize_error``.""" BASE = "https://gitea.example.com" CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5" SCOPES = "openid profile email read:user read:organization read:repository write:repository" def _exc_different_scope(self, **overrides: str) -> fa.AuthError: kw: dict[str, object] = { "error": "server_error", "error_description": "a grant exists with different scope", "gitea_base_url": self.BASE, "client_id": self.CID, "scopes": self.SCOPES, } kw.update(overrides) return fa._build_authorize_error( str(kw["error"]), str(kw["error_description"]) if kw["error_description"] else None, str(kw["gitea_base_url"]), client_id=str(kw["client_id"]), scopes=str(kw["scopes"]), ) def test_different_scope_returns_autherror(self) -> None: self.assertIsInstance(self._exc_different_scope(), fa.AuthError) def test_different_scope_message_is_five_lines_max(self) -> None: # RULE #2 §D.3: user-facing error bodies cap at 5 lines. s = str(self._exc_different_scope()) self.assertLessEqual(len(s.splitlines()), 5) def test_different_scope_surfaces_gitea_phrasing(self) -> None: self.assertIn("different scope", str(self._exc_different_scope()).lower()) def test_different_scope_cites_settings_url(self) -> None: self.assertIn( f"{self.BASE}/user/settings/applications", str(self._exc_different_scope()), ) def test_different_scope_disambiguates_authorized_section(self) -> None: # Gitea's settings page has both "Authorized OAuth2 Applications" # (user grants) and "Manage OAuth2 Applications" (app registrations); # the message must point at the first. self.assertIn( '"Authorized OAuth2 Applications"', str(self._exc_different_scope()), ) def test_different_scope_includes_full_client_id(self) -> None: self.assertIn(self.CID, str(self._exc_different_scope())) def test_different_scope_includes_requested_scopes(self) -> None: self.assertIn(self.SCOPES, str(self._exc_different_scope())) def test_different_scope_links_to_remediation_doc(self) -> None: # External rationale lives in the doc per RULE #2 §D/E. self.assertIn( "docs/oauth-grant-scope-mismatch.md", str(self._exc_different_scope()), ) def test_different_scope_without_base_url_uses_placeholder(self) -> None: self.assertIn( "/user/settings/applications", str(self._exc_different_scope(gitea_base_url="")), ) def test_different_scope_without_client_id_uses_placeholder(self) -> None: self.assertIn( "", str(self._exc_different_scope(client_id="")), ) def test_different_scope_without_scopes_uses_placeholder(self) -> None: self.assertIn( "", str(self._exc_different_scope(scopes="")), ) def test_access_denied_branch(self) -> None: exc = fa._build_authorize_error( "access_denied", "The resource owner or authorization server denied the request.", self.BASE, ) self.assertIn("access_denied", str(exc)) self.assertIn("just login", str(exc)) def test_fallback_preserves_raw_error_and_description(self) -> None: exc = fa._build_authorize_error( "invalid_request", "redirect_uri mismatch", self.BASE, ) s = str(exc) self.assertIn("invalid_request", s) self.assertIn("redirect_uri mismatch", s) def test_fallback_with_no_description_only_includes_error_code(self) -> None: exc = fa._build_authorize_error("temporarily_unavailable", None, self.BASE) self.assertIn("temporarily_unavailable", str(exc)) class AuthorizeUrlIsHeadlessInvariantTests(unittest.TestCase): """Pin the invariant: authorize URL is independent of ``--no-browser``.""" def setUp(self) -> None: self.cfg = fa.ForgeAuthConfig( gitea_base_url="https://gitea.example.com", client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5", redirect_uri="http://127.0.0.1:38111/callback", expected_username="alice", ) self.endpoints = { "authorization_endpoint": "https://gitea.example.com/login/oauth/authorize", "token_endpoint": "https://gitea.example.com/login/oauth/access_token", "userinfo_endpoint": "https://gitea.example.com/login/oauth/userinfo", } def test_build_authorize_url_signature_has_no_headless_parameter(self) -> None: import inspect sig = inspect.signature(fa.build_authorize_url) self.assertNotIn("open_browser", sig.parameters) self.assertNotIn("headless", sig.parameters) self.assertNotIn("no_browser", sig.parameters) def test_url_is_deterministic_for_fixed_challenge_and_state(self) -> None: url_a = fa.build_authorize_url( self.cfg, self.endpoints, challenge="ch_1", state="st_1" ) url_b = fa.build_authorize_url( self.cfg, self.endpoints, challenge="ch_1", state="st_1" ) self.assertEqual(url_a, url_b) def test_scope_parameter_matches_config_scopes_exactly(self) -> None: from urllib.parse import parse_qs, urlparse url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c", state="s" ) params = parse_qs(urlparse(url).query) self.assertEqual(params["scope"], [self.cfg.scopes]) def test_default_url_carries_prompt_login(self) -> None: from urllib.parse import parse_qs, urlparse url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c", state="s" ) params = parse_qs(urlparse(url).query) self.assertEqual(params["prompt"], ["login"]) def test_force_login_prompt_false_drops_prompt_param(self) -> None: from urllib.parse import parse_qs, urlparse url = fa.build_authorize_url( self.cfg, self.endpoints, challenge="c", state="s", force_login_prompt=False, ) params = parse_qs(urlparse(url).query) self.assertNotIn("prompt", params) # login_hint must still ride along; the retry path keeps it so # Gitea can pre-fill the username field if a fresh login screen # ever does appear (e.g., expired session cookie). self.assertEqual(params["login_hint"], [self.cfg.expected_username]) class ScopeMismatchAuthErrorTests(unittest.TestCase): """Contract tests for the ``ScopeMismatchAuthError`` subclass. The "different scope" branch of ``_build_authorize_error`` must return a ``ScopeMismatchAuthError`` so callers can drive a revoke-and-retry recovery flow instead of swallowing the error. The class is also a subclass of ``AuthError`` so existing ``except AuthError`` handlers (e.g., the credential helper) keep working unchanged. """ BASE = "https://gitea.example.com" CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5" SCOPES = "openid profile email read:user read:organization read:repository write:repository" def _build(self, **overrides): kw = { "error": "server_error", "error_description": "a grant exists with different scope", "gitea_base_url": self.BASE, "client_id": self.CID, "scopes": self.SCOPES, } kw.update(overrides) return fa._build_authorize_error( str(kw["error"]), str(kw["error_description"]) if kw["error_description"] else None, str(kw["gitea_base_url"]), client_id=str(kw["client_id"]), scopes=str(kw["scopes"]), ) def test_different_scope_returns_subclass(self) -> None: self.assertIsInstance(self._build(), fa.ScopeMismatchAuthError) def test_subclass_inherits_from_autherror(self) -> None: self.assertTrue(issubclass(fa.ScopeMismatchAuthError, fa.AuthError)) def test_attributes_carry_diagnostic_fields(self) -> None: exc = self._build() self.assertEqual(exc.gitea_base_url, self.BASE) self.assertEqual(exc.client_id, self.CID) self.assertEqual(exc.scopes, self.SCOPES) def test_revoke_url_with_base(self) -> None: exc = self._build() self.assertEqual( exc.revoke_url, f"{self.BASE}/user/settings/applications", ) def test_revoke_url_strips_trailing_slash(self) -> None: exc = self._build(gitea_base_url=self.BASE + "/") self.assertEqual( exc.revoke_url, f"{self.BASE}/user/settings/applications", ) def test_revoke_url_without_base_uses_placeholder(self) -> None: exc = self._build(gitea_base_url="") self.assertEqual( exc.revoke_url, "/user/settings/applications", ) def test_access_denied_branch_is_plain_autherror(self) -> None: # Negative case: only the scope-conflict branch upgrades to # the subclass; access_denied stays a generic AuthError. exc = fa._build_authorize_error( "access_denied", "denied by user", self.BASE, ) self.assertNotIsInstance(exc, fa.ScopeMismatchAuthError) self.assertIsInstance(exc, fa.AuthError) class CanPromptForRevokeTests(unittest.TestCase): """``_can_prompt_for_revoke`` gates the interactive retry path. Returns False whenever the contributor cannot reasonably be asked to press Enter: explicit opt-out via ``FORGE_AUTO_REVOKE``, non-interactive mode via ``FORGE_SETUP_YES``, or non-TTY stdio. """ def setUp(self) -> None: self._env_keys = ("FORGE_AUTO_REVOKE", "FORGE_SETUP_YES") self._env_backup = {k: os.environ.get(k) for k in self._env_keys} for k in self._env_keys: os.environ.pop(k, None) def tearDown(self) -> None: for k, v in self._env_backup.items(): if v is None: os.environ.pop(k, None) else: os.environ[k] = v def _run(self, *, stderr_tty: bool = True, stdin_tty: bool = True) -> bool: with mock.patch.object(sys.stderr, "isatty", lambda: stderr_tty), \ mock.patch.object(sys.stdin, "isatty", lambda: stdin_tty): return fa._can_prompt_for_revoke() def test_default_with_both_ttys_returns_true(self) -> None: self.assertTrue(self._run()) def test_force_auto_revoke_off_disables(self) -> None: for v in ("0", "no", "false", "FALSE", "No"): with self.subTest(value=v): os.environ["FORGE_AUTO_REVOKE"] = v self.assertFalse(self._run()) os.environ.pop("FORGE_AUTO_REVOKE", None) def test_setup_yes_disables(self) -> None: os.environ["FORGE_SETUP_YES"] = "1" self.assertFalse(self._run()) def test_no_stderr_tty_disables(self) -> None: self.assertFalse(self._run(stderr_tty=False)) def test_no_stdin_tty_disables(self) -> None: self.assertFalse(self._run(stdin_tty=False)) class CmdLoginRetryOnScopeMismatchTests(unittest.TestCase): """``_cmd_login`` auto-retries once after ``ScopeMismatchAuthError`` when the prompt path is enabled; otherwise exits 1 immediately. The retry must call ``run_login`` with ``force=True`` and ``force_login_prompt=False`` so the cached live-token short-circuit cannot mask a stale grant and Gitea can reuse the existing browser session cookie (only consent screen on retry). """ def setUp(self) -> None: # Capture stderr to keep cli_err/cli_ok/cli_info from polluting # the test runner output for the entire class. stderr_patch = mock.patch.object(sys, "stderr", new_callable=io.StringIO) stderr_patch.start() self.addCleanup(stderr_patch.stop) self.fake_cfg = fa.ForgeAuthConfig( gitea_base_url="https://gitea.example.com", client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5", redirect_uri="http://127.0.0.1:38111/callback", ) self.fake_state = mock.Mock(username="alice") self.scope_exc = fa.ScopeMismatchAuthError( "boom", gitea_base_url=self.fake_cfg.gitea_base_url, client_id=self.fake_cfg.client_id, scopes=self.fake_cfg.scopes, ) def _common_patches(self, *, run_login_side_effect): return [ mock.patch.object( fa.ForgeAuthConfig, "from_env", return_value=self.fake_cfg ), mock.patch.object( fa, "run_login", side_effect=run_login_side_effect ), mock.patch.object( fa, "auth_store_path", return_value=Path("/tmp/dummy.json") ), ] def _start(self, patches): for p in patches: p.start() self.addCleanup(lambda: [p.stop() for p in patches]) def test_retries_when_prompt_allowed(self) -> None: patches = self._common_patches( run_login_side_effect=[self.scope_exc, self.fake_state] ) + [ mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True), mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=True), ] self._start(patches) rc = fa._cmd_login([]) self.assertEqual(rc, 0) self.assertEqual(fa.run_login.call_count, 2) _, kwargs = fa.run_login.call_args_list[1] self.assertTrue(kwargs.get("force")) # The retry must drop ``prompt=login`` so Gitea reuses the # browser session cookie established by the failed first call. self.assertIs(kwargs.get("force_login_prompt"), False) def test_does_not_retry_when_prompt_disabled(self) -> None: prompt_mock = mock.Mock(return_value=True) patches = self._common_patches( run_login_side_effect=[self.scope_exc] ) + [ mock.patch.object(fa, "_can_prompt_for_revoke", return_value=False), mock.patch.object(fa, "_prompt_revoke_and_wait", new=prompt_mock), ] self._start(patches) rc = fa._cmd_login([]) self.assertEqual(rc, 1) self.assertEqual(fa.run_login.call_count, 1) prompt_mock.assert_not_called() def test_does_not_retry_when_user_aborts(self) -> None: patches = self._common_patches( run_login_side_effect=[self.scope_exc] ) + [ mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True), mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=False), ] self._start(patches) rc = fa._cmd_login([]) self.assertEqual(rc, 1) self.assertEqual(fa.run_login.call_count, 1) def test_unrelated_autherror_propagates(self) -> None: patches = self._common_patches( run_login_side_effect=[fa.AuthError("unrelated")] ) + [ mock.patch.object(fa, "_can_prompt_for_revoke", return_value=True), mock.patch.object(fa, "_prompt_revoke_and_wait", return_value=True), ] self._start(patches) with self.assertRaises(fa.AuthError): fa._cmd_login([]) if __name__ == "__main__": unittest.main()