"""End-to-end integration test for forge_auth.py against a mock OIDC server. Covers the full PKCE login flow (authorize → callback → token exchange → userinfo → persist), transparent refresh, logout, and the idempotent "already authenticated" short-circuit. No real network calls. No browser required: the test simulates the browser by doing an HTTP GET to the authorize endpoint; the mock server 302-redirects to the loopback callback, which `forge_auth.run_login` is already listening on. Run with: python3 -m unittest tests.test_forge_auth_integration """ from __future__ import annotations import json import os import socket import sys import tempfile import threading import time import unittest import urllib.request from http.client import HTTPResponse from pathlib import Path from unittest import mock HERE = Path(__file__).resolve().parent ROOT = HERE.parent sys.path.insert(0, str(ROOT / "scripts")) sys.path.insert(0, str(HERE)) import forge_auth as fa # noqa: E402 import mock_oidc_server # noqa: E402 def _free_loopback_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] class _MockBrowser: """Drive the authorize endpoint on a worker thread. Delay allows `run_login` to bind the loopback callback server. The GET then follows the mock server redirect to the callback. """ def __init__(self, authorize_url: str, delay_seconds: float = 0.2) -> None: self.authorize_url = authorize_url self.delay_seconds = delay_seconds self.exc: BaseException | None = None self._thread: threading.Thread | None = None def start(self) -> None: self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def join(self, timeout: float = 10.0) -> None: if self._thread is not None: self._thread.join(timeout) def _run(self) -> None: try: time.sleep(self.delay_seconds) with urllib.request.urlopen(self.authorize_url, timeout=5) as resp: resp.read() except BaseException as exc: # noqa: BLE001 self.exc = exc class ForgeAuthIntegrationTests(unittest.TestCase): def setUp(self) -> None: # Tempdir for auth store. self._tmp_ctx = tempfile.TemporaryDirectory() self.tmp = Path(self._tmp_ctx.name) # Mock Gitea on an ephemeral port. self.server, self.server_state, self.base_url = mock_oidc_server.make_server( username="integration-user" ) mock_oidc_server.serve_forever(self.server) # Loopback callback port (separate from the mock server port). self.loopback_port = _free_loopback_port() self.redirect_uri = f"http://127.0.0.1:{self.loopback_port}/callback" self.env = { "FORGE_GITEA_URL": self.base_url, "FSDGG_CLI_CLIENT_ID": "integration-client", "FSDGG_CLI_REDIRECT_URI": self.redirect_uri, "FSDGG_AUTH_STORE_PATH": str(self.tmp / "client-auth.json"), "HOME": str(self.tmp), } def tearDown(self) -> None: self.server.shutdown() self.server.server_close() self._tmp_ctx.cleanup() # ----------------------------------------------------------------- # Helpers # ----------------------------------------------------------------- def _login(self) -> fa.AuthFile: """Run `run_login()` with a patched browser opener.""" with mock.patch.dict(os.environ, self.env, clear=True): config = fa.ForgeAuthConfig.from_env() # Start the mock browser from the patched opener so the GET # occurs after URL construction and before callback wait. browser_holder: dict[str, _MockBrowser] = {} def fake_webbrowser_open(url: str, new: int = 0) -> bool: browser = _MockBrowser(url) browser.start() browser_holder["b"] = browser return True with mock.patch("forge_auth.webbrowser.open", side_effect=fake_webbrowser_open): state = fa.run_login(config, open_browser=True, force=False, print_authorize_url=False) if "b" in browser_holder: browser_holder["b"].join(timeout=5) if browser_holder["b"].exc is not None: raise browser_holder["b"].exc return state # ----------------------------------------------------------------- # Tests # ----------------------------------------------------------------- def test_login_persists_full_auth_file(self) -> None: state = self._login() self.assertEqual(state.username, "integration-user") self.assertTrue(state.gitea_access_token.startswith("access-")) self.assertTrue(state.refresh_token.startswith("refresh-")) self.assertTrue(state.has_live_gitea_token()) # The file must be mode 0600. store = Path(self.env["FSDGG_AUTH_STORE_PATH"]) self.assertTrue(store.is_file()) self.assertEqual(store.stat().st_mode & 0o777, 0o600) # All gateway-required fields are populated (with defaults). payload = json.loads(store.read_text(encoding="utf-8")) for key in ( "username", "access_token", "expires_in", "issued_at", "public_base_url", "index_name", ): self.assertIn(key, payload) # Welcome-managed fields are there too. self.assertEqual(payload["_forge_client_id"], "integration-client") self.assertEqual(payload["_forge_gitea_base_url"], self.base_url) def test_login_is_idempotent_when_token_live(self) -> None: first = self._login() first_access = first.gitea_access_token first_counter = self.server_state.access_token_counter # Second call with force=False must NOT talk to the mock server. with mock.patch.dict(os.environ, self.env, clear=True): config = fa.ForgeAuthConfig.from_env() with mock.patch("forge_auth.webbrowser.open") as wb: second = fa.run_login(config, open_browser=True, force=False, print_authorize_url=False) wb.assert_not_called() self.assertEqual(second.gitea_access_token, first_access) self.assertEqual(self.server_state.access_token_counter, first_counter) def test_refresh_rotates_tokens(self) -> None: state = self._login() original_access = state.gitea_access_token original_refresh = state.refresh_token # Force the refresh path. with mock.patch.dict(os.environ, self.env, clear=True): config = fa.ForgeAuthConfig.from_env() refreshed = fa.run_refresh(config, must_refresh=True) self.assertNotEqual(refreshed.gitea_access_token, original_access) self.assertNotEqual(refreshed.refresh_token, original_refresh) self.assertTrue(refreshed.has_live_gitea_token()) # Old refresh token is now revoked on the server; using it must fail. with mock.patch.dict(os.environ, self.env, clear=True): config = fa.ForgeAuthConfig.from_env() endpoints = fa.discover_endpoints(config) with self.assertRaises(fa.AuthError): fa.refresh_access_token( config, endpoints, refresh_token=original_refresh ) def test_logout_after_login_removes_fields(self) -> None: self._login() store = Path(self.env["FSDGG_AUTH_STORE_PATH"]) self.assertTrue(store.is_file()) with mock.patch.dict(os.environ, self.env, clear=True): fa.run_logout() # File should be gone (gateway never wrote its bearer in this test). self.assertFalse(store.is_file()) def test_logout_preserves_gateway_bearer(self) -> None: self._login() store = Path(self.env["FSDGG_AUTH_STORE_PATH"]) # Simulate the gateway subsequently writing its own bearer. payload = json.loads(store.read_text(encoding="utf-8")) payload.update({ "access_token": "GATEWAY-BEARER", "expires_in": 7200, "public_base_url": "https://gateway.example", "index_name": "forge", }) store.write_text(json.dumps(payload), encoding="utf-8") with mock.patch.dict(os.environ, self.env, clear=True): fa.run_logout() self.assertTrue(store.is_file()) remaining = json.loads(store.read_text(encoding="utf-8")) self.assertEqual(remaining["access_token"], "GATEWAY-BEARER") self.assertEqual(remaining["public_base_url"], "https://gateway.example") self.assertNotIn("gitea_access_token", remaining) self.assertNotIn("_forge_refresh_token", remaining) def test_callback_state_csrf_mismatch_raises(self) -> None: """Reject a tampered callback state via `verify_state()`.""" key = b"\x01" * 32 signed = fa.sign_state(key, "nonce") with self.assertRaises(fa.AuthError): fa.verify_state(b"\x02" * 32, signed) if __name__ == "__main__": unittest.main()