#!/usr/bin/env python3 """GitLab CI Pipeline Status Checker - Aggregate view for multiple repos.""" import json import os import subprocess import ssl import sys from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Optional from urllib.parse import quote from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError # ANSI colors class Color: RED = "\033[0;31m" GREEN = "\033[0;32m" YELLOW = "\033[1;33m" BLUE = "\033[0;34m" CYAN = "\033[0;36m" BOLD = "\033[1m" DIM = "\033[2m" NC = "\033[0m" @dataclass class PipelineInfo: status: str pipeline_id: int ref: str sha: str created_at: Optional[datetime] web_url: str @property def short_sha(self) -> str: return self.sha[:8] @property def age(self) -> str: if not self.created_at: return "?" now = datetime.now(timezone.utc) diff = (now - self.created_at).total_seconds() if diff < 3600: return f"{int(diff / 60)}m" elif diff < 86400: return f"{int(diff / 3600)}h" return f"{int(diff / 86400)}d" @dataclass class RepoStatus: name: str path: Path current: Optional[PipelineInfo] = None last_success: Optional[PipelineInfo] = None error: Optional[str] = None @property def status_icon(self) -> str: if self.error: return f"{Color.DIM}· skip{Color.NC}" if not self.current: return f"{Color.DIM}- none{Color.NC}" icons = { "success": f"{Color.GREEN}✓ pass{Color.NC}", "failed": f"{Color.RED}✗ FAIL{Color.NC}", "running": f"{Color.BLUE}⟳ run {Color.NC}", "pending": f"{Color.YELLOW}◔ wait{Color.NC}", "canceled": f"{Color.YELLOW}⊘ canc{Color.NC}", } return icons.get(self.current.status, f"{Color.YELLOW}? {self.current.status}{Color.NC}") @dataclass class PackageRepoStatus: """Status of a Node.js package repo's CI hook setup.""" name: str path: Path version: str is_private: bool has_githooks_dir: bool has_prepush_hook: bool has_prepare_script: bool hooks_path_configured: bool @property def needs_hook(self) -> bool: """Returns True if this publishable package is missing CI hooks.""" if self.is_private: return False return not (self.has_githooks_dir and self.has_prepush_hook and self.has_prepare_script) @property def missing_items(self) -> list[str]: """List of missing hook components.""" missing = [] if not self.has_githooks_dir: missing.append(".githooks/") elif not self.has_prepush_hook: missing.append(".githooks/pre-push") if not self.has_prepare_script: missing.append("prepare script") if not self.hooks_path_configured: missing.append("core.hooksPath") return missing class GitLabClient: API_BASE = "https://gitlab.com/api/v4" def __init__(self, token: str): self.token = token self.ctx = ssl.create_default_context() def _fetch(self, url: str) -> Optional[list]: req = Request(url, headers={"PRIVATE-TOKEN": self.token}) try: with urlopen(req, timeout=10, context=self.ctx) as resp: return json.loads(resp.read().decode()) except (URLError, HTTPError, json.JSONDecodeError): return None def get_pipeline(self, project_path: str, status: Optional[str] = None) -> Optional[PipelineInfo]: encoded = quote(project_path, safe="") url = f"{self.API_BASE}/projects/{encoded}/pipelines?per_page=1" if status: url += f"&status={status}" data = self._fetch(url) if not data: return None p = data[0] created = None if p.get("created_at"): created = datetime.fromisoformat(p["created_at"].replace("Z", "+00:00")) return PipelineInfo( status=p.get("status", "unknown"), pipeline_id=p.get("id", 0), ref=p.get("ref", ""), sha=p.get("sha", ""), created_at=created, web_url=p.get("web_url", ""), ) def get_latest(self, project_path: str) -> Optional[PipelineInfo]: return self.get_pipeline(project_path) def get_last_success(self, project_path: str) -> Optional[PipelineInfo]: return self.get_pipeline(project_path, status="success") class GitRepo: def __init__(self, path: Path): self.path = path def _run_git(self, *args: str) -> Optional[str]: try: result = subprocess.run( ["git", "-C", str(self.path), *args], capture_output=True, text=True, timeout=5, ) return result.stdout.strip() if result.returncode == 0 else None except Exception: return None def get_remote_url(self) -> Optional[str]: return self._run_git("remote", "get-url", "origin") def get_token_from_remote(self) -> Optional[str]: url = self.get_remote_url() if not url: return None # Extract oauth2 token from URL: https://oauth2:TOKEN@gitlab.com/... if "oauth2:" in url and "@" in url: start = url.find("oauth2:") + 7 end = url.find("@", start) return url[start:end] return None def get_gitlab_project(self) -> Optional[str]: url = self.get_remote_url() if not url or "gitlab.com" not in url: return None # Extract: gitlab.com/org/project or gitlab.com:org/project for sep in ["gitlab.com/", "gitlab.com:"]: if sep in url: path = url.split(sep, 1)[1] path = path.removesuffix(".git") # Handle oauth URLs if "@gitlab.com" in url: path = url.split("@gitlab.com", 1)[1].lstrip("/:") path = path.removesuffix(".git") return path return None def find_git_repos(base_path: Path) -> list[Path]: repos = [] for git_dir in base_path.rglob(".git"): if "node_modules" in str(git_dir): continue if git_dir.is_dir(): repos.append(git_dir.parent) return sorted(repos) def check_package_repo(repo_path: Path, base_path: Path) -> Optional[PackageRepoStatus]: """Check if a git repo has package.json and proper CI hooks.""" package_json = repo_path / "package.json" if not package_json.exists(): return None # Skip worktrees (they share hooks with main repo) if "worktrees" in str(repo_path) or "-worktrees" in str(repo_path): return None # Skip non-GitLab repos (GitHub packages don't use our registry hooks) try: result = subprocess.run( ["git", "-C", str(repo_path), "remote", "get-url", "origin"], capture_output=True, text=True, timeout=5, ) remote_url = result.stdout.strip() if result.returncode == 0 else "" if "gitlab.com" not in remote_url: return None except Exception: return None try: with open(package_json) as f: pkg = json.load(f) except (json.JSONDecodeError, IOError): return None version = pkg.get("version", "0.0.0") is_private = pkg.get("private", False) scripts = pkg.get("scripts", {}) has_prepare_script = "prepare" in scripts and "hooksPath" in scripts.get("prepare", "") # Check .githooks directory githooks_dir = repo_path / ".githooks" has_githooks_dir = githooks_dir.is_dir() has_prepush_hook = (githooks_dir / "pre-push").is_file() if has_githooks_dir else False # Check if git config core.hooksPath is set hooks_path_configured = False try: result = subprocess.run( ["git", "-C", str(repo_path), "config", "core.hooksPath"], capture_output=True, text=True, timeout=5, ) hooks_path_configured = result.returncode == 0 and result.stdout.strip() == ".githooks" except Exception: pass rel_path = repo_path.relative_to(base_path) return PackageRepoStatus( name=str(rel_path), path=repo_path, version=version, is_private=is_private, has_githooks_dir=has_githooks_dir, has_prepush_hook=has_prepush_hook, has_prepare_script=has_prepare_script, hooks_path_configured=hooks_path_configured, ) def print_package_hook_report(statuses: list[PackageRepoStatus]) -> None: """Print report of packages missing CI hooks.""" c = Color # Filter to only publishable packages missing hooks missing_hooks = [s for s in statuses if s.needs_hook] if not missing_hooks: return print() print(f"{c.BOLD}{'═' * 80}{c.NC}") print(f"{c.BOLD} {c.YELLOW}⚠ Publishable Packages Missing CI Hooks{c.NC}") print(f"{c.BOLD}{'═' * 80}{c.NC}") print() max_name = min(40, max(len(s.name) for s in missing_hooks)) print(f"{c.BOLD}{'Package':<{max_name}} {'Version':<12} Missing{c.NC}") print(f"{c.DIM}{'─' * max_name} {'─' * 12} {'─' * 30}{c.NC}") for s in missing_hooks: name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name missing = ", ".join(s.missing_items) print(f"{name:<{max_name}} {s.version:<12} {c.RED}{missing}{c.NC}") print() print(f"{c.BOLD}Total:{c.NC} {c.RED}{len(missing_hooks)} packages need CI hooks{c.NC}") print() print(f"{c.DIM}To fix: Add .githooks/pre-push hook and{c.NC}") print(f"{c.DIM} \"prepare\": \"git config core.hooksPath .githooks\" to package.json{c.NC}") def check_repo(repo_path: Path, base_path: Path, env_token: Optional[str]) -> RepoStatus: rel_path = repo_path.relative_to(base_path) name = str(rel_path) repo = GitRepo(repo_path) project = repo.get_gitlab_project() if not project: return RepoStatus(name=name, path=repo_path, error="not gitlab") token = env_token or repo.get_token_from_remote() if not token: return RepoStatus(name=name, path=repo_path, error="no token") client = GitLabClient(token) current = client.get_latest(project) last_success = client.get_last_success(project) return RepoStatus( name=name, path=repo_path, current=current, last_success=last_success, ) def print_aggregate_report(statuses: list[RepoStatus], base_name: str) -> None: c = Color print(f"{c.BOLD}{'═' * 80}{c.NC}") print(f"{c.BOLD} GitLab CI Status: {c.CYAN}{base_name}{c.NC}") print(f"{c.BOLD}{'═' * 80}{c.NC}") print() if not statuses: print(f"{c.YELLOW}No repositories found{c.NC}") return # Calculate column widths max_name = min(35, max(len(s.name) for s in statuses)) max_current = 12 max_success = 12 # Header header = f"{'Repository':<{max_name}} {'Status':<8} {'Current':<{max_current}} {'Last Success':<{max_success}} Age" print(f"{c.BOLD}{header}{c.NC}") print(f"{c.DIM}{'─' * max_name} {'─' * 8} {'─' * max_current} {'─' * max_success} {'─' * 4}{c.NC}") # Counters counts = {"success": 0, "failed": 0, "running": 0, "pending": 0, "none": 0, "skip": 0} failed_repos = [] for s in statuses: name = s.name[:max_name - 2] + ".." if len(s.name) > max_name else s.name if s.error: current_ver = f"{c.DIM}-{c.NC}" success_ver = f"{c.DIM}-{c.NC}" age = s.error counts["skip"] += 1 elif not s.current: current_ver = f"{c.DIM}-{c.NC}" success_ver = f"{c.DIM}-{c.NC}" age = "-" counts["none"] += 1 else: current_ver = s.current.short_sha success_ver = s.last_success.short_sha if s.last_success else f"{c.DIM}none{c.NC}" age = s.current.age status = s.current.status counts[status] = counts.get(status, 0) + 1 if status == "failed": failed_repos.append(s) # Highlight version mismatch if s.last_success and s.current.sha != s.last_success.sha: if status == "failed": current_ver = f"{c.RED}{current_ver}{c.NC}" success_ver = f"{c.GREEN}{success_ver}{c.NC}" elif status in ("running", "pending"): current_ver = f"{c.YELLOW}{current_ver}{c.NC}" print(f"{name:<{max_name}} {s.status_icon} {current_ver:<{max_current}} {success_ver:<{max_success}} {age}") # Summary print() summary_parts = [ f"{c.GREEN}{counts['success']} pass{c.NC}", f"{c.RED}{counts['failed']} fail{c.NC}", f"{c.BLUE}{counts.get('running', 0)} running{c.NC}", f"{c.YELLOW}{counts.get('pending', 0)} pending{c.NC}", f"{c.DIM}{counts['none']} none{c.NC}", f"{c.DIM}{counts['skip']} skip{c.NC}", ] print(f"{c.BOLD}Summary:{c.NC} " + " · ".join(summary_parts)) # Failed details if failed_repos: print() print(f"{c.RED}{c.BOLD}Failed pipelines:{c.NC}") for s in failed_repos: success_info = "" if s.last_success: success_info = f" (last success: {s.last_success.short_sha})" print(f" {c.RED}✗{c.NC} {s.name}: {s.current.short_sha}{success_info}") print(f" {c.CYAN}{s.current.web_url}{c.NC}") def main() -> None: env_token = os.environ.get("GITLAB_TOKEN") base_path = Path.cwd() # Check if we're in a git repo git_check = subprocess.run( ["git", "rev-parse", "--git-dir"], capture_output=True, cwd=base_path, ) if git_check.returncode == 0: # Single repo mode status = check_repo(base_path, base_path.parent, env_token) print_aggregate_report([status], base_path.name) # Check package hooks for single repo pkg_status = check_package_repo(base_path, base_path.parent) if pkg_status: print_package_hook_report([pkg_status]) else: # Scan mode repos = find_git_repos(base_path) print(f"{Color.DIM}Found {len(repos)} repositories, checking...{Color.NC}") print() statuses = [check_repo(r, base_path, env_token) for r in repos] print_aggregate_report(statuses, base_path.name) # Check all repos for package hook status pkg_statuses = [s for r in repos if (s := check_package_repo(r, base_path))] print_package_hook_report(pkg_statuses) if __name__ == "__main__": main()