#!/usr/bin/env python3 """Shared utilities for GitLab npm registry scripts.""" import json import os import ssl import time from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Optional, TypeVar from urllib.error import HTTPError, URLError from urllib.parse import quote from urllib.request import Request, urlopen T = TypeVar("T") class Color: """ANSI color codes for terminal output.""" RED = "\033[0;31m" GREEN = "\033[0;32m" YELLOW = "\033[1;33m" BLUE = "\033[0;34m" CYAN = "\033[0;36m" BOLD = "\033[1m" DIM = "\033[2m" NC = "\033[0m" GITLAB_NPM_REGISTRY = "https://gitlab.com/api/v4/packages/npm" GITLAB_API_BASE = "https://gitlab.com/api/v4" DEFAULT_SCOPE = "@transquinnftw" class RateLimiter: """Rate limiter with exponential backoff for API calls.""" def __init__( self, min_interval: float = 0.5, max_retries: int = 5, base_backoff: float = 2.0, max_backoff: float = 60.0, ): self.min_interval = min_interval self.max_retries = max_retries self.base_backoff = base_backoff self.max_backoff = max_backoff self._last_request_time: float = 0 def _wait_for_interval(self) -> None: """Ensure minimum interval between requests.""" elapsed = time.time() - self._last_request_time if elapsed < self.min_interval: time.sleep(self.min_interval - elapsed) def _get_backoff_time(self, attempt: int) -> float: """Calculate backoff time with exponential increase.""" backoff = self.base_backoff ** attempt return min(backoff, self.max_backoff) def execute( self, func: Callable[[], T], retry_on: tuple[type[Exception], ...] = (HTTPError,), retry_codes: tuple[int, ...] = (401, 429, 500, 502, 503, 504), ) -> T: """Execute function with rate limiting and retry logic.""" last_error: Optional[Exception] = None for attempt in range(self.max_retries): self._wait_for_interval() try: result = func() self._last_request_time = time.time() return result except retry_on as e: last_error = e should_retry = False if isinstance(e, HTTPError) and e.code in retry_codes: should_retry = True elif not isinstance(e, HTTPError): should_retry = True if should_retry and attempt < self.max_retries - 1: backoff = self._get_backoff_time(attempt) code = getattr(e, "code", "N/A") print( f"{Color.DIM} Rate limited (HTTP {code}), " f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{self.max_retries})...{Color.NC}" ) time.sleep(backoff) self._last_request_time = time.time() else: raise if last_error: raise last_error raise RuntimeError("Rate limiter exhausted without result") # Global rate limiter instance _rate_limiter = RateLimiter() def get_rate_limiter() -> RateLimiter: """Get the global rate limiter instance.""" return _rate_limiter def set_rate_limiter(limiter: RateLimiter) -> None: """Set a custom rate limiter.""" global _rate_limiter _rate_limiter = limiter @dataclass class Package: """Represents an npm package in the workspace.""" name: str version: str path: Path is_private: bool has_publish_config: bool registry: Optional[str] = None dependencies: dict = field(default_factory=dict) dev_dependencies: dict = field(default_factory=dict) @property def can_publish(self) -> bool: return not self.is_private and self.has_publish_config @property def scoped_name(self) -> str: """Return URL-encoded package name for API queries.""" return quote(self.name, safe="") @property def short_name(self) -> str: """Return name without scope.""" if "/" in self.name: return self.name.split("/", 1)[1] return self.name @dataclass class RegistryPackageInfo: """Package metadata from GitLab npm registry.""" name: str latest_version: Optional[str] all_versions: list[str] dist_tags: dict[str, str] exists: bool error: Optional[str] = None @dataclass class LinkReference: """A link: or file:// reference in package.json.""" package_json_path: Path dependency_name: str current_value: str dep_type: str # dependencies, devDependencies, peerDependencies class GitLabApiClient: """Client for GitLab REST API with rate limiting.""" def __init__(self, token: Optional[str] = None): self.token = token or os.environ.get("GITLAB_TOKEN") or os.environ.get("GITLAB_PAT") self.ctx = ssl.create_default_context() self.rate_limiter = get_rate_limiter() def _request( self, url: str, method: str = "GET", data: Optional[dict] = None, ) -> Optional[dict]: """Make an API request with rate limiting.""" headers = { "Accept": "application/json", "Content-Type": "application/json", } if self.token: headers["PRIVATE-TOKEN"] = self.token body = json.dumps(data).encode() if data else None req = Request(url, headers=headers, data=body, method=method) def do_request() -> Optional[dict]: with urlopen(req, timeout=30, context=self.ctx) as resp: content = resp.read().decode() return json.loads(content) if content else {} try: return self.rate_limiter.execute(do_request) except HTTPError as e: if e.code == 404: return None raise except (URLError, json.JSONDecodeError): return None def create_project( self, name: str, visibility: str = "private", description: Optional[str] = None, ) -> Optional[dict]: """Create a new GitLab project.""" data = {"name": name, "visibility": visibility} if description: data["description"] = description return self._request(f"{GITLAB_API_BASE}/projects", method="POST", data=data) def delete_project(self, project_path: str) -> bool: """Delete a GitLab project. Returns True if successful.""" encoded_path = quote(project_path, safe="") try: result = self._request( f"{GITLAB_API_BASE}/projects/{encoded_path}", method="DELETE", ) return result is not None except HTTPError: return False def get_project(self, project_path: str) -> Optional[dict]: """Get project info by path (e.g., 'TransQuinnFTW/ui').""" encoded_path = quote(project_path, safe="") return self._request(f"{GITLAB_API_BASE}/projects/{encoded_path}") def unarchive_project(self, project_path: str) -> bool: """Unarchive a project.""" encoded_path = quote(project_path, safe="") try: result = self._request( f"{GITLAB_API_BASE}/projects/{encoded_path}/unarchive", method="POST", ) return result is not None except HTTPError: return False def restore_project(self, project_path: str) -> bool: """Restore a project scheduled for deletion.""" encoded_path = quote(project_path, safe="") try: result = self._request( f"{GITLAB_API_BASE}/projects/{encoded_path}/restore", method="POST", ) return result is not None except HTTPError: return False class GitLabNpmClient: """Client for GitLab npm registry API.""" def __init__(self, token: Optional[str] = None): self.token = token or os.environ.get("GITLAB_NPM_TOKEN") self.ctx = ssl.create_default_context() self.rate_limiter = get_rate_limiter() def _fetch(self, url: str) -> Optional[dict]: """Fetch JSON from URL with optional auth and rate limiting.""" headers = {"Accept": "application/json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" req = Request(url, headers=headers) def do_fetch() -> Optional[dict]: with urlopen(req, timeout=15, context=self.ctx) as resp: return json.loads(resp.read().decode()) try: return self.rate_limiter.execute(do_fetch) except HTTPError as e: if e.code == 404: return None raise except (URLError, json.JSONDecodeError): return None def get_package_info(self, name: str) -> RegistryPackageInfo: """Get package metadata from GitLab npm registry.""" encoded = quote(name, safe="") url = f"{GITLAB_NPM_REGISTRY}/{encoded}" try: data = self._fetch(url) except Exception as e: return RegistryPackageInfo( name=name, latest_version=None, all_versions=[], dist_tags={}, exists=False, error=str(e), ) if not data: return RegistryPackageInfo( name=name, latest_version=None, all_versions=[], dist_tags={}, exists=False, ) versions = list(data.get("versions", {}).keys()) dist_tags = data.get("dist-tags", {}) latest = dist_tags.get("latest") return RegistryPackageInfo( name=name, latest_version=latest, all_versions=versions, dist_tags=dist_tags, exists=True, ) def get_latest_version(self, name: str) -> Optional[str]: """Get latest published version of a package.""" info = self.get_package_info(name) return info.latest_version if info.exists else None def load_package(pkg_path: Path) -> Optional[Package]: """Load package info from package.json.""" try: with open(pkg_path) as f: data = json.load(f) except (json.JSONDecodeError, IOError): return None name = data.get("name", "") if not name or name.endswith("/root"): return None publish_config = data.get("publishConfig", {}) registry = None for key, value in publish_config.items(): if "registry" in key.lower(): registry = value break return Package( name=name, version=data.get("version", "0.0.0"), path=pkg_path.parent, is_private=data.get("private", False), has_publish_config=bool(publish_config), registry=registry, dependencies=data.get("dependencies", {}), dev_dependencies=data.get("devDependencies", {}), ) def find_packages(base_path: Path) -> list[Package]: """Find all packages in the workspace.""" packages = [] # Check for pnpm workspace workspace_yaml = base_path / "pnpm-workspace.yaml" if workspace_yaml.exists(): packages = find_workspace_packages(base_path) else: # Single package root_pkg = base_path / "package.json" if root_pkg.exists(): pkg = load_package(root_pkg) if pkg: packages.append(pkg) return packages def find_workspace_packages(base_path: Path) -> list[Package]: """Find all packages in a pnpm workspace.""" packages = [] for pkg_json in base_path.rglob("package.json"): if "node_modules" in str(pkg_json): continue if pkg_json.parent == base_path: continue pkg = load_package(pkg_json) if pkg: packages.append(pkg) return sorted(packages, key=lambda p: p.name) def find_link_references(base_path: Path, scope: str = DEFAULT_SCOPE) -> list[LinkReference]: """Find all link: and file:// references to scoped packages.""" refs = [] for pkg_json in base_path.rglob("package.json"): if "node_modules" in str(pkg_json): continue try: with open(pkg_json) as f: data = json.load(f) except (json.JSONDecodeError, IOError): continue for dep_type in ["dependencies", "devDependencies", "peerDependencies"]: deps = data.get(dep_type, {}) for name, value in deps.items(): if not name.startswith(scope): continue if value.startswith(("link:", "file:")): refs.append(LinkReference( package_json_path=pkg_json, dependency_name=name, current_value=value, dep_type=dep_type, )) return refs def get_dependency_tiers(packages: list[Package], scope: str = DEFAULT_SCOPE) -> list[list[Package]]: """Sort packages into dependency tiers for ordered publishing. Tier 0: Packages with no internal dependencies Tier 1: Packages depending only on Tier 0 Tier 2: Packages depending on Tier 1 etc. """ package_names = {p.name for p in packages} remaining = list(packages) tiers = [] resolved = set() while remaining: tier = [] for pkg in remaining: # Get all internal dependencies all_deps = {**pkg.dependencies, **pkg.dev_dependencies} internal_deps = {d for d in all_deps if d.startswith(scope) and d in package_names} # Check if all internal deps are already resolved if internal_deps.issubset(resolved): tier.append(pkg) if not tier: # Circular dependency or error - add all remaining tier = remaining remaining = [] else: for pkg in tier: remaining.remove(pkg) resolved.add(pkg.name) tiers.append(tier) return tiers def check_token() -> bool: """Verify GITLAB_NPM_TOKEN is set.""" token = os.environ.get("GITLAB_NPM_TOKEN") if not token: print(f"{Color.RED}Error: GITLAB_NPM_TOKEN environment variable not set{Color.NC}") print(f"{Color.DIM}Set it with: export GITLAB_NPM_TOKEN=glpat-...{Color.NC}") return False return True def get_npmrc_registry_lines(scope: str = DEFAULT_SCOPE, token_env: str = "GITLAB_NPM_TOKEN") -> str: """Generate .npmrc registry configuration lines.""" return f"""{scope}:registry={GITLAB_NPM_REGISTRY}/ //gitlab.com/api/v4/packages/npm/:_authToken=${{{token_env}}} """ def print_header(title: str, width: int = 80) -> None: """Print a formatted header.""" print(f"{Color.BOLD}{'═' * width}{Color.NC}") print(f"{Color.BOLD} {title}{Color.NC}") print(f"{Color.BOLD}{'═' * width}{Color.NC}") print()