Files
welcome-to-codevalet-as-a-p…/tests/test_forge_auth_integration.py
FanaticPythoner (Nathan Trudeau) 0c159e91fb Initial Commit
2026-04-27 15:56:43 -04:00

241 lines
9.1 KiB
Python
Executable File

"""End-to-end integration test for forge_auth.py against a mock OIDC server.
Covers the full PKCE login flow (authorize → callback → token
exchange → userinfo → persist), transparent refresh, logout, and
the idempotent "already authenticated" short-circuit.
No real network calls. No browser required: the test simulates the
browser by doing an HTTP GET to the authorize endpoint; the mock
server 302-redirects to the loopback callback, which
`forge_auth.run_login` is already listening on.
Run with: python3 -m unittest tests.test_forge_auth_integration
"""
from __future__ import annotations
import json
import os
import socket
import sys
import tempfile
import threading
import time
import unittest
import urllib.request
from http.client import HTTPResponse
from pathlib import Path
from unittest import mock
HERE = Path(__file__).resolve().parent
ROOT = HERE.parent
sys.path.insert(0, str(ROOT / "scripts"))
sys.path.insert(0, str(HERE))
import forge_auth as fa # noqa: E402
import mock_oidc_server # noqa: E402
def _free_loopback_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
class _MockBrowser:
"""Drive the authorize endpoint on a worker thread.
Delay allows `run_login` to bind the loopback callback server.
The GET then follows the mock server redirect to the callback.
"""
def __init__(self, authorize_url: str, delay_seconds: float = 0.2) -> None:
self.authorize_url = authorize_url
self.delay_seconds = delay_seconds
self.exc: BaseException | None = None
self._thread: threading.Thread | None = None
def start(self) -> None:
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def join(self, timeout: float = 10.0) -> None:
if self._thread is not None:
self._thread.join(timeout)
def _run(self) -> None:
try:
time.sleep(self.delay_seconds)
with urllib.request.urlopen(self.authorize_url, timeout=5) as resp:
resp.read()
except BaseException as exc: # noqa: BLE001
self.exc = exc
class ForgeAuthIntegrationTests(unittest.TestCase):
def setUp(self) -> None:
# Tempdir for auth store.
self._tmp_ctx = tempfile.TemporaryDirectory()
self.tmp = Path(self._tmp_ctx.name)
# Mock Gitea on an ephemeral port.
self.server, self.server_state, self.base_url = mock_oidc_server.make_server(
username="integration-user"
)
mock_oidc_server.serve_forever(self.server)
# Loopback callback port (separate from the mock server port).
self.loopback_port = _free_loopback_port()
self.redirect_uri = f"http://127.0.0.1:{self.loopback_port}/callback"
self.env = {
"FORGE_GITEA_URL": self.base_url,
"FSDGG_CLI_CLIENT_ID": "integration-client",
"FSDGG_CLI_REDIRECT_URI": self.redirect_uri,
"FSDGG_AUTH_STORE_PATH": str(self.tmp / "client-auth.json"),
"HOME": str(self.tmp),
}
def tearDown(self) -> None:
self.server.shutdown()
self.server.server_close()
self._tmp_ctx.cleanup()
# -----------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------
def _login(self) -> fa.AuthFile:
"""Run `run_login()` with a patched browser opener."""
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
# Start the mock browser from the patched opener so the GET
# occurs after URL construction and before callback wait.
browser_holder: dict[str, _MockBrowser] = {}
def fake_webbrowser_open(url: str, new: int = 0) -> bool:
browser = _MockBrowser(url)
browser.start()
browser_holder["b"] = browser
return True
with mock.patch("forge_auth.webbrowser.open", side_effect=fake_webbrowser_open):
state = fa.run_login(config, open_browser=True, force=False,
print_authorize_url=False)
if "b" in browser_holder:
browser_holder["b"].join(timeout=5)
if browser_holder["b"].exc is not None:
raise browser_holder["b"].exc
return state
# -----------------------------------------------------------------
# Tests
# -----------------------------------------------------------------
def test_login_persists_full_auth_file(self) -> None:
state = self._login()
self.assertEqual(state.username, "integration-user")
self.assertTrue(state.gitea_access_token.startswith("access-"))
self.assertTrue(state.refresh_token.startswith("refresh-"))
self.assertTrue(state.has_live_gitea_token())
# The file must be mode 0600.
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
self.assertTrue(store.is_file())
self.assertEqual(store.stat().st_mode & 0o777, 0o600)
# All gateway-required fields are populated (with defaults).
payload = json.loads(store.read_text(encoding="utf-8"))
for key in (
"username", "access_token", "expires_in",
"issued_at", "public_base_url", "index_name",
):
self.assertIn(key, payload)
# Welcome-managed fields are there too.
self.assertEqual(payload["_forge_client_id"], "integration-client")
self.assertEqual(payload["_forge_gitea_base_url"], self.base_url)
def test_login_is_idempotent_when_token_live(self) -> None:
first = self._login()
first_access = first.gitea_access_token
first_counter = self.server_state.access_token_counter
# Second call with force=False must NOT talk to the mock server.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
with mock.patch("forge_auth.webbrowser.open") as wb:
second = fa.run_login(config, open_browser=True, force=False,
print_authorize_url=False)
wb.assert_not_called()
self.assertEqual(second.gitea_access_token, first_access)
self.assertEqual(self.server_state.access_token_counter, first_counter)
def test_refresh_rotates_tokens(self) -> None:
state = self._login()
original_access = state.gitea_access_token
original_refresh = state.refresh_token
# Force the refresh path.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
refreshed = fa.run_refresh(config, must_refresh=True)
self.assertNotEqual(refreshed.gitea_access_token, original_access)
self.assertNotEqual(refreshed.refresh_token, original_refresh)
self.assertTrue(refreshed.has_live_gitea_token())
# Old refresh token is now revoked on the server; using it must fail.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
endpoints = fa.discover_endpoints(config)
with self.assertRaises(fa.AuthError):
fa.refresh_access_token(
config, endpoints, refresh_token=original_refresh
)
def test_logout_after_login_removes_fields(self) -> None:
self._login()
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
self.assertTrue(store.is_file())
with mock.patch.dict(os.environ, self.env, clear=True):
fa.run_logout()
# File should be gone (gateway never wrote its bearer in this test).
self.assertFalse(store.is_file())
def test_logout_preserves_gateway_bearer(self) -> None:
self._login()
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
# Simulate the gateway subsequently writing its own bearer.
payload = json.loads(store.read_text(encoding="utf-8"))
payload.update({
"access_token": "GATEWAY-BEARER",
"expires_in": 7200,
"public_base_url": "https://gateway.example",
"index_name": "forge",
})
store.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(os.environ, self.env, clear=True):
fa.run_logout()
self.assertTrue(store.is_file())
remaining = json.loads(store.read_text(encoding="utf-8"))
self.assertEqual(remaining["access_token"], "GATEWAY-BEARER")
self.assertEqual(remaining["public_base_url"], "https://gateway.example")
self.assertNotIn("gitea_access_token", remaining)
self.assertNotIn("_forge_refresh_token", remaining)
def test_callback_state_csrf_mismatch_raises(self) -> None:
"""Reject a tampered callback state via `verify_state()`."""
key = b"\x01" * 32
signed = fa.sign_state(key, "nonce")
with self.assertRaises(fa.AuthError):
fa.verify_state(b"\x02" * 32, signed)
if __name__ == "__main__":
unittest.main()