#!/usr/bin/env python3 """PKCE OAuth2 client for the Gitea CLI app. Implements RFC 7636 (PKCE) + RFC 6749 §4.1 using only the Python standard library. Persists tokens to the shared auth file consumed by ``forge-stack-devpi-gateway-gitea`` so the orchestrator's downstream tooling reuses the same session without a second login. Auth-file path precedence: ``$FSDGG_AUTH_STORE_PATH`` → ``$FSDGG_RUNTIME_DIR/client-auth.json`` → ``~/.forge-stack-devpi-gateway-gitea/client-auth.json`` Gateway-owned fields (``access_token``, ``expires_in``, ``public_base_url``, ``index_name``) in the auth file are preserved on every write. Writes are atomic (``.tmp`` + ``rename``, ``chmod 0600``). CSRF ``state`` is HMAC-signed with a per-process session key never written to disk. """ from __future__ import annotations import base64 import dataclasses import hashlib import hmac import json import os import secrets import socket import ssl import sys import threading import time import urllib.error import urllib.parse import urllib.request import webbrowser from dataclasses import dataclass, field from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from queue import Queue from typing import Any from urllib.parse import parse_qs, urlencode, urlparse # -------------------------------------------------------------------- # Errors # -------------------------------------------------------------------- class AuthError(RuntimeError): """Any non-transient OAuth/auth-file error. Always user-visible.""" class ScopeMismatchAuthError(AuthError): """Gitea ``server_error`` caused by a grant/scope conflict. Raised when Gitea refuses an authorize request because a grant already exists for ``client_id`` under a different scope set (RFC 6749 §4.1.2.1; ``error_description`` contains "different scope"). Recovery: revoke the grant under ``/user/settings/applications`` and re-run. Subclassing ``AuthError`` keeps existing ``except AuthError`` handlers (e.g., the credential helper) unchanged. Callers that want to drive an interactive revoke-and-retry flow check for this concrete subclass and read the structured attributes. """ def __init__( self, message: str, *, gitea_base_url: str, client_id: str, scopes: str, ) -> None: super().__init__(message) self.gitea_base_url = gitea_base_url self.client_id = client_id self.scopes = scopes @property def revoke_url(self) -> str: base = self.gitea_base_url.rstrip("/") if not base: return "/user/settings/applications" return f"{base}/user/settings/applications" # -------------------------------------------------------------------- # CLI output helpers. Tag scheme matches scripts/common.sh: # cyan=info, green=ok, yellow=warn, red=err. ANSI disabled when stderr # is not a TTY, NO_COLOR is set, or TERM is "dumb". # -------------------------------------------------------------------- def _ansi_on() -> bool: try: tty = sys.stderr.isatty() except (AttributeError, ValueError): tty = False return ( tty and not os.environ.get("NO_COLOR") and os.environ.get("TERM", "dumb") != "dumb" ) def _c(code: str) -> str: return code if _ansi_on() else "" def _tag(color_code: str, label: str) -> str: padded = f"[{label}]".ljust(6) return f"{_c(color_code)}{padded}{_c(chr(0x1B) + '[0m')}" def cli_info(msg: str) -> None: print(f"{_tag(chr(0x1B) + '[36m', 'info')} {msg}", file=sys.stderr) def cli_ok(msg: str) -> None: print(f"{_tag(chr(0x1B) + '[32m', 'ok')} {msg}", file=sys.stderr) def cli_warn(msg: str) -> None: print(f"{_tag(chr(0x1B) + '[33m', 'warn')} {msg}", file=sys.stderr) def cli_err(msg: str) -> None: print(f"{_tag(chr(0x1B) + '[31m', 'err')} {msg}", file=sys.stderr) # -------------------------------------------------------------------- # Paths (match forge-stack-devpi-gateway-gitea exactly) # -------------------------------------------------------------------- AUTH_STORE_ENV_VAR = "FSDGG_AUTH_STORE_PATH" RUNTIME_DIR_ENV_VAR = "FSDGG_RUNTIME_DIR" DEFAULT_AUTH_DIR = Path.home() / ".forge-stack-devpi-gateway-gitea" DEFAULT_AUTH_FILE = DEFAULT_AUTH_DIR / "client-auth.json" def auth_store_path() -> Path: configured = os.environ.get(AUTH_STORE_ENV_VAR, "").strip() if configured: return Path(configured).expanduser().resolve() runtime_dir = os.environ.get(RUNTIME_DIR_ENV_VAR, "").strip() if runtime_dir: return Path(runtime_dir).expanduser().resolve() / "client-auth.json" return DEFAULT_AUTH_FILE # -------------------------------------------------------------------- # PKCE primitives # -------------------------------------------------------------------- def pkce_pair() -> tuple[str, str]: """Return ``(verifier, S256-challenge)`` suitable for Gitea. RFC 7636 §4.1: verifier is 43–128 chars of unreserved URL chars. `secrets.token_urlsafe(64)` yields ~86 chars, well within bounds. RFC 7636 §4.2: challenge = BASE64URL(SHA256(verifier)), no padding. """ verifier = secrets.token_urlsafe(64) digest = hashlib.sha256(verifier.encode("ascii")).digest() challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") return verifier, challenge def sign_state(session_key: bytes, nonce: str) -> str: """Return ``.`` using an in-memory HMAC-SHA256 key.""" mac = hmac.new(session_key, nonce.encode("ascii"), hashlib.sha256).hexdigest() return f"{nonce}.{mac}" def verify_state(session_key: bytes, value: str) -> str: """Return the nonce if the signature verifies; raise otherwise.""" if "." not in value: raise AuthError("OAuth state has no signature separator") nonce, _, got_mac = value.rpartition(".") want_mac = hmac.new( session_key, nonce.encode("ascii"), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(got_mac, want_mac): raise AuthError("OAuth state signature mismatch (possible CSRF)") return nonce # -------------------------------------------------------------------- # Config # -------------------------------------------------------------------- @dataclass(frozen=True) class ForgeAuthConfig: gitea_base_url: str client_id: str redirect_uri: str scopes: str = "openid profile email read:user read:organization read:repository write:repository" insecure_tls: bool = False # only for dev Gitea with self-signed certs expected_username: str = "" # from FORGE_GITEA_USERNAME; empty = no check @classmethod def from_env(cls) -> ForgeAuthConfig: missing: list[str] = [] def must(name: str) -> str: value = os.environ.get(name, "").strip() if not value: missing.append(name) return value base = must("FORGE_GITEA_URL") client_id = must("FSDGG_CLI_CLIENT_ID") redirect = os.environ.get( "FSDGG_CLI_REDIRECT_URI", "http://127.0.0.1:38111/callback" ).strip() if missing: raise AuthError( "missing required env vars: " + ", ".join(missing) + "\n → run 'just init-env' and edit .env" ) parsed = urlparse(redirect) if parsed.scheme != "http" or parsed.hostname not in {"127.0.0.1", "::1", "localhost"}: raise AuthError( "FSDGG_CLI_REDIRECT_URI must be a loopback URI per RFC 8252 §7.3: " "http://127.0.0.1:/, http://[::1]:/, " f"or http://localhost:/. Got: {redirect}. " "This is the CLI's local callback listener, not the Gitea " "server URL (that is FORGE_GITEA_URL)." ) if parsed.port is None: raise AuthError("FSDGG_CLI_REDIRECT_URI must include an explicit port") return cls( gitea_base_url=base.rstrip("/"), client_id=client_id, redirect_uri=redirect, insecure_tls=os.environ.get("FORGE_INSECURE_TLS", "").strip() == "1", expected_username=os.environ.get("FORGE_GITEA_USERNAME", "").strip(), ) def build_gitea_logout_url(gitea_base_url: str) -> str: """Return ``/user/logout?redirect_to=/user/login``.""" return ( f"{gitea_base_url.rstrip('/')}/user/logout" f"?{urlencode({'redirect_to': '/user/login'})}" ) # -------------------------------------------------------------------- # OIDC discovery + token exchange (stdlib HTTP) # -------------------------------------------------------------------- def _tls_context(insecure: bool) -> ssl.SSLContext: ctx = ssl.create_default_context() if insecure: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def _http_get_json(url: str, *, insecure_tls: bool, timeout: float = 15.0) -> dict[str, Any]: req = urllib.request.Request(url, method="GET", headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout, context=_tls_context(insecure_tls)) as resp: body = resp.read().decode("utf-8") try: data = json.loads(body) except json.JSONDecodeError as exc: raise AuthError(f"GET {url} returned non-JSON body: {exc}") from exc if not isinstance(data, dict): raise AuthError(f"GET {url} returned non-object JSON: {type(data).__name__}") return data def _http_post_form( url: str, form: dict[str, str], *, insecure_tls: bool, timeout: float = 15.0, auth_bearer: str | None = None, ) -> dict[str, Any]: data = urlencode(form).encode("ascii") headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", } if auth_bearer: headers["Authorization"] = f"Bearer {auth_bearer}" req = urllib.request.Request(url, data=data, method="POST", headers=headers) try: with urllib.request.urlopen(req, timeout=timeout, context=_tls_context(insecure_tls)) as resp: body = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: err_body = exc.read().decode("utf-8", errors="replace") raise AuthError( f"POST {url} failed with HTTP {exc.code}: {err_body.strip()}" ) from exc try: payload = json.loads(body) except json.JSONDecodeError as exc: raise AuthError(f"POST {url} returned non-JSON body: {exc}") from exc if not isinstance(payload, dict): raise AuthError(f"POST {url} returned non-object JSON: {type(payload).__name__}") if "error" in payload: raise AuthError( f"OAuth error from {url}: {payload.get('error')} " f"({payload.get('error_description', '')})" ) return payload def discover_endpoints(config: ForgeAuthConfig) -> dict[str, str]: """Call Gitea's ``/.well-known/openid-configuration`` and return the three endpoints used by the CLI, validated against the issuer. """ url = f"{config.gitea_base_url}/.well-known/openid-configuration" payload = _http_get_json(url, insecure_tls=config.insecure_tls) issuer = payload.get("issuer") if not isinstance(issuer, str) or issuer.rstrip("/") != config.gitea_base_url.rstrip("/"): raise AuthError( f"OIDC discovery issuer {issuer!r} does not match " f"FORGE_GITEA_URL {config.gitea_base_url!r}" ) out: dict[str, str] = {} for key in ("authorization_endpoint", "token_endpoint", "userinfo_endpoint"): value = payload.get(key) if not isinstance(value, str) or not value.startswith(f"{config.gitea_base_url}/"): raise AuthError(f"OIDC discovery missing/invalid {key}: {value!r}") out[key] = value return out def build_authorize_url( config: ForgeAuthConfig, endpoints: dict[str, str], *, challenge: str, state: str, force_login_prompt: bool = True, ) -> str: """PKCE authorise URL with optional ``prompt=login`` and ``login_hint``. ``prompt=login`` (OIDC Core §3.1.2.1) is the default: it forces Gitea to re-authenticate the user even when a session cookie is already present, which is the right ergonomic for the first attempt of ``just login``. Setting ``force_login_prompt=False`` drops the parameter so the second attempt of an auto-retry (after a grant revocation) reuses the established session cookie and only triggers the consent screen, halving the browser-side prompts. The post-auth ``expected_username`` check in ``run_login`` still enforces the wrong-user guard on either path. ``login_hint`` is unaffected. """ params: dict[str, str] = { "client_id": config.client_id, "redirect_uri": config.redirect_uri, "response_type": "code", "scope": config.scopes, "state": state, "code_challenge": challenge, "code_challenge_method": "S256", } if force_login_prompt: params["prompt"] = "login" if config.expected_username: params["login_hint"] = config.expected_username return f"{endpoints['authorization_endpoint']}?{urlencode(params)}" def exchange_code_for_tokens( config: ForgeAuthConfig, endpoints: dict[str, str], *, code: str, verifier: str, ) -> dict[str, Any]: return _http_post_form( endpoints["token_endpoint"], { "grant_type": "authorization_code", "code": code, "redirect_uri": config.redirect_uri, "client_id": config.client_id, "code_verifier": verifier, }, insecure_tls=config.insecure_tls, ) def refresh_access_token( config: ForgeAuthConfig, endpoints: dict[str, str], *, refresh_token: str, ) -> dict[str, Any]: return _http_post_form( endpoints["token_endpoint"], { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": config.client_id, }, insecure_tls=config.insecure_tls, ) def fetch_userinfo( config: ForgeAuthConfig, endpoints: dict[str, str], *, access_token: str, ) -> dict[str, Any]: req = urllib.request.Request( endpoints["userinfo_endpoint"], method="GET", headers={ "Authorization": f"Bearer {access_token}", "Accept": "application/json", }, ) with urllib.request.urlopen(req, timeout=15, context=_tls_context(config.insecure_tls)) as resp: body = resp.read().decode("utf-8") return json.loads(body) # -------------------------------------------------------------------- # Loopback callback server (single-shot) # -------------------------------------------------------------------- class _CallbackHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: # noqa: N802 (BaseHTTPRequestHandler API) parsed = urlparse(self.path) query = parse_qs(parsed.query) code = (query.get("code") or [None])[0] state = (query.get("state") or [None])[0] error = (query.get("error") or [None])[0] error_description = (query.get("error_description") or [None])[0] server = self.server # type: ignore[attr-defined] if error: self.send_response(400) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write( b"

Login failed

Close this window.

" ) server.result_queue.put( _build_authorize_error( error, error_description, server.gitea_base_url, client_id=getattr(server, "client_id", ""), scopes=getattr(server, "scopes", ""), ) ) return if not code or not state: self.send_response(400) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(b"

Missing code or state

") server.result_queue.put(AuthError("OAuth callback is missing code or state")) return self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.end_headers() self.wfile.write( b"" b"

Login complete.

" b"

Close this window. Return to the terminal.

" b"" ) server.result_queue.put((code, state)) def log_message(self, format: str, *args: Any) -> None: # noqa: A003 return # silence default logging class _LoopbackServer(HTTPServer): def __init__( self, addr: tuple[str, int], gitea_base_url: str = "", client_id: str = "", scopes: str = "", ) -> None: super().__init__(addr, _CallbackHandler) self.result_queue: Queue[tuple[str, str] | BaseException] = Queue(maxsize=1) # Read by _CallbackHandler when assembling _build_authorize_error. self.gitea_base_url = gitea_base_url self.client_id = client_id self.scopes = scopes def _build_authorize_error( error: str, error_description: str | None, gitea_base_url: str, client_id: str = "", scopes: str = "", ) -> AuthError: """Map an RFC 6749 §4.1.2.1 authorize-error redirect to ``AuthError``. Parameters ---------- error : str The ``error`` query parameter (``server_error``, ``access_denied``, ``invalid_request``, ...). error_description : str or None The ``error_description`` query parameter; human text set by Gitea. ``None`` and empty string are treated identically. gitea_base_url : str Base URL of the Gitea server. Used to build the user-settings URL in the remediation message. Empty string yields a ```` placeholder. client_id : str, optional OAuth client id requesting the authorization. Surfaced in the "different scope" message to disambiguate the grant row in Gitea's settings page. scopes : str, optional Space-separated scope set requested by this client. Surfaced in the "different scope" message. Returns ------- AuthError A user-facing error whose body is at most 5 lines (see RULE #2 §D.3) and, for the ``different scope`` case, references ``docs/oauth-grant-scope-mismatch.md``. """ desc = (error_description or "").strip() low = desc.lower() base = gitea_base_url.rstrip("/") settings_url = ( f"{base}/user/settings/applications" if base else "/user/settings/applications" ) if "different scope" in low or ("scope" in low and "grant" in low): cid = (client_id or "").strip() scope_fragment = scopes or "" return ScopeMismatchAuthError( f'Gitea server_error: "{desc or error}".\n' f" Client ID: {cid}\n" f" Revoke at: {settings_url} " f"(\"Authorized OAuth2 Applications\").\n" f" Requested: {scope_fragment}\n" " See: docs/oauth-grant-scope-mismatch.md", gitea_base_url=gitea_base_url, client_id=cid, scopes=scope_fragment, ) if error == "access_denied" or "access_denied" in low or "denied" in low: return AuthError( f"access_denied: {desc or error}. Re-run 'just login' and approve." ) if desc: return AuthError(f"authorize endpoint returned error: {error} ({desc})") return AuthError(f"authorize endpoint returned error: {error}") def wait_for_callback( host: str, port: int, *, timeout_seconds: float = 300.0, gitea_base_url: str = "", client_id: str = "", scopes: str = "", ) -> tuple[str, str]: """Bind a loopback HTTP server, serve one request, return ``(code, state)``. Parameters ---------- host, port : str, int Loopback address to bind. timeout_seconds : float Queue ``get`` timeout. gitea_base_url, client_id, scopes : str Forwarded to ``_LoopbackServer`` for the error path only; unused on the success path. Raises ------ AuthError On bind failure, timeout, or an authorize-error redirect. """ try: server = _LoopbackServer( (host, port), gitea_base_url=gitea_base_url, client_id=client_id, scopes=scopes, ) except OSError as exc: raise AuthError( f"cannot bind loopback server on {host}:{port}: {exc}. " f"Is another 'just login' still running? " f"If port {port} is held by an unrelated process, override " f"FSDGG_CLI_REDIRECT_URI in .env (note: the port must match " f"the OAuth app registered in Gitea)." ) from exc thread = threading.Thread(target=server.handle_request, daemon=True) thread.start() try: item = server.result_queue.get(timeout=timeout_seconds) except Exception as exc: raise AuthError( f"timed out after {int(timeout_seconds)}s waiting for OAuth " f"callback. Complete browser login and retry." ) from exc finally: server.server_close() if isinstance(item, BaseException): raise item return item # -------------------------------------------------------------------- # Auth-file read / merge / write # -------------------------------------------------------------------- GATEWAY_REQUIRED_FIELDS_DEFAULTS: dict[str, Any] = { "username": "", "access_token": "", "expires_in": 0, "issued_at": 0.0, "public_base_url": "", "index_name": "", } GATEWAY_OPTIONAL_FIELDS: frozenset[str] = frozenset( {"gitea_access_token", "gitea_token_expires_at"} ) # Welcome-repo-private extension fields. Kept in the same file for # single-source-of-truth; the gateway's loader ignores unknown keys so # these are forward-compatible. FORGE_EXTRA_FIELDS: frozenset[str] = frozenset( { "_forge_refresh_token", "_forge_client_id", "_forge_gitea_base_url", "_forge_issued_at", } ) @dataclass class AuthFile: """Round-trippable view over the stored JSON. Unknown keys are preserved verbatim for forward-compat with the gateway schema. """ raw: dict[str, Any] = field(default_factory=dict) @classmethod def read(cls, path: Path) -> AuthFile: if not path.is_file(): return cls(raw={}) try: payload = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: raise AuthError( f"{path} is not valid JSON: {exc}. " f"Delete it and re-run 'just login' to rewrite it cleanly." ) from exc if not isinstance(payload, dict): raise AuthError(f"{path} does not contain a JSON object") return cls(raw=payload) # ---- accessors -------------------------------------------------- @property def gitea_access_token(self) -> str: value = self.raw.get("gitea_access_token") or "" return str(value) if value else "" @property def gitea_token_expires_at(self) -> float | None: value = self.raw.get("gitea_token_expires_at") if value is None: return None try: return float(value) except (TypeError, ValueError): return None @property def refresh_token(self) -> str: return str(self.raw.get("_forge_refresh_token") or "") @property def username(self) -> str: return str(self.raw.get("username") or "") # ---- predicates ------------------------------------------------- def has_live_gitea_token(self, *, leeway_seconds: float = 30.0) -> bool: if not self.gitea_access_token: return False expires_at = self.gitea_token_expires_at if expires_at is None: return True # unknown expiry: trust the token; API will reject if stale return time.time() + leeway_seconds < expires_at # ---- merging ---------------------------------------------------- def merge_login( self, *, username: str, gitea_access_token: str, gitea_token_expires_at: float | None, refresh_token: str, client_id: str, gitea_base_url: str, ) -> None: """Write Gitea-side fields without clobbering gateway fields. If an existing gateway bearer lives in the file, it is kept intact so that a subsequent ``repos-login`` inside the orchestrator does not have to re-run the browser flow. """ for key, default in GATEWAY_REQUIRED_FIELDS_DEFAULTS.items(): self.raw.setdefault(key, default) # Always refresh the "issued_at" of the *Gitea* login side. now = time.time() self.raw["username"] = username self.raw["gitea_access_token"] = gitea_access_token if gitea_token_expires_at is not None: self.raw["gitea_token_expires_at"] = gitea_token_expires_at self.raw["_forge_refresh_token"] = refresh_token self.raw["_forge_client_id"] = client_id self.raw["_forge_gitea_base_url"] = gitea_base_url self.raw["_forge_issued_at"] = now def merge_refresh( self, *, gitea_access_token: str, gitea_token_expires_at: float | None, refresh_token: str | None, ) -> None: self.raw["gitea_access_token"] = gitea_access_token if gitea_token_expires_at is not None: self.raw["gitea_token_expires_at"] = gitea_token_expires_at if refresh_token: self.raw["_forge_refresh_token"] = refresh_token def write(self, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(self.raw, indent=2) + "\n", encoding="utf-8") os.chmod(tmp, 0o600) os.replace(tmp, path) # -------------------------------------------------------------------- # Top-level flows (login, refresh, logout) # -------------------------------------------------------------------- def _extract_expiry(payload: dict[str, Any]) -> float | None: raw = payload.get("expires_in") if raw is None: return None try: return time.time() + int(raw) except (TypeError, ValueError): return None def _print_headless_guidance(auth_url: str, redirect) -> None: """Print the authorise URL and reachability guidance for headless mode. Uses the actual loopback host/port from the parsed redirect URI (not hardcoded) and the current hostname/user so the SSH-forward template is copy-pasteable. """ cb_host = redirect.hostname or "127.0.0.1" cb_port = redirect.port path = redirect.path or "/callback" try: hostname = socket.getfqdn() or socket.gethostname() or "" except OSError: hostname = "" user = os.environ.get("USER") or os.environ.get("LOGNAME") or "" in_ssh = bool(os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY")) info_tag = _tag(chr(0x1B) + "[36m", "info") lines = [ "", f"{info_tag} paste this URL into a browser:", "", f" {auth_url}", "", f"{info_tag} after consent, Gitea will 302-redirect that browser to the", f" local callback on THIS machine: http://{cb_host}:{cb_port}{path}", " (loopback HTTP per RFC 8252 §7.3; not a public endpoint).", "", ] if in_ssh: lines += [ f"{info_tag} this process is running inside an SSH session. From the", " machine where the browser will open, run:", "", f" ssh -L {cb_port}:127.0.0.1:{cb_port} {user}@{hostname}", "", " and paste the URL above into THAT machine's browser.", "", ] else: lines += [ f"{info_tag} if this machine is remote, from the machine with the", " browser run (before pasting the URL):", "", f" ssh -L {cb_port}:127.0.0.1:{cb_port} {user}@{hostname}", "", ] lines += [ f"{info_tag} reachability probe (run on the browser-side machine).", " Any response, including 'Missing code or state',", " confirms the listener is reachable:", "", f" curl -sS -m 2 http://127.0.0.1:{cb_port}/", "", " 'Connection refused' or a timeout means the SSH forward", " above is not active yet.", "", ] sys.stderr.write("\n".join(lines) + "\n") sys.stderr.flush() def run_login( config: ForgeAuthConfig, *, open_browser: bool = True, force: bool = False, print_authorize_url: bool = True, force_login_prompt: bool = True, ) -> AuthFile: """Run the full PKCE flow and persist the result. Idempotent: if the stored file already carries a live Gitea token and ``force`` is False, skip everything and return the existing state. The caller is the one deciding when to force a refresh. ``force_login_prompt`` is forwarded to ``build_authorize_url`` and is set to False by the auto-retry path after a grant revocation so Gitea reuses the existing browser session. """ store = auth_store_path() existing = AuthFile.read(store) if not force and existing.has_live_gitea_token(): return existing endpoints = discover_endpoints(config) verifier, challenge = pkce_pair() session_key = secrets.token_bytes(32) nonce = secrets.token_urlsafe(24) state = sign_state(session_key, nonce) auth_url = build_authorize_url( config, endpoints, challenge=challenge, state=state, force_login_prompt=force_login_prompt, ) redirect = urlparse(config.redirect_uri) assert redirect.hostname and redirect.port # validated upstream if print_authorize_url: if open_browser: cli_info( f"open this URL in the browser if automatic launch does not " f"automatically:\n {auth_url}" ) else: _print_headless_guidance(auth_url, redirect) if open_browser: try: webbrowser.open(auth_url, new=2) except webbrowser.Error: pass code, returned_state = wait_for_callback( redirect.hostname, redirect.port, gitea_base_url=config.gitea_base_url, client_id=config.client_id, scopes=config.scopes, ) verify_state(session_key, returned_state) token_payload = exchange_code_for_tokens( config, endpoints, code=code, verifier=verifier ) access_token = str(token_payload.get("access_token") or "") refresh_token = str(token_payload.get("refresh_token") or "") if not access_token: raise AuthError(f"token endpoint returned no access_token: {token_payload}") if not refresh_token: # Gitea currently always returns a refresh_token for PKCE public # clients; flag loudly if that ever changes rather than silently # degrading the UX. raise AuthError("token endpoint returned no refresh_token; cannot auto-renew") userinfo = fetch_userinfo(config, endpoints, access_token=access_token) username = str( userinfo.get("preferred_username") or userinfo.get("login") or userinfo.get("name") or "" ) if not username: raise AuthError(f"userinfo did not expose a username: {userinfo}") # Post-auth identity check: prompt=login is a hint, not a guarantee. if config.expected_username and username.lower() != config.expected_username.lower(): logout_url = build_gitea_logout_url(config.gitea_base_url) if open_browser: try: webbrowser.open(logout_url, new=2) except webbrowser.Error: pass raise AuthError( "Gitea authorised the login as " f"'{username}', but FORGE_GITEA_USERNAME in .env is " f"'{config.expected_username}'. The stored auth file has " "NOT been updated.\n" " Recovery:\n" f" 1. Open the Gitea logout URL: {logout_url}\n" f" 2. Sign in as '{config.expected_username}'\n" " 3. Re-run 'just login'" ) existing.merge_login( username=username, gitea_access_token=access_token, gitea_token_expires_at=_extract_expiry(token_payload), refresh_token=refresh_token, client_id=config.client_id, gitea_base_url=config.gitea_base_url, ) existing.write(store) return existing def run_refresh(config: ForgeAuthConfig, *, must_refresh: bool = False) -> AuthFile: """Refresh the stored access token using the stored refresh token. If ``must_refresh`` is False and the access token is still live, this is a no-op. If refresh fails, raises ``AuthError`` (the credential helper surfaces this to git as "fall through to prompt", and the CLI ``just refresh`` surfaces it with an instruction to re-run ``just login``). """ store = auth_store_path() existing = AuthFile.read(store) if not must_refresh and existing.has_live_gitea_token(): return existing rt = existing.refresh_token if not rt: raise AuthError( "no refresh token stored; run 'just login' to authenticate from scratch" ) endpoints = discover_endpoints(config) token_payload = refresh_access_token(config, endpoints, refresh_token=rt) new_access = str(token_payload.get("access_token") or "") if not new_access: raise AuthError(f"refresh returned no access_token: {token_payload}") existing.merge_refresh( gitea_access_token=new_access, gitea_token_expires_at=_extract_expiry(token_payload), refresh_token=str(token_payload.get("refresh_token") or ""), ) existing.write(store) return existing def run_logout() -> Path | None: """Remove every codevalet-managed field from the auth file. If the file only ever held welcome-managed fields, delete it entirely. If the gateway has already populated its own fields (access_token etc.), wipe only the Gitea + welcome-repo keys and leave the rest. """ store = auth_store_path() if not store.is_file(): return None try: existing = AuthFile.read(store) except AuthError: store.unlink() return store for key in list(existing.raw.keys()): if key in GATEWAY_OPTIONAL_FIELDS or key in FORGE_EXTRA_FIELDS: existing.raw.pop(key, None) # If nothing of value remains, remove the file outright. if all( not existing.raw.get(k) for k in ("access_token", "public_base_url", "index_name") ): store.unlink() return store existing.write(store) return store # -------------------------------------------------------------------- # CLI # -------------------------------------------------------------------- def _can_prompt_for_revoke() -> bool: """Return True only when the environment is interactive enough to block on ``input()`` and have the operator click "Revoke". Disabling vectors (any one of these returns False): * ``FORGE_AUTO_REVOKE`` set to ``0``/``no``/``false``: explicit opt-out for callers that want strict fail-fast behaviour. * ``FORGE_SETUP_YES=1``: non-interactive auto-yes mode used by ``setup.sh`` and CI; never block on ``input()``. * stderr or stdin is not a TTY: avoids deadlocks in piped or backgrounded executions. """ val = os.environ.get("FORGE_AUTO_REVOKE", "1").strip().lower() if val in {"0", "no", "false"}: return False if os.environ.get("FORGE_SETUP_YES", "0").strip() == "1": return False try: return sys.stderr.isatty() and sys.stdin.isatty() except (AttributeError, ValueError): return False def _prompt_revoke_and_wait( exc: ScopeMismatchAuthError, *, open_browser: bool ) -> bool: """Drive the manual revoke step. Returns True iff the operator pressed Enter (i.e., agreed to retry); False on EOF/Ctrl-C. Side effects: prints the revoke URL and ``client_id`` to stderr; when ``open_browser`` is True, also calls ``webbrowser.open`` on the revoke URL (best-effort; failure is silent). """ cli_info( 'opening Gitea\'s "Authorized OAuth2 Applications" page so the ' "conflicting grant can be revoked." ) cli_info(f" URL: {exc.revoke_url}") cli_info(f" Client ID: {exc.client_id}") if open_browser: try: webbrowser.open(exc.revoke_url, new=2) except webbrowser.Error: pass cli_info( 'after clicking "Revoke" on the row matching the Client ID ' "above, press Enter here to retry login (or Ctrl-C to abort)." ) try: input("") except (EOFError, KeyboardInterrupt): return False return True def _cmd_login(argv: list[str]) -> int: force = "--force" in argv no_browser = "--no-browser" in argv config = ForgeAuthConfig.from_env() try: state = run_login(config, open_browser=not no_browser, force=force) except ScopeMismatchAuthError as exc: cli_err(str(exc)) if not _can_prompt_for_revoke(): return 1 if not _prompt_revoke_and_wait(exc, open_browser=not no_browser): return 1 # Single retry. ``force=True`` bypasses the live-token short- # circuit; ``force_login_prompt=False`` reuses the browser # session cookie established by the failed first attempt so # Gitea only shows the consent screen on the retry. state = run_login( config, open_browser=not no_browser, force=True, force_login_prompt=False, ) cli_ok(f"authenticated as: {state.username}") cli_info(f"auth file: {auth_store_path()}") return 0 def _cmd_refresh(argv: list[str]) -> int: must = "--force" in argv config = ForgeAuthConfig.from_env() state = run_refresh(config, must_refresh=must) expires_in = ( int((state.gitea_token_expires_at or 0) - time.time()) if state.gitea_token_expires_at else -1 ) if expires_in > 0: cli_ok(f"token refreshed, valid for ~{expires_in}s") else: cli_ok("token refreshed") return 0 def _cmd_logout(_: list[str]) -> int: path = run_logout() if path is None: cli_info("no auth file to remove") else: cli_ok(f"cleared {path}") return 0 def _cmd_status(_: list[str]) -> int: path = auth_store_path() state = AuthFile.read(path) alive = state.has_live_gitea_token() exp = state.gitea_token_expires_at print(f"path: {path}") print(f"file exists: {path.is_file()}") print(f"has gitea_access_token: {bool(state.gitea_access_token)}") print(f"has _forge_refresh_token: {bool(state.refresh_token)}") print(f"username: {state.username or ''}") print(f"expires_at: {exp if exp is not None else ''}") print(f"live: {alive}") return 0 if alive else 1 _COMMANDS = { "login": _cmd_login, "refresh": _cmd_refresh, "logout": _cmd_logout, "status": _cmd_status, } def main(argv: list[str]) -> int: if len(argv) < 2 or argv[1] not in _COMMANDS: print( "usage: forge_auth.py [--force] [--no-browser]", file=sys.stderr, ) return 2 try: return _COMMANDS[argv[1]](argv[2:]) except AuthError as exc: cli_err(str(exc)) return 1 if __name__ == "__main__": sys.exit(main(sys.argv))