730 lines
28 KiB
Python
Executable File
730 lines
28 KiB
Python
Executable File
"""Unit tests for scripts/forge_auth.py.
|
|
|
|
Pure-logic tests (no network, no browser). The full PKCE flow is
|
|
covered by tests/test_forge_auth_integration.sh, which stands up a
|
|
local mock Gitea OIDC server and drives the CLI end-to-end.
|
|
|
|
Run with: python3 -m unittest tests.test_forge_auth
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
from urllib.parse import urlparse
|
|
|
|
HERE = Path(__file__).resolve().parent
|
|
ROOT = HERE.parent
|
|
sys.path.insert(0, str(ROOT / "scripts"))
|
|
|
|
import forge_auth as fa # noqa: E402
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# PKCE primitives
|
|
# --------------------------------------------------------------------
|
|
class PkcePairTests(unittest.TestCase):
|
|
|
|
def test_verifier_length_in_range(self) -> None:
|
|
v, _ = fa.pkce_pair()
|
|
# RFC 7636: 43 <= len(verifier) <= 128
|
|
self.assertGreaterEqual(len(v), 43)
|
|
self.assertLessEqual(len(v), 128)
|
|
|
|
def test_challenge_is_correct_s256_encoding(self) -> None:
|
|
v, c = fa.pkce_pair()
|
|
expected = (
|
|
base64.urlsafe_b64encode(hashlib.sha256(v.encode("ascii")).digest())
|
|
.rstrip(b"=")
|
|
.decode("ascii")
|
|
)
|
|
self.assertEqual(c, expected)
|
|
|
|
def test_pairs_are_unique(self) -> None:
|
|
pairs = {fa.pkce_pair() for _ in range(10)}
|
|
self.assertEqual(len(pairs), 10)
|
|
|
|
|
|
class StateSigningTests(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.key = b"\x01" * 32
|
|
|
|
def test_roundtrip(self) -> None:
|
|
nonce = "abc.123" # dot in nonce must still round-trip (rpartition)
|
|
signed = fa.sign_state(self.key, nonce)
|
|
self.assertEqual(fa.verify_state(self.key, signed), nonce)
|
|
|
|
def test_tampered_mac_rejected(self) -> None:
|
|
signed = fa.sign_state(self.key, "n1")
|
|
tampered = signed[:-1] + ("0" if signed[-1] != "0" else "1")
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.verify_state(self.key, tampered)
|
|
|
|
def test_wrong_key_rejected(self) -> None:
|
|
signed = fa.sign_state(self.key, "n1")
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.verify_state(b"\x02" * 32, signed)
|
|
|
|
def test_missing_separator_raises(self) -> None:
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.verify_state(self.key, "nosignaturehere")
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# ForgeAuthConfig.from_env
|
|
# --------------------------------------------------------------------
|
|
class ConfigFromEnvTests(unittest.TestCase):
|
|
|
|
def _env(self, **overrides: str) -> dict[str, str]:
|
|
env = {
|
|
"FORGE_GITEA_URL": "https://gitea.example.com",
|
|
"FSDGG_CLI_CLIENT_ID": "my-client-id",
|
|
"FSDGG_CLI_REDIRECT_URI": "http://127.0.0.1:38111/callback",
|
|
}
|
|
env.update(overrides)
|
|
return env
|
|
|
|
def test_happy_path(self) -> None:
|
|
with mock.patch.dict(os.environ, self._env(), clear=True):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
|
|
self.assertEqual(cfg.client_id, "my-client-id")
|
|
self.assertEqual(cfg.redirect_uri, "http://127.0.0.1:38111/callback")
|
|
self.assertFalse(cfg.insecure_tls)
|
|
|
|
def test_trailing_slash_stripped(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FORGE_GITEA_URL="https://gitea.example.com/"),
|
|
clear=True,
|
|
):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
|
|
|
|
def test_localhost_redirect_allowed(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FSDGG_CLI_REDIRECT_URI="http://localhost:45000/cb"),
|
|
clear=True,
|
|
):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.redirect_uri, "http://localhost:45000/cb")
|
|
|
|
def test_ipv6_loopback_redirect_allowed(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FSDGG_CLI_REDIRECT_URI="http://[::1]:45000/cb"),
|
|
clear=True,
|
|
):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.redirect_uri, "http://[::1]:45000/cb")
|
|
|
|
def test_non_loopback_rejected(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FSDGG_CLI_REDIRECT_URI="https://example.com/cb"),
|
|
clear=True,
|
|
):
|
|
with self.assertRaises(fa.AuthError) as ctx:
|
|
fa.ForgeAuthConfig.from_env()
|
|
self.assertIn("RFC 8252", str(ctx.exception))
|
|
|
|
def test_https_loopback_rejected(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FSDGG_CLI_REDIRECT_URI="https://127.0.0.1:38111/cb"),
|
|
clear=True,
|
|
):
|
|
with self.assertRaises(fa.AuthError) as ctx:
|
|
fa.ForgeAuthConfig.from_env()
|
|
self.assertIn("RFC 8252", str(ctx.exception))
|
|
|
|
def test_missing_port_rejected(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FSDGG_CLI_REDIRECT_URI="http://127.0.0.1/cb"),
|
|
clear=True,
|
|
):
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.ForgeAuthConfig.from_env()
|
|
|
|
def test_missing_required_env_var_lists_all_missing(self) -> None:
|
|
env = self._env()
|
|
env["FORGE_GITEA_URL"] = ""
|
|
env["FSDGG_CLI_CLIENT_ID"] = ""
|
|
with mock.patch.dict(os.environ, env, clear=True):
|
|
with self.assertRaises(fa.AuthError) as ctx:
|
|
fa.ForgeAuthConfig.from_env()
|
|
msg = str(ctx.exception)
|
|
self.assertIn("FORGE_GITEA_URL", msg)
|
|
self.assertIn("FSDGG_CLI_CLIENT_ID", msg)
|
|
|
|
def test_expected_username_read_from_env(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
self._env(FORGE_GITEA_USERNAME="CVJMAllaire"),
|
|
clear=True,
|
|
):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.expected_username, "CVJMAllaire")
|
|
|
|
def test_expected_username_defaults_to_empty(self) -> None:
|
|
with mock.patch.dict(os.environ, self._env(), clear=True):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertEqual(cfg.expected_username, "")
|
|
|
|
def test_insecure_tls_flag(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ, self._env(FORGE_INSECURE_TLS="1"), clear=True
|
|
):
|
|
cfg = fa.ForgeAuthConfig.from_env()
|
|
self.assertTrue(cfg.insecure_tls)
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# build_authorize_url
|
|
# --------------------------------------------------------------------
|
|
class BuildAuthorizeUrlTests(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.cfg = fa.ForgeAuthConfig(
|
|
gitea_base_url="https://g.example",
|
|
client_id="client-1",
|
|
redirect_uri="http://127.0.0.1:38111/callback",
|
|
)
|
|
self.endpoints = {
|
|
"authorization_endpoint": "https://g.example/login/oauth/authorize",
|
|
"token_endpoint": "https://g.example/login/oauth/access_token",
|
|
"userinfo_endpoint": "https://g.example/login/oauth/userinfo",
|
|
}
|
|
|
|
def test_url_contains_all_required_parameters(self) -> None:
|
|
url = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="c123", state="s123"
|
|
)
|
|
self.assertTrue(url.startswith("https://g.example/login/oauth/authorize?"))
|
|
# All required PKCE + OAuth2 params
|
|
for token in (
|
|
"response_type=code",
|
|
"client_id=client-1",
|
|
"code_challenge=c123",
|
|
"code_challenge_method=S256",
|
|
"state=s123",
|
|
"redirect_uri=http%3A%2F%2F127.0.0.1%3A38111%2Fcallback",
|
|
):
|
|
self.assertIn(token, url)
|
|
|
|
def test_url_always_includes_prompt_login(self) -> None:
|
|
url = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="c", state="s"
|
|
)
|
|
# prompt=login forces Gitea to display the login page even when a
|
|
# session is already active: prevents silent auth under the
|
|
# wrong identity.
|
|
self.assertIn("prompt=login", url)
|
|
|
|
def test_url_adds_login_hint_when_expected_username_set(self) -> None:
|
|
cfg = fa.ForgeAuthConfig(
|
|
gitea_base_url="https://g.example",
|
|
client_id="client-1",
|
|
redirect_uri="http://127.0.0.1:38111/callback",
|
|
expected_username="CVJMAllaire",
|
|
)
|
|
url = fa.build_authorize_url(
|
|
cfg, self.endpoints, challenge="c", state="s"
|
|
)
|
|
self.assertIn("login_hint=CVJMAllaire", url)
|
|
|
|
def test_url_omits_login_hint_when_expected_username_unset(self) -> None:
|
|
url = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="c", state="s"
|
|
)
|
|
self.assertNotIn("login_hint=", url)
|
|
|
|
|
|
class BuildGiteaLogoutUrlTests(unittest.TestCase):
|
|
|
|
def test_composes_logout_url_with_redirect_to_login(self) -> None:
|
|
url = fa.build_gitea_logout_url("https://gitea.example.com")
|
|
self.assertEqual(
|
|
url,
|
|
"https://gitea.example.com/user/logout?redirect_to=%2Fuser%2Flogin",
|
|
)
|
|
|
|
def test_trailing_slash_is_stripped(self) -> None:
|
|
url = fa.build_gitea_logout_url("https://gitea.example.com/")
|
|
self.assertTrue(url.startswith("https://gitea.example.com/user/logout"))
|
|
self.assertNotIn("//user/logout", url)
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# AuthFile (read / merge / write / has_live_gitea_token)
|
|
# --------------------------------------------------------------------
|
|
class AuthFileTests(unittest.TestCase):
|
|
|
|
def test_read_missing_file_returns_empty(self) -> None:
|
|
f = fa.AuthFile.read(Path("/nonexistent/path/client-auth.json"))
|
|
self.assertEqual(f.raw, {})
|
|
self.assertFalse(f.has_live_gitea_token())
|
|
|
|
def test_read_malformed_json_raises(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "bad.json"
|
|
p.write_text("{ not json", encoding="utf-8")
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.AuthFile.read(p)
|
|
|
|
def test_read_non_object_raises(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "arr.json"
|
|
p.write_text("[1,2,3]", encoding="utf-8")
|
|
with self.assertRaises(fa.AuthError):
|
|
fa.AuthFile.read(p)
|
|
|
|
def test_has_live_gitea_token_requires_token(self) -> None:
|
|
f = fa.AuthFile(raw={"gitea_access_token": ""})
|
|
self.assertFalse(f.has_live_gitea_token())
|
|
|
|
def test_has_live_gitea_token_expired(self) -> None:
|
|
f = fa.AuthFile(raw={
|
|
"gitea_access_token": "t",
|
|
"gitea_token_expires_at": time.time() - 10,
|
|
})
|
|
self.assertFalse(f.has_live_gitea_token())
|
|
|
|
def test_has_live_gitea_token_live(self) -> None:
|
|
f = fa.AuthFile(raw={
|
|
"gitea_access_token": "t",
|
|
"gitea_token_expires_at": time.time() + 3600,
|
|
})
|
|
self.assertTrue(f.has_live_gitea_token())
|
|
|
|
def test_has_live_gitea_token_unknown_expiry_is_trusted(self) -> None:
|
|
f = fa.AuthFile(raw={"gitea_access_token": "t"})
|
|
self.assertTrue(f.has_live_gitea_token())
|
|
|
|
def test_merge_login_fills_required_gateway_fields(self) -> None:
|
|
f = fa.AuthFile()
|
|
f.merge_login(
|
|
username="alice",
|
|
gitea_access_token="AAA",
|
|
gitea_token_expires_at=time.time() + 3600,
|
|
refresh_token="RRR",
|
|
client_id="client-1",
|
|
gitea_base_url="https://g.example",
|
|
)
|
|
for required in (
|
|
"username", "access_token", "expires_in",
|
|
"issued_at", "public_base_url", "index_name",
|
|
):
|
|
self.assertIn(required, f.raw)
|
|
self.assertEqual(f.raw["username"], "alice")
|
|
self.assertEqual(f.raw["gitea_access_token"], "AAA")
|
|
self.assertEqual(f.raw["_forge_refresh_token"], "RRR")
|
|
self.assertEqual(f.raw["_forge_client_id"], "client-1")
|
|
self.assertEqual(f.raw["_forge_gitea_base_url"], "https://g.example")
|
|
|
|
def test_merge_login_preserves_gateway_bearer(self) -> None:
|
|
# Simulates the case where the orchestrator already ran
|
|
# `auth login` and populated the gateway bearer. The code must not
|
|
# overwrite those fields.
|
|
f = fa.AuthFile(raw={
|
|
"username": "old-alice",
|
|
"access_token": "GATEWAY-BEARER",
|
|
"expires_in": 7200,
|
|
"issued_at": 1000000.0,
|
|
"public_base_url": "https://gateway.example",
|
|
"index_name": "forge",
|
|
})
|
|
f.merge_login(
|
|
username="alice",
|
|
gitea_access_token="NEW-GITEA",
|
|
gitea_token_expires_at=time.time() + 3600,
|
|
refresh_token="NEW-RT",
|
|
client_id="client-1",
|
|
gitea_base_url="https://g.example",
|
|
)
|
|
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
|
|
self.assertEqual(f.raw["public_base_url"], "https://gateway.example")
|
|
self.assertEqual(f.raw["index_name"], "forge")
|
|
# And the Gitea fields got refreshed
|
|
self.assertEqual(f.raw["gitea_access_token"], "NEW-GITEA")
|
|
self.assertEqual(f.raw["username"], "alice")
|
|
|
|
def test_merge_refresh_only_touches_gitea_fields(self) -> None:
|
|
f = fa.AuthFile(raw={
|
|
"username": "alice",
|
|
"access_token": "GATEWAY-BEARER",
|
|
"public_base_url": "https://gateway.example",
|
|
})
|
|
f.merge_refresh(
|
|
gitea_access_token="FRESH",
|
|
gitea_token_expires_at=time.time() + 3600,
|
|
refresh_token="ROTATED",
|
|
)
|
|
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
|
|
self.assertEqual(f.raw["gitea_access_token"], "FRESH")
|
|
self.assertEqual(f.raw["_forge_refresh_token"], "ROTATED")
|
|
|
|
def test_merge_refresh_empty_refresh_token_keeps_existing(self) -> None:
|
|
f = fa.AuthFile(raw={"_forge_refresh_token": "KEEP"})
|
|
f.merge_refresh(
|
|
gitea_access_token="NEW", gitea_token_expires_at=None, refresh_token=""
|
|
)
|
|
self.assertEqual(f.raw["_forge_refresh_token"], "KEEP")
|
|
|
|
def test_write_is_atomic_and_0600(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "sub" / "client-auth.json"
|
|
f = fa.AuthFile(raw={"username": "u", "gitea_access_token": "T"})
|
|
f.write(p)
|
|
self.assertTrue(p.is_file())
|
|
mode = p.stat().st_mode & 0o777
|
|
self.assertEqual(mode, 0o600)
|
|
# The tmp file must not still exist
|
|
self.assertFalse((p.parent / "client-auth.json.tmp").exists())
|
|
roundtrip = json.loads(p.read_text(encoding="utf-8"))
|
|
self.assertEqual(roundtrip["username"], "u")
|
|
|
|
def test_write_preserves_unknown_keys(self) -> None:
|
|
# Forward-compat: the gateway might add new fields unknown to the
|
|
# current schema. Writing must preserve them verbatim.
|
|
raw = {"username": "u", "future_field": {"x": 1}, "access_token": "A"}
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "a.json"
|
|
fa.AuthFile(raw=dict(raw)).write(p)
|
|
roundtrip = json.loads(p.read_text(encoding="utf-8"))
|
|
self.assertEqual(roundtrip, raw)
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# auth_store_path
|
|
# --------------------------------------------------------------------
|
|
class AuthStorePathTests(unittest.TestCase):
|
|
|
|
def test_explicit_env_var_wins(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
{"FSDGG_AUTH_STORE_PATH": "/tmp/somewhere/a.json", "FSDGG_RUNTIME_DIR": "/other"},
|
|
clear=True,
|
|
):
|
|
self.assertEqual(fa.auth_store_path(), Path("/tmp/somewhere/a.json"))
|
|
|
|
def test_runtime_dir_fallback(self) -> None:
|
|
with mock.patch.dict(
|
|
os.environ, {"FSDGG_RUNTIME_DIR": "/tmp/rt"}, clear=True
|
|
):
|
|
self.assertEqual(fa.auth_store_path(), Path("/tmp/rt/client-auth.json"))
|
|
|
|
def test_default_path(self) -> None:
|
|
with mock.patch.dict(os.environ, {}, clear=True):
|
|
self.assertEqual(fa.auth_store_path(), fa.DEFAULT_AUTH_FILE)
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# run_logout
|
|
# --------------------------------------------------------------------
|
|
class RunLogoutTests(unittest.TestCase):
|
|
"""Note: all assertions happen *inside* the TemporaryDirectory
|
|
context. Earlier iterations leaked the tempdir past the
|
|
assertions, producing false positives on ``assertFalse(is_file)``
|
|
because cleanup had already deleted the file.
|
|
"""
|
|
|
|
def test_logout_no_file(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "client-auth.json"
|
|
with mock.patch.dict(
|
|
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
|
|
):
|
|
result = fa.run_logout()
|
|
self.assertIsNone(result)
|
|
self.assertFalse(p.is_file())
|
|
|
|
def test_logout_only_welcome_fields_removes_file(self) -> None:
|
|
payload = {
|
|
"username": "u", "access_token": "", "expires_in": 0,
|
|
"issued_at": 0.0, "public_base_url": "", "index_name": "",
|
|
"gitea_access_token": "A", "_forge_refresh_token": "R",
|
|
}
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "client-auth.json"
|
|
p.write_text(json.dumps(payload), encoding="utf-8")
|
|
with mock.patch.dict(
|
|
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
|
|
):
|
|
result = fa.run_logout()
|
|
self.assertEqual(result, p)
|
|
self.assertFalse(p.is_file())
|
|
|
|
def test_logout_preserves_gateway_bearer(self) -> None:
|
|
payload = {
|
|
"username": "u",
|
|
"access_token": "GATEWAY-BEARER",
|
|
"expires_in": 3600,
|
|
"issued_at": 1000000.0,
|
|
"public_base_url": "https://gateway.example",
|
|
"index_name": "forge",
|
|
"gitea_access_token": "GITEA-A",
|
|
"gitea_token_expires_at": 2000000.0,
|
|
"_forge_refresh_token": "R",
|
|
"_forge_client_id": "c",
|
|
}
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "client-auth.json"
|
|
p.write_text(json.dumps(payload), encoding="utf-8")
|
|
with mock.patch.dict(
|
|
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
|
|
):
|
|
fa.run_logout()
|
|
self.assertTrue(p.is_file())
|
|
remaining = json.loads(p.read_text(encoding="utf-8"))
|
|
self.assertEqual(remaining["access_token"], "GATEWAY-BEARER")
|
|
self.assertEqual(remaining["public_base_url"], "https://gateway.example")
|
|
self.assertNotIn("gitea_access_token", remaining)
|
|
self.assertNotIn("_forge_refresh_token", remaining)
|
|
|
|
|
|
# --------------------------------------------------------------------
|
|
# main() dispatcher
|
|
# --------------------------------------------------------------------
|
|
class MainDispatcherTests(unittest.TestCase):
|
|
|
|
def test_unknown_command_rc_2(self) -> None:
|
|
self.assertEqual(fa.main(["forge_auth.py", "nope"]), 2)
|
|
|
|
def test_no_args_rc_2(self) -> None:
|
|
self.assertEqual(fa.main(["forge_auth.py"]), 2)
|
|
|
|
def test_status_on_empty_store_rc_1(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
{"FSDGG_AUTH_STORE_PATH": str(Path(d) / "a.json")},
|
|
clear=True,
|
|
):
|
|
rc = fa.main(["forge_auth.py", "status"])
|
|
self.assertEqual(rc, 1)
|
|
|
|
def test_status_on_live_token_rc_0(self) -> None:
|
|
with tempfile.TemporaryDirectory() as d:
|
|
p = Path(d) / "a.json"
|
|
p.write_text(json.dumps({
|
|
"username": "u",
|
|
"gitea_access_token": "LIVE",
|
|
"gitea_token_expires_at": time.time() + 3600,
|
|
}), encoding="utf-8")
|
|
with mock.patch.dict(
|
|
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
|
|
):
|
|
rc = fa.main(["forge_auth.py", "status"])
|
|
self.assertEqual(rc, 0)
|
|
|
|
|
|
class HeadlessGuidanceTests(unittest.TestCase):
|
|
AUTH_URL = "https://gitea.example/login/oauth/authorize?x=1"
|
|
REDIRECT = urlparse("http://127.0.0.1:54321/callback")
|
|
|
|
def _capture(self, env_extra: dict[str, str] | None = None) -> str:
|
|
env = {"USER": "alice"}
|
|
if env_extra is not None:
|
|
for k in ("SSH_CONNECTION", "SSH_TTY"):
|
|
env.pop(k, None)
|
|
env.update(env_extra)
|
|
buf = io.StringIO()
|
|
with mock.patch.dict(os.environ, env, clear=True), \
|
|
mock.patch("forge_auth.sys.stderr", buf), \
|
|
mock.patch("forge_auth.socket.getfqdn", return_value="box.example"), \
|
|
mock.patch("forge_auth.socket.gethostname", return_value="box.example"):
|
|
fa._print_headless_guidance(self.AUTH_URL, self.REDIRECT)
|
|
return buf.getvalue()
|
|
|
|
def test_prints_authorize_url(self) -> None:
|
|
out = self._capture()
|
|
self.assertIn(self.AUTH_URL, out)
|
|
|
|
def test_prints_rfc_8252_reference(self) -> None:
|
|
self.assertIn("RFC 8252", self._capture())
|
|
|
|
def test_uses_configured_redirect_port(self) -> None:
|
|
out = self._capture()
|
|
self.assertIn("127.0.0.1:54321", out)
|
|
self.assertNotIn("38111", out)
|
|
|
|
def test_ssh_forward_template_uses_configured_port(self) -> None:
|
|
out = self._capture()
|
|
self.assertIn("ssh -L 54321:127.0.0.1:54321 alice@box.example", out)
|
|
|
|
def test_reachability_probe_included(self) -> None:
|
|
out = self._capture()
|
|
self.assertIn("curl -sS -m 2 http://127.0.0.1:54321/", out)
|
|
self.assertIn("Connection refused", out)
|
|
|
|
def test_ssh_branch_when_SSH_CONNECTION_set(self) -> None:
|
|
out = self._capture({"SSH_CONNECTION": "1.2.3.4 22 5.6.7.8 22", "USER": "alice"})
|
|
self.assertIn("running inside an SSH session", out)
|
|
|
|
def test_non_ssh_branch_when_no_ssh_env(self) -> None:
|
|
out = self._capture()
|
|
self.assertIn("if this machine is remote", out)
|
|
self.assertNotIn("running inside an SSH session", out)
|
|
|
|
|
|
class BuildAuthorizeErrorTests(unittest.TestCase):
|
|
"""Contract tests for ``_build_authorize_error``."""
|
|
|
|
BASE = "https://gitea.example.com"
|
|
CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5"
|
|
SCOPES = "openid profile email read:user read:repository write:repository"
|
|
|
|
def _exc_different_scope(self, **overrides: str) -> fa.AuthError:
|
|
kw: dict[str, object] = {
|
|
"error": "server_error",
|
|
"error_description": "a grant exists with different scope",
|
|
"gitea_base_url": self.BASE,
|
|
"client_id": self.CID,
|
|
"scopes": self.SCOPES,
|
|
}
|
|
kw.update(overrides)
|
|
return fa._build_authorize_error(
|
|
str(kw["error"]),
|
|
str(kw["error_description"]) if kw["error_description"] else None,
|
|
str(kw["gitea_base_url"]),
|
|
client_id=str(kw["client_id"]),
|
|
scopes=str(kw["scopes"]),
|
|
)
|
|
|
|
def test_different_scope_returns_autherror(self) -> None:
|
|
self.assertIsInstance(self._exc_different_scope(), fa.AuthError)
|
|
|
|
def test_different_scope_message_is_five_lines_max(self) -> None:
|
|
# RULE #2 §D.3: user-facing error bodies cap at 5 lines.
|
|
s = str(self._exc_different_scope())
|
|
self.assertLessEqual(len(s.splitlines()), 5)
|
|
|
|
def test_different_scope_surfaces_gitea_phrasing(self) -> None:
|
|
self.assertIn("different scope", str(self._exc_different_scope()).lower())
|
|
|
|
def test_different_scope_cites_settings_url(self) -> None:
|
|
self.assertIn(
|
|
f"{self.BASE}/user/settings/applications",
|
|
str(self._exc_different_scope()),
|
|
)
|
|
|
|
def test_different_scope_disambiguates_authorized_section(self) -> None:
|
|
# Gitea's settings page has both "Authorized OAuth2 Applications"
|
|
# (user grants) and "Manage OAuth2 Applications" (app registrations);
|
|
# the message must point at the first.
|
|
self.assertIn(
|
|
'"Authorized OAuth2 Applications"',
|
|
str(self._exc_different_scope()),
|
|
)
|
|
|
|
def test_different_scope_includes_full_client_id(self) -> None:
|
|
self.assertIn(self.CID, str(self._exc_different_scope()))
|
|
|
|
def test_different_scope_includes_requested_scopes(self) -> None:
|
|
self.assertIn(self.SCOPES, str(self._exc_different_scope()))
|
|
|
|
def test_different_scope_links_to_remediation_doc(self) -> None:
|
|
# External rationale lives in the doc per RULE #2 §D/E.
|
|
self.assertIn(
|
|
"docs/oauth-grant-scope-mismatch.md",
|
|
str(self._exc_different_scope()),
|
|
)
|
|
|
|
def test_different_scope_without_base_url_uses_placeholder(self) -> None:
|
|
self.assertIn(
|
|
"<gitea-base-url>/user/settings/applications",
|
|
str(self._exc_different_scope(gitea_base_url="")),
|
|
)
|
|
|
|
def test_different_scope_without_client_id_uses_placeholder(self) -> None:
|
|
self.assertIn(
|
|
"<unknown-client-id>",
|
|
str(self._exc_different_scope(client_id="")),
|
|
)
|
|
|
|
def test_different_scope_without_scopes_uses_placeholder(self) -> None:
|
|
self.assertIn(
|
|
"<unknown-scopes>",
|
|
str(self._exc_different_scope(scopes="")),
|
|
)
|
|
|
|
def test_access_denied_branch(self) -> None:
|
|
exc = fa._build_authorize_error(
|
|
"access_denied",
|
|
"The resource owner or authorization server denied the request.",
|
|
self.BASE,
|
|
)
|
|
self.assertIn("access_denied", str(exc))
|
|
self.assertIn("just login", str(exc))
|
|
|
|
def test_fallback_preserves_raw_error_and_description(self) -> None:
|
|
exc = fa._build_authorize_error(
|
|
"invalid_request", "redirect_uri mismatch", self.BASE,
|
|
)
|
|
s = str(exc)
|
|
self.assertIn("invalid_request", s)
|
|
self.assertIn("redirect_uri mismatch", s)
|
|
|
|
def test_fallback_with_no_description_only_includes_error_code(self) -> None:
|
|
exc = fa._build_authorize_error("temporarily_unavailable", None, self.BASE)
|
|
self.assertIn("temporarily_unavailable", str(exc))
|
|
|
|
|
|
class AuthorizeUrlIsHeadlessInvariantTests(unittest.TestCase):
|
|
"""Pin the invariant: authorize URL is independent of ``--no-browser``."""
|
|
|
|
def setUp(self) -> None:
|
|
self.cfg = fa.ForgeAuthConfig(
|
|
gitea_base_url="https://gitea.example.com",
|
|
client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5",
|
|
redirect_uri="http://127.0.0.1:38111/callback",
|
|
expected_username="alice",
|
|
)
|
|
self.endpoints = {
|
|
"authorization_endpoint": "https://gitea.example.com/login/oauth/authorize",
|
|
"token_endpoint": "https://gitea.example.com/login/oauth/access_token",
|
|
"userinfo_endpoint": "https://gitea.example.com/login/oauth/userinfo",
|
|
}
|
|
|
|
def test_build_authorize_url_signature_has_no_headless_parameter(self) -> None:
|
|
import inspect
|
|
|
|
sig = inspect.signature(fa.build_authorize_url)
|
|
self.assertNotIn("open_browser", sig.parameters)
|
|
self.assertNotIn("headless", sig.parameters)
|
|
self.assertNotIn("no_browser", sig.parameters)
|
|
|
|
def test_url_is_deterministic_for_fixed_challenge_and_state(self) -> None:
|
|
url_a = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
|
|
)
|
|
url_b = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
|
|
)
|
|
self.assertEqual(url_a, url_b)
|
|
|
|
def test_scope_parameter_matches_config_scopes_exactly(self) -> None:
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
url = fa.build_authorize_url(
|
|
self.cfg, self.endpoints, challenge="c", state="s"
|
|
)
|
|
params = parse_qs(urlparse(url).query)
|
|
self.assertEqual(params["scope"], [self.cfg.scopes])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|