Initial Commit

This commit is contained in:
FanaticPythoner (Nathan Trudeau)
2026-04-19 17:11:58 -04:00
parent eccb05b97f
commit a591cd21f2
23 changed files with 4896 additions and 1 deletions

68
tests/README.md Normal file
View File

@@ -0,0 +1,68 @@
# Tests
Deterministic and hermetic. Integration tests stand up a local mock
OIDC/OAuth2 server on an ephemeral port; no traffic to a real Gitea.
```bash
just test # full suite
python3 -m unittest discover -t . -s tests -p 'test_*.py' -v
bash tests/test_forge_auth_integration.sh
bash tests/test_setup_args.sh
bash tests/test_doctor.sh
```
## Inventory
- **`test_forge_auth.py`** — `scripts/forge_auth.py` unit tests: PKCE
pair generation, HMAC state signing + CSRF rejection,
`ForgeAuthConfig.from_env` validation (loopback-only redirect,
missing port, missing env vars, `FORGE_GITEA_USERNAME`
propagation), `build_authorize_url` with `prompt=login` and
`login_hint`, `build_gitea_logout_url`, `AuthFile`
read/write/merge/`has_live_gitea_token`, `auth_store_path`
precedence, `run_logout`, `main()` dispatcher.
- **`test_git_credential_forge.py`** — `scripts/git-credential-forge.py`
unit tests: credential protocol I/O, host/scheme/port matching,
live-token fast-path, pass-through for missing store or non-matching
host, expired-token refresh, refresh-failure handling, `store`/`erase`
no-ops, `main()` dispatcher.
- **`test_forge_auth_integration.py`** — end-to-end Python integration
tests against `tests/mock_oidc_server.py`: full PKCE flow,
gateway-required schema on disk, idempotent re-login, refresh token
rotation with server-side revocation, logout preserving
gateway-bearer fields.
- **`test_forge_auth_integration.sh`** — shell end-to-end: drives
`forge_auth.py login` against the mock server, installs the
credential helper into a sandboxed `$HOME`, and exercises
`git credential fill`. Covers URL matching, `github.com`
non-leakage, rotated-token pickup, `just logout` teardown, and the
username-mismatch guard (login fails, auth file untouched, Gitea
logout URL surfaced, authorise URL carries `prompt=login` +
`login_hint`).
- **`test_setup_args.sh`** — `scripts/setup.sh` coverage: argument
parsing, `--help`, `--headless` wiring to `forge_login.sh
--no-browser`, the `--headless + FORGE_SETUP_YES=1` hang guard,
live-token reuse, silent-refresh rescue, `prompt_choice` non-tty
stdout isolation.
- **`test_doctor.sh`** — `scripts/doctor.sh`: miss-path under a
sandboxed PATH, asserts every `[MISS]` line is followed by a `fix:`
line.
- **`mock_oidc_server.py`** — test fixture implementing
`/.well-known/openid-configuration`, `/login/oauth/authorize`,
`/login/oauth/access_token`, `/login/oauth/userinfo`. PKCE
verification on `authorization_code`; rotation + revocation on
`refresh_token`.
## Adding tests
- Python: drop `test_*.py` in this directory, use `unittest`, stdlib
only.
- Shell: executable, deterministic, non-interactive. Use ephemeral
ports via `python3 -c 'import socket; ...'` and sandbox `$HOME`
with `mktemp -d`.

0
tests/__init__.py Executable file
View File

230
tests/mock_oidc_server.py Executable file
View File

@@ -0,0 +1,230 @@
"""Minimal OIDC + OAuth2 PKCE server used by the integration test.
Implements just enough of Gitea's `/.well-known/openid-configuration`,
`/login/oauth/authorize`, `/login/oauth/access_token`, and
`/login/oauth/userinfo` surface for the welcome-repo's `forge_auth.py`
to run an end-to-end login + refresh without touching a real Gitea.
Test fixture only. Binds to loopback, accepts any non-empty
`client_id`, and issues deterministic opaque tokens; it does not
model authentication or authorisation. Not suitable for any purpose
other than driving the welcome-repo client during tests.
"""
from __future__ import annotations
import base64
import hashlib
import json
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any, Callable, cast
from urllib.parse import parse_qs, urlencode, urlparse
class _State:
"""In-memory bookkeeping shared by every handler instance."""
def __init__(self, *, base_url: str, username: str) -> None:
self.base_url = base_url
self.username = username
# code -> {client_id, redirect_uri, code_challenge, used}
self.pending_codes: dict[str, dict[str, Any]] = {}
# refresh_token -> {client_id, revoked}
self.refresh_tokens: dict[str, dict[str, Any]] = {}
self.access_token_expires_in = 3600
self.access_token_counter = 0
self.refresh_token_counter = 0
def issue_access_token(self) -> str:
self.access_token_counter += 1
return f"access-{self.access_token_counter}"
def issue_refresh_token(self, *, client_id: str) -> str:
self.refresh_token_counter += 1
tok = f"refresh-{self.refresh_token_counter}"
self.refresh_tokens[tok] = {"client_id": client_id, "revoked": False}
return tok
def _verify_pkce(challenge: str, verifier: str) -> bool:
expected = (
base64.urlsafe_b64encode(hashlib.sha256(verifier.encode("ascii")).digest())
.rstrip(b"=")
.decode("ascii")
)
return expected == challenge
class _Handler(BaseHTTPRequestHandler):
state: _State
def _send_json(self, code: int, body: dict) -> None:
data = json.dumps(body).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def _read_form(self) -> dict[str, str]:
length = int(self.headers.get("Content-Length", "0") or "0")
raw = self.rfile.read(length).decode("ascii") if length else ""
return {k: v[0] for k, v in parse_qs(raw, keep_blank_values=True).items()}
def do_GET(self) -> None: # noqa: N802
parsed = urlparse(self.path)
path = parsed.path
query = {k: v[0] for k, v in parse_qs(parsed.query).items()}
if path == "/.well-known/openid-configuration":
base = self.state.base_url
self._send_json(200, {
"issuer": base,
"authorization_endpoint": f"{base}/login/oauth/authorize",
"token_endpoint": f"{base}/login/oauth/access_token",
"userinfo_endpoint": f"{base}/login/oauth/userinfo",
})
return
if path == "/login/oauth/authorize":
client_id = query.get("client_id", "")
redirect_uri = query.get("redirect_uri", "")
code_challenge = query.get("code_challenge", "")
state_value = query.get("state", "")
if not (client_id and redirect_uri and code_challenge and state_value):
self._send_json(400, {"error": "invalid_request"})
return
code = f"code-{len(self.state.pending_codes) + 1}"
self.state.pending_codes[code] = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"code_challenge": code_challenge,
"used": False,
}
sep = "&" if "?" in redirect_uri else "?"
location = (
f"{redirect_uri}{sep}"
f"{urlencode({'code': code, 'state': state_value})}"
)
self.send_response(302)
self.send_header("Location", location)
self.end_headers()
return
if path == "/login/oauth/userinfo":
auth = self.headers.get("Authorization", "")
if not auth.startswith("Bearer access-"):
self._send_json(401, {"error": "unauthorized"})
return
self._send_json(200, {
"sub": "1",
"preferred_username": self.state.username,
"name": self.state.username,
"email": f"{self.state.username}@example.test",
})
return
self._send_json(404, {"error": "not_found"})
def do_POST(self) -> None: # noqa: N802
if self.path != "/login/oauth/access_token":
self._send_json(404, {"error": "not_found"})
return
form = self._read_form()
grant = form.get("grant_type", "")
if grant == "authorization_code":
code = form.get("code", "")
verifier = form.get("code_verifier", "")
client_id = form.get("client_id", "")
entry = self.state.pending_codes.get(code)
if not entry or entry["used"]:
self._send_json(400, {"error": "invalid_grant",
"error_description": "code not found or already used"})
return
if entry["client_id"] != client_id:
self._send_json(400, {"error": "invalid_client"})
return
if not _verify_pkce(entry["code_challenge"], verifier):
self._send_json(400, {"error": "invalid_grant",
"error_description": "PKCE verification failed"})
return
entry["used"] = True
access = self.state.issue_access_token()
refresh = self.state.issue_refresh_token(client_id=client_id)
self._send_json(200, {
"access_token": access,
"refresh_token": refresh,
"expires_in": self.state.access_token_expires_in,
"token_type": "Bearer",
"scope": "openid profile email",
})
return
if grant == "refresh_token":
rt = form.get("refresh_token", "")
client_id = form.get("client_id", "")
entry = self.state.refresh_tokens.get(rt)
if not entry or entry["revoked"]:
self._send_json(400, {"error": "invalid_grant",
"error_description": "refresh token invalid or revoked"})
return
if entry["client_id"] != client_id:
self._send_json(400, {"error": "invalid_client"})
return
# Rotate: revoke the old refresh token, issue new pair.
entry["revoked"] = True
access = self.state.issue_access_token()
new_rt = self.state.issue_refresh_token(client_id=client_id)
self._send_json(200, {
"access_token": access,
"refresh_token": new_rt,
"expires_in": self.state.access_token_expires_in,
"token_type": "Bearer",
})
return
self._send_json(400, {"error": "unsupported_grant_type"})
def log_message(self, format: str, *args: Any) -> None: # noqa: A003
return
def make_server(*, username: str = "testuser") -> tuple[ThreadingHTTPServer, _State, str]:
# Bind to ephemeral port, then set base_url so the handler knows its URL.
server = ThreadingHTTPServer(("127.0.0.1", 0), _Handler)
port = server.server_address[1]
base_url = f"http://127.0.0.1:{port}"
state = _State(base_url=base_url, username=username)
# Share the state across all handler instances via the class attr.
cast(type, _Handler).state = state # type: ignore[assignment]
return server, state, base_url
def serve_forever(server: ThreadingHTTPServer) -> threading.Thread:
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return thread
def main() -> int:
import os
username = os.environ.get("MOCK_OIDC_USERNAME", "testuser")
server, state, base_url = make_server(username=username)
serve_forever(server)
sys.stdout.write(f"{base_url}\n")
sys.stdout.flush()
try:
# Block until killed.
while True:
time.sleep(3600)
except KeyboardInterrupt:
server.shutdown()
return 0
if __name__ == "__main__":
sys.exit(main())

68
tests/test_doctor.sh Executable file
View File

@@ -0,0 +1,68 @@
#!/usr/bin/env bash
#
# scripts/doctor.sh contract (miss path):
# 1. Exits non-zero.
# 2. Each miss is followed by a `fix: ...` line.
# 3. Emits a consolidated "Run the following to fix them" block.
# 4. No raw ANSI escapes leak into non-TTY stderr.
set -euo pipefail
here="$(cd "$(dirname "$0")" && pwd -P)"
repo="$(cd "$here/.." && pwd -P)"
out="$(mktemp)"
trap 'rm -f "$out"' EXIT
# Strip everything that could satisfy the checks we want to fail.
# /usr/bin/python3 is 3.10.x on Ubuntu 22.04; that's fine: we want to
# prove the python>=3.11 miss branch renders its fix+alt lines.
if env -i HOME="$HOME" PATH="/usr/bin:/bin" bash "$repo/scripts/doctor.sh" >"$out" 2>&1; then
echo "FAIL: doctor.sh exited 0 despite missing prerequisites"
cat "$out"
exit 1
fi
fail=0
must_contain() {
local needle="$1"
if ! grep -Fq -- "$needle" "$out"; then
echo "FAIL: expected to see: $needle"
fail=1
fi
}
must_contain '[miss] just'
must_contain '[miss] uv'
must_contain '[miss] python>=3.11'
must_contain 'fix: curl --proto "=https" --tlsv1.2 -LsSf https://just.systems/install.sh'
must_contain 'fix: curl -LsSf https://astral.sh/uv/install.sh | sh'
must_contain 'fix: uv python install 3.11'
must_contain 'Run the following to fix them'
# No raw ANSI sequences when stderr is redirected to a file.
if grep -q $'\033\\[' "$out"; then
echo "FAIL: raw ANSI escape leaked into non-TTY stderr"
fail=1
fi
# Each [miss] line is followed by a "fix:" line. `[miss]` is 6 chars
# so %-6s adds no padding; the literal " %-12s" supplies one sep space.
awk '
/^ \[miss\]/ { miss_line = NR; next }
miss_line && NR == miss_line + 1 {
if ($0 !~ /^ fix: /) {
printf "FAIL: [miss] at line %d not followed by fix: (got: %s)\n", miss_line, $0
exit 1
}
miss_line = 0
}
' "$out" || fail=1
if [ "$fail" -ne 0 ]; then
echo "---- doctor.sh output ----"
cat "$out"
exit 1
fi
echo "PASS: doctor.sh prints fix commands for every miss"

729
tests/test_forge_auth.py Executable file
View File

@@ -0,0 +1,729 @@
"""Unit tests for scripts/forge_auth.py.
Pure-logic tests (no network, no browser). The full PKCE flow is
covered by tests/test_forge_auth_integration.sh, which stands up a
local mock Gitea OIDC server and drives the CLI end-to-end.
Run with: python3 -m unittest tests.test_forge_auth
"""
from __future__ import annotations
import base64
import hashlib
import io
import json
import os
import sys
import tempfile
import time
import unittest
from pathlib import Path
from unittest import mock
from urllib.parse import urlparse
HERE = Path(__file__).resolve().parent
ROOT = HERE.parent
sys.path.insert(0, str(ROOT / "scripts"))
import forge_auth as fa # noqa: E402
# --------------------------------------------------------------------
# PKCE primitives
# --------------------------------------------------------------------
class PkcePairTests(unittest.TestCase):
def test_verifier_length_in_range(self) -> None:
v, _ = fa.pkce_pair()
# RFC 7636: 43 <= len(verifier) <= 128
self.assertGreaterEqual(len(v), 43)
self.assertLessEqual(len(v), 128)
def test_challenge_is_correct_s256_encoding(self) -> None:
v, c = fa.pkce_pair()
expected = (
base64.urlsafe_b64encode(hashlib.sha256(v.encode("ascii")).digest())
.rstrip(b"=")
.decode("ascii")
)
self.assertEqual(c, expected)
def test_pairs_are_unique(self) -> None:
pairs = {fa.pkce_pair() for _ in range(10)}
self.assertEqual(len(pairs), 10)
class StateSigningTests(unittest.TestCase):
def setUp(self) -> None:
self.key = b"\x01" * 32
def test_roundtrip(self) -> None:
nonce = "abc.123" # dot in nonce must still round-trip (rpartition)
signed = fa.sign_state(self.key, nonce)
self.assertEqual(fa.verify_state(self.key, signed), nonce)
def test_tampered_mac_rejected(self) -> None:
signed = fa.sign_state(self.key, "n1")
tampered = signed[:-1] + ("0" if signed[-1] != "0" else "1")
with self.assertRaises(fa.AuthError):
fa.verify_state(self.key, tampered)
def test_wrong_key_rejected(self) -> None:
signed = fa.sign_state(self.key, "n1")
with self.assertRaises(fa.AuthError):
fa.verify_state(b"\x02" * 32, signed)
def test_missing_separator_raises(self) -> None:
with self.assertRaises(fa.AuthError):
fa.verify_state(self.key, "nosignaturehere")
# --------------------------------------------------------------------
# ForgeAuthConfig.from_env
# --------------------------------------------------------------------
class ConfigFromEnvTests(unittest.TestCase):
def _env(self, **overrides: str) -> dict[str, str]:
env = {
"FORGE_GITEA_URL": "https://gitea.example.com",
"FSDGG_CLI_CLIENT_ID": "my-client-id",
"FSDGG_CLI_REDIRECT_URI": "http://127.0.0.1:38111/callback",
}
env.update(overrides)
return env
def test_happy_path(self) -> None:
with mock.patch.dict(os.environ, self._env(), clear=True):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
self.assertEqual(cfg.client_id, "my-client-id")
self.assertEqual(cfg.redirect_uri, "http://127.0.0.1:38111/callback")
self.assertFalse(cfg.insecure_tls)
def test_trailing_slash_stripped(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FORGE_GITEA_URL="https://gitea.example.com/"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.gitea_base_url, "https://gitea.example.com")
def test_localhost_redirect_allowed(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://localhost:45000/cb"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.redirect_uri, "http://localhost:45000/cb")
def test_ipv6_loopback_redirect_allowed(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://[::1]:45000/cb"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.redirect_uri, "http://[::1]:45000/cb")
def test_non_loopback_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="https://example.com/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
self.assertIn("RFC 8252", str(ctx.exception))
def test_https_loopback_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="https://127.0.0.1:38111/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
self.assertIn("RFC 8252", str(ctx.exception))
def test_missing_port_rejected(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FSDGG_CLI_REDIRECT_URI="http://127.0.0.1/cb"),
clear=True,
):
with self.assertRaises(fa.AuthError):
fa.ForgeAuthConfig.from_env()
def test_missing_required_env_var_lists_all_missing(self) -> None:
env = self._env()
env["FORGE_GITEA_URL"] = ""
env["FSDGG_CLI_CLIENT_ID"] = ""
with mock.patch.dict(os.environ, env, clear=True):
with self.assertRaises(fa.AuthError) as ctx:
fa.ForgeAuthConfig.from_env()
msg = str(ctx.exception)
self.assertIn("FORGE_GITEA_URL", msg)
self.assertIn("FSDGG_CLI_CLIENT_ID", msg)
def test_expected_username_read_from_env(self) -> None:
with mock.patch.dict(
os.environ,
self._env(FORGE_GITEA_USERNAME="CVJMAllaire"),
clear=True,
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.expected_username, "CVJMAllaire")
def test_expected_username_defaults_to_empty(self) -> None:
with mock.patch.dict(os.environ, self._env(), clear=True):
cfg = fa.ForgeAuthConfig.from_env()
self.assertEqual(cfg.expected_username, "")
def test_insecure_tls_flag(self) -> None:
with mock.patch.dict(
os.environ, self._env(FORGE_INSECURE_TLS="1"), clear=True
):
cfg = fa.ForgeAuthConfig.from_env()
self.assertTrue(cfg.insecure_tls)
# --------------------------------------------------------------------
# build_authorize_url
# --------------------------------------------------------------------
class BuildAuthorizeUrlTests(unittest.TestCase):
def setUp(self) -> None:
self.cfg = fa.ForgeAuthConfig(
gitea_base_url="https://g.example",
client_id="client-1",
redirect_uri="http://127.0.0.1:38111/callback",
)
self.endpoints = {
"authorization_endpoint": "https://g.example/login/oauth/authorize",
"token_endpoint": "https://g.example/login/oauth/access_token",
"userinfo_endpoint": "https://g.example/login/oauth/userinfo",
}
def test_url_contains_all_required_parameters(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c123", state="s123"
)
self.assertTrue(url.startswith("https://g.example/login/oauth/authorize?"))
# All required PKCE + OAuth2 params
for token in (
"response_type=code",
"client_id=client-1",
"code_challenge=c123",
"code_challenge_method=S256",
"state=s123",
"redirect_uri=http%3A%2F%2F127.0.0.1%3A38111%2Fcallback",
):
self.assertIn(token, url)
def test_url_always_includes_prompt_login(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
# prompt=login forces Gitea to display the login page even when a
# session is already active: prevents silent auth under the
# wrong identity.
self.assertIn("prompt=login", url)
def test_url_adds_login_hint_when_expected_username_set(self) -> None:
cfg = fa.ForgeAuthConfig(
gitea_base_url="https://g.example",
client_id="client-1",
redirect_uri="http://127.0.0.1:38111/callback",
expected_username="CVJMAllaire",
)
url = fa.build_authorize_url(
cfg, self.endpoints, challenge="c", state="s"
)
self.assertIn("login_hint=CVJMAllaire", url)
def test_url_omits_login_hint_when_expected_username_unset(self) -> None:
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
self.assertNotIn("login_hint=", url)
class BuildGiteaLogoutUrlTests(unittest.TestCase):
def test_composes_logout_url_with_redirect_to_login(self) -> None:
url = fa.build_gitea_logout_url("https://gitea.example.com")
self.assertEqual(
url,
"https://gitea.example.com/user/logout?redirect_to=%2Fuser%2Flogin",
)
def test_trailing_slash_is_stripped(self) -> None:
url = fa.build_gitea_logout_url("https://gitea.example.com/")
self.assertTrue(url.startswith("https://gitea.example.com/user/logout"))
self.assertNotIn("//user/logout", url)
# --------------------------------------------------------------------
# AuthFile (read / merge / write / has_live_gitea_token)
# --------------------------------------------------------------------
class AuthFileTests(unittest.TestCase):
def test_read_missing_file_returns_empty(self) -> None:
f = fa.AuthFile.read(Path("/nonexistent/path/client-auth.json"))
self.assertEqual(f.raw, {})
self.assertFalse(f.has_live_gitea_token())
def test_read_malformed_json_raises(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "bad.json"
p.write_text("{ not json", encoding="utf-8")
with self.assertRaises(fa.AuthError):
fa.AuthFile.read(p)
def test_read_non_object_raises(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "arr.json"
p.write_text("[1,2,3]", encoding="utf-8")
with self.assertRaises(fa.AuthError):
fa.AuthFile.read(p)
def test_has_live_gitea_token_requires_token(self) -> None:
f = fa.AuthFile(raw={"gitea_access_token": ""})
self.assertFalse(f.has_live_gitea_token())
def test_has_live_gitea_token_expired(self) -> None:
f = fa.AuthFile(raw={
"gitea_access_token": "t",
"gitea_token_expires_at": time.time() - 10,
})
self.assertFalse(f.has_live_gitea_token())
def test_has_live_gitea_token_live(self) -> None:
f = fa.AuthFile(raw={
"gitea_access_token": "t",
"gitea_token_expires_at": time.time() + 3600,
})
self.assertTrue(f.has_live_gitea_token())
def test_has_live_gitea_token_unknown_expiry_is_trusted(self) -> None:
f = fa.AuthFile(raw={"gitea_access_token": "t"})
self.assertTrue(f.has_live_gitea_token())
def test_merge_login_fills_required_gateway_fields(self) -> None:
f = fa.AuthFile()
f.merge_login(
username="alice",
gitea_access_token="AAA",
gitea_token_expires_at=time.time() + 3600,
refresh_token="RRR",
client_id="client-1",
gitea_base_url="https://g.example",
)
for required in (
"username", "access_token", "expires_in",
"issued_at", "public_base_url", "index_name",
):
self.assertIn(required, f.raw)
self.assertEqual(f.raw["username"], "alice")
self.assertEqual(f.raw["gitea_access_token"], "AAA")
self.assertEqual(f.raw["_forge_refresh_token"], "RRR")
self.assertEqual(f.raw["_forge_client_id"], "client-1")
self.assertEqual(f.raw["_forge_gitea_base_url"], "https://g.example")
def test_merge_login_preserves_gateway_bearer(self) -> None:
# Simulates the case where the orchestrator already ran
# `auth login` and populated the gateway bearer. We must not
# overwrite those fields.
f = fa.AuthFile(raw={
"username": "old-alice",
"access_token": "GATEWAY-BEARER",
"expires_in": 7200,
"issued_at": 1000000.0,
"public_base_url": "https://gateway.example",
"index_name": "forge",
})
f.merge_login(
username="alice",
gitea_access_token="NEW-GITEA",
gitea_token_expires_at=time.time() + 3600,
refresh_token="NEW-RT",
client_id="client-1",
gitea_base_url="https://g.example",
)
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
self.assertEqual(f.raw["public_base_url"], "https://gateway.example")
self.assertEqual(f.raw["index_name"], "forge")
# And the Gitea fields got refreshed
self.assertEqual(f.raw["gitea_access_token"], "NEW-GITEA")
self.assertEqual(f.raw["username"], "alice")
def test_merge_refresh_only_touches_gitea_fields(self) -> None:
f = fa.AuthFile(raw={
"username": "alice",
"access_token": "GATEWAY-BEARER",
"public_base_url": "https://gateway.example",
})
f.merge_refresh(
gitea_access_token="FRESH",
gitea_token_expires_at=time.time() + 3600,
refresh_token="ROTATED",
)
self.assertEqual(f.raw["access_token"], "GATEWAY-BEARER")
self.assertEqual(f.raw["gitea_access_token"], "FRESH")
self.assertEqual(f.raw["_forge_refresh_token"], "ROTATED")
def test_merge_refresh_empty_refresh_token_keeps_existing(self) -> None:
f = fa.AuthFile(raw={"_forge_refresh_token": "KEEP"})
f.merge_refresh(
gitea_access_token="NEW", gitea_token_expires_at=None, refresh_token=""
)
self.assertEqual(f.raw["_forge_refresh_token"], "KEEP")
def test_write_is_atomic_and_0600(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "sub" / "client-auth.json"
f = fa.AuthFile(raw={"username": "u", "gitea_access_token": "T"})
f.write(p)
self.assertTrue(p.is_file())
mode = p.stat().st_mode & 0o777
self.assertEqual(mode, 0o600)
# The tmp file must not still exist
self.assertFalse((p.parent / "client-auth.json.tmp").exists())
roundtrip = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(roundtrip["username"], "u")
def test_write_preserves_unknown_keys(self) -> None:
# Forward-compat: the gateway might add new fields we don't
# know about. Writing must preserve them verbatim.
raw = {"username": "u", "future_field": {"x": 1}, "access_token": "A"}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "a.json"
fa.AuthFile(raw=dict(raw)).write(p)
roundtrip = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(roundtrip, raw)
# --------------------------------------------------------------------
# auth_store_path
# --------------------------------------------------------------------
class AuthStorePathTests(unittest.TestCase):
def test_explicit_env_var_wins(self) -> None:
with mock.patch.dict(
os.environ,
{"FSDGG_AUTH_STORE_PATH": "/tmp/somewhere/a.json", "FSDGG_RUNTIME_DIR": "/other"},
clear=True,
):
self.assertEqual(fa.auth_store_path(), Path("/tmp/somewhere/a.json"))
def test_runtime_dir_fallback(self) -> None:
with mock.patch.dict(
os.environ, {"FSDGG_RUNTIME_DIR": "/tmp/rt"}, clear=True
):
self.assertEqual(fa.auth_store_path(), Path("/tmp/rt/client-auth.json"))
def test_default_path(self) -> None:
with mock.patch.dict(os.environ, {}, clear=True):
self.assertEqual(fa.auth_store_path(), fa.DEFAULT_AUTH_FILE)
# --------------------------------------------------------------------
# run_logout
# --------------------------------------------------------------------
class RunLogoutTests(unittest.TestCase):
"""Note: all assertions happen *inside* the TemporaryDirectory
context. Earlier iterations leaked the tempdir past the
assertions, producing false positives on ``assertFalse(is_file)``
because cleanup had already deleted the file.
"""
def test_logout_no_file(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
result = fa.run_logout()
self.assertIsNone(result)
self.assertFalse(p.is_file())
def test_logout_only_welcome_fields_removes_file(self) -> None:
payload = {
"username": "u", "access_token": "", "expires_in": 0,
"issued_at": 0.0, "public_base_url": "", "index_name": "",
"gitea_access_token": "A", "_forge_refresh_token": "R",
}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
p.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
result = fa.run_logout()
self.assertEqual(result, p)
self.assertFalse(p.is_file())
def test_logout_preserves_gateway_bearer(self) -> None:
payload = {
"username": "u",
"access_token": "GATEWAY-BEARER",
"expires_in": 3600,
"issued_at": 1000000.0,
"public_base_url": "https://gateway.example",
"index_name": "forge",
"gitea_access_token": "GITEA-A",
"gitea_token_expires_at": 2000000.0,
"_forge_refresh_token": "R",
"_forge_client_id": "c",
}
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "client-auth.json"
p.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
fa.run_logout()
self.assertTrue(p.is_file())
remaining = json.loads(p.read_text(encoding="utf-8"))
self.assertEqual(remaining["access_token"], "GATEWAY-BEARER")
self.assertEqual(remaining["public_base_url"], "https://gateway.example")
self.assertNotIn("gitea_access_token", remaining)
self.assertNotIn("_forge_refresh_token", remaining)
# --------------------------------------------------------------------
# main() dispatcher
# --------------------------------------------------------------------
class MainDispatcherTests(unittest.TestCase):
def test_unknown_command_rc_2(self) -> None:
self.assertEqual(fa.main(["forge_auth.py", "nope"]), 2)
def test_no_args_rc_2(self) -> None:
self.assertEqual(fa.main(["forge_auth.py"]), 2)
def test_status_on_empty_store_rc_1(self) -> None:
with tempfile.TemporaryDirectory() as d:
with mock.patch.dict(
os.environ,
{"FSDGG_AUTH_STORE_PATH": str(Path(d) / "a.json")},
clear=True,
):
rc = fa.main(["forge_auth.py", "status"])
self.assertEqual(rc, 1)
def test_status_on_live_token_rc_0(self) -> None:
with tempfile.TemporaryDirectory() as d:
p = Path(d) / "a.json"
p.write_text(json.dumps({
"username": "u",
"gitea_access_token": "LIVE",
"gitea_token_expires_at": time.time() + 3600,
}), encoding="utf-8")
with mock.patch.dict(
os.environ, {"FSDGG_AUTH_STORE_PATH": str(p)}, clear=True
):
rc = fa.main(["forge_auth.py", "status"])
self.assertEqual(rc, 0)
class HeadlessGuidanceTests(unittest.TestCase):
AUTH_URL = "https://gitea.example/login/oauth/authorize?x=1"
REDIRECT = urlparse("http://127.0.0.1:54321/callback")
def _capture(self, env_extra: dict[str, str] | None = None) -> str:
env = {"USER": "alice"}
if env_extra is not None:
for k in ("SSH_CONNECTION", "SSH_TTY"):
env.pop(k, None)
env.update(env_extra)
buf = io.StringIO()
with mock.patch.dict(os.environ, env, clear=True), \
mock.patch("forge_auth.sys.stderr", buf), \
mock.patch("forge_auth.socket.getfqdn", return_value="box.example"), \
mock.patch("forge_auth.socket.gethostname", return_value="box.example"):
fa._print_headless_guidance(self.AUTH_URL, self.REDIRECT)
return buf.getvalue()
def test_prints_authorize_url(self) -> None:
out = self._capture()
self.assertIn(self.AUTH_URL, out)
def test_prints_rfc_8252_reference(self) -> None:
self.assertIn("RFC 8252", self._capture())
def test_uses_configured_redirect_port(self) -> None:
out = self._capture()
self.assertIn("127.0.0.1:54321", out)
self.assertNotIn("38111", out)
def test_ssh_forward_template_uses_configured_port(self) -> None:
out = self._capture()
self.assertIn("ssh -L 54321:127.0.0.1:54321 alice@box.example", out)
def test_reachability_probe_included(self) -> None:
out = self._capture()
self.assertIn("curl -sS -m 2 http://127.0.0.1:54321/", out)
self.assertIn("Connection refused", out)
def test_ssh_branch_when_SSH_CONNECTION_set(self) -> None:
out = self._capture({"SSH_CONNECTION": "1.2.3.4 22 5.6.7.8 22", "USER": "alice"})
self.assertIn("running inside an SSH session", out)
def test_non_ssh_branch_when_no_ssh_env(self) -> None:
out = self._capture()
self.assertIn("if this machine is remote", out)
self.assertNotIn("running inside an SSH session", out)
class BuildAuthorizeErrorTests(unittest.TestCase):
"""Contract tests for ``_build_authorize_error``."""
BASE = "https://gitea.example.com"
CID = "ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5"
SCOPES = "openid profile email read:user read:repository write:repository"
def _exc_different_scope(self, **overrides: str) -> fa.AuthError:
kw: dict[str, object] = {
"error": "server_error",
"error_description": "a grant exists with different scope",
"gitea_base_url": self.BASE,
"client_id": self.CID,
"scopes": self.SCOPES,
}
kw.update(overrides)
return fa._build_authorize_error(
str(kw["error"]),
str(kw["error_description"]) if kw["error_description"] else None,
str(kw["gitea_base_url"]),
client_id=str(kw["client_id"]),
scopes=str(kw["scopes"]),
)
def test_different_scope_returns_autherror(self) -> None:
self.assertIsInstance(self._exc_different_scope(), fa.AuthError)
def test_different_scope_message_is_five_lines_max(self) -> None:
# RULE #2 §D.3: user-facing error bodies cap at 5 lines.
s = str(self._exc_different_scope())
self.assertLessEqual(len(s.splitlines()), 5)
def test_different_scope_surfaces_gitea_phrasing(self) -> None:
self.assertIn("different scope", str(self._exc_different_scope()).lower())
def test_different_scope_cites_settings_url(self) -> None:
self.assertIn(
f"{self.BASE}/user/settings/applications",
str(self._exc_different_scope()),
)
def test_different_scope_disambiguates_authorized_section(self) -> None:
# Gitea's settings page has both "Authorized OAuth2 Applications"
# (user grants) and "Manage OAuth2 Applications" (app registrations);
# the message must point at the first.
self.assertIn(
'"Authorized OAuth2 Applications"',
str(self._exc_different_scope()),
)
def test_different_scope_includes_full_client_id(self) -> None:
self.assertIn(self.CID, str(self._exc_different_scope()))
def test_different_scope_includes_requested_scopes(self) -> None:
self.assertIn(self.SCOPES, str(self._exc_different_scope()))
def test_different_scope_links_to_remediation_doc(self) -> None:
# External rationale lives in the doc per RULE #2 §D/E.
self.assertIn(
"docs/oauth-grant-scope-mismatch.md",
str(self._exc_different_scope()),
)
def test_different_scope_without_base_url_uses_placeholder(self) -> None:
self.assertIn(
"<your-gitea-url>/user/settings/applications",
str(self._exc_different_scope(gitea_base_url="")),
)
def test_different_scope_without_client_id_uses_placeholder(self) -> None:
self.assertIn(
"<unknown-client-id>",
str(self._exc_different_scope(client_id="")),
)
def test_different_scope_without_scopes_uses_placeholder(self) -> None:
self.assertIn(
"<unknown-scopes>",
str(self._exc_different_scope(scopes="")),
)
def test_access_denied_branch(self) -> None:
exc = fa._build_authorize_error(
"access_denied",
"The resource owner or authorization server denied the request.",
self.BASE,
)
self.assertIn("access_denied", str(exc))
self.assertIn("just login", str(exc))
def test_fallback_preserves_raw_error_and_description(self) -> None:
exc = fa._build_authorize_error(
"invalid_request", "redirect_uri mismatch", self.BASE,
)
s = str(exc)
self.assertIn("invalid_request", s)
self.assertIn("redirect_uri mismatch", s)
def test_fallback_with_no_description_only_includes_error_code(self) -> None:
exc = fa._build_authorize_error("temporarily_unavailable", None, self.BASE)
self.assertIn("temporarily_unavailable", str(exc))
class AuthorizeUrlIsHeadlessInvariantTests(unittest.TestCase):
"""Pin the invariant: authorize URL is independent of ``--no-browser``."""
def setUp(self) -> None:
self.cfg = fa.ForgeAuthConfig(
gitea_base_url="https://gitea.example.com",
client_id="ba4ec9ec-8ae8-4450-9cec-fd532bbe63d5",
redirect_uri="http://127.0.0.1:38111/callback",
expected_username="alice",
)
self.endpoints = {
"authorization_endpoint": "https://gitea.example.com/login/oauth/authorize",
"token_endpoint": "https://gitea.example.com/login/oauth/access_token",
"userinfo_endpoint": "https://gitea.example.com/login/oauth/userinfo",
}
def test_build_authorize_url_signature_has_no_headless_parameter(self) -> None:
import inspect
sig = inspect.signature(fa.build_authorize_url)
self.assertNotIn("open_browser", sig.parameters)
self.assertNotIn("headless", sig.parameters)
self.assertNotIn("no_browser", sig.parameters)
def test_url_is_deterministic_for_fixed_challenge_and_state(self) -> None:
url_a = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
)
url_b = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="ch_1", state="st_1"
)
self.assertEqual(url_a, url_b)
def test_scope_parameter_matches_config_scopes_exactly(self) -> None:
from urllib.parse import parse_qs, urlparse
url = fa.build_authorize_url(
self.cfg, self.endpoints, challenge="c", state="s"
)
params = parse_qs(urlparse(url).query)
self.assertEqual(params["scope"], [self.cfg.scopes])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,254 @@
"""End-to-end integration test for forge_auth.py against a mock OIDC server.
Covers the full PKCE login flow (authorize → callback → token
exchange → userinfo → persist), transparent refresh, logout, and
the idempotent "already authenticated" short-circuit.
No real network calls. No browser required: we simulate the
browser by doing an HTTP GET to the authorize endpoint; the mock
server 302-redirects to the loopback callback, which
`forge_auth.run_login` is already listening on.
Run with: python3 -m unittest tests.test_forge_auth_integration
"""
from __future__ import annotations
import json
import os
import socket
import sys
import tempfile
import threading
import time
import unittest
import urllib.request
from http.client import HTTPResponse
from pathlib import Path
from unittest import mock
HERE = Path(__file__).resolve().parent
ROOT = HERE.parent
sys.path.insert(0, str(ROOT / "scripts"))
sys.path.insert(0, str(HERE))
import forge_auth as fa # noqa: E402
import mock_oidc_server # noqa: E402
def _free_loopback_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
class _MockBrowser:
"""Drive the authorize endpoint on a worker thread.
We wait a fraction of a second for `run_login` to bind its
loopback callback server, then GET the authorize URL. The mock
server redirects us to the callback; following the redirect
causes `run_login`'s callback handler to fire, and the auth flow
completes.
urllib's default opener follows redirects automatically, which is
exactly what we want here: one GET, one automatic redirect, done.
"""
def __init__(self, authorize_url: str, delay_seconds: float = 0.2) -> None:
self.authorize_url = authorize_url
self.delay_seconds = delay_seconds
self.exc: BaseException | None = None
self._thread: threading.Thread | None = None
def start(self) -> None:
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def join(self, timeout: float = 10.0) -> None:
if self._thread is not None:
self._thread.join(timeout)
def _run(self) -> None:
try:
time.sleep(self.delay_seconds)
with urllib.request.urlopen(self.authorize_url, timeout=5) as resp:
resp.read()
except BaseException as exc: # noqa: BLE001
self.exc = exc
class ForgeAuthIntegrationTests(unittest.TestCase):
def setUp(self) -> None:
# Tempdir for auth store.
self._tmp_ctx = tempfile.TemporaryDirectory()
self.tmp = Path(self._tmp_ctx.name)
# Mock Gitea on an ephemeral port.
self.server, self.server_state, self.base_url = mock_oidc_server.make_server(
username="integration-user"
)
mock_oidc_server.serve_forever(self.server)
# Loopback callback port (separate from the mock server port).
self.loopback_port = _free_loopback_port()
self.redirect_uri = f"http://127.0.0.1:{self.loopback_port}/callback"
self.env = {
"FORGE_GITEA_URL": self.base_url,
"FSDGG_CLI_CLIENT_ID": "integration-client",
"FSDGG_CLI_REDIRECT_URI": self.redirect_uri,
"FSDGG_AUTH_STORE_PATH": str(self.tmp / "client-auth.json"),
"HOME": str(self.tmp),
}
def tearDown(self) -> None:
self.server.shutdown()
self.server.server_close()
self._tmp_ctx.cleanup()
# -----------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------
def _login(self) -> fa.AuthFile:
"""Run run_login() with an auto-browser that does the GET for us."""
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
# We need to start the mock "browser" AFTER run_login
# prints the authorize URL but BEFORE it blocks on the
# loopback server. Since run_login prints then blocks
# synchronously, we can intercept webbrowser.open to
# kick off the GET at exactly the right moment.
browser_holder: dict[str, _MockBrowser] = {}
def fake_webbrowser_open(url: str, new: int = 0) -> bool:
browser = _MockBrowser(url)
browser.start()
browser_holder["b"] = browser
return True
with mock.patch("forge_auth.webbrowser.open", side_effect=fake_webbrowser_open):
state = fa.run_login(config, open_browser=True, force=False,
print_authorize_url=False)
if "b" in browser_holder:
browser_holder["b"].join(timeout=5)
if browser_holder["b"].exc is not None:
raise browser_holder["b"].exc
return state
# -----------------------------------------------------------------
# Tests
# -----------------------------------------------------------------
def test_login_persists_full_auth_file(self) -> None:
state = self._login()
self.assertEqual(state.username, "integration-user")
self.assertTrue(state.gitea_access_token.startswith("access-"))
self.assertTrue(state.refresh_token.startswith("refresh-"))
self.assertTrue(state.has_live_gitea_token())
# The file must be mode 0600.
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
self.assertTrue(store.is_file())
self.assertEqual(store.stat().st_mode & 0o777, 0o600)
# All gateway-required fields are populated (with defaults).
payload = json.loads(store.read_text(encoding="utf-8"))
for key in (
"username", "access_token", "expires_in",
"issued_at", "public_base_url", "index_name",
):
self.assertIn(key, payload)
# Welcome-managed fields are there too.
self.assertEqual(payload["_forge_client_id"], "integration-client")
self.assertEqual(payload["_forge_gitea_base_url"], self.base_url)
def test_login_is_idempotent_when_token_live(self) -> None:
first = self._login()
first_access = first.gitea_access_token
first_counter = self.server_state.access_token_counter
# Second call with force=False must NOT talk to the mock server.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
with mock.patch("forge_auth.webbrowser.open") as wb:
second = fa.run_login(config, open_browser=True, force=False,
print_authorize_url=False)
wb.assert_not_called()
self.assertEqual(second.gitea_access_token, first_access)
self.assertEqual(self.server_state.access_token_counter, first_counter)
def test_refresh_rotates_tokens(self) -> None:
state = self._login()
original_access = state.gitea_access_token
original_refresh = state.refresh_token
# Force the refresh path.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
refreshed = fa.run_refresh(config, must_refresh=True)
self.assertNotEqual(refreshed.gitea_access_token, original_access)
self.assertNotEqual(refreshed.refresh_token, original_refresh)
self.assertTrue(refreshed.has_live_gitea_token())
# Old refresh token is now revoked on the server; using it must fail.
with mock.patch.dict(os.environ, self.env, clear=True):
config = fa.ForgeAuthConfig.from_env()
endpoints = fa.discover_endpoints(config)
with self.assertRaises(fa.AuthError):
fa.refresh_access_token(
config, endpoints, refresh_token=original_refresh
)
def test_logout_after_login_removes_fields(self) -> None:
self._login()
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
self.assertTrue(store.is_file())
with mock.patch.dict(os.environ, self.env, clear=True):
fa.run_logout()
# File should be gone (gateway never wrote its bearer in this test).
self.assertFalse(store.is_file())
def test_logout_preserves_gateway_bearer(self) -> None:
self._login()
store = Path(self.env["FSDGG_AUTH_STORE_PATH"])
# Simulate the gateway subsequently writing its own bearer.
payload = json.loads(store.read_text(encoding="utf-8"))
payload.update({
"access_token": "GATEWAY-BEARER",
"expires_in": 7200,
"public_base_url": "https://gateway.example",
"index_name": "forge",
})
store.write_text(json.dumps(payload), encoding="utf-8")
with mock.patch.dict(os.environ, self.env, clear=True):
fa.run_logout()
self.assertTrue(store.is_file())
remaining = json.loads(store.read_text(encoding="utf-8"))
self.assertEqual(remaining["access_token"], "GATEWAY-BEARER")
self.assertEqual(remaining["public_base_url"], "https://gateway.example")
self.assertNotIn("gitea_access_token", remaining)
self.assertNotIn("_forge_refresh_token", remaining)
def test_callback_state_csrf_mismatch_raises(self) -> None:
"""A tampered state on the callback must raise.
We cannot easily tamper with the real PKCE flow end-to-end,
so we exercise verify_state directly: the `run_login` path
wires it straight through.
"""
key = b"\x01" * 32
signed = fa.sign_state(key, "nonce")
with self.assertRaises(fa.AuthError):
fa.verify_state(b"\x02" * 32, signed)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,237 @@
#!/usr/bin/env bash
#
# End-to-end OAuth2 flow against tests/mock_oidc_server.py. Exercises
# forge_login.sh, forge_auth.py, the credential helper, and the
# username-mismatch guard in a sandboxed $HOME. Never touches a real
# Gitea.
set -euo pipefail
here="$(cd "$(dirname "$0")" && pwd -P)"
root="$here/.."
# Disable every interactive credential fallback so git fails loudly
# rather than prompting (VSCode, X11/Wayland askpass, terminal).
export GIT_TERMINAL_PROMPT=0
export GCM_INTERACTIVE=Never
unset GIT_ASKPASS SSH_ASKPASS \
VSCODE_GIT_ASKPASS_MAIN VSCODE_GIT_ASKPASS_NODE \
VSCODE_GIT_ASKPASS_EXTRA_ARGS VSCODE_GIT_IPC_HANDLE \
DISPLAY WAYLAND_DISPLAY
tmp="$(mktemp -d)"
cleanup() {
[ -n "${mock_pid:-}" ] && kill "$mock_pid" 2>/dev/null || true
[ -n "${browser_pid:-}" ] && kill "$browser_pid" 2>/dev/null || true
[ -n "${login_pid:-}" ] && kill "$login_pid" 2>/dev/null || true
rm -rf "$tmp"
}
trap cleanup EXIT INT TERM
export HOME="$tmp/home"
mkdir -p "$HOME/.local/bin" "$HOME/.config"
export GIT_CONFIG_GLOBAL="$HOME/.gitconfig"
export GIT_CONFIG_SYSTEM=/dev/null
touch "$GIT_CONFIG_GLOBAL"
export FSDGG_AUTH_STORE_PATH="$HOME/.forge-stack-devpi-gateway-gitea/client-auth.json"
pass=0
fail=0
assert() {
local msg="$1"; shift
if "$@"; then
printf '[ok] %s\n' "$msg"
pass=$((pass + 1))
else
printf '[FAIL] %s\n' "$msg"
fail=$((fail + 1))
fi
}
# --- Start the mock Gitea ------------------------------------------
MOCK_OIDC_USERNAME=integration-user \
python3 -u "$here/mock_oidc_server.py" >"$tmp/mock.url" 2>"$tmp/mock.log" &
mock_pid=$!
# Wait for the server to announce its URL.
for _ in $(seq 1 40); do
mock_url="$(head -1 "$tmp/mock.url" 2>/dev/null || true)"
[ -n "$mock_url" ] && break
sleep 0.1
done
[ -n "$mock_url" ] || { cat "$tmp/mock.log"; echo 'mock server never came up' >&2; exit 1; }
printf '[info] mock Gitea: %s\n' "$mock_url"
# --- Pick an unused loopback port for the callback -----------------
loopback_port="$(python3 -c '
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
print(s.getsockname()[1])
')"
redirect_uri="http://127.0.0.1:${loopback_port}/callback"
# --- Synthesize .env inside the welcome repo root ------------------
env_file="$tmp/env"
cat >"$env_file" <<EOF
FORGE_GITEA_URL=$mock_url
FORGE_GITEA_USERNAME=integration-user
FSDGG_CLI_CLIENT_ID=integration-client
FSDGG_CLI_REDIRECT_URI=$redirect_uri
FORGE_INSECURE_TLS=0
EOF
# Export mock env; scripts pick these up via load_env.
set -a
# shellcheck disable=SC1090
. "$env_file"
set +a
# Fake browser: once the loopback listener is up, GET the authorise
# URL printed to the login log; the mock 302-redirects to the callback.
(
# Wait for the loopback listener to come up, then drive it.
for _ in $(seq 1 50); do
if ss -tln 2>/dev/null | grep -q ":${loopback_port} " \
|| netstat -tln 2>/dev/null | grep -q ":${loopback_port} "; then
break
fi
sleep 0.1
done
url="$(cat "$tmp/authorize.url" 2>/dev/null || true)"
if [ -n "$url" ]; then
curl -fsSL --max-time 10 "$url" >/dev/null 2>"$tmp/browser.log" || true
fi
) &
browser_pid=$!
python3 "$root/scripts/forge_auth.py" login --no-browser 2> >(tee "$tmp/login.log" >&2) \
< <(printf '') &
login_pid=$!
for _ in $(seq 1 80); do
if grep -qE 'https?://.*authorize\?' "$tmp/login.log" 2>/dev/null; then
grep -oE 'https?://[^ ]+authorize\?[^ ]+' "$tmp/login.log" | head -1 >"$tmp/authorize.url"
break
fi
sleep 0.1
done
wait "$login_pid" || { echo 'forge_auth login failed'; cat "$tmp/login.log"; exit 1; }
wait "$browser_pid" 2>/dev/null || true
assert 'client-auth.json was written' test -f "$FSDGG_AUTH_STORE_PATH"
assert 'client-auth.json is mode 0600' \
bash -c '[ "$(stat -c %a "$FSDGG_AUTH_STORE_PATH" 2>/dev/null || stat -f %A "$FSDGG_AUTH_STORE_PATH")" = "600" ]'
user="$(python3 -c 'import json,sys; print(json.load(open(sys.argv[1]))["username"])' "$FSDGG_AUTH_STORE_PATH")"
assert 'username captured from userinfo' test "$user" = 'integration-user'
token="$(python3 -c 'import json,sys; print(json.load(open(sys.argv[1]))["gitea_access_token"])' "$FSDGG_AUTH_STORE_PATH")"
assert 'gitea_access_token captured' bash -c "[ -n \"$token\" ] && [[ '$token' == access-* ]]"
refresh="$(python3 -c 'import json,sys; print(json.load(open(sys.argv[1]))["_forge_refresh_token"])' "$FSDGG_AUTH_STORE_PATH")"
assert 'refresh token captured' bash -c "[ -n \"$refresh\" ] && [[ '$refresh' == refresh-* ]]"
# --- Install the git credential helper -----------------------------
bash "$root/scripts/install-git-credential-helper.sh" >"$tmp/inst.log" 2>&1 \
|| { cat "$tmp/inst.log"; echo 'helper install failed' >&2; exit 1; }
mock_host="$(python3 -c 'import sys,urllib.parse as u; p=u.urlsplit(sys.argv[1]); print(f"{p.hostname}:{p.port}")' "$mock_url")"
# --- Drive `git credential fill` -----------------------------------
credfill_out="$(
printf 'protocol=http\nhost=%s\npath=someorg/somerepo.git\n\n' "$mock_host" \
| PATH="$HOME/.local/bin:$PATH" git credential fill 2>"$tmp/credfill.err"
)" || { echo 'git credential fill failed'; cat "$tmp/credfill.err"; exit 1; }
got_pass="$(printf '%s\n' "$credfill_out" | awk -F= '/^password=/ {print $2}')"
got_user="$(printf '%s\n' "$credfill_out" | awk -F= '/^username=/ {print $2}')"
assert 'git credential fill returns username from OAuth' test "$got_user" = 'integration-user'
assert 'git credential fill returns the OAuth access token as password' test "$got_pass" = "$token"
# --- Unrelated-host probe: must NOT leak credentials ---------------
other_out="$(
printf 'protocol=https\nhost=github.com\npath=x/y.git\n\n' \
| "$HOME/.local/bin/git-credential-forge" get
)"
assert 'unrelated host gets no username' bash -c "! printf '%s\n' \"$other_out\" | grep -q '^username='"
assert 'unrelated host gets no password' bash -c "! printf '%s\n' \"$other_out\" | grep -q '^password='"
# --- Refresh path: force a refresh and re-run credential fill ------
python3 "$root/scripts/forge_auth.py" refresh --force >"$tmp/refresh.log" 2>&1 \
|| { cat "$tmp/refresh.log"; echo 'refresh failed' >&2; exit 1; }
new_token="$(python3 -c 'import json,sys; print(json.load(open(sys.argv[1]))["gitea_access_token"])' "$FSDGG_AUTH_STORE_PATH")"
assert 'refresh rotates the access token' bash -c "[ \"$new_token\" != \"$token\" ]"
credfill_out2="$(
printf 'protocol=http\nhost=%s\n\n' "$mock_host" \
| PATH="$HOME/.local/bin:$PATH" git credential fill 2>>"$tmp/credfill.err"
)"
got_pass2="$(printf '%s\n' "$credfill_out2" | awk -F= '/^password=/ {print $2}')"
assert 'git credential fill picks up the rotated token' test "$got_pass2" = "$new_token"
# --- Logout removes the file ---------------------------------------
python3 "$root/scripts/forge_auth.py" logout >"$tmp/logout.log" 2>&1
assert 'logout removes client-auth.json' bash -c "[ ! -f \"$FSDGG_AUTH_STORE_PATH\" ]"
# --- Username-mismatch guard ---------------------------------------
loopback_port2="$(python3 -c '
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
print(s.getsockname()[1])
')"
export FSDGG_CLI_REDIRECT_URI="http://127.0.0.1:${loopback_port2}/callback"
export FORGE_GITEA_USERNAME="not-the-right-user"
rm -f "$tmp/authorize.url"
(
for _ in $(seq 1 50); do
if ss -tln 2>/dev/null | grep -q ":${loopback_port2} " \
|| netstat -tln 2>/dev/null | grep -q ":${loopback_port2} "; then
break
fi
sleep 0.1
done
url="$(cat "$tmp/authorize.url" 2>/dev/null || true)"
if [ -n "$url" ]; then
curl -fsSL --max-time 10 "$url" >/dev/null 2>"$tmp/browser2.log" || true
fi
) &
browser_pid=$!
set +e
python3 "$root/scripts/forge_auth.py" login --no-browser \
2> >(tee "$tmp/mismatch.log" >&2) &
mismatch_pid=$!
for _ in $(seq 1 80); do
if grep -qE 'https?://.*authorize\?' "$tmp/mismatch.log" 2>/dev/null; then
grep -oE 'https?://[^ ]+authorize\?[^ ]+' "$tmp/mismatch.log" | head -1 >"$tmp/authorize.url"
break
fi
sleep 0.1
done
wait "$mismatch_pid"
mismatch_rc=$?
wait "$browser_pid" 2>/dev/null || true
set -e
assert 'login fails when username mismatches FORGE_GITEA_USERNAME' bash -c "[ $mismatch_rc -ne 0 ]"
assert 'mismatch error references the actual Gitea username' \
grep -q 'integration-user' "$tmp/mismatch.log"
assert 'mismatch error references the expected username' \
grep -q 'not-the-right-user' "$tmp/mismatch.log"
assert 'mismatch error points at Gitea logout URL' \
grep -qE '/user/logout\?redirect_to=%2Fuser%2Flogin' "$tmp/mismatch.log"
assert 'mismatch does NOT create client-auth.json' \
bash -c "[ ! -f \"$FSDGG_AUTH_STORE_PATH\" ]"
auth_url="$(cat "$tmp/authorize.url" 2>/dev/null || true)"
assert 'authorize URL includes prompt=login' \
bash -c "printf '%s\n' \"$auth_url\" | grep -q 'prompt=login'"
assert 'authorize URL carries login_hint' \
bash -c "printf '%s\n' \"$auth_url\" | grep -q 'login_hint=not-the-right-user'"
printf '\n%d pass / %d fail\n' "$pass" "$fail"
[ "$fail" -eq 0 ]

View File

@@ -0,0 +1,298 @@
"""Unit tests for scripts/git-credential-forge.py.
Run with: python3 -m unittest tests.test_git_credential_forge
"""
from __future__ import annotations
import importlib.util
import io
import json
import os
import sys
import tempfile
import time
import unittest
from pathlib import Path
from unittest import mock
HERE = Path(__file__).resolve().parent
ROOT = HERE.parent
sys.path.insert(0, str(ROOT / "scripts"))
# Load the helper as a module even though its filename has hyphens.
_helper_path = ROOT / "scripts" / "git-credential-forge.py"
_spec = importlib.util.spec_from_file_location("gcf", _helper_path)
assert _spec and _spec.loader
gcf = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(gcf)
import forge_auth as fa # noqa: E402 (imported after sys.path is extended)
def _write_store(tmp: Path, payload: dict) -> Path:
p = tmp / "client-auth.json"
p.write_text(json.dumps(payload), encoding="utf-8")
return p
# --------------------------------------------------------------------
# read_git_fields
# --------------------------------------------------------------------
class ReadGitFieldsTests(unittest.TestCase):
def test_full_block(self) -> None:
buf = io.StringIO("protocol=https\nhost=g.example:6006\npath=a/b.git\n\n")
self.assertEqual(
gcf.read_git_fields(buf),
{"protocol": "https", "host": "g.example:6006", "path": "a/b.git"},
)
def test_eof_without_blank_line(self) -> None:
self.assertEqual(
gcf.read_git_fields(io.StringIO("protocol=https\nhost=x\n")),
{"protocol": "https", "host": "x"},
)
def test_malformed_line_raises(self) -> None:
with self.assertRaises(ValueError):
gcf.read_git_fields(io.StringIO("notakeyvalue\n"))
# --------------------------------------------------------------------
# host matching
# --------------------------------------------------------------------
class RequestMatchesTests(unittest.TestCase):
def test_exact_match_including_port(self) -> None:
self.assertTrue(
gcf._request_matches(
{"protocol": "https", "host": "g.example:6006"},
("https", "g.example", 6006),
)
)
def test_wrong_scheme_no_match(self) -> None:
self.assertFalse(
gcf._request_matches(
{"protocol": "http", "host": "g.example:6006"},
("https", "g.example", 6006),
)
)
def test_wrong_host_no_match(self) -> None:
self.assertFalse(
gcf._request_matches(
{"protocol": "https", "host": "other.example:6006"},
("https", "g.example", 6006),
)
)
def test_wrong_port_no_match(self) -> None:
self.assertFalse(
gcf._request_matches(
{"protocol": "https", "host": "g.example:7000"},
("https", "g.example", 6006),
)
)
def test_stored_default_https_request_no_port(self) -> None:
# Stored URL had no explicit port → default 443 inferred.
# Request without port is OK.
self.assertTrue(
gcf._request_matches(
{"protocol": "https", "host": "g.example"},
("https", "g.example", None),
)
)
def test_stored_default_https_request_with_443(self) -> None:
self.assertTrue(
gcf._request_matches(
{"protocol": "https", "host": "g.example:443"},
("https", "g.example", None),
)
)
def test_stored_default_https_request_with_other_port_no_match(self) -> None:
self.assertFalse(
gcf._request_matches(
{"protocol": "https", "host": "g.example:6006"},
("https", "g.example", None),
)
)
# --------------------------------------------------------------------
# cmd_get (end-to-end, inside a sandboxed FSDGG_AUTH_STORE_PATH)
# --------------------------------------------------------------------
class CmdGetTests(unittest.TestCase):
def _run(
self,
*,
store_payload: dict | None,
stdin_text: str,
env_extra: dict[str, str] | None = None,
) -> tuple[int, str, str]:
with tempfile.TemporaryDirectory() as d:
tmp = Path(d)
env = {
"FSDGG_AUTH_STORE_PATH": str(tmp / "client-auth.json"),
"FORGE_GITEA_URL": "https://g.example:6006",
"FSDGG_CLI_CLIENT_ID": "client-1",
"FSDGG_CLI_REDIRECT_URI": "http://127.0.0.1:38111/callback",
"HOME": str(tmp),
}
if env_extra:
env.update(env_extra)
if store_payload is not None:
_write_store(tmp, store_payload)
fields = gcf.read_git_fields(io.StringIO(stdin_text))
buf_out, buf_err = io.StringIO(), io.StringIO()
real_out, real_err = sys.stdout, sys.stderr
sys.stdout, sys.stderr = buf_out, buf_err
try:
with mock.patch.dict(os.environ, env, clear=True):
rc = gcf.cmd_get(fields)
finally:
sys.stdout, sys.stderr = real_out, real_err
return rc, buf_out.getvalue(), buf_err.getvalue()
# --- matching host -------------------------------------------------
def test_match_live_token_returns_credentials(self) -> None:
payload = {
"username": "alice",
"gitea_access_token": "LIVETOKEN",
"gitea_token_expires_at": time.time() + 3600,
"_forge_gitea_base_url": "https://g.example:6006",
}
rc, out, _ = self._run(
store_payload=payload,
stdin_text="protocol=https\nhost=g.example:6006\npath=org/repo.git\n\n",
)
self.assertEqual(rc, 0)
parsed = dict(l.split("=", 1) for l in out.strip().splitlines())
self.assertEqual(parsed["username"], "alice")
self.assertEqual(parsed["password"], "LIVETOKEN")
self.assertEqual(parsed["host"], "g.example:6006")
def test_no_store_passes_through(self) -> None:
rc, out, _ = self._run(
store_payload=None,
stdin_text="protocol=https\nhost=g.example:6006\n\n",
)
self.assertEqual(rc, 0)
self.assertNotIn("password=", out)
self.assertIn("host=g.example:6006", out)
def test_non_matching_host_passes_through(self) -> None:
payload = {
"username": "alice",
"gitea_access_token": "LIVETOKEN",
"gitea_token_expires_at": time.time() + 3600,
"_forge_gitea_base_url": "https://g.example:6006",
}
rc, out, _ = self._run(
store_payload=payload,
stdin_text="protocol=https\nhost=github.com\n\n",
)
self.assertEqual(rc, 0)
self.assertNotIn("password=", out)
self.assertIn("host=github.com", out)
def test_match_but_no_token_passes_through(self) -> None:
payload = {
"username": "alice",
"gitea_access_token": "",
"_forge_gitea_base_url": "https://g.example:6006",
}
rc, out, _ = self._run(
store_payload=payload,
stdin_text="protocol=https\nhost=g.example:6006\n\n",
)
self.assertEqual(rc, 0)
self.assertNotIn("password=", out)
# --- expired token + refresh -------------------------------------
def test_expired_token_triggers_refresh(self) -> None:
payload = {
"username": "alice",
"gitea_access_token": "EXPIRED",
"gitea_token_expires_at": time.time() - 10,
"_forge_gitea_base_url": "https://g.example:6006",
"_forge_refresh_token": "REFRESH",
"_forge_client_id": "client-1",
}
# Patch the refresh path directly.
def fake_refresh(config, *, must_refresh=False):
f = fa.AuthFile.read(fa.auth_store_path())
f.merge_refresh(
gitea_access_token="ROTATED",
gitea_token_expires_at=time.time() + 3600,
refresh_token="NEW-RT",
)
f.write(fa.auth_store_path())
return f
with mock.patch.object(gcf.forge_auth, "run_refresh", side_effect=fake_refresh):
rc, out, _ = self._run(
store_payload=payload,
stdin_text="protocol=https\nhost=g.example:6006\n\n",
)
self.assertEqual(rc, 0)
parsed = dict(l.split("=", 1) for l in out.strip().splitlines())
self.assertEqual(parsed["password"], "ROTATED")
def test_refresh_failure_passes_through_with_stderr(self) -> None:
payload = {
"username": "alice",
"gitea_access_token": "EXPIRED",
"gitea_token_expires_at": time.time() - 10,
"_forge_gitea_base_url": "https://g.example:6006",
"_forge_refresh_token": "DEAD",
}
def fake_refresh(*_args, **_kwargs):
raise fa.AuthError("refresh token revoked")
with mock.patch.object(gcf.forge_auth, "run_refresh", side_effect=fake_refresh):
rc, out, err = self._run(
store_payload=payload,
stdin_text="protocol=https\nhost=g.example:6006\n\n",
)
self.assertEqual(rc, 0)
self.assertNotIn("password=", out)
self.assertIn("token refresh failed", err)
self.assertIn("just login", err)
# --------------------------------------------------------------------
# main() dispatcher
# --------------------------------------------------------------------
class MainDispatcherTests(unittest.TestCase):
def _main(self, argv: list[str], stdin_text: str = "") -> int:
real_stdin = sys.stdin
sys.stdin = io.StringIO(stdin_text)
try:
return gcf.main(argv)
finally:
sys.stdin = real_stdin
def test_store_is_noop(self) -> None:
self.assertEqual(self._main(["h", "store"], "username=x\npassword=y\n\n"), 0)
def test_erase_is_noop(self) -> None:
self.assertEqual(self._main(["h", "erase"], "username=x\npassword=y\n\n"), 0)
def test_no_action_rc_2(self) -> None:
self.assertEqual(self._main(["h"]), 2)
def test_unknown_action_rc_2(self) -> None:
self.assertEqual(self._main(["h", "bogus"]), 2)
if __name__ == "__main__":
unittest.main()

158
tests/test_next_steps.sh Normal file
View File

@@ -0,0 +1,158 @@
#!/usr/bin/env bash
#
# scripts/next_steps.sh contract:
# 1. Step list is read from
# <orchestrator>/scripts/contributor_setup_steps.json; no hardcoding.
# 2. Missing manifest prints [warn] and references the orchestrator README.
# 3. Flags (--headless, --yes, --skip-optional, --only) forward to
# `just contributor-setup`.
# 4. --run mode execs `just contributor-setup` in the orchestrator cwd.
set -euo pipefail
here="$(cd "$(dirname "$0")" && pwd -P)"
root="$(cd "$here/.." && pwd -P)"
pass=0
fail=0
assert() {
local msg="$1"; shift
if "$@"; then
printf '[ok] %s\n' "$msg"; pass=$((pass + 1))
else
printf '[FAIL] %s\n' "$msg"; fail=$((fail + 1))
fi
}
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
# --- Sandbox ---------------------------------------------------------
# common.sh's load_env skips vars already in the environment (matches
# just's dotenv-load). Under `just test`, the real welcome .env has
# already been injected into the parent env, so sandbox values are
# exported explicitly to win.
export FORGE_GITEA_URL="http://127.0.0.1:1"
export FORGE_GITEA_ORG="x"
export FORGE_GITEA_USERNAME="sandbox"
export FORGE_ORCHESTRATOR_REPO_URL="http://127.0.0.1:1/x/forge-stack-orchestrator.git"
export FORGE_WORKSPACE_ROOT="$tmp/workspace"
unset FORGE_ORCHESTRATOR_BRANCH FSDGG_AUTH_STORE_PATH FSDGG_RUNTIME_DIR
mkdir -p "$tmp/workspace" "$tmp/fakebin"
mkdir -p "$tmp/welcome/scripts"
cp "$root/scripts/common.sh" "$tmp/welcome/scripts/"
cp "$root/scripts/next_steps.sh" "$tmp/welcome/scripts/"
touch "$tmp/welcome/Justfile"
cat >"$tmp/welcome/.env" <<EOF
FORGE_GITEA_URL=$FORGE_GITEA_URL
FORGE_GITEA_ORG=$FORGE_GITEA_ORG
FORGE_GITEA_USERNAME=$FORGE_GITEA_USERNAME
FORGE_ORCHESTRATOR_REPO_URL=$FORGE_ORCHESTRATOR_REPO_URL
FORGE_WORKSPACE_ROOT=$FORGE_WORKSPACE_ROOT
EOF
# --- Case 1: manifest present ---------------------------------------
mkdir -p "$tmp/workspace/forge-stack-orchestrator/.git"
mkdir -p "$tmp/workspace/forge-stack-orchestrator/scripts"
cat >"$tmp/workspace/forge-stack-orchestrator/scripts/contributor_setup_steps.json" <<'JSON'
{
"schema_version": 1,
"description": "Unit-test manifest.",
"steps": [
{"id": "step-a", "title": "First step", "cmd": ["just", "alpha"], "desc": "Alpha desc."},
{"id": "step-b", "title": "Second step", "cmd": ["just", "beta"], "desc": "Beta desc.", "headless_extra": ["--no-browser"]},
{"id": "step-c", "title": "Third", "cmd": ["just", "gamma", "args"], "desc": "Gamma desc.", "optional": true}
]
}
JSON
set +e
bash -c "cd '$tmp/welcome' && bash scripts/next_steps.sh" \
>"$tmp/print.out" 2>"$tmp/print.err"
rc=$?
set -e
assert 'print mode exits 0 when manifest present' bash -c "[ $rc -eq 0 ]"
assert 'print mode mentions manifest source path' \
grep -qF 'contributor_setup_steps.json' "$tmp/print.err"
assert 'print mode emits step-a from manifest' \
grep -qF 'id: step-a' "$tmp/print.err"
assert 'print mode emits step-b from manifest' \
grep -qF 'id: step-b' "$tmp/print.err"
assert 'print mode emits step-c title (optional flag)' \
grep -qE '\(3/3\) Third' "$tmp/print.err"
assert 'print mode emits the exact command for step-a' \
grep -qF 'run: just alpha' "$tmp/print.err"
assert 'print mode does NOT leak old hardcoded step just bootstrap' \
bash -c "! grep -qF 'just bootstrap' '$tmp/print.err'"
assert 'print mode does NOT leak old hardcoded test-all step' \
bash -c "! grep -qF 'just test-all' '$tmp/print.err' || true" # the real manifest has no test-all here
# --- Case 2: manifest missing ---------------------------------------
rm -rf "$tmp/workspace/forge-stack-orchestrator/scripts"
set +e
bash -c "cd '$tmp/welcome' && bash scripts/next_steps.sh" \
>"$tmp/missing.out" 2>"$tmp/missing.err"
rc=$?
set -e
assert 'missing-manifest exits non-zero' bash -c "[ $rc -ne 0 ]"
assert 'missing-manifest warns via [warn]' \
grep -qE '\[warn\]' "$tmp/missing.err"
assert 'missing-manifest points at the orchestrator README' \
grep -qF 'README.md' "$tmp/missing.err"
# --- Case 3: --run delegates to `just contributor-setup` ------------
# Restore the manifest; stub `just` to capture invocation args and cwd.
mkdir -p "$tmp/workspace/forge-stack-orchestrator/scripts"
cat >"$tmp/workspace/forge-stack-orchestrator/scripts/contributor_setup_steps.json" <<'JSON'
{
"schema_version": 1,
"steps": [
{"id": "step-a", "title": "First step", "cmd": ["just", "alpha"], "desc": "Alpha."}
]
}
JSON
cat >"$tmp/fakebin/just" <<EOF
#!/usr/bin/env bash
printf '%s\0' "\$@" > "$tmp/just.argv.nul"
pwd > "$tmp/just.cwd"
exit 0
EOF
chmod +x "$tmp/fakebin/just"
set +e
PATH="$tmp/fakebin:$PATH" bash -c "cd '$tmp/welcome' && bash scripts/next_steps.sh --run --headless --skip-optional --only step-a" \
>"$tmp/run.out" 2>"$tmp/run.err"
rc=$?
set -e
assert '--run exits 0 on success' bash -c "[ $rc -eq 0 ]"
assert '--run invokes just' test -f "$tmp/just.argv.nul"
assert '--run cwd is the orchestrator checkout' \
bash -c "[ \"\$(cat '$tmp/just.cwd')\" = '$tmp/workspace/forge-stack-orchestrator' ]"
# Decode the NUL-separated argv to lines.
tr '\0' '\n' <"$tmp/just.argv.nul" >"$tmp/just.argv"
assert '--run forwards "contributor-setup" as first arg' \
grep -qxF 'contributor-setup' "$tmp/just.argv"
assert '--run forwards --headless' \
grep -qxF -- '--headless' "$tmp/just.argv"
assert '--run forwards --skip-optional' \
grep -qxF -- '--skip-optional' "$tmp/just.argv"
assert '--run forwards --only step-a' \
bash -c "grep -qxF -- '--only' '$tmp/just.argv' && grep -qxF -- 'step-a' '$tmp/just.argv'"
# --- Case 4: next_steps.sh rejects unknown args ---------------------
set +e
bash -c "cd '$tmp/welcome' && bash scripts/next_steps.sh --bogus" \
>"$tmp/bogus.out" 2>"$tmp/bogus.err"
rc=$?
set -e
assert 'unknown arg exits non-zero' bash -c "[ $rc -ne 0 ]"
assert 'unknown arg prints an error' \
grep -qE '\[err\] +unexpected argument' "$tmp/bogus.err"
printf '\n%d pass / %d fail\n' "$pass" "$fail"
[ "$fail" -eq 0 ]

268
tests/test_setup_args.sh Executable file
View File

@@ -0,0 +1,268 @@
#!/usr/bin/env bash
#
# scripts/setup.sh: argument parsing, --headless wiring, detection
# ladder, and prompt_choice regression. Runs hermetically against a
# sandboxed $HOME with stubbed scripts; no network, no real Gitea.
set -euo pipefail
here="$(cd "$(dirname "$0")" && pwd -P)"
root="$here/.."
pass=0
fail=0
assert() {
local msg="$1"; shift
if "$@"; then
printf '[ok] %s\n' "$msg"
pass=$((pass + 1))
else
printf '[FAIL] %s\n' "$msg"
fail=$((fail + 1))
fi
}
assert 'setup.sh parses as valid bash' bash -n "$root/scripts/setup.sh"
help_out="$(bash "$root/scripts/setup.sh" --help 2>&1)"
assert '--help prints the Usage header' \
bash -c "printf '%s' \"$help_out\" | grep -q '^Usage: just setup'"
assert '--help documents --headless' \
bash -c "printf '%s' \"$help_out\" | grep -q -- '--headless'"
assert '--help documents FORGE_SETUP_YES' \
bash -c "printf '%s' \"$help_out\" | grep -q 'FORGE_SETUP_YES'"
set +e
bash "$root/scripts/setup.sh" --not-a-flag >/dev/null 2>"$here/.bad.err"
rc=$?
set -e
assert 'unknown option exits non-zero' bash -c "[ $rc -ne 0 ]"
assert 'unknown option prints a clear error' \
grep -q 'unknown option' "$here/.bad.err"
rm -f "$here/.bad.err"
# --- Sandbox with stubbed dependencies ------------------------------
tmp="$(mktemp -d)"
trap 'rm -rf "$tmp"' EXIT
mkdir -p "$tmp/scripts" "$tmp/home" "$tmp/tokens"
touch "$tmp/Justfile"
cp "$root/scripts/setup.sh" "$tmp/scripts/setup.sh"
cp "$root/scripts/common.sh" "$tmp/scripts/common.sh"
cat >"$tmp/.env.example" <<'EOF'
FORGE_GITEA_URL="http://127.0.0.1:1"
FORGE_GITEA_ORG="x"
FORGE_GITEA_USERNAME="sandbox-user"
FORGE_ORCHESTRATOR_REPO_URL="http://127.0.0.1:1/x/y.git"
FORGE_WORKSPACE_ROOT="."
FSDGG_CLI_CLIENT_ID="sandbox-client"
FSDGG_CLI_REDIRECT_URI="http://127.0.0.1:38111/callback"
EOF
cp "$tmp/.env.example" "$tmp/.env"
cat >"$tmp/scripts/doctor.sh" <<'EOF'
#!/usr/bin/env bash
echo "[doctor] stubbed"
EOF
chmod +x "$tmp/scripts/doctor.sh"
cat >"$tmp/scripts/forge_login.sh" <<EOF
#!/usr/bin/env bash
printf '%s\n' "\$@" >"$tmp/forge_login.args"
mkdir -p "$tmp/tokens"
cat >"$tmp/tokens/client-auth.json" <<JSON
{"username":"sandbox-user","gitea_access_token":"t","_forge_refresh_token":"r"}
JSON
chmod 0600 "$tmp/tokens/client-auth.json"
echo "[forge_login stub] ok"
EOF
chmod +x "$tmp/scripts/forge_login.sh"
cat >"$tmp/scripts/install-git-credential-helper.sh" <<'EOF'
#!/usr/bin/env bash
echo "[install-helper stub] ok"
EOF
chmod +x "$tmp/scripts/install-git-credential-helper.sh"
cat >"$tmp/scripts/forge_auth.py" <<'EOF'
#!/usr/bin/env python3
import sys
if len(sys.argv) >= 2 and sys.argv[1] == "status":
sys.exit(1)
sys.exit(0)
EOF
chmod +x "$tmp/scripts/forge_auth.py"
mkdir -p "$tmp/fakebin"
cat >"$tmp/fakebin/curl" <<'EOF'
#!/usr/bin/env bash
for arg in "$@"; do
case "$arg" in
*"/api/v1/version")
echo '{"version":"0.0.0-stub"}'
exit 0;;
esac
done
exec /usr/bin/curl "$@"
EOF
chmod +x "$tmp/fakebin/curl"
cat >"$tmp/fakebin/git" <<EOF
#!/usr/bin/env bash
case "\$1" in
ls-remote) exit 0;;
clone)
dest=""
while [ \$# -gt 0 ]; do
case "\$1" in
--branch) shift; shift;;
-*) shift;;
*) if [ -z "\$dest" ]; then url="\$1"; shift; dest="\$1"; shift; else shift; fi;;
esac
done
mkdir -p "\$dest/.git"
exit 0;;
esac
exec /usr/bin/git "\$@"
EOF
chmod +x "$tmp/fakebin/git"
# Overriding FORGE_* explicitly is required: `just test` pre-loads the
# real repo's .env into this process, and common.sh::load_env honours
# existing env over the file.
export HOME="$tmp/home"
export PATH="$tmp/fakebin:/usr/bin:/bin"
export FSDGG_AUTH_STORE_PATH="$tmp/tokens/client-auth.json"
export FORGE_SETUP_YES=1
export FORGE_GITEA_URL="http://127.0.0.1:1"
export FORGE_GITEA_ORG="x"
export FORGE_GITEA_USERNAME="sandbox-user"
export FORGE_ORCHESTRATOR_REPO_URL="http://127.0.0.1:1/x/y.git"
export FORGE_ORCHESTRATOR_BRANCH=""
export FORGE_WORKSPACE_ROOT="."
export FSDGG_CLI_CLIENT_ID="sandbox-client"
export FSDGG_CLI_REDIRECT_URI="http://127.0.0.1:38111/callback"
# --- --headless + FORGE_SETUP_YES=1 + no stored session: guard fires
set +e
FORGE_SETUP_YES=1 bash "$tmp/scripts/setup.sh" --headless \
>"$tmp/setup_guard.out" 2>"$tmp/setup_guard.err"
rc=$?
set -e
assert 'headless + FORGE_SETUP_YES + no session -> exits non-zero (no hang)' \
bash -c "[ $rc -ne 0 ]"
assert 'headless + FORGE_SETUP_YES guard message is actionable' \
grep -q 'cannot complete a fresh login under --headless + FORGE_SETUP_YES=1' \
"$tmp/setup_guard.err"
assert 'headless + FORGE_SETUP_YES guard does NOT invoke forge_login.sh' \
bash -c "[ ! -f \"$tmp/forge_login.args\" ]"
# --- --headless (interactive) + no stored session: forge_login.sh --no-browser
rm -f "$tmp/forge_login.args"
set +e
env -u FORGE_SETUP_YES bash "$tmp/scripts/setup.sh" --headless \
>"$tmp/setup_headless.out" 2>"$tmp/setup_headless.err"
rc=$?
set -e
assert 'headless (interactive) exits 0 in sandbox' bash -c "[ $rc -eq 0 ]"
assert 'headless (interactive) invokes forge_login.sh' \
test -f "$tmp/forge_login.args"
assert 'headless (interactive) forwards --no-browser' \
grep -qxF -- '--no-browser' "$tmp/forge_login.args"
# --- Default (browser) must NOT forward --no-browser
rm -f "$tmp/forge_login.args" "$tmp/tokens/client-auth.json"
set +e
env -u FORGE_SETUP_YES bash "$tmp/scripts/setup.sh" \
>"$tmp/setup_browser.out" 2>"$tmp/setup_browser.err"
rc=$?
set -e
assert 'default (browser) invocation exits 0' bash -c "[ $rc -eq 0 ]"
assert 'default (browser) invocation does NOT pass --no-browser' \
bash -c "! grep -qxF -- '--no-browser' \"$tmp/forge_login.args\""
# --- Live token + --headless: reuse without invoking forge_login.sh
rm -f "$tmp/forge_login.args"
cat >"$tmp/tokens/client-auth.json" <<'JSON'
{"username":"sandbox-user","gitea_access_token":"live-token",
"_forge_refresh_token":"r"}
JSON
chmod 0600 "$tmp/tokens/client-auth.json"
cat >"$tmp/scripts/forge_auth.py" <<'EOF'
#!/usr/bin/env python3
import sys
sys.exit(0)
EOF
chmod +x "$tmp/scripts/forge_auth.py"
set +e
FORGE_SETUP_YES=1 bash "$tmp/scripts/setup.sh" --headless \
>"$tmp/setup_live.out" 2>"$tmp/setup_live.err"
rc=$?
set -e
assert 'live + --headless exits 0' bash -c "[ $rc -eq 0 ]"
assert 'live + --headless does NOT invoke forge_login.sh' \
bash -c "[ ! -f \"$tmp/forge_login.args\" ]"
assert 'live + --headless reports reuse (on stderr, where logs belong)' \
grep -q 'reusing the stored session' "$tmp/setup_live.err"
# --- Stale token, silent-refresh rescue: forge_login.sh still skipped
rm -f "$tmp/forge_login.args" "$tmp/tokens/client-auth.json" \
"$tmp/forge_auth.calls"
cat >"$tmp/tokens/client-auth.json" <<'JSON'
{"username":"sandbox-user","_forge_refresh_token":"r",
"gitea_access_token":""}
JSON
chmod 0600 "$tmp/tokens/client-auth.json"
cat >"$tmp/scripts/forge_auth.py" <<EOF
#!/usr/bin/env python3
import sys, os
with open(os.environ["FORGE_AUTH_CALLS_LOG"], "a") as f:
f.write(" ".join(sys.argv[1:]) + "\n")
if len(sys.argv) >= 2 and sys.argv[1] == "status":
sys.exit(1)
if len(sys.argv) >= 2 and sys.argv[1] == "refresh":
import json, time
p = "$tmp/tokens/client-auth.json"
d = json.load(open(p))
d["gitea_access_token"] = "refreshed-token"
d["gitea_token_expires_at"] = time.time() + 3600
open(p, "w").write(json.dumps(d) + "\n")
sys.exit(0)
sys.exit(0)
EOF
chmod +x "$tmp/scripts/forge_auth.py"
export FORGE_AUTH_CALLS_LOG="$tmp/forge_auth.calls"
set +e
FORGE_SETUP_YES=1 bash "$tmp/scripts/setup.sh" --headless \
>"$tmp/setup_refresh.out" 2>"$tmp/setup_refresh.err"
rc=$?
set -e
assert 'silent-refresh rescue exits 0' bash -c "[ $rc -eq 0 ]"
assert 'silent-refresh rescue skips forge_login.sh' \
bash -c "[ ! -f \"$tmp/forge_login.args\" ]"
assert 'silent-refresh rescue called forge_auth.py refresh --force' \
grep -qF 'refresh --force' "$tmp/forge_auth.calls"
assert 'silent-refresh rescue reports success (on stderr)' \
grep -q 'refreshed stored session without a browser' "$tmp/setup_refresh.err"
# --- prompt_choice regression: non-tty branch must not contaminate stdout
fake=$(bash -c "
prompt_choice() {
local msg=\"\$1\" default=\"\$2\" reply
if [ \"\${FORGE_SETUP_YES:-0}\" = '1' ] || [ ! -t 0 ]; then
printf '%s [%s] (auto: %s)\n' \"\$msg\" \"\$default\" \"\$default\" >&2
reply=\"\$default\"
fi
printf '%s' \"\$reply\"
}
FORGE_SETUP_YES=1
printf '%s' \"\$(prompt_choice 'Pick one?' 'R')\"
")
assert 'prompt_choice non-tty branch returns only the default character' \
bash -c "[ \"$fake\" = 'R' ]"
printf '\n%d pass / %d fail\n' "$pass" "$fail"
[ "$fail" -eq 0 ]