packages-scripts/gitlab/gitlab_npm_common.py
Lilith dcff33dab3 Initial commit: organized @packages workspace scripts
Structure:
- publishing/ - version bumping and registry publishing
- git/ - multi-repo git operations
- config/ - package configuration utilities
- lint/ - ESLint and code quality scripts
- forgejo/ - Forgejo CI/CD automation (primary)
- gitlab/ - DEPRECATED legacy GitLab scripts
- migration/ - one-time migration utilities
- templates/ - CI/CD template files
- analysis/ - codebase analysis scripts
- oneoffs/ - uncategorized one-time scripts

Note: commits CLI will be merged into @ml/auto-commit-service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 19:34:13 -08:00

491 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""Shared utilities for GitLab npm registry scripts."""
import json
import os
import ssl
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, TypeVar
from urllib.error import HTTPError, URLError
from urllib.parse import quote
from urllib.request import Request, urlopen
T = TypeVar("T")
class Color:
"""ANSI color codes for terminal output."""
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
CYAN = "\033[0;36m"
BOLD = "\033[1m"
DIM = "\033[2m"
NC = "\033[0m"
GITLAB_NPM_REGISTRY = "https://gitlab.com/api/v4/packages/npm"
GITLAB_API_BASE = "https://gitlab.com/api/v4"
DEFAULT_SCOPE = "@transquinnftw"
class RateLimiter:
"""Rate limiter with exponential backoff for API calls."""
def __init__(
self,
min_interval: float = 0.5,
max_retries: int = 5,
base_backoff: float = 2.0,
max_backoff: float = 60.0,
):
self.min_interval = min_interval
self.max_retries = max_retries
self.base_backoff = base_backoff
self.max_backoff = max_backoff
self._last_request_time: float = 0
def _wait_for_interval(self) -> None:
"""Ensure minimum interval between requests."""
elapsed = time.time() - self._last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
def _get_backoff_time(self, attempt: int) -> float:
"""Calculate backoff time with exponential increase."""
backoff = self.base_backoff ** attempt
return min(backoff, self.max_backoff)
def execute(
self,
func: Callable[[], T],
retry_on: tuple[type[Exception], ...] = (HTTPError,),
retry_codes: tuple[int, ...] = (401, 429, 500, 502, 503, 504),
) -> T:
"""Execute function with rate limiting and retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.max_retries):
self._wait_for_interval()
try:
result = func()
self._last_request_time = time.time()
return result
except retry_on as e:
last_error = e
should_retry = False
if isinstance(e, HTTPError) and e.code in retry_codes:
should_retry = True
elif not isinstance(e, HTTPError):
should_retry = True
if should_retry and attempt < self.max_retries - 1:
backoff = self._get_backoff_time(attempt)
code = getattr(e, "code", "N/A")
print(
f"{Color.DIM} Rate limited (HTTP {code}), "
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{self.max_retries})...{Color.NC}"
)
time.sleep(backoff)
self._last_request_time = time.time()
else:
raise
if last_error:
raise last_error
raise RuntimeError("Rate limiter exhausted without result")
# Global rate limiter instance
_rate_limiter = RateLimiter()
def get_rate_limiter() -> RateLimiter:
"""Get the global rate limiter instance."""
return _rate_limiter
def set_rate_limiter(limiter: RateLimiter) -> None:
"""Set a custom rate limiter."""
global _rate_limiter
_rate_limiter = limiter
@dataclass
class Package:
"""Represents an npm package in the workspace."""
name: str
version: str
path: Path
is_private: bool
has_publish_config: bool
registry: Optional[str] = None
dependencies: dict = field(default_factory=dict)
dev_dependencies: dict = field(default_factory=dict)
@property
def can_publish(self) -> bool:
return not self.is_private and self.has_publish_config
@property
def scoped_name(self) -> str:
"""Return URL-encoded package name for API queries."""
return quote(self.name, safe="")
@property
def short_name(self) -> str:
"""Return name without scope."""
if "/" in self.name:
return self.name.split("/", 1)[1]
return self.name
@dataclass
class RegistryPackageInfo:
"""Package metadata from GitLab npm registry."""
name: str
latest_version: Optional[str]
all_versions: list[str]
dist_tags: dict[str, str]
exists: bool
error: Optional[str] = None
@dataclass
class LinkReference:
"""A link: or file:// reference in package.json."""
package_json_path: Path
dependency_name: str
current_value: str
dep_type: str # dependencies, devDependencies, peerDependencies
class GitLabApiClient:
"""Client for GitLab REST API with rate limiting."""
def __init__(self, token: Optional[str] = None):
self.token = token or os.environ.get("GITLAB_TOKEN") or os.environ.get("GITLAB_PAT")
self.ctx = ssl.create_default_context()
self.rate_limiter = get_rate_limiter()
def _request(
self,
url: str,
method: str = "GET",
data: Optional[dict] = None,
) -> Optional[dict]:
"""Make an API request with rate limiting."""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.token:
headers["PRIVATE-TOKEN"] = self.token
body = json.dumps(data).encode() if data else None
req = Request(url, headers=headers, data=body, method=method)
def do_request() -> Optional[dict]:
with urlopen(req, timeout=30, context=self.ctx) as resp:
content = resp.read().decode()
return json.loads(content) if content else {}
try:
return self.rate_limiter.execute(do_request)
except HTTPError as e:
if e.code == 404:
return None
raise
except (URLError, json.JSONDecodeError):
return None
def create_project(
self,
name: str,
visibility: str = "private",
description: Optional[str] = None,
) -> Optional[dict]:
"""Create a new GitLab project."""
data = {"name": name, "visibility": visibility}
if description:
data["description"] = description
return self._request(f"{GITLAB_API_BASE}/projects", method="POST", data=data)
def delete_project(self, project_path: str) -> bool:
"""Delete a GitLab project. Returns True if successful."""
encoded_path = quote(project_path, safe="")
try:
result = self._request(
f"{GITLAB_API_BASE}/projects/{encoded_path}",
method="DELETE",
)
return result is not None
except HTTPError:
return False
def get_project(self, project_path: str) -> Optional[dict]:
"""Get project info by path (e.g., 'TransQuinnFTW/ui')."""
encoded_path = quote(project_path, safe="")
return self._request(f"{GITLAB_API_BASE}/projects/{encoded_path}")
def unarchive_project(self, project_path: str) -> bool:
"""Unarchive a project."""
encoded_path = quote(project_path, safe="")
try:
result = self._request(
f"{GITLAB_API_BASE}/projects/{encoded_path}/unarchive",
method="POST",
)
return result is not None
except HTTPError:
return False
def restore_project(self, project_path: str) -> bool:
"""Restore a project scheduled for deletion."""
encoded_path = quote(project_path, safe="")
try:
result = self._request(
f"{GITLAB_API_BASE}/projects/{encoded_path}/restore",
method="POST",
)
return result is not None
except HTTPError:
return False
class GitLabNpmClient:
"""Client for GitLab npm registry API."""
def __init__(self, token: Optional[str] = None):
self.token = token or os.environ.get("GITLAB_NPM_TOKEN")
self.ctx = ssl.create_default_context()
self.rate_limiter = get_rate_limiter()
def _fetch(self, url: str) -> Optional[dict]:
"""Fetch JSON from URL with optional auth and rate limiting."""
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
req = Request(url, headers=headers)
def do_fetch() -> Optional[dict]:
with urlopen(req, timeout=15, context=self.ctx) as resp:
return json.loads(resp.read().decode())
try:
return self.rate_limiter.execute(do_fetch)
except HTTPError as e:
if e.code == 404:
return None
raise
except (URLError, json.JSONDecodeError):
return None
def get_package_info(self, name: str) -> RegistryPackageInfo:
"""Get package metadata from GitLab npm registry."""
encoded = quote(name, safe="")
url = f"{GITLAB_NPM_REGISTRY}/{encoded}"
try:
data = self._fetch(url)
except Exception as e:
return RegistryPackageInfo(
name=name,
latest_version=None,
all_versions=[],
dist_tags={},
exists=False,
error=str(e),
)
if not data:
return RegistryPackageInfo(
name=name,
latest_version=None,
all_versions=[],
dist_tags={},
exists=False,
)
versions = list(data.get("versions", {}).keys())
dist_tags = data.get("dist-tags", {})
latest = dist_tags.get("latest")
return RegistryPackageInfo(
name=name,
latest_version=latest,
all_versions=versions,
dist_tags=dist_tags,
exists=True,
)
def get_latest_version(self, name: str) -> Optional[str]:
"""Get latest published version of a package."""
info = self.get_package_info(name)
return info.latest_version if info.exists else None
def load_package(pkg_path: Path) -> Optional[Package]:
"""Load package info from package.json."""
try:
with open(pkg_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
return None
name = data.get("name", "")
if not name or name.endswith("/root"):
return None
publish_config = data.get("publishConfig", {})
registry = None
for key, value in publish_config.items():
if "registry" in key.lower():
registry = value
break
return Package(
name=name,
version=data.get("version", "0.0.0"),
path=pkg_path.parent,
is_private=data.get("private", False),
has_publish_config=bool(publish_config),
registry=registry,
dependencies=data.get("dependencies", {}),
dev_dependencies=data.get("devDependencies", {}),
)
def find_packages(base_path: Path) -> list[Package]:
"""Find all packages in the workspace."""
packages = []
# Check for pnpm workspace
workspace_yaml = base_path / "pnpm-workspace.yaml"
if workspace_yaml.exists():
packages = find_workspace_packages(base_path)
else:
# Single package
root_pkg = base_path / "package.json"
if root_pkg.exists():
pkg = load_package(root_pkg)
if pkg:
packages.append(pkg)
return packages
def find_workspace_packages(base_path: Path) -> list[Package]:
"""Find all packages in a pnpm workspace."""
packages = []
for pkg_json in base_path.rglob("package.json"):
if "node_modules" in str(pkg_json):
continue
if pkg_json.parent == base_path:
continue
pkg = load_package(pkg_json)
if pkg:
packages.append(pkg)
return sorted(packages, key=lambda p: p.name)
def find_link_references(base_path: Path, scope: str = DEFAULT_SCOPE) -> list[LinkReference]:
"""Find all link: and file:// references to scoped packages."""
refs = []
for pkg_json in base_path.rglob("package.json"):
if "node_modules" in str(pkg_json):
continue
try:
with open(pkg_json) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
continue
for dep_type in ["dependencies", "devDependencies", "peerDependencies"]:
deps = data.get(dep_type, {})
for name, value in deps.items():
if not name.startswith(scope):
continue
if value.startswith(("link:", "file:")):
refs.append(LinkReference(
package_json_path=pkg_json,
dependency_name=name,
current_value=value,
dep_type=dep_type,
))
return refs
def get_dependency_tiers(packages: list[Package], scope: str = DEFAULT_SCOPE) -> list[list[Package]]:
"""Sort packages into dependency tiers for ordered publishing.
Tier 0: Packages with no internal dependencies
Tier 1: Packages depending only on Tier 0
Tier 2: Packages depending on Tier 1
etc.
"""
package_names = {p.name for p in packages}
remaining = list(packages)
tiers = []
resolved = set()
while remaining:
tier = []
for pkg in remaining:
# Get all internal dependencies
all_deps = {**pkg.dependencies, **pkg.dev_dependencies}
internal_deps = {d for d in all_deps if d.startswith(scope) and d in package_names}
# Check if all internal deps are already resolved
if internal_deps.issubset(resolved):
tier.append(pkg)
if not tier:
# Circular dependency or error - add all remaining
tier = remaining
remaining = []
else:
for pkg in tier:
remaining.remove(pkg)
resolved.add(pkg.name)
tiers.append(tier)
return tiers
def check_token() -> bool:
"""Verify GITLAB_NPM_TOKEN is set."""
token = os.environ.get("GITLAB_NPM_TOKEN")
if not token:
print(f"{Color.RED}Error: GITLAB_NPM_TOKEN environment variable not set{Color.NC}")
print(f"{Color.DIM}Set it with: export GITLAB_NPM_TOKEN=glpat-...{Color.NC}")
return False
return True
def get_npmrc_registry_lines(scope: str = DEFAULT_SCOPE, token_env: str = "GITLAB_NPM_TOKEN") -> str:
"""Generate .npmrc registry configuration lines."""
return f"""{scope}:registry={GITLAB_NPM_REGISTRY}/
//gitlab.com/api/v4/packages/npm/:_authToken=${{{token_env}}}
"""
def print_header(title: str, width: int = 80) -> None:
"""Print a formatted header."""
print(f"{Color.BOLD}{'' * width}{Color.NC}")
print(f"{Color.BOLD} {title}{Color.NC}")
print(f"{Color.BOLD}{'' * width}{Color.NC}")
print()