Structure: - publishing/ - version bumping and registry publishing - git/ - multi-repo git operations - config/ - package configuration utilities - lint/ - ESLint and code quality scripts - forgejo/ - Forgejo CI/CD automation (primary) - gitlab/ - DEPRECATED legacy GitLab scripts - migration/ - one-time migration utilities - templates/ - CI/CD template files - analysis/ - codebase analysis scripts - oneoffs/ - uncategorized one-time scripts Note: commits CLI will be merged into @ml/auto-commit-service Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
459 lines
15 KiB
Python
Executable file
459 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""GitLab CI Pipeline Status Checker - Aggregate view for multiple repos."""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import ssl
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from urllib.parse import quote
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import URLError, HTTPError
|
|
|
|
# ANSI colors
|
|
class Color:
|
|
RED = "\033[0;31m"
|
|
GREEN = "\033[0;32m"
|
|
YELLOW = "\033[1;33m"
|
|
BLUE = "\033[0;34m"
|
|
CYAN = "\033[0;36m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
NC = "\033[0m"
|
|
|
|
|
|
@dataclass
|
|
class PipelineInfo:
|
|
status: str
|
|
pipeline_id: int
|
|
ref: str
|
|
sha: str
|
|
created_at: Optional[datetime]
|
|
web_url: str
|
|
|
|
@property
|
|
def short_sha(self) -> str:
|
|
return self.sha[:8]
|
|
|
|
@property
|
|
def age(self) -> str:
|
|
if not self.created_at:
|
|
return "?"
|
|
now = datetime.now(timezone.utc)
|
|
diff = (now - self.created_at).total_seconds()
|
|
if diff < 3600:
|
|
return f"{int(diff / 60)}m"
|
|
elif diff < 86400:
|
|
return f"{int(diff / 3600)}h"
|
|
return f"{int(diff / 86400)}d"
|
|
|
|
|
|
@dataclass
|
|
class RepoStatus:
|
|
name: str
|
|
path: Path
|
|
current: Optional[PipelineInfo] = None
|
|
last_success: Optional[PipelineInfo] = None
|
|
error: Optional[str] = None
|
|
|
|
@property
|
|
def status_icon(self) -> str:
|
|
if self.error:
|
|
return f"{Color.DIM}· skip{Color.NC}"
|
|
if not self.current:
|
|
return f"{Color.DIM}- none{Color.NC}"
|
|
icons = {
|
|
"success": f"{Color.GREEN}✓ pass{Color.NC}",
|
|
"failed": f"{Color.RED}✗ FAIL{Color.NC}",
|
|
"running": f"{Color.BLUE}⟳ run {Color.NC}",
|
|
"pending": f"{Color.YELLOW}◔ wait{Color.NC}",
|
|
"canceled": f"{Color.YELLOW}⊘ canc{Color.NC}",
|
|
}
|
|
return icons.get(self.current.status, f"{Color.YELLOW}? {self.current.status}{Color.NC}")
|
|
|
|
|
|
@dataclass
|
|
class PackageRepoStatus:
|
|
"""Status of a Node.js package repo's CI hook setup."""
|
|
name: str
|
|
path: Path
|
|
version: str
|
|
is_private: bool
|
|
has_githooks_dir: bool
|
|
has_prepush_hook: bool
|
|
has_prepare_script: bool
|
|
hooks_path_configured: bool
|
|
|
|
@property
|
|
def needs_hook(self) -> bool:
|
|
"""Returns True if this publishable package is missing CI hooks."""
|
|
if self.is_private:
|
|
return False
|
|
return not (self.has_githooks_dir and self.has_prepush_hook and self.has_prepare_script)
|
|
|
|
@property
|
|
def missing_items(self) -> list[str]:
|
|
"""List of missing hook components."""
|
|
missing = []
|
|
if not self.has_githooks_dir:
|
|
missing.append(".githooks/")
|
|
elif not self.has_prepush_hook:
|
|
missing.append(".githooks/pre-push")
|
|
if not self.has_prepare_script:
|
|
missing.append("prepare script")
|
|
if not self.hooks_path_configured:
|
|
missing.append("core.hooksPath")
|
|
return missing
|
|
|
|
|
|
class GitLabClient:
|
|
API_BASE = "https://gitlab.com/api/v4"
|
|
|
|
def __init__(self, token: str):
|
|
self.token = token
|
|
self.ctx = ssl.create_default_context()
|
|
|
|
def _fetch(self, url: str) -> Optional[list]:
|
|
req = Request(url, headers={"PRIVATE-TOKEN": self.token})
|
|
try:
|
|
with urlopen(req, timeout=10, context=self.ctx) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except (URLError, HTTPError, json.JSONDecodeError):
|
|
return None
|
|
|
|
def get_pipeline(self, project_path: str, status: Optional[str] = None) -> Optional[PipelineInfo]:
|
|
encoded = quote(project_path, safe="")
|
|
url = f"{self.API_BASE}/projects/{encoded}/pipelines?per_page=1"
|
|
if status:
|
|
url += f"&status={status}"
|
|
|
|
data = self._fetch(url)
|
|
if not data:
|
|
return None
|
|
|
|
p = data[0]
|
|
created = None
|
|
if p.get("created_at"):
|
|
created = datetime.fromisoformat(p["created_at"].replace("Z", "+00:00"))
|
|
|
|
return PipelineInfo(
|
|
status=p.get("status", "unknown"),
|
|
pipeline_id=p.get("id", 0),
|
|
ref=p.get("ref", ""),
|
|
sha=p.get("sha", ""),
|
|
created_at=created,
|
|
web_url=p.get("web_url", ""),
|
|
)
|
|
|
|
def get_latest(self, project_path: str) -> Optional[PipelineInfo]:
|
|
return self.get_pipeline(project_path)
|
|
|
|
def get_last_success(self, project_path: str) -> Optional[PipelineInfo]:
|
|
return self.get_pipeline(project_path, status="success")
|
|
|
|
|
|
class GitRepo:
|
|
def __init__(self, path: Path):
|
|
self.path = path
|
|
|
|
def _run_git(self, *args: str) -> Optional[str]:
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "-C", str(self.path), *args],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
return result.stdout.strip() if result.returncode == 0 else None
|
|
except Exception:
|
|
return None
|
|
|
|
def get_remote_url(self) -> Optional[str]:
|
|
return self._run_git("remote", "get-url", "origin")
|
|
|
|
def get_token_from_remote(self) -> Optional[str]:
|
|
url = self.get_remote_url()
|
|
if not url:
|
|
return None
|
|
# Extract oauth2 token from URL: https://oauth2:TOKEN@gitlab.com/...
|
|
if "oauth2:" in url and "@" in url:
|
|
start = url.find("oauth2:") + 7
|
|
end = url.find("@", start)
|
|
return url[start:end]
|
|
return None
|
|
|
|
def get_gitlab_project(self) -> Optional[str]:
|
|
url = self.get_remote_url()
|
|
if not url or "gitlab.com" not in url:
|
|
return None
|
|
# Extract: gitlab.com/org/project or gitlab.com:org/project
|
|
for sep in ["gitlab.com/", "gitlab.com:"]:
|
|
if sep in url:
|
|
path = url.split(sep, 1)[1]
|
|
path = path.removesuffix(".git")
|
|
# Handle oauth URLs
|
|
if "@gitlab.com" in url:
|
|
path = url.split("@gitlab.com", 1)[1].lstrip("/:")
|
|
path = path.removesuffix(".git")
|
|
return path
|
|
return None
|
|
|
|
|
|
def find_git_repos(base_path: Path) -> list[Path]:
|
|
repos = []
|
|
for git_dir in base_path.rglob(".git"):
|
|
if "node_modules" in str(git_dir):
|
|
continue
|
|
if git_dir.is_dir():
|
|
repos.append(git_dir.parent)
|
|
return sorted(repos)
|
|
|
|
|
|
def check_package_repo(repo_path: Path, base_path: Path) -> Optional[PackageRepoStatus]:
|
|
"""Check if a git repo has package.json and proper CI hooks."""
|
|
package_json = repo_path / "package.json"
|
|
if not package_json.exists():
|
|
return None
|
|
|
|
# Skip worktrees (they share hooks with main repo)
|
|
if "worktrees" in str(repo_path) or "-worktrees" in str(repo_path):
|
|
return None
|
|
|
|
# Skip non-GitLab repos (GitHub packages don't use our registry hooks)
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "-C", str(repo_path), "remote", "get-url", "origin"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
remote_url = result.stdout.strip() if result.returncode == 0 else ""
|
|
if "gitlab.com" not in remote_url:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
try:
|
|
with open(package_json) as f:
|
|
pkg = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return None
|
|
|
|
version = pkg.get("version", "0.0.0")
|
|
is_private = pkg.get("private", False)
|
|
scripts = pkg.get("scripts", {})
|
|
has_prepare_script = "prepare" in scripts and "hooksPath" in scripts.get("prepare", "")
|
|
|
|
# Check .githooks directory
|
|
githooks_dir = repo_path / ".githooks"
|
|
has_githooks_dir = githooks_dir.is_dir()
|
|
has_prepush_hook = (githooks_dir / "pre-push").is_file() if has_githooks_dir else False
|
|
|
|
# Check if git config core.hooksPath is set
|
|
hooks_path_configured = False
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "-C", str(repo_path), "config", "core.hooksPath"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
hooks_path_configured = result.returncode == 0 and result.stdout.strip() == ".githooks"
|
|
except Exception:
|
|
pass
|
|
|
|
rel_path = repo_path.relative_to(base_path)
|
|
return PackageRepoStatus(
|
|
name=str(rel_path),
|
|
path=repo_path,
|
|
version=version,
|
|
is_private=is_private,
|
|
has_githooks_dir=has_githooks_dir,
|
|
has_prepush_hook=has_prepush_hook,
|
|
has_prepare_script=has_prepare_script,
|
|
hooks_path_configured=hooks_path_configured,
|
|
)
|
|
|
|
|
|
def print_package_hook_report(statuses: list[PackageRepoStatus]) -> None:
|
|
"""Print report of packages missing CI hooks."""
|
|
c = Color
|
|
|
|
# Filter to only publishable packages missing hooks
|
|
missing_hooks = [s for s in statuses if s.needs_hook]
|
|
|
|
if not missing_hooks:
|
|
return
|
|
|
|
print()
|
|
print(f"{c.BOLD}{'═' * 80}{c.NC}")
|
|
print(f"{c.BOLD} {c.YELLOW}⚠ Publishable Packages Missing CI Hooks{c.NC}")
|
|
print(f"{c.BOLD}{'═' * 80}{c.NC}")
|
|
print()
|
|
|
|
max_name = min(40, max(len(s.name) for s in missing_hooks))
|
|
|
|
print(f"{c.BOLD}{'Package':<{max_name}} {'Version':<12} Missing{c.NC}")
|
|
print(f"{c.DIM}{'─' * max_name} {'─' * 12} {'─' * 30}{c.NC}")
|
|
|
|
for s in missing_hooks:
|
|
name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name
|
|
missing = ", ".join(s.missing_items)
|
|
print(f"{name:<{max_name}} {s.version:<12} {c.RED}{missing}{c.NC}")
|
|
|
|
print()
|
|
print(f"{c.BOLD}Total:{c.NC} {c.RED}{len(missing_hooks)} packages need CI hooks{c.NC}")
|
|
print()
|
|
print(f"{c.DIM}To fix: Add .githooks/pre-push hook and{c.NC}")
|
|
print(f"{c.DIM} \"prepare\": \"git config core.hooksPath .githooks\" to package.json{c.NC}")
|
|
|
|
|
|
def check_repo(repo_path: Path, base_path: Path, env_token: Optional[str]) -> RepoStatus:
|
|
rel_path = repo_path.relative_to(base_path)
|
|
name = str(rel_path)
|
|
|
|
repo = GitRepo(repo_path)
|
|
project = repo.get_gitlab_project()
|
|
if not project:
|
|
return RepoStatus(name=name, path=repo_path, error="not gitlab")
|
|
|
|
token = env_token or repo.get_token_from_remote()
|
|
if not token:
|
|
return RepoStatus(name=name, path=repo_path, error="no token")
|
|
|
|
client = GitLabClient(token)
|
|
current = client.get_latest(project)
|
|
last_success = client.get_last_success(project)
|
|
|
|
return RepoStatus(
|
|
name=name,
|
|
path=repo_path,
|
|
current=current,
|
|
last_success=last_success,
|
|
)
|
|
|
|
|
|
def print_aggregate_report(statuses: list[RepoStatus], base_name: str) -> None:
|
|
c = Color
|
|
|
|
print(f"{c.BOLD}{'═' * 80}{c.NC}")
|
|
print(f"{c.BOLD} GitLab CI Status: {c.CYAN}{base_name}{c.NC}")
|
|
print(f"{c.BOLD}{'═' * 80}{c.NC}")
|
|
print()
|
|
|
|
if not statuses:
|
|
print(f"{c.YELLOW}No repositories found{c.NC}")
|
|
return
|
|
|
|
# Calculate column widths
|
|
max_name = min(35, max(len(s.name) for s in statuses))
|
|
max_current = 12
|
|
max_success = 12
|
|
|
|
# Header
|
|
header = f"{'Repository':<{max_name}} {'Status':<8} {'Current':<{max_current}} {'Last Success':<{max_success}} Age"
|
|
print(f"{c.BOLD}{header}{c.NC}")
|
|
print(f"{c.DIM}{'─' * max_name} {'─' * 8} {'─' * max_current} {'─' * max_success} {'─' * 4}{c.NC}")
|
|
|
|
# Counters
|
|
counts = {"success": 0, "failed": 0, "running": 0, "pending": 0, "none": 0, "skip": 0}
|
|
failed_repos = []
|
|
|
|
for s in statuses:
|
|
name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name
|
|
|
|
if s.error:
|
|
current_ver = f"{c.DIM}-{c.NC}"
|
|
success_ver = f"{c.DIM}-{c.NC}"
|
|
age = s.error
|
|
counts["skip"] += 1
|
|
elif not s.current:
|
|
current_ver = f"{c.DIM}-{c.NC}"
|
|
success_ver = f"{c.DIM}-{c.NC}"
|
|
age = "-"
|
|
counts["none"] += 1
|
|
else:
|
|
current_ver = s.current.short_sha
|
|
success_ver = s.last_success.short_sha if s.last_success else f"{c.DIM}none{c.NC}"
|
|
age = s.current.age
|
|
|
|
status = s.current.status
|
|
counts[status] = counts.get(status, 0) + 1
|
|
|
|
if status == "failed":
|
|
failed_repos.append(s)
|
|
|
|
# Highlight version mismatch
|
|
if s.last_success and s.current.sha != s.last_success.sha:
|
|
if status == "failed":
|
|
current_ver = f"{c.RED}{current_ver}{c.NC}"
|
|
success_ver = f"{c.GREEN}{success_ver}{c.NC}"
|
|
elif status in ("running", "pending"):
|
|
current_ver = f"{c.YELLOW}{current_ver}{c.NC}"
|
|
|
|
print(f"{name:<{max_name}} {s.status_icon} {current_ver:<{max_current}} {success_ver:<{max_success}} {age}")
|
|
|
|
# Summary
|
|
print()
|
|
summary_parts = [
|
|
f"{c.GREEN}{counts['success']} pass{c.NC}",
|
|
f"{c.RED}{counts['failed']} fail{c.NC}",
|
|
f"{c.BLUE}{counts.get('running', 0)} running{c.NC}",
|
|
f"{c.YELLOW}{counts.get('pending', 0)} pending{c.NC}",
|
|
f"{c.DIM}{counts['none']} none{c.NC}",
|
|
f"{c.DIM}{counts['skip']} skip{c.NC}",
|
|
]
|
|
print(f"{c.BOLD}Summary:{c.NC} " + " · ".join(summary_parts))
|
|
|
|
# Failed details
|
|
if failed_repos:
|
|
print()
|
|
print(f"{c.RED}{c.BOLD}Failed pipelines:{c.NC}")
|
|
for s in failed_repos:
|
|
success_info = ""
|
|
if s.last_success:
|
|
success_info = f" (last success: {s.last_success.short_sha})"
|
|
print(f" {c.RED}✗{c.NC} {s.name}: {s.current.short_sha}{success_info}")
|
|
print(f" {c.CYAN}{s.current.web_url}{c.NC}")
|
|
|
|
|
|
def main() -> None:
|
|
env_token = os.environ.get("GITLAB_TOKEN")
|
|
base_path = Path.cwd()
|
|
|
|
# Check if we're in a git repo
|
|
git_check = subprocess.run(
|
|
["git", "rev-parse", "--git-dir"],
|
|
capture_output=True,
|
|
cwd=base_path,
|
|
)
|
|
|
|
if git_check.returncode == 0:
|
|
# Single repo mode
|
|
status = check_repo(base_path, base_path.parent, env_token)
|
|
print_aggregate_report([status], base_path.name)
|
|
|
|
# Check package hooks for single repo
|
|
pkg_status = check_package_repo(base_path, base_path.parent)
|
|
if pkg_status:
|
|
print_package_hook_report([pkg_status])
|
|
else:
|
|
# Scan mode
|
|
repos = find_git_repos(base_path)
|
|
print(f"{Color.DIM}Found {len(repos)} repositories, checking...{Color.NC}")
|
|
print()
|
|
|
|
statuses = [check_repo(r, base_path, env_token) for r in repos]
|
|
print_aggregate_report(statuses, base_path.name)
|
|
|
|
# Check all repos for package hook status
|
|
pkg_statuses = [s for r in repos if (s := check_package_repo(r, base_path))]
|
|
print_package_hook_report(pkg_statuses)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|