Structure: - publishing/ - version bumping and registry publishing - git/ - multi-repo git operations - config/ - package configuration utilities - lint/ - ESLint and code quality scripts - forgejo/ - Forgejo CI/CD automation (primary) - gitlab/ - DEPRECATED legacy GitLab scripts - migration/ - one-time migration utilities - templates/ - CI/CD template files - analysis/ - codebase analysis scripts - oneoffs/ - uncategorized one-time scripts Note: commits CLI will be merged into @ml/auto-commit-service Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
491 lines
15 KiB
Python
Executable file
491 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Shared utilities for GitLab npm registry scripts."""
|
|
|
|
import json
|
|
import os
|
|
import ssl
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Callable, Optional, TypeVar
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import quote
|
|
from urllib.request import Request, urlopen
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class Color:
|
|
"""ANSI color codes for terminal output."""
|
|
RED = "\033[0;31m"
|
|
GREEN = "\033[0;32m"
|
|
YELLOW = "\033[1;33m"
|
|
BLUE = "\033[0;34m"
|
|
CYAN = "\033[0;36m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
NC = "\033[0m"
|
|
|
|
|
|
GITLAB_NPM_REGISTRY = "https://gitlab.com/api/v4/packages/npm"
|
|
GITLAB_API_BASE = "https://gitlab.com/api/v4"
|
|
DEFAULT_SCOPE = "@transquinnftw"
|
|
|
|
|
|
class RateLimiter:
|
|
"""Rate limiter with exponential backoff for API calls."""
|
|
|
|
def __init__(
|
|
self,
|
|
min_interval: float = 0.5,
|
|
max_retries: int = 5,
|
|
base_backoff: float = 2.0,
|
|
max_backoff: float = 60.0,
|
|
):
|
|
self.min_interval = min_interval
|
|
self.max_retries = max_retries
|
|
self.base_backoff = base_backoff
|
|
self.max_backoff = max_backoff
|
|
self._last_request_time: float = 0
|
|
|
|
def _wait_for_interval(self) -> None:
|
|
"""Ensure minimum interval between requests."""
|
|
elapsed = time.time() - self._last_request_time
|
|
if elapsed < self.min_interval:
|
|
time.sleep(self.min_interval - elapsed)
|
|
|
|
def _get_backoff_time(self, attempt: int) -> float:
|
|
"""Calculate backoff time with exponential increase."""
|
|
backoff = self.base_backoff ** attempt
|
|
return min(backoff, self.max_backoff)
|
|
|
|
def execute(
|
|
self,
|
|
func: Callable[[], T],
|
|
retry_on: tuple[type[Exception], ...] = (HTTPError,),
|
|
retry_codes: tuple[int, ...] = (401, 429, 500, 502, 503, 504),
|
|
) -> T:
|
|
"""Execute function with rate limiting and retry logic."""
|
|
last_error: Optional[Exception] = None
|
|
|
|
for attempt in range(self.max_retries):
|
|
self._wait_for_interval()
|
|
|
|
try:
|
|
result = func()
|
|
self._last_request_time = time.time()
|
|
return result
|
|
except retry_on as e:
|
|
last_error = e
|
|
should_retry = False
|
|
|
|
if isinstance(e, HTTPError) and e.code in retry_codes:
|
|
should_retry = True
|
|
elif not isinstance(e, HTTPError):
|
|
should_retry = True
|
|
|
|
if should_retry and attempt < self.max_retries - 1:
|
|
backoff = self._get_backoff_time(attempt)
|
|
code = getattr(e, "code", "N/A")
|
|
print(
|
|
f"{Color.DIM} Rate limited (HTTP {code}), "
|
|
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{self.max_retries})...{Color.NC}"
|
|
)
|
|
time.sleep(backoff)
|
|
self._last_request_time = time.time()
|
|
else:
|
|
raise
|
|
|
|
if last_error:
|
|
raise last_error
|
|
raise RuntimeError("Rate limiter exhausted without result")
|
|
|
|
|
|
# Global rate limiter instance
|
|
_rate_limiter = RateLimiter()
|
|
|
|
|
|
def get_rate_limiter() -> RateLimiter:
|
|
"""Get the global rate limiter instance."""
|
|
return _rate_limiter
|
|
|
|
|
|
def set_rate_limiter(limiter: RateLimiter) -> None:
|
|
"""Set a custom rate limiter."""
|
|
global _rate_limiter
|
|
_rate_limiter = limiter
|
|
|
|
|
|
@dataclass
|
|
class Package:
|
|
"""Represents an npm package in the workspace."""
|
|
name: str
|
|
version: str
|
|
path: Path
|
|
is_private: bool
|
|
has_publish_config: bool
|
|
registry: Optional[str] = None
|
|
dependencies: dict = field(default_factory=dict)
|
|
dev_dependencies: dict = field(default_factory=dict)
|
|
|
|
@property
|
|
def can_publish(self) -> bool:
|
|
return not self.is_private and self.has_publish_config
|
|
|
|
@property
|
|
def scoped_name(self) -> str:
|
|
"""Return URL-encoded package name for API queries."""
|
|
return quote(self.name, safe="")
|
|
|
|
@property
|
|
def short_name(self) -> str:
|
|
"""Return name without scope."""
|
|
if "/" in self.name:
|
|
return self.name.split("/", 1)[1]
|
|
return self.name
|
|
|
|
|
|
@dataclass
|
|
class RegistryPackageInfo:
|
|
"""Package metadata from GitLab npm registry."""
|
|
name: str
|
|
latest_version: Optional[str]
|
|
all_versions: list[str]
|
|
dist_tags: dict[str, str]
|
|
exists: bool
|
|
error: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class LinkReference:
|
|
"""A link: or file:// reference in package.json."""
|
|
package_json_path: Path
|
|
dependency_name: str
|
|
current_value: str
|
|
dep_type: str # dependencies, devDependencies, peerDependencies
|
|
|
|
|
|
class GitLabApiClient:
|
|
"""Client for GitLab REST API with rate limiting."""
|
|
|
|
def __init__(self, token: Optional[str] = None):
|
|
self.token = token or os.environ.get("GITLAB_TOKEN") or os.environ.get("GITLAB_PAT")
|
|
self.ctx = ssl.create_default_context()
|
|
self.rate_limiter = get_rate_limiter()
|
|
|
|
def _request(
|
|
self,
|
|
url: str,
|
|
method: str = "GET",
|
|
data: Optional[dict] = None,
|
|
) -> Optional[dict]:
|
|
"""Make an API request with rate limiting."""
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
}
|
|
if self.token:
|
|
headers["PRIVATE-TOKEN"] = self.token
|
|
|
|
body = json.dumps(data).encode() if data else None
|
|
req = Request(url, headers=headers, data=body, method=method)
|
|
|
|
def do_request() -> Optional[dict]:
|
|
with urlopen(req, timeout=30, context=self.ctx) as resp:
|
|
content = resp.read().decode()
|
|
return json.loads(content) if content else {}
|
|
|
|
try:
|
|
return self.rate_limiter.execute(do_request)
|
|
except HTTPError as e:
|
|
if e.code == 404:
|
|
return None
|
|
raise
|
|
except (URLError, json.JSONDecodeError):
|
|
return None
|
|
|
|
def create_project(
|
|
self,
|
|
name: str,
|
|
visibility: str = "private",
|
|
description: Optional[str] = None,
|
|
) -> Optional[dict]:
|
|
"""Create a new GitLab project."""
|
|
data = {"name": name, "visibility": visibility}
|
|
if description:
|
|
data["description"] = description
|
|
return self._request(f"{GITLAB_API_BASE}/projects", method="POST", data=data)
|
|
|
|
def delete_project(self, project_path: str) -> bool:
|
|
"""Delete a GitLab project. Returns True if successful."""
|
|
encoded_path = quote(project_path, safe="")
|
|
try:
|
|
result = self._request(
|
|
f"{GITLAB_API_BASE}/projects/{encoded_path}",
|
|
method="DELETE",
|
|
)
|
|
return result is not None
|
|
except HTTPError:
|
|
return False
|
|
|
|
def get_project(self, project_path: str) -> Optional[dict]:
|
|
"""Get project info by path (e.g., 'TransQuinnFTW/ui')."""
|
|
encoded_path = quote(project_path, safe="")
|
|
return self._request(f"{GITLAB_API_BASE}/projects/{encoded_path}")
|
|
|
|
def unarchive_project(self, project_path: str) -> bool:
|
|
"""Unarchive a project."""
|
|
encoded_path = quote(project_path, safe="")
|
|
try:
|
|
result = self._request(
|
|
f"{GITLAB_API_BASE}/projects/{encoded_path}/unarchive",
|
|
method="POST",
|
|
)
|
|
return result is not None
|
|
except HTTPError:
|
|
return False
|
|
|
|
def restore_project(self, project_path: str) -> bool:
|
|
"""Restore a project scheduled for deletion."""
|
|
encoded_path = quote(project_path, safe="")
|
|
try:
|
|
result = self._request(
|
|
f"{GITLAB_API_BASE}/projects/{encoded_path}/restore",
|
|
method="POST",
|
|
)
|
|
return result is not None
|
|
except HTTPError:
|
|
return False
|
|
|
|
|
|
class GitLabNpmClient:
|
|
"""Client for GitLab npm registry API."""
|
|
|
|
def __init__(self, token: Optional[str] = None):
|
|
self.token = token or os.environ.get("GITLAB_NPM_TOKEN")
|
|
self.ctx = ssl.create_default_context()
|
|
self.rate_limiter = get_rate_limiter()
|
|
|
|
def _fetch(self, url: str) -> Optional[dict]:
|
|
"""Fetch JSON from URL with optional auth and rate limiting."""
|
|
headers = {"Accept": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
|
|
req = Request(url, headers=headers)
|
|
|
|
def do_fetch() -> Optional[dict]:
|
|
with urlopen(req, timeout=15, context=self.ctx) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
try:
|
|
return self.rate_limiter.execute(do_fetch)
|
|
except HTTPError as e:
|
|
if e.code == 404:
|
|
return None
|
|
raise
|
|
except (URLError, json.JSONDecodeError):
|
|
return None
|
|
|
|
def get_package_info(self, name: str) -> RegistryPackageInfo:
|
|
"""Get package metadata from GitLab npm registry."""
|
|
encoded = quote(name, safe="")
|
|
url = f"{GITLAB_NPM_REGISTRY}/{encoded}"
|
|
|
|
try:
|
|
data = self._fetch(url)
|
|
except Exception as e:
|
|
return RegistryPackageInfo(
|
|
name=name,
|
|
latest_version=None,
|
|
all_versions=[],
|
|
dist_tags={},
|
|
exists=False,
|
|
error=str(e),
|
|
)
|
|
|
|
if not data:
|
|
return RegistryPackageInfo(
|
|
name=name,
|
|
latest_version=None,
|
|
all_versions=[],
|
|
dist_tags={},
|
|
exists=False,
|
|
)
|
|
|
|
versions = list(data.get("versions", {}).keys())
|
|
dist_tags = data.get("dist-tags", {})
|
|
latest = dist_tags.get("latest")
|
|
|
|
return RegistryPackageInfo(
|
|
name=name,
|
|
latest_version=latest,
|
|
all_versions=versions,
|
|
dist_tags=dist_tags,
|
|
exists=True,
|
|
)
|
|
|
|
def get_latest_version(self, name: str) -> Optional[str]:
|
|
"""Get latest published version of a package."""
|
|
info = self.get_package_info(name)
|
|
return info.latest_version if info.exists else None
|
|
|
|
|
|
def load_package(pkg_path: Path) -> Optional[Package]:
|
|
"""Load package info from package.json."""
|
|
try:
|
|
with open(pkg_path) as f:
|
|
data = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return None
|
|
|
|
name = data.get("name", "")
|
|
if not name or name.endswith("/root"):
|
|
return None
|
|
|
|
publish_config = data.get("publishConfig", {})
|
|
registry = None
|
|
for key, value in publish_config.items():
|
|
if "registry" in key.lower():
|
|
registry = value
|
|
break
|
|
|
|
return Package(
|
|
name=name,
|
|
version=data.get("version", "0.0.0"),
|
|
path=pkg_path.parent,
|
|
is_private=data.get("private", False),
|
|
has_publish_config=bool(publish_config),
|
|
registry=registry,
|
|
dependencies=data.get("dependencies", {}),
|
|
dev_dependencies=data.get("devDependencies", {}),
|
|
)
|
|
|
|
|
|
def find_packages(base_path: Path) -> list[Package]:
|
|
"""Find all packages in the workspace."""
|
|
packages = []
|
|
|
|
# Check for pnpm workspace
|
|
workspace_yaml = base_path / "pnpm-workspace.yaml"
|
|
if workspace_yaml.exists():
|
|
packages = find_workspace_packages(base_path)
|
|
else:
|
|
# Single package
|
|
root_pkg = base_path / "package.json"
|
|
if root_pkg.exists():
|
|
pkg = load_package(root_pkg)
|
|
if pkg:
|
|
packages.append(pkg)
|
|
|
|
return packages
|
|
|
|
|
|
def find_workspace_packages(base_path: Path) -> list[Package]:
|
|
"""Find all packages in a pnpm workspace."""
|
|
packages = []
|
|
|
|
for pkg_json in base_path.rglob("package.json"):
|
|
if "node_modules" in str(pkg_json):
|
|
continue
|
|
if pkg_json.parent == base_path:
|
|
continue
|
|
|
|
pkg = load_package(pkg_json)
|
|
if pkg:
|
|
packages.append(pkg)
|
|
|
|
return sorted(packages, key=lambda p: p.name)
|
|
|
|
|
|
def find_link_references(base_path: Path, scope: str = DEFAULT_SCOPE) -> list[LinkReference]:
|
|
"""Find all link: and file:// references to scoped packages."""
|
|
refs = []
|
|
|
|
for pkg_json in base_path.rglob("package.json"):
|
|
if "node_modules" in str(pkg_json):
|
|
continue
|
|
|
|
try:
|
|
with open(pkg_json) as f:
|
|
data = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
continue
|
|
|
|
for dep_type in ["dependencies", "devDependencies", "peerDependencies"]:
|
|
deps = data.get(dep_type, {})
|
|
for name, value in deps.items():
|
|
if not name.startswith(scope):
|
|
continue
|
|
if value.startswith(("link:", "file:")):
|
|
refs.append(LinkReference(
|
|
package_json_path=pkg_json,
|
|
dependency_name=name,
|
|
current_value=value,
|
|
dep_type=dep_type,
|
|
))
|
|
|
|
return refs
|
|
|
|
|
|
def get_dependency_tiers(packages: list[Package], scope: str = DEFAULT_SCOPE) -> list[list[Package]]:
|
|
"""Sort packages into dependency tiers for ordered publishing.
|
|
|
|
Tier 0: Packages with no internal dependencies
|
|
Tier 1: Packages depending only on Tier 0
|
|
Tier 2: Packages depending on Tier 1
|
|
etc.
|
|
"""
|
|
package_names = {p.name for p in packages}
|
|
remaining = list(packages)
|
|
tiers = []
|
|
resolved = set()
|
|
|
|
while remaining:
|
|
tier = []
|
|
for pkg in remaining:
|
|
# Get all internal dependencies
|
|
all_deps = {**pkg.dependencies, **pkg.dev_dependencies}
|
|
internal_deps = {d for d in all_deps if d.startswith(scope) and d in package_names}
|
|
|
|
# Check if all internal deps are already resolved
|
|
if internal_deps.issubset(resolved):
|
|
tier.append(pkg)
|
|
|
|
if not tier:
|
|
# Circular dependency or error - add all remaining
|
|
tier = remaining
|
|
remaining = []
|
|
else:
|
|
for pkg in tier:
|
|
remaining.remove(pkg)
|
|
resolved.add(pkg.name)
|
|
|
|
tiers.append(tier)
|
|
|
|
return tiers
|
|
|
|
|
|
def check_token() -> bool:
|
|
"""Verify GITLAB_NPM_TOKEN is set."""
|
|
token = os.environ.get("GITLAB_NPM_TOKEN")
|
|
if not token:
|
|
print(f"{Color.RED}Error: GITLAB_NPM_TOKEN environment variable not set{Color.NC}")
|
|
print(f"{Color.DIM}Set it with: export GITLAB_NPM_TOKEN=glpat-...{Color.NC}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def get_npmrc_registry_lines(scope: str = DEFAULT_SCOPE, token_env: str = "GITLAB_NPM_TOKEN") -> str:
|
|
"""Generate .npmrc registry configuration lines."""
|
|
return f"""{scope}:registry={GITLAB_NPM_REGISTRY}/
|
|
//gitlab.com/api/v4/packages/npm/:_authToken=${{{token_env}}}
|
|
"""
|
|
|
|
|
|
def print_header(title: str, width: int = 80) -> None:
|
|
"""Print a formatted header."""
|
|
print(f"{Color.BOLD}{'═' * width}{Color.NC}")
|
|
print(f"{Color.BOLD} {title}{Color.NC}")
|
|
print(f"{Color.BOLD}{'═' * width}{Color.NC}")
|
|
print()
|