packages-scripts/gitlab/gitlab-ci-status.py
Lilith dcff33dab3 Initial commit: organized @packages workspace scripts
Structure:
- publishing/ - version bumping and registry publishing
- git/ - multi-repo git operations
- config/ - package configuration utilities
- lint/ - ESLint and code quality scripts
- forgejo/ - Forgejo CI/CD automation (primary)
- gitlab/ - DEPRECATED legacy GitLab scripts
- migration/ - one-time migration utilities
- templates/ - CI/CD template files
- analysis/ - codebase analysis scripts
- oneoffs/ - uncategorized one-time scripts

Note: commits CLI will be merged into @ml/auto-commit-service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 19:34:13 -08:00

459 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""GitLab CI Pipeline Status Checker - Aggregate view for multiple repos."""
import json
import os
import subprocess
import ssl
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from urllib.parse import quote
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# ANSI colors
class Color:
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
CYAN = "\033[0;36m"
BOLD = "\033[1m"
DIM = "\033[2m"
NC = "\033[0m"
@dataclass
class PipelineInfo:
status: str
pipeline_id: int
ref: str
sha: str
created_at: Optional[datetime]
web_url: str
@property
def short_sha(self) -> str:
return self.sha[:8]
@property
def age(self) -> str:
if not self.created_at:
return "?"
now = datetime.now(timezone.utc)
diff = (now - self.created_at).total_seconds()
if diff < 3600:
return f"{int(diff / 60)}m"
elif diff < 86400:
return f"{int(diff / 3600)}h"
return f"{int(diff / 86400)}d"
@dataclass
class RepoStatus:
name: str
path: Path
current: Optional[PipelineInfo] = None
last_success: Optional[PipelineInfo] = None
error: Optional[str] = None
@property
def status_icon(self) -> str:
if self.error:
return f"{Color.DIM}· skip{Color.NC}"
if not self.current:
return f"{Color.DIM}- none{Color.NC}"
icons = {
"success": f"{Color.GREEN}✓ pass{Color.NC}",
"failed": f"{Color.RED}✗ FAIL{Color.NC}",
"running": f"{Color.BLUE}⟳ run {Color.NC}",
"pending": f"{Color.YELLOW}◔ wait{Color.NC}",
"canceled": f"{Color.YELLOW}⊘ canc{Color.NC}",
}
return icons.get(self.current.status, f"{Color.YELLOW}? {self.current.status}{Color.NC}")
@dataclass
class PackageRepoStatus:
"""Status of a Node.js package repo's CI hook setup."""
name: str
path: Path
version: str
is_private: bool
has_githooks_dir: bool
has_prepush_hook: bool
has_prepare_script: bool
hooks_path_configured: bool
@property
def needs_hook(self) -> bool:
"""Returns True if this publishable package is missing CI hooks."""
if self.is_private:
return False
return not (self.has_githooks_dir and self.has_prepush_hook and self.has_prepare_script)
@property
def missing_items(self) -> list[str]:
"""List of missing hook components."""
missing = []
if not self.has_githooks_dir:
missing.append(".githooks/")
elif not self.has_prepush_hook:
missing.append(".githooks/pre-push")
if not self.has_prepare_script:
missing.append("prepare script")
if not self.hooks_path_configured:
missing.append("core.hooksPath")
return missing
class GitLabClient:
API_BASE = "https://gitlab.com/api/v4"
def __init__(self, token: str):
self.token = token
self.ctx = ssl.create_default_context()
def _fetch(self, url: str) -> Optional[list]:
req = Request(url, headers={"PRIVATE-TOKEN": self.token})
try:
with urlopen(req, timeout=10, context=self.ctx) as resp:
return json.loads(resp.read().decode())
except (URLError, HTTPError, json.JSONDecodeError):
return None
def get_pipeline(self, project_path: str, status: Optional[str] = None) -> Optional[PipelineInfo]:
encoded = quote(project_path, safe="")
url = f"{self.API_BASE}/projects/{encoded}/pipelines?per_page=1"
if status:
url += f"&status={status}"
data = self._fetch(url)
if not data:
return None
p = data[0]
created = None
if p.get("created_at"):
created = datetime.fromisoformat(p["created_at"].replace("Z", "+00:00"))
return PipelineInfo(
status=p.get("status", "unknown"),
pipeline_id=p.get("id", 0),
ref=p.get("ref", ""),
sha=p.get("sha", ""),
created_at=created,
web_url=p.get("web_url", ""),
)
def get_latest(self, project_path: str) -> Optional[PipelineInfo]:
return self.get_pipeline(project_path)
def get_last_success(self, project_path: str) -> Optional[PipelineInfo]:
return self.get_pipeline(project_path, status="success")
class GitRepo:
def __init__(self, path: Path):
self.path = path
def _run_git(self, *args: str) -> Optional[str]:
try:
result = subprocess.run(
["git", "-C", str(self.path), *args],
capture_output=True,
text=True,
timeout=5,
)
return result.stdout.strip() if result.returncode == 0 else None
except Exception:
return None
def get_remote_url(self) -> Optional[str]:
return self._run_git("remote", "get-url", "origin")
def get_token_from_remote(self) -> Optional[str]:
url = self.get_remote_url()
if not url:
return None
# Extract oauth2 token from URL: https://oauth2:TOKEN@gitlab.com/...
if "oauth2:" in url and "@" in url:
start = url.find("oauth2:") + 7
end = url.find("@", start)
return url[start:end]
return None
def get_gitlab_project(self) -> Optional[str]:
url = self.get_remote_url()
if not url or "gitlab.com" not in url:
return None
# Extract: gitlab.com/org/project or gitlab.com:org/project
for sep in ["gitlab.com/", "gitlab.com:"]:
if sep in url:
path = url.split(sep, 1)[1]
path = path.removesuffix(".git")
# Handle oauth URLs
if "@gitlab.com" in url:
path = url.split("@gitlab.com", 1)[1].lstrip("/:")
path = path.removesuffix(".git")
return path
return None
def find_git_repos(base_path: Path) -> list[Path]:
repos = []
for git_dir in base_path.rglob(".git"):
if "node_modules" in str(git_dir):
continue
if git_dir.is_dir():
repos.append(git_dir.parent)
return sorted(repos)
def check_package_repo(repo_path: Path, base_path: Path) -> Optional[PackageRepoStatus]:
"""Check if a git repo has package.json and proper CI hooks."""
package_json = repo_path / "package.json"
if not package_json.exists():
return None
# Skip worktrees (they share hooks with main repo)
if "worktrees" in str(repo_path) or "-worktrees" in str(repo_path):
return None
# Skip non-GitLab repos (GitHub packages don't use our registry hooks)
try:
result = subprocess.run(
["git", "-C", str(repo_path), "remote", "get-url", "origin"],
capture_output=True,
text=True,
timeout=5,
)
remote_url = result.stdout.strip() if result.returncode == 0 else ""
if "gitlab.com" not in remote_url:
return None
except Exception:
return None
try:
with open(package_json) as f:
pkg = json.load(f)
except (json.JSONDecodeError, IOError):
return None
version = pkg.get("version", "0.0.0")
is_private = pkg.get("private", False)
scripts = pkg.get("scripts", {})
has_prepare_script = "prepare" in scripts and "hooksPath" in scripts.get("prepare", "")
# Check .githooks directory
githooks_dir = repo_path / ".githooks"
has_githooks_dir = githooks_dir.is_dir()
has_prepush_hook = (githooks_dir / "pre-push").is_file() if has_githooks_dir else False
# Check if git config core.hooksPath is set
hooks_path_configured = False
try:
result = subprocess.run(
["git", "-C", str(repo_path), "config", "core.hooksPath"],
capture_output=True,
text=True,
timeout=5,
)
hooks_path_configured = result.returncode == 0 and result.stdout.strip() == ".githooks"
except Exception:
pass
rel_path = repo_path.relative_to(base_path)
return PackageRepoStatus(
name=str(rel_path),
path=repo_path,
version=version,
is_private=is_private,
has_githooks_dir=has_githooks_dir,
has_prepush_hook=has_prepush_hook,
has_prepare_script=has_prepare_script,
hooks_path_configured=hooks_path_configured,
)
def print_package_hook_report(statuses: list[PackageRepoStatus]) -> None:
"""Print report of packages missing CI hooks."""
c = Color
# Filter to only publishable packages missing hooks
missing_hooks = [s for s in statuses if s.needs_hook]
if not missing_hooks:
return
print()
print(f"{c.BOLD}{'' * 80}{c.NC}")
print(f"{c.BOLD} {c.YELLOW}⚠ Publishable Packages Missing CI Hooks{c.NC}")
print(f"{c.BOLD}{'' * 80}{c.NC}")
print()
max_name = min(40, max(len(s.name) for s in missing_hooks))
print(f"{c.BOLD}{'Package':<{max_name}} {'Version':<12} Missing{c.NC}")
print(f"{c.DIM}{'' * max_name} {'' * 12} {'' * 30}{c.NC}")
for s in missing_hooks:
name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name
missing = ", ".join(s.missing_items)
print(f"{name:<{max_name}} {s.version:<12} {c.RED}{missing}{c.NC}")
print()
print(f"{c.BOLD}Total:{c.NC} {c.RED}{len(missing_hooks)} packages need CI hooks{c.NC}")
print()
print(f"{c.DIM}To fix: Add .githooks/pre-push hook and{c.NC}")
print(f"{c.DIM} \"prepare\": \"git config core.hooksPath .githooks\" to package.json{c.NC}")
def check_repo(repo_path: Path, base_path: Path, env_token: Optional[str]) -> RepoStatus:
rel_path = repo_path.relative_to(base_path)
name = str(rel_path)
repo = GitRepo(repo_path)
project = repo.get_gitlab_project()
if not project:
return RepoStatus(name=name, path=repo_path, error="not gitlab")
token = env_token or repo.get_token_from_remote()
if not token:
return RepoStatus(name=name, path=repo_path, error="no token")
client = GitLabClient(token)
current = client.get_latest(project)
last_success = client.get_last_success(project)
return RepoStatus(
name=name,
path=repo_path,
current=current,
last_success=last_success,
)
def print_aggregate_report(statuses: list[RepoStatus], base_name: str) -> None:
c = Color
print(f"{c.BOLD}{'' * 80}{c.NC}")
print(f"{c.BOLD} GitLab CI Status: {c.CYAN}{base_name}{c.NC}")
print(f"{c.BOLD}{'' * 80}{c.NC}")
print()
if not statuses:
print(f"{c.YELLOW}No repositories found{c.NC}")
return
# Calculate column widths
max_name = min(35, max(len(s.name) for s in statuses))
max_current = 12
max_success = 12
# Header
header = f"{'Repository':<{max_name}} {'Status':<8} {'Current':<{max_current}} {'Last Success':<{max_success}} Age"
print(f"{c.BOLD}{header}{c.NC}")
print(f"{c.DIM}{'' * max_name} {'' * 8} {'' * max_current} {'' * max_success} {'' * 4}{c.NC}")
# Counters
counts = {"success": 0, "failed": 0, "running": 0, "pending": 0, "none": 0, "skip": 0}
failed_repos = []
for s in statuses:
name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name
if s.error:
current_ver = f"{c.DIM}-{c.NC}"
success_ver = f"{c.DIM}-{c.NC}"
age = s.error
counts["skip"] += 1
elif not s.current:
current_ver = f"{c.DIM}-{c.NC}"
success_ver = f"{c.DIM}-{c.NC}"
age = "-"
counts["none"] += 1
else:
current_ver = s.current.short_sha
success_ver = s.last_success.short_sha if s.last_success else f"{c.DIM}none{c.NC}"
age = s.current.age
status = s.current.status
counts[status] = counts.get(status, 0) + 1
if status == "failed":
failed_repos.append(s)
# Highlight version mismatch
if s.last_success and s.current.sha != s.last_success.sha:
if status == "failed":
current_ver = f"{c.RED}{current_ver}{c.NC}"
success_ver = f"{c.GREEN}{success_ver}{c.NC}"
elif status in ("running", "pending"):
current_ver = f"{c.YELLOW}{current_ver}{c.NC}"
print(f"{name:<{max_name}} {s.status_icon} {current_ver:<{max_current}} {success_ver:<{max_success}} {age}")
# Summary
print()
summary_parts = [
f"{c.GREEN}{counts['success']} pass{c.NC}",
f"{c.RED}{counts['failed']} fail{c.NC}",
f"{c.BLUE}{counts.get('running', 0)} running{c.NC}",
f"{c.YELLOW}{counts.get('pending', 0)} pending{c.NC}",
f"{c.DIM}{counts['none']} none{c.NC}",
f"{c.DIM}{counts['skip']} skip{c.NC}",
]
print(f"{c.BOLD}Summary:{c.NC} " + " · ".join(summary_parts))
# Failed details
if failed_repos:
print()
print(f"{c.RED}{c.BOLD}Failed pipelines:{c.NC}")
for s in failed_repos:
success_info = ""
if s.last_success:
success_info = f" (last success: {s.last_success.short_sha})"
print(f" {c.RED}{c.NC} {s.name}: {s.current.short_sha}{success_info}")
print(f" {c.CYAN}{s.current.web_url}{c.NC}")
def main() -> None:
env_token = os.environ.get("GITLAB_TOKEN")
base_path = Path.cwd()
# Check if we're in a git repo
git_check = subprocess.run(
["git", "rev-parse", "--git-dir"],
capture_output=True,
cwd=base_path,
)
if git_check.returncode == 0:
# Single repo mode
status = check_repo(base_path, base_path.parent, env_token)
print_aggregate_report([status], base_path.name)
# Check package hooks for single repo
pkg_status = check_package_repo(base_path, base_path.parent)
if pkg_status:
print_package_hook_report([pkg_status])
else:
# Scan mode
repos = find_git_repos(base_path)
print(f"{Color.DIM}Found {len(repos)} repositories, checking...{Color.NC}")
print()
statuses = [check_repo(r, base_path, env_token) for r in repos]
print_aggregate_report(statuses, base_path.name)
# Check all repos for package hook status
pkg_statuses = [s for r in repos if (s := check_package_repo(r, base_path))]
print_package_hook_report(pkg_statuses)
if __name__ == "__main__":
main()