1175 lines
40 KiB
Python
Executable File
1175 lines
40 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""PKCE OAuth2 client for the Gitea CLI app.
|
||
|
||
Implements RFC 7636 (PKCE) + RFC 6749 §4.1 using only the Python
|
||
standard library. Persists tokens to the shared auth file consumed by
|
||
``forge-stack-devpi-gateway-gitea`` so the orchestrator's downstream
|
||
tooling reuses the same session without a second login.
|
||
|
||
Auth-file path precedence:
|
||
``$FSDGG_AUTH_STORE_PATH``
|
||
→ ``$FSDGG_RUNTIME_DIR/client-auth.json``
|
||
→ ``~/.forge-stack-devpi-gateway-gitea/client-auth.json``
|
||
|
||
Gateway-owned fields (``access_token``, ``expires_in``,
|
||
``public_base_url``, ``index_name``) in the auth file are preserved
|
||
on every write. Writes are atomic (``<path>.tmp`` + ``rename``,
|
||
``chmod 0600``). CSRF ``state`` is HMAC-signed with a per-process
|
||
session key never written to disk.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import dataclasses
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import os
|
||
import secrets
|
||
import socket
|
||
import ssl
|
||
import sys
|
||
import threading
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
import webbrowser
|
||
from dataclasses import dataclass, field
|
||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||
from pathlib import Path
|
||
from queue import Queue
|
||
from typing import Any
|
||
from urllib.parse import parse_qs, urlencode, urlparse
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Errors
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
class AuthError(RuntimeError):
|
||
"""Any non-transient OAuth/auth-file error. Always user-visible."""
|
||
|
||
|
||
class ScopeMismatchAuthError(AuthError):
|
||
"""Gitea ``server_error`` caused by a grant/scope conflict.
|
||
|
||
Raised when Gitea refuses an authorize request because a grant
|
||
already exists for ``client_id`` under a different scope set
|
||
(RFC 6749 §4.1.2.1; ``error_description`` contains "different
|
||
scope"). Recovery: revoke the grant under
|
||
``<gitea_base_url>/user/settings/applications`` and re-run.
|
||
|
||
Subclassing ``AuthError`` keeps existing ``except AuthError``
|
||
handlers (e.g., the credential helper) unchanged. Callers that
|
||
want to drive an interactive revoke-and-retry flow check for
|
||
this concrete subclass and read the structured attributes.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
message: str,
|
||
*,
|
||
gitea_base_url: str,
|
||
client_id: str,
|
||
scopes: str,
|
||
) -> None:
|
||
super().__init__(message)
|
||
self.gitea_base_url = gitea_base_url
|
||
self.client_id = client_id
|
||
self.scopes = scopes
|
||
|
||
@property
|
||
def revoke_url(self) -> str:
|
||
base = self.gitea_base_url.rstrip("/")
|
||
if not base:
|
||
return "<gitea-base-url>/user/settings/applications"
|
||
return f"{base}/user/settings/applications"
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# CLI output helpers. Tag scheme matches scripts/common.sh:
|
||
# cyan=info, green=ok, yellow=warn, red=err. ANSI disabled when stderr
|
||
# is not a TTY, NO_COLOR is set, or TERM is "dumb".
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
def _ansi_on() -> bool:
|
||
try:
|
||
tty = sys.stderr.isatty()
|
||
except (AttributeError, ValueError):
|
||
tty = False
|
||
return (
|
||
tty
|
||
and not os.environ.get("NO_COLOR")
|
||
and os.environ.get("TERM", "dumb") != "dumb"
|
||
)
|
||
|
||
|
||
def _c(code: str) -> str:
|
||
return code if _ansi_on() else ""
|
||
|
||
|
||
def _tag(color_code: str, label: str) -> str:
|
||
padded = f"[{label}]".ljust(6)
|
||
return f"{_c(color_code)}{padded}{_c(chr(0x1B) + '[0m')}"
|
||
|
||
|
||
def cli_info(msg: str) -> None:
|
||
print(f"{_tag(chr(0x1B) + '[36m', 'info')} {msg}", file=sys.stderr)
|
||
|
||
|
||
def cli_ok(msg: str) -> None:
|
||
print(f"{_tag(chr(0x1B) + '[32m', 'ok')} {msg}", file=sys.stderr)
|
||
|
||
|
||
def cli_warn(msg: str) -> None:
|
||
print(f"{_tag(chr(0x1B) + '[33m', 'warn')} {msg}", file=sys.stderr)
|
||
|
||
|
||
def cli_err(msg: str) -> None:
|
||
print(f"{_tag(chr(0x1B) + '[31m', 'err')} {msg}", file=sys.stderr)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Paths (match forge-stack-devpi-gateway-gitea exactly)
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
AUTH_STORE_ENV_VAR = "FSDGG_AUTH_STORE_PATH"
|
||
RUNTIME_DIR_ENV_VAR = "FSDGG_RUNTIME_DIR"
|
||
DEFAULT_AUTH_DIR = Path.home() / ".forge-stack-devpi-gateway-gitea"
|
||
DEFAULT_AUTH_FILE = DEFAULT_AUTH_DIR / "client-auth.json"
|
||
|
||
|
||
def auth_store_path() -> Path:
|
||
configured = os.environ.get(AUTH_STORE_ENV_VAR, "").strip()
|
||
if configured:
|
||
return Path(configured).expanduser().resolve()
|
||
runtime_dir = os.environ.get(RUNTIME_DIR_ENV_VAR, "").strip()
|
||
if runtime_dir:
|
||
return Path(runtime_dir).expanduser().resolve() / "client-auth.json"
|
||
return DEFAULT_AUTH_FILE
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# PKCE primitives
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
def pkce_pair() -> tuple[str, str]:
|
||
"""Return ``(verifier, S256-challenge)`` suitable for Gitea.
|
||
|
||
RFC 7636 §4.1: verifier is 43–128 chars of unreserved URL chars.
|
||
`secrets.token_urlsafe(64)` yields ~86 chars, well within bounds.
|
||
RFC 7636 §4.2: challenge = BASE64URL(SHA256(verifier)), no padding.
|
||
"""
|
||
verifier = secrets.token_urlsafe(64)
|
||
digest = hashlib.sha256(verifier.encode("ascii")).digest()
|
||
challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
|
||
return verifier, challenge
|
||
|
||
|
||
def sign_state(session_key: bytes, nonce: str) -> str:
|
||
"""Return ``<nonce>.<hex-mac>`` using an in-memory HMAC-SHA256 key."""
|
||
mac = hmac.new(session_key, nonce.encode("ascii"), hashlib.sha256).hexdigest()
|
||
return f"{nonce}.{mac}"
|
||
|
||
|
||
def verify_state(session_key: bytes, value: str) -> str:
|
||
"""Return the nonce if the signature verifies; raise otherwise."""
|
||
if "." not in value:
|
||
raise AuthError("OAuth state has no signature separator")
|
||
nonce, _, got_mac = value.rpartition(".")
|
||
want_mac = hmac.new(
|
||
session_key, nonce.encode("ascii"), hashlib.sha256
|
||
).hexdigest()
|
||
if not hmac.compare_digest(got_mac, want_mac):
|
||
raise AuthError("OAuth state signature mismatch (possible CSRF)")
|
||
return nonce
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Config
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ForgeAuthConfig:
|
||
gitea_base_url: str
|
||
client_id: str
|
||
redirect_uri: str
|
||
scopes: str = "openid profile email read:user read:organization read:repository write:repository"
|
||
insecure_tls: bool = False # only for dev Gitea with self-signed certs
|
||
expected_username: str = "" # from FORGE_GITEA_USERNAME; empty = no check
|
||
|
||
@classmethod
|
||
def from_env(cls) -> ForgeAuthConfig:
|
||
missing: list[str] = []
|
||
|
||
def must(name: str) -> str:
|
||
value = os.environ.get(name, "").strip()
|
||
if not value:
|
||
missing.append(name)
|
||
return value
|
||
|
||
base = must("FORGE_GITEA_URL")
|
||
client_id = must("FSDGG_CLI_CLIENT_ID")
|
||
redirect = os.environ.get(
|
||
"FSDGG_CLI_REDIRECT_URI", "http://127.0.0.1:38111/callback"
|
||
).strip()
|
||
if missing:
|
||
raise AuthError(
|
||
"missing required env vars: " + ", ".join(missing)
|
||
+ "\n → run 'just init-env' and edit .env"
|
||
)
|
||
parsed = urlparse(redirect)
|
||
if parsed.scheme != "http" or parsed.hostname not in {"127.0.0.1", "::1", "localhost"}:
|
||
raise AuthError(
|
||
"FSDGG_CLI_REDIRECT_URI must be a loopback URI per RFC 8252 §7.3: "
|
||
"http://127.0.0.1:<port>/<path>, http://[::1]:<port>/<path>, "
|
||
f"or http://localhost:<port>/<path>. Got: {redirect}. "
|
||
"This is the CLI's local callback listener, not the Gitea "
|
||
"server URL (that is FORGE_GITEA_URL)."
|
||
)
|
||
if parsed.port is None:
|
||
raise AuthError("FSDGG_CLI_REDIRECT_URI must include an explicit port")
|
||
return cls(
|
||
gitea_base_url=base.rstrip("/"),
|
||
client_id=client_id,
|
||
redirect_uri=redirect,
|
||
insecure_tls=os.environ.get("FORGE_INSECURE_TLS", "").strip() == "1",
|
||
expected_username=os.environ.get("FORGE_GITEA_USERNAME", "").strip(),
|
||
)
|
||
|
||
|
||
def build_gitea_logout_url(gitea_base_url: str) -> str:
|
||
"""Return ``<gitea>/user/logout?redirect_to=/user/login``."""
|
||
return (
|
||
f"{gitea_base_url.rstrip('/')}/user/logout"
|
||
f"?{urlencode({'redirect_to': '/user/login'})}"
|
||
)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# OIDC discovery + token exchange (stdlib HTTP)
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
def _tls_context(insecure: bool) -> ssl.SSLContext:
|
||
ctx = ssl.create_default_context()
|
||
if insecure:
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
return ctx
|
||
|
||
|
||
def _http_get_json(url: str, *, insecure_tls: bool, timeout: float = 15.0) -> dict[str, Any]:
|
||
req = urllib.request.Request(url, method="GET", headers={"Accept": "application/json"})
|
||
with urllib.request.urlopen(req, timeout=timeout, context=_tls_context(insecure_tls)) as resp:
|
||
body = resp.read().decode("utf-8")
|
||
try:
|
||
data = json.loads(body)
|
||
except json.JSONDecodeError as exc:
|
||
raise AuthError(f"GET {url} returned non-JSON body: {exc}") from exc
|
||
if not isinstance(data, dict):
|
||
raise AuthError(f"GET {url} returned non-object JSON: {type(data).__name__}")
|
||
return data
|
||
|
||
|
||
def _http_post_form(
|
||
url: str,
|
||
form: dict[str, str],
|
||
*,
|
||
insecure_tls: bool,
|
||
timeout: float = 15.0,
|
||
auth_bearer: str | None = None,
|
||
) -> dict[str, Any]:
|
||
data = urlencode(form).encode("ascii")
|
||
headers = {
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Accept": "application/json",
|
||
}
|
||
if auth_bearer:
|
||
headers["Authorization"] = f"Bearer {auth_bearer}"
|
||
req = urllib.request.Request(url, data=data, method="POST", headers=headers)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout, context=_tls_context(insecure_tls)) as resp:
|
||
body = resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
err_body = exc.read().decode("utf-8", errors="replace")
|
||
raise AuthError(
|
||
f"POST {url} failed with HTTP {exc.code}: {err_body.strip()}"
|
||
) from exc
|
||
try:
|
||
payload = json.loads(body)
|
||
except json.JSONDecodeError as exc:
|
||
raise AuthError(f"POST {url} returned non-JSON body: {exc}") from exc
|
||
if not isinstance(payload, dict):
|
||
raise AuthError(f"POST {url} returned non-object JSON: {type(payload).__name__}")
|
||
if "error" in payload:
|
||
raise AuthError(
|
||
f"OAuth error from {url}: {payload.get('error')} "
|
||
f"({payload.get('error_description', '')})"
|
||
)
|
||
return payload
|
||
|
||
|
||
def discover_endpoints(config: ForgeAuthConfig) -> dict[str, str]:
|
||
"""Call Gitea's ``/.well-known/openid-configuration`` and return the
|
||
three endpoints used by the CLI, validated against the issuer.
|
||
"""
|
||
url = f"{config.gitea_base_url}/.well-known/openid-configuration"
|
||
payload = _http_get_json(url, insecure_tls=config.insecure_tls)
|
||
issuer = payload.get("issuer")
|
||
if not isinstance(issuer, str) or issuer.rstrip("/") != config.gitea_base_url.rstrip("/"):
|
||
raise AuthError(
|
||
f"OIDC discovery issuer {issuer!r} does not match "
|
||
f"FORGE_GITEA_URL {config.gitea_base_url!r}"
|
||
)
|
||
out: dict[str, str] = {}
|
||
for key in ("authorization_endpoint", "token_endpoint", "userinfo_endpoint"):
|
||
value = payload.get(key)
|
||
if not isinstance(value, str) or not value.startswith(f"{config.gitea_base_url}/"):
|
||
raise AuthError(f"OIDC discovery missing/invalid {key}: {value!r}")
|
||
out[key] = value
|
||
return out
|
||
|
||
|
||
def build_authorize_url(
|
||
config: ForgeAuthConfig,
|
||
endpoints: dict[str, str],
|
||
*,
|
||
challenge: str,
|
||
state: str,
|
||
force_login_prompt: bool = True,
|
||
) -> str:
|
||
"""PKCE authorise URL with optional ``prompt=login`` and ``login_hint``.
|
||
|
||
``prompt=login`` (OIDC Core §3.1.2.1) is the default: it forces
|
||
Gitea to re-authenticate the user even when a session cookie is
|
||
already present, which is the right ergonomic for the first attempt
|
||
of ``just login``. Setting ``force_login_prompt=False`` drops the
|
||
parameter so the second attempt of an auto-retry (after a grant
|
||
revocation) reuses the established session cookie and only triggers
|
||
the consent screen, halving the browser-side prompts. The post-auth
|
||
``expected_username`` check in ``run_login`` still enforces the
|
||
wrong-user guard on either path. ``login_hint`` is unaffected.
|
||
"""
|
||
params: dict[str, str] = {
|
||
"client_id": config.client_id,
|
||
"redirect_uri": config.redirect_uri,
|
||
"response_type": "code",
|
||
"scope": config.scopes,
|
||
"state": state,
|
||
"code_challenge": challenge,
|
||
"code_challenge_method": "S256",
|
||
}
|
||
if force_login_prompt:
|
||
params["prompt"] = "login"
|
||
if config.expected_username:
|
||
params["login_hint"] = config.expected_username
|
||
return f"{endpoints['authorization_endpoint']}?{urlencode(params)}"
|
||
|
||
|
||
def exchange_code_for_tokens(
|
||
config: ForgeAuthConfig,
|
||
endpoints: dict[str, str],
|
||
*,
|
||
code: str,
|
||
verifier: str,
|
||
) -> dict[str, Any]:
|
||
return _http_post_form(
|
||
endpoints["token_endpoint"],
|
||
{
|
||
"grant_type": "authorization_code",
|
||
"code": code,
|
||
"redirect_uri": config.redirect_uri,
|
||
"client_id": config.client_id,
|
||
"code_verifier": verifier,
|
||
},
|
||
insecure_tls=config.insecure_tls,
|
||
)
|
||
|
||
|
||
def refresh_access_token(
|
||
config: ForgeAuthConfig,
|
||
endpoints: dict[str, str],
|
||
*,
|
||
refresh_token: str,
|
||
) -> dict[str, Any]:
|
||
return _http_post_form(
|
||
endpoints["token_endpoint"],
|
||
{
|
||
"grant_type": "refresh_token",
|
||
"refresh_token": refresh_token,
|
||
"client_id": config.client_id,
|
||
},
|
||
insecure_tls=config.insecure_tls,
|
||
)
|
||
|
||
|
||
def fetch_userinfo(
|
||
config: ForgeAuthConfig,
|
||
endpoints: dict[str, str],
|
||
*,
|
||
access_token: str,
|
||
) -> dict[str, Any]:
|
||
req = urllib.request.Request(
|
||
endpoints["userinfo_endpoint"],
|
||
method="GET",
|
||
headers={
|
||
"Authorization": f"Bearer {access_token}",
|
||
"Accept": "application/json",
|
||
},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=15, context=_tls_context(config.insecure_tls)) as resp:
|
||
body = resp.read().decode("utf-8")
|
||
return json.loads(body)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Loopback callback server (single-shot)
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
class _CallbackHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self) -> None: # noqa: N802 (BaseHTTPRequestHandler API)
|
||
parsed = urlparse(self.path)
|
||
query = parse_qs(parsed.query)
|
||
code = (query.get("code") or [None])[0]
|
||
state = (query.get("state") or [None])[0]
|
||
error = (query.get("error") or [None])[0]
|
||
error_description = (query.get("error_description") or [None])[0]
|
||
server = self.server # type: ignore[attr-defined]
|
||
if error:
|
||
self.send_response(400)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(
|
||
b"<html><body><h1>Login failed</h1><p>Close this window.</p></body></html>"
|
||
)
|
||
server.result_queue.put(
|
||
_build_authorize_error(
|
||
error,
|
||
error_description,
|
||
server.gitea_base_url,
|
||
client_id=getattr(server, "client_id", ""),
|
||
scopes=getattr(server, "scopes", ""),
|
||
)
|
||
)
|
||
return
|
||
if not code or not state:
|
||
self.send_response(400)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(b"<html><body><h1>Missing code or state</h1></body></html>")
|
||
server.result_queue.put(AuthError("OAuth callback is missing code or state"))
|
||
return
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.end_headers()
|
||
self.wfile.write(
|
||
b"<html><body style='font-family:sans-serif'>"
|
||
b"<h1>Login complete.</h1>"
|
||
b"<p>Close this window. Return to the terminal.</p>"
|
||
b"</body></html>"
|
||
)
|
||
server.result_queue.put((code, state))
|
||
|
||
def log_message(self, format: str, *args: Any) -> None: # noqa: A003
|
||
return # silence default logging
|
||
|
||
|
||
class _LoopbackServer(HTTPServer):
|
||
def __init__(
|
||
self,
|
||
addr: tuple[str, int],
|
||
gitea_base_url: str = "",
|
||
client_id: str = "",
|
||
scopes: str = "",
|
||
) -> None:
|
||
super().__init__(addr, _CallbackHandler)
|
||
self.result_queue: Queue[tuple[str, str] | BaseException] = Queue(maxsize=1)
|
||
# Read by _CallbackHandler when assembling _build_authorize_error.
|
||
self.gitea_base_url = gitea_base_url
|
||
self.client_id = client_id
|
||
self.scopes = scopes
|
||
|
||
|
||
def _build_authorize_error(
|
||
error: str,
|
||
error_description: str | None,
|
||
gitea_base_url: str,
|
||
client_id: str = "",
|
||
scopes: str = "",
|
||
) -> AuthError:
|
||
"""Map an RFC 6749 §4.1.2.1 authorize-error redirect to ``AuthError``.
|
||
|
||
Parameters
|
||
----------
|
||
error : str
|
||
The ``error`` query parameter (``server_error``,
|
||
``access_denied``, ``invalid_request``, ...).
|
||
error_description : str or None
|
||
The ``error_description`` query parameter; human text set by
|
||
Gitea. ``None`` and empty string are treated identically.
|
||
gitea_base_url : str
|
||
Base URL of the Gitea server. Used to build the user-settings
|
||
URL in the remediation message. Empty string yields a
|
||
``<gitea-base-url>`` placeholder.
|
||
client_id : str, optional
|
||
OAuth client id requesting the authorization. Surfaced in the
|
||
"different scope" message to disambiguate the grant row in
|
||
Gitea's settings page.
|
||
scopes : str, optional
|
||
Space-separated scope set requested by this client. Surfaced
|
||
in the "different scope" message.
|
||
|
||
Returns
|
||
-------
|
||
AuthError
|
||
A user-facing error whose body is at most 5 lines (see RULE
|
||
#2 §D.3) and, for the ``different scope`` case, references
|
||
``docs/oauth-grant-scope-mismatch.md``.
|
||
"""
|
||
desc = (error_description or "").strip()
|
||
low = desc.lower()
|
||
base = gitea_base_url.rstrip("/")
|
||
settings_url = (
|
||
f"{base}/user/settings/applications"
|
||
if base
|
||
else "<gitea-base-url>/user/settings/applications"
|
||
)
|
||
|
||
if "different scope" in low or ("scope" in low and "grant" in low):
|
||
cid = (client_id or "<unknown-client-id>").strip()
|
||
scope_fragment = scopes or "<unknown-scopes>"
|
||
return ScopeMismatchAuthError(
|
||
f'Gitea server_error: "{desc or error}".\n'
|
||
f" Client ID: {cid}\n"
|
||
f" Revoke at: {settings_url} "
|
||
f"(\"Authorized OAuth2 Applications\").\n"
|
||
f" Requested: {scope_fragment}\n"
|
||
" See: docs/oauth-grant-scope-mismatch.md",
|
||
gitea_base_url=gitea_base_url,
|
||
client_id=cid,
|
||
scopes=scope_fragment,
|
||
)
|
||
|
||
if error == "access_denied" or "access_denied" in low or "denied" in low:
|
||
return AuthError(
|
||
f"access_denied: {desc or error}. Re-run 'just login' and approve."
|
||
)
|
||
|
||
if desc:
|
||
return AuthError(f"authorize endpoint returned error: {error} ({desc})")
|
||
return AuthError(f"authorize endpoint returned error: {error}")
|
||
|
||
|
||
def wait_for_callback(
|
||
host: str,
|
||
port: int,
|
||
*,
|
||
timeout_seconds: float = 300.0,
|
||
gitea_base_url: str = "",
|
||
client_id: str = "",
|
||
scopes: str = "",
|
||
) -> tuple[str, str]:
|
||
"""Bind a loopback HTTP server, serve one request, return ``(code, state)``.
|
||
|
||
Parameters
|
||
----------
|
||
host, port : str, int
|
||
Loopback address to bind.
|
||
timeout_seconds : float
|
||
Queue ``get`` timeout.
|
||
gitea_base_url, client_id, scopes : str
|
||
Forwarded to ``_LoopbackServer`` for the error path only; unused
|
||
on the success path.
|
||
|
||
Raises
|
||
------
|
||
AuthError
|
||
On bind failure, timeout, or an authorize-error redirect.
|
||
"""
|
||
try:
|
||
server = _LoopbackServer(
|
||
(host, port),
|
||
gitea_base_url=gitea_base_url,
|
||
client_id=client_id,
|
||
scopes=scopes,
|
||
)
|
||
except OSError as exc:
|
||
raise AuthError(
|
||
f"cannot bind loopback server on {host}:{port}: {exc}. "
|
||
f"Is another 'just login' still running? "
|
||
f"If port {port} is held by an unrelated process, override "
|
||
f"FSDGG_CLI_REDIRECT_URI in .env (note: the port must match "
|
||
f"the OAuth app registered in Gitea)."
|
||
) from exc
|
||
thread = threading.Thread(target=server.handle_request, daemon=True)
|
||
thread.start()
|
||
try:
|
||
item = server.result_queue.get(timeout=timeout_seconds)
|
||
except Exception as exc:
|
||
raise AuthError(
|
||
f"timed out after {int(timeout_seconds)}s waiting for OAuth "
|
||
f"callback. Complete browser login and retry."
|
||
) from exc
|
||
finally:
|
||
server.server_close()
|
||
if isinstance(item, BaseException):
|
||
raise item
|
||
return item
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Auth-file read / merge / write
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
GATEWAY_REQUIRED_FIELDS_DEFAULTS: dict[str, Any] = {
|
||
"username": "",
|
||
"access_token": "",
|
||
"expires_in": 0,
|
||
"issued_at": 0.0,
|
||
"public_base_url": "",
|
||
"index_name": "",
|
||
}
|
||
|
||
GATEWAY_OPTIONAL_FIELDS: frozenset[str] = frozenset(
|
||
{"gitea_access_token", "gitea_token_expires_at"}
|
||
)
|
||
|
||
# Welcome-repo-private extension fields. Kept in the same file for
|
||
# single-source-of-truth; the gateway's loader ignores unknown keys so
|
||
# these are forward-compatible.
|
||
FORGE_EXTRA_FIELDS: frozenset[str] = frozenset(
|
||
{
|
||
"_forge_refresh_token",
|
||
"_forge_client_id",
|
||
"_forge_gitea_base_url",
|
||
"_forge_issued_at",
|
||
}
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class AuthFile:
|
||
"""Round-trippable view over the stored JSON. Unknown keys are
|
||
preserved verbatim for forward-compat with the gateway schema.
|
||
"""
|
||
|
||
raw: dict[str, Any] = field(default_factory=dict)
|
||
|
||
@classmethod
|
||
def read(cls, path: Path) -> AuthFile:
|
||
if not path.is_file():
|
||
return cls(raw={})
|
||
try:
|
||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||
except json.JSONDecodeError as exc:
|
||
raise AuthError(
|
||
f"{path} is not valid JSON: {exc}. "
|
||
f"Delete it and re-run 'just login' to rewrite it cleanly."
|
||
) from exc
|
||
if not isinstance(payload, dict):
|
||
raise AuthError(f"{path} does not contain a JSON object")
|
||
return cls(raw=payload)
|
||
|
||
# ---- accessors --------------------------------------------------
|
||
|
||
@property
|
||
def gitea_access_token(self) -> str:
|
||
value = self.raw.get("gitea_access_token") or ""
|
||
return str(value) if value else ""
|
||
|
||
@property
|
||
def gitea_token_expires_at(self) -> float | None:
|
||
value = self.raw.get("gitea_token_expires_at")
|
||
if value is None:
|
||
return None
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
@property
|
||
def refresh_token(self) -> str:
|
||
return str(self.raw.get("_forge_refresh_token") or "")
|
||
|
||
@property
|
||
def username(self) -> str:
|
||
return str(self.raw.get("username") or "")
|
||
|
||
# ---- predicates -------------------------------------------------
|
||
|
||
def has_live_gitea_token(self, *, leeway_seconds: float = 30.0) -> bool:
|
||
if not self.gitea_access_token:
|
||
return False
|
||
expires_at = self.gitea_token_expires_at
|
||
if expires_at is None:
|
||
return True # unknown expiry: trust the token; API will reject if stale
|
||
return time.time() + leeway_seconds < expires_at
|
||
|
||
# ---- merging ----------------------------------------------------
|
||
|
||
def merge_login(
|
||
self,
|
||
*,
|
||
username: str,
|
||
gitea_access_token: str,
|
||
gitea_token_expires_at: float | None,
|
||
refresh_token: str,
|
||
client_id: str,
|
||
gitea_base_url: str,
|
||
) -> None:
|
||
"""Write Gitea-side fields without clobbering gateway fields.
|
||
|
||
If an existing gateway bearer lives in the file, it is kept
|
||
intact so that a subsequent ``repos-login`` inside the
|
||
orchestrator does not have to re-run the browser flow.
|
||
"""
|
||
for key, default in GATEWAY_REQUIRED_FIELDS_DEFAULTS.items():
|
||
self.raw.setdefault(key, default)
|
||
# Always refresh the "issued_at" of the *Gitea* login side.
|
||
now = time.time()
|
||
self.raw["username"] = username
|
||
self.raw["gitea_access_token"] = gitea_access_token
|
||
if gitea_token_expires_at is not None:
|
||
self.raw["gitea_token_expires_at"] = gitea_token_expires_at
|
||
self.raw["_forge_refresh_token"] = refresh_token
|
||
self.raw["_forge_client_id"] = client_id
|
||
self.raw["_forge_gitea_base_url"] = gitea_base_url
|
||
self.raw["_forge_issued_at"] = now
|
||
|
||
def merge_refresh(
|
||
self,
|
||
*,
|
||
gitea_access_token: str,
|
||
gitea_token_expires_at: float | None,
|
||
refresh_token: str | None,
|
||
) -> None:
|
||
self.raw["gitea_access_token"] = gitea_access_token
|
||
if gitea_token_expires_at is not None:
|
||
self.raw["gitea_token_expires_at"] = gitea_token_expires_at
|
||
if refresh_token:
|
||
self.raw["_forge_refresh_token"] = refresh_token
|
||
|
||
def write(self, path: Path) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
tmp.write_text(json.dumps(self.raw, indent=2) + "\n", encoding="utf-8")
|
||
os.chmod(tmp, 0o600)
|
||
os.replace(tmp, path)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Top-level flows (login, refresh, logout)
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
def _extract_expiry(payload: dict[str, Any]) -> float | None:
|
||
raw = payload.get("expires_in")
|
||
if raw is None:
|
||
return None
|
||
try:
|
||
return time.time() + int(raw)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def _print_headless_guidance(auth_url: str, redirect) -> None:
|
||
"""Print the authorise URL and reachability guidance for headless mode.
|
||
|
||
Uses the actual loopback host/port from the parsed redirect URI (not
|
||
hardcoded) and the current hostname/user so the SSH-forward template
|
||
is copy-pasteable.
|
||
"""
|
||
cb_host = redirect.hostname or "127.0.0.1"
|
||
cb_port = redirect.port
|
||
path = redirect.path or "/callback"
|
||
try:
|
||
hostname = socket.getfqdn() or socket.gethostname() or "<this-host>"
|
||
except OSError:
|
||
hostname = "<this-host>"
|
||
user = os.environ.get("USER") or os.environ.get("LOGNAME") or "<user>"
|
||
in_ssh = bool(os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY"))
|
||
|
||
info_tag = _tag(chr(0x1B) + "[36m", "info")
|
||
|
||
lines = [
|
||
"",
|
||
f"{info_tag} paste this URL into a browser:",
|
||
"",
|
||
f" {auth_url}",
|
||
"",
|
||
f"{info_tag} after consent, Gitea will 302-redirect that browser to the",
|
||
f" local callback on THIS machine: http://{cb_host}:{cb_port}{path}",
|
||
" (loopback HTTP per RFC 8252 §7.3; not a public endpoint).",
|
||
"",
|
||
]
|
||
if in_ssh:
|
||
lines += [
|
||
f"{info_tag} this process is running inside an SSH session. From the",
|
||
" machine where the browser will open, run:",
|
||
"",
|
||
f" ssh -L {cb_port}:127.0.0.1:{cb_port} {user}@{hostname}",
|
||
"",
|
||
" and paste the URL above into THAT machine's browser.",
|
||
"",
|
||
]
|
||
else:
|
||
lines += [
|
||
f"{info_tag} if this machine is remote, from the machine with the",
|
||
" browser run (before pasting the URL):",
|
||
"",
|
||
f" ssh -L {cb_port}:127.0.0.1:{cb_port} {user}@{hostname}",
|
||
"",
|
||
]
|
||
lines += [
|
||
f"{info_tag} reachability probe (run on the browser-side machine).",
|
||
" Any response, including 'Missing code or state',",
|
||
" confirms the listener is reachable:",
|
||
"",
|
||
f" curl -sS -m 2 http://127.0.0.1:{cb_port}/",
|
||
"",
|
||
" 'Connection refused' or a timeout means the SSH forward",
|
||
" above is not active yet.",
|
||
"",
|
||
]
|
||
sys.stderr.write("\n".join(lines) + "\n")
|
||
sys.stderr.flush()
|
||
|
||
|
||
def run_login(
|
||
config: ForgeAuthConfig,
|
||
*,
|
||
open_browser: bool = True,
|
||
force: bool = False,
|
||
print_authorize_url: bool = True,
|
||
force_login_prompt: bool = True,
|
||
) -> AuthFile:
|
||
"""Run the full PKCE flow and persist the result.
|
||
|
||
Idempotent: if the stored file already carries a live Gitea token
|
||
and ``force`` is False, skip everything and return the existing
|
||
state. The caller is the one deciding when to force a refresh.
|
||
``force_login_prompt`` is forwarded to ``build_authorize_url`` and
|
||
is set to False by the auto-retry path after a grant revocation so
|
||
Gitea reuses the existing browser session.
|
||
"""
|
||
store = auth_store_path()
|
||
existing = AuthFile.read(store)
|
||
if not force and existing.has_live_gitea_token():
|
||
return existing
|
||
|
||
endpoints = discover_endpoints(config)
|
||
verifier, challenge = pkce_pair()
|
||
session_key = secrets.token_bytes(32)
|
||
nonce = secrets.token_urlsafe(24)
|
||
state = sign_state(session_key, nonce)
|
||
|
||
auth_url = build_authorize_url(
|
||
config,
|
||
endpoints,
|
||
challenge=challenge,
|
||
state=state,
|
||
force_login_prompt=force_login_prompt,
|
||
)
|
||
redirect = urlparse(config.redirect_uri)
|
||
assert redirect.hostname and redirect.port # validated upstream
|
||
|
||
if print_authorize_url:
|
||
if open_browser:
|
||
cli_info(
|
||
f"open this URL in the browser if automatic launch does not "
|
||
f"automatically:\n {auth_url}"
|
||
)
|
||
else:
|
||
_print_headless_guidance(auth_url, redirect)
|
||
if open_browser:
|
||
try:
|
||
webbrowser.open(auth_url, new=2)
|
||
except webbrowser.Error:
|
||
pass
|
||
|
||
code, returned_state = wait_for_callback(
|
||
redirect.hostname,
|
||
redirect.port,
|
||
gitea_base_url=config.gitea_base_url,
|
||
client_id=config.client_id,
|
||
scopes=config.scopes,
|
||
)
|
||
verify_state(session_key, returned_state)
|
||
|
||
token_payload = exchange_code_for_tokens(
|
||
config, endpoints, code=code, verifier=verifier
|
||
)
|
||
access_token = str(token_payload.get("access_token") or "")
|
||
refresh_token = str(token_payload.get("refresh_token") or "")
|
||
if not access_token:
|
||
raise AuthError(f"token endpoint returned no access_token: {token_payload}")
|
||
if not refresh_token:
|
||
# Gitea currently always returns a refresh_token for PKCE public
|
||
# clients; flag loudly if that ever changes rather than silently
|
||
# degrading the UX.
|
||
raise AuthError("token endpoint returned no refresh_token; cannot auto-renew")
|
||
|
||
userinfo = fetch_userinfo(config, endpoints, access_token=access_token)
|
||
username = str(
|
||
userinfo.get("preferred_username")
|
||
or userinfo.get("login")
|
||
or userinfo.get("name")
|
||
or ""
|
||
)
|
||
if not username:
|
||
raise AuthError(f"userinfo did not expose a username: {userinfo}")
|
||
|
||
# Post-auth identity check: prompt=login is a hint, not a guarantee.
|
||
if config.expected_username and username.lower() != config.expected_username.lower():
|
||
logout_url = build_gitea_logout_url(config.gitea_base_url)
|
||
if open_browser:
|
||
try:
|
||
webbrowser.open(logout_url, new=2)
|
||
except webbrowser.Error:
|
||
pass
|
||
raise AuthError(
|
||
"Gitea authorised the login as "
|
||
f"'{username}', but FORGE_GITEA_USERNAME in .env is "
|
||
f"'{config.expected_username}'. The stored auth file has "
|
||
"NOT been updated.\n"
|
||
" Recovery:\n"
|
||
f" 1. Open the Gitea logout URL: {logout_url}\n"
|
||
f" 2. Sign in as '{config.expected_username}'\n"
|
||
" 3. Re-run 'just login'"
|
||
)
|
||
|
||
existing.merge_login(
|
||
username=username,
|
||
gitea_access_token=access_token,
|
||
gitea_token_expires_at=_extract_expiry(token_payload),
|
||
refresh_token=refresh_token,
|
||
client_id=config.client_id,
|
||
gitea_base_url=config.gitea_base_url,
|
||
)
|
||
existing.write(store)
|
||
return existing
|
||
|
||
|
||
def run_refresh(config: ForgeAuthConfig, *, must_refresh: bool = False) -> AuthFile:
|
||
"""Refresh the stored access token using the stored refresh token.
|
||
|
||
If ``must_refresh`` is False and the access token is still live,
|
||
this is a no-op. If refresh fails, raises ``AuthError`` (the
|
||
credential helper surfaces this to git as "fall through to prompt",
|
||
and the CLI ``just refresh`` surfaces it with an instruction to
|
||
re-run ``just login``).
|
||
"""
|
||
store = auth_store_path()
|
||
existing = AuthFile.read(store)
|
||
if not must_refresh and existing.has_live_gitea_token():
|
||
return existing
|
||
rt = existing.refresh_token
|
||
if not rt:
|
||
raise AuthError(
|
||
"no refresh token stored; run 'just login' to authenticate from scratch"
|
||
)
|
||
endpoints = discover_endpoints(config)
|
||
token_payload = refresh_access_token(config, endpoints, refresh_token=rt)
|
||
new_access = str(token_payload.get("access_token") or "")
|
||
if not new_access:
|
||
raise AuthError(f"refresh returned no access_token: {token_payload}")
|
||
existing.merge_refresh(
|
||
gitea_access_token=new_access,
|
||
gitea_token_expires_at=_extract_expiry(token_payload),
|
||
refresh_token=str(token_payload.get("refresh_token") or ""),
|
||
)
|
||
existing.write(store)
|
||
return existing
|
||
|
||
|
||
def run_logout() -> Path | None:
|
||
"""Remove every codevalet-managed field from the auth file.
|
||
|
||
If the file only ever held welcome-managed fields, delete it entirely. If the
|
||
gateway has already populated its own fields (access_token etc.),
|
||
wipe only the Gitea + welcome-repo keys and leave the rest.
|
||
"""
|
||
store = auth_store_path()
|
||
if not store.is_file():
|
||
return None
|
||
try:
|
||
existing = AuthFile.read(store)
|
||
except AuthError:
|
||
store.unlink()
|
||
return store
|
||
for key in list(existing.raw.keys()):
|
||
if key in GATEWAY_OPTIONAL_FIELDS or key in FORGE_EXTRA_FIELDS:
|
||
existing.raw.pop(key, None)
|
||
# If nothing of value remains, remove the file outright.
|
||
if all(
|
||
not existing.raw.get(k)
|
||
for k in ("access_token", "public_base_url", "index_name")
|
||
):
|
||
store.unlink()
|
||
return store
|
||
existing.write(store)
|
||
return store
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# CLI
|
||
# --------------------------------------------------------------------
|
||
|
||
|
||
def _can_prompt_for_revoke() -> bool:
|
||
"""Return True only when the environment is interactive enough to
|
||
block on ``input()`` and have the operator click "Revoke".
|
||
|
||
Disabling vectors (any one of these returns False):
|
||
|
||
* ``FORGE_AUTO_REVOKE`` set to ``0``/``no``/``false``: explicit
|
||
opt-out for callers that want strict fail-fast behaviour.
|
||
* ``FORGE_SETUP_YES=1``: non-interactive auto-yes mode used by
|
||
``setup.sh`` and CI; never block on ``input()``.
|
||
* stderr or stdin is not a TTY: avoids deadlocks in piped or
|
||
backgrounded executions.
|
||
"""
|
||
val = os.environ.get("FORGE_AUTO_REVOKE", "1").strip().lower()
|
||
if val in {"0", "no", "false"}:
|
||
return False
|
||
if os.environ.get("FORGE_SETUP_YES", "0").strip() == "1":
|
||
return False
|
||
try:
|
||
return sys.stderr.isatty() and sys.stdin.isatty()
|
||
except (AttributeError, ValueError):
|
||
return False
|
||
|
||
|
||
def _prompt_revoke_and_wait(
|
||
exc: ScopeMismatchAuthError, *, open_browser: bool
|
||
) -> bool:
|
||
"""Drive the manual revoke step. Returns True iff the operator
|
||
pressed Enter (i.e., agreed to retry); False on EOF/Ctrl-C.
|
||
|
||
Side effects: prints the revoke URL and ``client_id`` to stderr;
|
||
when ``open_browser`` is True, also calls ``webbrowser.open`` on
|
||
the revoke URL (best-effort; failure is silent).
|
||
"""
|
||
cli_info(
|
||
'opening Gitea\'s "Authorized OAuth2 Applications" page so the '
|
||
"conflicting grant can be revoked."
|
||
)
|
||
cli_info(f" URL: {exc.revoke_url}")
|
||
cli_info(f" Client ID: {exc.client_id}")
|
||
if open_browser:
|
||
try:
|
||
webbrowser.open(exc.revoke_url, new=2)
|
||
except webbrowser.Error:
|
||
pass
|
||
cli_info(
|
||
'after clicking "Revoke" on the row matching the Client ID '
|
||
"above, press Enter here to retry login (or Ctrl-C to abort)."
|
||
)
|
||
try:
|
||
input("")
|
||
except (EOFError, KeyboardInterrupt):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _cmd_login(argv: list[str]) -> int:
|
||
force = "--force" in argv
|
||
no_browser = "--no-browser" in argv
|
||
config = ForgeAuthConfig.from_env()
|
||
try:
|
||
state = run_login(config, open_browser=not no_browser, force=force)
|
||
except ScopeMismatchAuthError as exc:
|
||
cli_err(str(exc))
|
||
if not _can_prompt_for_revoke():
|
||
return 1
|
||
if not _prompt_revoke_and_wait(exc, open_browser=not no_browser):
|
||
return 1
|
||
# Single retry. ``force=True`` bypasses the live-token short-
|
||
# circuit; ``force_login_prompt=False`` reuses the browser
|
||
# session cookie established by the failed first attempt so
|
||
# Gitea only shows the consent screen on the retry.
|
||
state = run_login(
|
||
config,
|
||
open_browser=not no_browser,
|
||
force=True,
|
||
force_login_prompt=False,
|
||
)
|
||
cli_ok(f"authenticated as: {state.username}")
|
||
cli_info(f"auth file: {auth_store_path()}")
|
||
return 0
|
||
|
||
|
||
def _cmd_refresh(argv: list[str]) -> int:
|
||
must = "--force" in argv
|
||
config = ForgeAuthConfig.from_env()
|
||
state = run_refresh(config, must_refresh=must)
|
||
expires_in = (
|
||
int((state.gitea_token_expires_at or 0) - time.time())
|
||
if state.gitea_token_expires_at
|
||
else -1
|
||
)
|
||
if expires_in > 0:
|
||
cli_ok(f"token refreshed, valid for ~{expires_in}s")
|
||
else:
|
||
cli_ok("token refreshed")
|
||
return 0
|
||
|
||
|
||
def _cmd_logout(_: list[str]) -> int:
|
||
path = run_logout()
|
||
if path is None:
|
||
cli_info("no auth file to remove")
|
||
else:
|
||
cli_ok(f"cleared {path}")
|
||
return 0
|
||
|
||
|
||
def _cmd_status(_: list[str]) -> int:
|
||
path = auth_store_path()
|
||
state = AuthFile.read(path)
|
||
alive = state.has_live_gitea_token()
|
||
exp = state.gitea_token_expires_at
|
||
print(f"path: {path}")
|
||
print(f"file exists: {path.is_file()}")
|
||
print(f"has gitea_access_token: {bool(state.gitea_access_token)}")
|
||
print(f"has _forge_refresh_token: {bool(state.refresh_token)}")
|
||
print(f"username: {state.username or '<unset>'}")
|
||
print(f"expires_at: {exp if exp is not None else '<unknown>'}")
|
||
print(f"live: {alive}")
|
||
return 0 if alive else 1
|
||
|
||
|
||
_COMMANDS = {
|
||
"login": _cmd_login,
|
||
"refresh": _cmd_refresh,
|
||
"logout": _cmd_logout,
|
||
"status": _cmd_status,
|
||
}
|
||
|
||
|
||
def main(argv: list[str]) -> int:
|
||
if len(argv) < 2 or argv[1] not in _COMMANDS:
|
||
print(
|
||
"usage: forge_auth.py <login|refresh|logout|status> [--force] [--no-browser]",
|
||
file=sys.stderr,
|
||
)
|
||
return 2
|
||
try:
|
||
return _COMMANDS[argv[1]](argv[2:])
|
||
except AuthError as exc:
|
||
cli_err(str(exc))
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main(sys.argv))
|